Changeset View
Standalone View
source/blender/blenlib/intern/task_iterator.c
| Show First 20 Lines • Show All 73 Lines • ▼ Show 20 Lines | typedef struct TaskParallelRangeState { | ||||
| void *initial_tls_memory; /* Pointer to actual user-defined 'tls' data. */ | void *initial_tls_memory; /* Pointer to actual user-defined 'tls' data. */ | ||||
| size_t tls_data_size; /* Size of that data. */ | size_t tls_data_size; /* Size of that data. */ | ||||
| void *flatten_tls_storage; /* 'tls' copies of initial_tls_memory for each running task. */ | void *flatten_tls_storage; /* 'tls' copies of initial_tls_memory for each running task. */ | ||||
| /* Number of 'tls' copies in the array, i.e. number of worker threads. */ | /* Number of 'tls' copies in the array, i.e. number of worker threads. */ | ||||
| size_t num_elements_in_tls_storage; | size_t num_elements_in_tls_storage; | ||||
| /* Function called from calling thread once whole range have been processed. */ | /* Function called from calling thread once whole range have been processed. */ | ||||
| TaskParallelFinalizeFunc func_finalize; | TaskParallelFreeFunc func_free; | ||||
| /* Current value of the iterator, shared between all threads (atomically updated). */ | /* Current value of the iterator, shared between all threads (atomically updated). */ | ||||
| int iter_value; | int iter_value; | ||||
| int iter_chunk_num; /* Amount of iterations to process in a single step. */ | int iter_chunk_num; /* Amount of iterations to process in a single step. */ | ||||
| } TaskParallelRangeState; | } TaskParallelRangeState; | ||||
| /* Stores all the parallel tasks for a single pool. */ | /* Stores all the parallel tasks for a single pool. */ | ||||
| typedef struct TaskParallelRangePool { | typedef struct TaskParallelRangePool { | ||||
| ▲ Show 20 Lines • Show All 159 Lines • ▼ Show 20 Lines | static void parallel_range_single_thread(TaskParallelRangePool *range_pool) | ||||
| for (TaskParallelRangeState *state = range_pool->parallel_range_states; state != NULL; | for (TaskParallelRangeState *state = range_pool->parallel_range_states; state != NULL; | ||||
| state = state->next) { | state = state->next) { | ||||
| const int start = state->start; | const int start = state->start; | ||||
| const int stop = state->stop; | const int stop = state->stop; | ||||
| void *userdata = state->userdata_shared; | void *userdata = state->userdata_shared; | ||||
| TaskParallelRangeFunc func = state->func; | TaskParallelRangeFunc func = state->func; | ||||
| void *initial_tls_memory = state->initial_tls_memory; | void *initial_tls_memory = state->initial_tls_memory; | ||||
| const size_t tls_data_size = state->tls_data_size; | const size_t tls_data_size = state->tls_data_size; | ||||
| void *flatten_tls_storage = NULL; | |||||
| const bool use_tls_data = (tls_data_size != 0) && (initial_tls_memory != NULL); | const bool use_tls_data = (tls_data_size != 0) && (initial_tls_memory != NULL); | ||||
| if (use_tls_data) { | |||||
| flatten_tls_storage = MALLOCA(tls_data_size); | |||||
| memcpy(flatten_tls_storage, initial_tls_memory, tls_data_size); | |||||
| } | |||||
| TaskParallelTLS tls = { | TaskParallelTLS tls = { | ||||
| .thread_id = 0, | .thread_id = 0, | ||||
| .userdata_chunk = flatten_tls_storage, | .userdata_chunk = initial_tls_memory, | ||||
| }; | }; | ||||
| for (int i = start; i < stop; i++) { | for (int i = start; i < stop; i++) { | ||||
| func(userdata, i, &tls); | func(userdata, i, &tls); | ||||
| } | } | ||||
| if (state->func_finalize != NULL) { | if (use_tls_data && state->func_free != NULL) { | ||||
| state->func_finalize(userdata, flatten_tls_storage); | state->func_free(userdata, initial_tls_memory); | ||||
| } | } | ||||
| MALLOCA_FREE(flatten_tls_storage, tls_data_size); | |||||
| } | } | ||||
mont29: I don’t think that that set of changes is valid? If I follow the code properly with this you:
*… | |||||
Not Done Inline Actions
The new mechanism is to reduce everything into the user-given data. For the single-threaded case, there is nothing to reduce and it's fastest to not make a copy of it.
The reduce functions should have no side effects, so that they can be run on any thread by TBB and not be bottlenecked by the main thread. Users of these callbacks were changed to work like this.
The assumption here is that func_free is used to free temporary working memory allocated by func. Any memory allocated to store the result of computations is expected to be freed by func_reduce and the caller. This is indeed somewhat weak, though not sure what the better way to do it would be besides better naming and comments. brecht: > Directly use given TLS data/memory into worker callback, which is never done in threaded case… | |||||
Done Inline ActionsComments have been added to the structs and where func_free is invoked. jbakker: Comments have been added to the structs and where func_free is invoked. | |||||
Not Done Inline ActionsOk… Not really thrilled by getting so specific and restrictive here, but I guess this is required by future switch to TBB. But at the very least, those specifics should be very clearly documented in the API! Right now there is no mention about any of those restrictions. mont29: Ok… Not really thrilled by getting so specific and restrictive here, but I guess this is… | |||||
| } | } | ||||
| /** | /** | ||||
| * This function allows to parallelized for loops in a similar way to OpenMP's | * This function allows to parallelized for loops in a similar way to OpenMP's | ||||
| * 'parallel for' statement. | * 'parallel for' statement. | ||||
| * | * | ||||
| * See public API doc of ParallelRangeSettings for description of all settings. | * See public API doc of ParallelRangeSettings for description of all settings. | ||||
| */ | */ | ||||
| Show All 13 Lines | TaskParallelRangeState state = { | ||||
| .next = NULL, | .next = NULL, | ||||
| .start = start, | .start = start, | ||||
| .stop = stop, | .stop = stop, | ||||
| .userdata_shared = userdata, | .userdata_shared = userdata, | ||||
| .func = func, | .func = func, | ||||
| .iter_value = start, | .iter_value = start, | ||||
| .initial_tls_memory = settings->userdata_chunk, | .initial_tls_memory = settings->userdata_chunk, | ||||
| .tls_data_size = settings->userdata_chunk_size, | .tls_data_size = settings->userdata_chunk_size, | ||||
| .func_finalize = settings->func_finalize, | .func_free = settings->func_free, | ||||
| }; | }; | ||||
| TaskParallelRangePool range_pool = { | TaskParallelRangePool range_pool = { | ||||
| .pool = NULL, .parallel_range_states = &state, .current_state = NULL, .settings = settings}; | .pool = NULL, .parallel_range_states = &state, .current_state = NULL, .settings = settings}; | ||||
| int i, num_threads, num_tasks; | int i, num_threads, num_tasks; | ||||
| void *tls_data = settings->userdata_chunk; | void *tls_data = settings->userdata_chunk; | ||||
| const size_t tls_data_size = settings->userdata_chunk_size; | const size_t tls_data_size = settings->userdata_chunk_size; | ||||
| if (tls_data_size != 0) { | if (tls_data_size != 0) { | ||||
| ▲ Show 20 Lines • Show All 47 Lines • ▼ Show 20 Lines | for (i = 0; i < num_tasks; i++) { | ||||
| /* Use this pool's pre-allocated tasks. */ | /* Use this pool's pre-allocated tasks. */ | ||||
| BLI_task_pool_push_from_thread( | BLI_task_pool_push_from_thread( | ||||
| task_pool, parallel_range_func, POINTER_FROM_INT(i), false, NULL, thread_id); | task_pool, parallel_range_func, POINTER_FROM_INT(i), false, NULL, thread_id); | ||||
| } | } | ||||
| BLI_task_pool_work_and_wait(task_pool); | BLI_task_pool_work_and_wait(task_pool); | ||||
| BLI_task_pool_free(task_pool); | BLI_task_pool_free(task_pool); | ||||
| if (use_tls_data) { | if (use_tls_data) { | ||||
| if (settings->func_finalize != NULL) { | |||||
| for (i = 0; i < num_tasks; i++) { | for (i = 0; i < num_tasks; i++) { | ||||
Done Inline ActionsThis one is missing the && (settings->func_reduce != NULL || settings->func_free != NULL) extra check ;) mont29: This one is missing the `&& (settings->func_reduce != NULL || settings->func_free != NULL)`… | |||||
| void *userdata_chunk_local = (char *)flatten_tls_storage + (tls_data_size * (size_t)i); | void *userdata_chunk_local = (char *)flatten_tls_storage + (tls_data_size * (size_t)i); | ||||
| settings->func_finalize(userdata, userdata_chunk_local); | if (settings->func_reduce) { | ||||
| settings->func_reduce(userdata, tls_data, userdata_chunk_local); | |||||
| } | } | ||||
| if (settings->func_free) { | |||||
| settings->func_free(userdata, userdata_chunk_local); | |||||
| } | } | ||||
| MALLOCA_FREE(flatten_tls_storage, tls_data_size * (size_t)num_tasks); | |||||
| } | } | ||||
| } | } | ||||
| MALLOCA_FREE(flatten_tls_storage, tls_data_size * (size_t)num_tasks); | |||||
Done Inline ActionsThat one should be protected by the if (use_tls_data)! mont29: That one should be protected by the `if (use_tls_data)`! | |||||
| } | |||||
| #ifdef TASK_RANGE_POOL | |||||
| /** | /** | ||||
| * Initialize a task pool to parallelize several for loops at the same time. | * Initialize a task pool to parallelize several for loops at the same time. | ||||
| * | * | ||||
| * See public API doc of ParallelRangeSettings for description of all settings. | * See public API doc of ParallelRangeSettings for description of all settings. | ||||
| * Note that loop-specific settings (like 'tls' data or finalize function) must be left NULL here. | * Note that loop-specific settings (like 'tls' data or finalize function) must be left NULL here. | ||||
Done Inline Actions'reduce function', not 'finalize function' mont29: 'reduce function', not 'finalize function' | |||||
| * Only settings controlling how iteration is parallelized must be defined, as those will affect | * Only settings controlling how iteration is parallelized must be defined, as those will affect | ||||
| * all loops added to that pool. | * all loops added to that pool. | ||||
| */ | */ | ||||
| TaskParallelRangePool *BLI_task_parallel_range_pool_init(const TaskParallelSettings *settings) | TaskParallelRangePool *BLI_task_parallel_range_pool_init(const TaskParallelSettings *settings) | ||||
| { | { | ||||
| TaskParallelRangePool *range_pool = MEM_callocN(sizeof(*range_pool), __func__); | TaskParallelRangePool *range_pool = MEM_callocN(sizeof(*range_pool), __func__); | ||||
| BLI_assert(settings->userdata_chunk == NULL); | BLI_assert(settings->userdata_chunk == NULL); | ||||
| Show All 39 Lines | void BLI_task_parallel_range_pool_push(TaskParallelRangePool *range_pool, | ||||
| state->func_finalize = settings->func_finalize; | state->func_finalize = settings->func_finalize; | ||||
| state->next = range_pool->parallel_range_states; | state->next = range_pool->parallel_range_states; | ||||
| range_pool->parallel_range_states = state; | range_pool->parallel_range_states = state; | ||||
| } | } | ||||
| static void parallel_range_func_finalize(TaskPool *__restrict pool, | static void parallel_range_func_finalize(TaskPool *__restrict pool, | ||||
| void *v_state, | void *v_state, | ||||
| int UNUSED(thread_id)) | int UNUSED(thread_id)) | ||||
Done Inline Actionswould keep finalize name here, this one also handles freeing, not only reducing. mont29: would keep `finalize` name here, this one also handles freeing, not only reducing. | |||||
| { | { | ||||
| TaskParallelRangePool *__restrict range_pool = BLI_task_pool_userdata(pool); | TaskParallelRangePool *__restrict range_pool = BLI_task_pool_userdata(pool); | ||||
| TaskParallelRangeState *state = v_state; | TaskParallelRangeState *state = v_state; | ||||
| for (int i = 0; i < range_pool->num_tasks; i++) { | for (int i = 0; i < range_pool->num_tasks; i++) { | ||||
| void *tls_data = (char *)state->flatten_tls_storage + (state->tls_data_size * (size_t)i); | void *tls_data = (char *)state->flatten_tls_storage + (state->tls_data_size * (size_t)i); | ||||
| state->func_finalize(state->userdata_shared, tls_data); | state->func_finalize(state->userdata_shared, tls_data); | ||||
| } | } | ||||
| ▲ Show 20 Lines • Show All 118 Lines • ▼ Show 20 Lines | void BLI_task_parallel_range_pool_free(TaskParallelRangePool *range_pool) | ||||
| for (TaskParallelRangeState *state = range_pool->parallel_range_states; state != NULL; | for (TaskParallelRangeState *state = range_pool->parallel_range_states; state != NULL; | ||||
| state = state_next) { | state = state_next) { | ||||
| state_next = state->next; | state_next = state->next; | ||||
| MEM_freeN(state); | MEM_freeN(state); | ||||
| } | } | ||||
| MEM_freeN(range_pool->settings); | MEM_freeN(range_pool->settings); | ||||
| MEM_freeN(range_pool); | MEM_freeN(range_pool); | ||||
| } | } | ||||
| #endif | |||||
| typedef struct TaskParallelIteratorState { | typedef struct TaskParallelIteratorState { | ||||
| void *userdata; | void *userdata; | ||||
| TaskParallelIteratorIterFunc iter_func; | TaskParallelIteratorIterFunc iter_func; | ||||
| TaskParallelIteratorFunc func; | TaskParallelIteratorFunc func; | ||||
| /* *** Data used to 'acquire' chunks of items from the iterator. *** */ | /* *** Data used to 'acquire' chunks of items from the iterator. *** */ | ||||
| /* Common data also passed to the generator callback. */ | /* Common data also passed to the generator callback. */ | ||||
| ▲ Show 20 Lines • Show All 88 Lines • ▼ Show 20 Lines | if (use_userdata_chunk) { | ||||
| memcpy(userdata_chunk_local, userdata_chunk, userdata_chunk_size); | memcpy(userdata_chunk_local, userdata_chunk, userdata_chunk_size); | ||||
| } | } | ||||
| /* Also marking it as non-threaded for the iterator callback. */ | /* Also marking it as non-threaded for the iterator callback. */ | ||||
| state->iter_shared.spin_lock = NULL; | state->iter_shared.spin_lock = NULL; | ||||
| parallel_iterator_func_do(state, userdata_chunk, 0); | parallel_iterator_func_do(state, userdata_chunk, 0); | ||||
| if (use_userdata_chunk) { | if (use_userdata_chunk && settings->func_free != NULL) { | ||||
| if (settings->func_finalize != NULL) { | settings->func_free(state->userdata, userdata_chunk_local); | ||||
| settings->func_finalize(state->userdata, userdata_chunk_local); | |||||
| } | |||||
| MALLOCA_FREE(userdata_chunk_local, userdata_chunk_size); | |||||
| } | } | ||||
Done Inline ActionsSame issues as above in parallel_range_single_thread() mont29: Same issues as above in `parallel_range_single_thread()` | |||||
Done Inline ActionsComments have been added in the structs and where func_free is invoked jbakker: Comments have been added in the structs and where func_free is invoked | |||||
| } | } | ||||
| static void task_parallel_iterator_do(const TaskParallelSettings *settings, | static void task_parallel_iterator_do(const TaskParallelSettings *settings, | ||||
| TaskParallelIteratorState *state) | TaskParallelIteratorState *state) | ||||
| { | { | ||||
| TaskScheduler *task_scheduler = BLI_task_scheduler_get(); | TaskScheduler *task_scheduler = BLI_task_scheduler_get(); | ||||
| const int num_threads = BLI_task_scheduler_num_threads(task_scheduler); | const int num_threads = BLI_task_scheduler_num_threads(task_scheduler); | ||||
| ▲ Show 20 Lines • Show All 41 Lines • ▼ Show 20 Lines | for (size_t i = 0; i < num_tasks; i++) { | ||||
| /* Use this pool's pre-allocated tasks. */ | /* Use this pool's pre-allocated tasks. */ | ||||
| BLI_task_pool_push_from_thread( | BLI_task_pool_push_from_thread( | ||||
| task_pool, parallel_iterator_func, userdata_chunk_local, false, NULL, thread_id); | task_pool, parallel_iterator_func, userdata_chunk_local, false, NULL, thread_id); | ||||
| } | } | ||||
| BLI_task_pool_work_and_wait(task_pool); | BLI_task_pool_work_and_wait(task_pool); | ||||
| BLI_task_pool_free(task_pool); | BLI_task_pool_free(task_pool); | ||||
| if (use_userdata_chunk) { | if (use_userdata_chunk && (settings->func_reduce != NULL || settings->func_free != NULL)) { | ||||
| if (settings->func_finalize != NULL) { | |||||
| for (size_t i = 0; i < num_tasks; i++) { | for (size_t i = 0; i < num_tasks; i++) { | ||||
| userdata_chunk_local = (char *)userdata_chunk_array + (userdata_chunk_size * i); | userdata_chunk_local = (char *)userdata_chunk_array + (userdata_chunk_size * i); | ||||
| settings->func_finalize(state->userdata, userdata_chunk_local); | if (settings->func_reduce != NULL) { | ||||
| settings->func_reduce(state->userdata, userdata_chunk, userdata_chunk_local); | |||||
| } | |||||
| if (settings->func_free != NULL) { | |||||
| settings->func_free(state->userdata, userdata_chunk_local); | |||||
| } | } | ||||
| } | } | ||||
| MALLOCA_FREE(userdata_chunk_array, userdata_chunk_size * num_tasks); | MALLOCA_FREE(userdata_chunk_array, userdata_chunk_size * num_tasks); | ||||
| } | } | ||||
| BLI_spin_end(&spin_lock); | BLI_spin_end(&spin_lock); | ||||
| state->iter_shared.spin_lock = NULL; | state->iter_shared.spin_lock = NULL; | ||||
| } | } | ||||
| ▲ Show 20 Lines • Show All 170 Lines • Show Last 20 Lines | |||||
I don’t think that that set of changes is valid? If I follow the code properly with this you: