Changeset View
Standalone View
source/blender/blenlib/intern/task_iterator.c
| Show First 20 Lines • Show All 73 Lines • ▼ Show 20 Lines | typedef struct TaskParallelRangeState { | ||||
| void *initial_tls_memory; /* Pointer to actual user-defined 'tls' data. */ | void *initial_tls_memory; /* Pointer to actual user-defined 'tls' data. */ | ||||
| size_t tls_data_size; /* Size of that data. */ | size_t tls_data_size; /* Size of that data. */ | ||||
| void *flatten_tls_storage; /* 'tls' copies of initial_tls_memory for each running task. */ | void *flatten_tls_storage; /* 'tls' copies of initial_tls_memory for each running task. */ | ||||
| /* Number of 'tls' copies in the array, i.e. number of worker threads. */ | /* Number of 'tls' copies in the array, i.e. number of worker threads. */ | ||||
| size_t num_elements_in_tls_storage; | size_t num_elements_in_tls_storage; | ||||
| /* Function called from calling thread once whole range have been processed. */ | /* Function called from calling thread once whole range have been processed. */ | ||||
| TaskParallelFinalizeFunc func_finalize; | TaskParallelReduceFunc func_reduce; | ||||
| /* Function called from calling thread to cleanup data created during execution. */ | |||||
| TaskParallelFreeFunc func_free; | |||||
| /* Current value of the iterator, shared between all threads (atomically updated). */ | /* Current value of the iterator, shared between all threads (atomically updated). */ | ||||
| int iter_value; | int iter_value; | ||||
| int iter_chunk_num; /* Amount of iterations to process in a single step. */ | int iter_chunk_num; /* Amount of iterations to process in a single step. */ | ||||
| } TaskParallelRangeState; | } TaskParallelRangeState; | ||||
| /* Stores all the parallel tasks for a single pool. */ | /* Stores all the parallel tasks for a single pool. */ | ||||
| typedef struct TaskParallelRangePool { | typedef struct TaskParallelRangePool { | ||||
| ▲ Show 20 Lines • Show All 159 Lines • ▼ Show 20 Lines | static void parallel_range_single_thread(TaskParallelRangePool *range_pool) | ||||
| for (TaskParallelRangeState *state = range_pool->parallel_range_states; state != NULL; | for (TaskParallelRangeState *state = range_pool->parallel_range_states; state != NULL; | ||||
| state = state->next) { | state = state->next) { | ||||
| const int start = state->start; | const int start = state->start; | ||||
| const int stop = state->stop; | const int stop = state->stop; | ||||
| void *userdata = state->userdata_shared; | void *userdata = state->userdata_shared; | ||||
| TaskParallelRangeFunc func = state->func; | TaskParallelRangeFunc func = state->func; | ||||
| void *initial_tls_memory = state->initial_tls_memory; | void *initial_tls_memory = state->initial_tls_memory; | ||||
| const size_t tls_data_size = state->tls_data_size; | const size_t tls_data_size = state->tls_data_size; | ||||
| void *flatten_tls_storage = NULL; | |||||
| const bool use_tls_data = (tls_data_size != 0) && (initial_tls_memory != NULL); | const bool use_tls_data = (tls_data_size != 0) && (initial_tls_memory != NULL); | ||||
| if (use_tls_data) { | |||||
| flatten_tls_storage = MALLOCA(tls_data_size); | |||||
| memcpy(flatten_tls_storage, initial_tls_memory, tls_data_size); | |||||
| } | |||||
| TaskParallelTLS tls = { | TaskParallelTLS tls = { | ||||
| .thread_id = 0, | .thread_id = 0, | ||||
| .userdata_chunk = flatten_tls_storage, | .userdata_chunk = initial_tls_memory, | ||||
| }; | }; | ||||
| for (int i = start; i < stop; i++) { | for (int i = start; i < stop; i++) { | ||||
| func(userdata, i, &tls); | func(userdata, i, &tls); | ||||
| } | } | ||||
| if (state->func_finalize != NULL) { | if (use_tls_data && state->func_free != NULL) { | ||||
| state->func_finalize(userdata, flatten_tls_storage); | state->func_free(userdata, initial_tls_memory); | ||||
| } | } | ||||
mont29: I don’t think that that set of changes is valid? If I follow the code properly with this you:
*… | |||||
brechtUnsubmitted Not Done Inline Actions
The new mechanism is to reduce everything into the user-given data. For the single-threaded case, there is nothing to reduce and it's fastest to not make a copy of it.
The reduce functions should have no side effects, so that they can be run on any thread by TBB and not be bottlenecked by the main thread. Users of these callbacks were changed to work like this.
The assumption here is that func_free is used to free temporary working memory allocated by func. Any memory allocated to store the result of computations is expected to be freed by func_reduce and the caller. This is indeed somewhat weak, though not sure what the better way to do it would be besides better naming and comments. brecht: > Directly use given TLS data/memory into worker callback, which is never done in threaded case… | |||||
jbakkerAuthorUnsubmitted Done Inline ActionsComments have been added to the structs and where func_free is invoked. jbakker: Comments have been added to the structs and where func_free is invoked. | |||||
mont29Unsubmitted Not Done Inline ActionsOk… Not really thrilled by getting so specific and restrictive here, but I guess this is required by future switch to TBB. But at the very least, those specifics should be very clearly documented in the API! Right now there is no mention about any of those restrictions. mont29: Ok… Not really thrilled by getting so specific and restrictive here, but I guess this is… | |||||
| MALLOCA_FREE(flatten_tls_storage, tls_data_size); | |||||
| } | } | ||||
| } | } | ||||
| /** | /** | ||||
| * This function allows to parallelized for loops in a similar way to OpenMP's | * This function allows to parallelized for loops in a similar way to OpenMP's | ||||
| * 'parallel for' statement. | * 'parallel for' statement. | ||||
| * | * | ||||
| * See public API doc of ParallelRangeSettings for description of all settings. | * See public API doc of ParallelRangeSettings for description of all settings. | ||||
| Show All 14 Lines | TaskParallelRangeState state = { | ||||
| .next = NULL, | .next = NULL, | ||||
| .start = start, | .start = start, | ||||
| .stop = stop, | .stop = stop, | ||||
| .userdata_shared = userdata, | .userdata_shared = userdata, | ||||
| .func = func, | .func = func, | ||||
| .iter_value = start, | .iter_value = start, | ||||
| .initial_tls_memory = settings->userdata_chunk, | .initial_tls_memory = settings->userdata_chunk, | ||||
| .tls_data_size = settings->userdata_chunk_size, | .tls_data_size = settings->userdata_chunk_size, | ||||
| .func_finalize = settings->func_finalize, | .func_free = settings->func_free, | ||||
| }; | }; | ||||
| TaskParallelRangePool range_pool = { | TaskParallelRangePool range_pool = { | ||||
| .pool = NULL, .parallel_range_states = &state, .current_state = NULL, .settings = settings}; | .pool = NULL, .parallel_range_states = &state, .current_state = NULL, .settings = settings}; | ||||
| int i, num_threads, num_tasks; | int i, num_threads, num_tasks; | ||||
| void *tls_data = settings->userdata_chunk; | void *tls_data = settings->userdata_chunk; | ||||
| const size_t tls_data_size = settings->userdata_chunk_size; | const size_t tls_data_size = settings->userdata_chunk_size; | ||||
| if (tls_data_size != 0) { | if (tls_data_size != 0) { | ||||
| ▲ Show 20 Lines • Show All 47 Lines • ▼ Show 20 Lines | for (i = 0; i < num_tasks; i++) { | ||||
| /* Use this pool's pre-allocated tasks. */ | /* Use this pool's pre-allocated tasks. */ | ||||
| BLI_task_pool_push_from_thread( | BLI_task_pool_push_from_thread( | ||||
| task_pool, parallel_range_func, POINTER_FROM_INT(i), false, NULL, thread_id); | task_pool, parallel_range_func, POINTER_FROM_INT(i), false, NULL, thread_id); | ||||
| } | } | ||||
| BLI_task_pool_work_and_wait(task_pool); | BLI_task_pool_work_and_wait(task_pool); | ||||
| BLI_task_pool_free(task_pool); | BLI_task_pool_free(task_pool); | ||||
| if (use_tls_data) { | if (use_tls_data) { | ||||
mont29Unsubmitted Done Inline ActionsThis one is missing the && (settings->func_reduce != NULL || settings->func_free != NULL) extra check ;) mont29: This one is missing the `&& (settings->func_reduce != NULL || settings->func_free != NULL)`… | |||||
| if (settings->func_finalize != NULL) { | |||||
| for (i = 0; i < num_tasks; i++) { | for (i = 0; i < num_tasks; i++) { | ||||
| void *userdata_chunk_local = (char *)flatten_tls_storage + (tls_data_size * (size_t)i); | void *userdata_chunk_local = (char *)flatten_tls_storage + (tls_data_size * (size_t)i); | ||||
| settings->func_finalize(userdata, userdata_chunk_local); | if (settings->func_reduce) { | ||||
| settings->func_reduce(userdata, tls_data, userdata_chunk_local); | |||||
| } | |||||
| if (settings->func_free) { | |||||
| settings->func_free(userdata, userdata_chunk_local); | |||||
| } | } | ||||
| } | } | ||||
| MALLOCA_FREE(flatten_tls_storage, tls_data_size * (size_t)num_tasks); | |||||
| } | } | ||||
| MALLOCA_FREE(flatten_tls_storage, tls_data_size * (size_t)num_tasks); | |||||
mont29Unsubmitted Done Inline ActionsThat one should be protected by the if (use_tls_data)! mont29: That one should be protected by the `if (use_tls_data)`! | |||||
| } | } | ||||
| /** | /** | ||||
| * Initialize a task pool to parallelize several for loops at the same time. | * Initialize a task pool to parallelize several for loops at the same time. | ||||
| * | * | ||||
| * See public API doc of ParallelRangeSettings for description of all settings. | * See public API doc of ParallelRangeSettings for description of all settings. | ||||
| * Note that loop-specific settings (like 'tls' data or finalize function) must be left NULL here. | * Note that loop-specific settings (like 'tls' data or finalize function) must be left NULL here. | ||||
mont29Unsubmitted Done Inline Actions'reduce function', not 'finalize function' mont29: 'reduce function', not 'finalize function' | |||||
| * Only settings controlling how iteration is parallelized must be defined, as those will affect | * Only settings controlling how iteration is parallelized must be defined, as those will affect | ||||
| * all loops added to that pool. | * all loops added to that pool. | ||||
| */ | */ | ||||
| TaskParallelRangePool *BLI_task_parallel_range_pool_init(const TaskParallelSettings *settings) | TaskParallelRangePool *BLI_task_parallel_range_pool_init(const TaskParallelSettings *settings) | ||||
| { | { | ||||
| TaskParallelRangePool *range_pool = MEM_callocN(sizeof(*range_pool), __func__); | TaskParallelRangePool *range_pool = MEM_callocN(sizeof(*range_pool), __func__); | ||||
| BLI_assert(settings->userdata_chunk == NULL); | BLI_assert(settings->userdata_chunk == NULL); | ||||
| BLI_assert(settings->func_finalize == NULL); | BLI_assert(settings->func_reduce == NULL); | ||||
| BLI_assert(settings->func_free == NULL); | |||||
| range_pool->settings = MEM_mallocN(sizeof(*range_pool->settings), __func__); | range_pool->settings = MEM_mallocN(sizeof(*range_pool->settings), __func__); | ||||
| *range_pool->settings = *settings; | *range_pool->settings = *settings; | ||||
| return range_pool; | return range_pool; | ||||
| } | } | ||||
| /** | /** | ||||
| * Add a loop task to the pool. It does not execute it at all. | * Add a loop task to the pool. It does not execute it at all. | ||||
| Show All 22 Lines | void BLI_task_parallel_range_pool_push(TaskParallelRangePool *range_pool, | ||||
| TaskParallelRangeState *state = MEM_callocN(sizeof(*state), __func__); | TaskParallelRangeState *state = MEM_callocN(sizeof(*state), __func__); | ||||
| state->start = start; | state->start = start; | ||||
| state->stop = stop; | state->stop = stop; | ||||
| state->userdata_shared = userdata; | state->userdata_shared = userdata; | ||||
| state->func = func; | state->func = func; | ||||
| state->iter_value = start; | state->iter_value = start; | ||||
| state->initial_tls_memory = settings->userdata_chunk; | state->initial_tls_memory = settings->userdata_chunk; | ||||
| state->tls_data_size = settings->userdata_chunk_size; | state->tls_data_size = settings->userdata_chunk_size; | ||||
| state->func_finalize = settings->func_finalize; | state->func_reduce = settings->func_reduce; | ||||
| state->func_free = settings->func_free; | |||||
| state->next = range_pool->parallel_range_states; | state->next = range_pool->parallel_range_states; | ||||
| range_pool->parallel_range_states = state; | range_pool->parallel_range_states = state; | ||||
| } | } | ||||
| static void parallel_range_func_finalize(TaskPool *__restrict pool, | static void parallel_range_func_reduce(TaskPool *__restrict pool, | ||||
mont29Unsubmitted Done Inline Actionswould keep finalize name here, this one also handles freeing, not only reducing. mont29: would keep `finalize` name here, this one also handles freeing, not only reducing. | |||||
| void *v_state, | void *v_state, | ||||
| int UNUSED(thread_id)) | int UNUSED(thread_id)) | ||||
| { | { | ||||
| TaskParallelRangePool *__restrict range_pool = BLI_task_pool_userdata(pool); | TaskParallelRangePool *__restrict range_pool = BLI_task_pool_userdata(pool); | ||||
| TaskParallelRangeState *state = v_state; | TaskParallelRangeState *state = v_state; | ||||
| for (int i = 0; i < range_pool->num_tasks; i++) { | for (int i = 0; i < range_pool->num_tasks; i++) { | ||||
| void *tls_data = (char *)state->flatten_tls_storage + (state->tls_data_size * (size_t)i); | void *tls_data = (char *)state->flatten_tls_storage + (state->tls_data_size * (size_t)i); | ||||
| state->func_finalize(state->userdata_shared, tls_data); | if (state->func_reduce != NULL) { | ||||
| state->func_reduce(state->userdata_shared, state->initial_tls_memory, tls_data); | |||||
| } | |||||
| if (state->func_free != NULL) { | |||||
| state->func_free(state->userdata_shared, tls_data); | |||||
| } | |||||
| } | } | ||||
| } | } | ||||
| /** | /** | ||||
| * Run all tasks pushed to the range_pool. | * Run all tasks pushed to the range_pool. | ||||
| * | * | ||||
| * Note that the range pool is re-usable (you may push new tasks into it and call this function | * Note that the range pool is re-usable (you may push new tasks into it and call this function | ||||
| * again). | * again). | ||||
| ▲ Show 20 Lines • Show All 69 Lines • ▼ Show 20 Lines | for (TaskParallelRangeState *state = range_pool->parallel_range_states; state != NULL; | ||||
| const size_t userdata_chunk_size = state->tls_data_size; | const size_t userdata_chunk_size = state->tls_data_size; | ||||
| void *userdata_chunk_array = state->flatten_tls_storage; | void *userdata_chunk_array = state->flatten_tls_storage; | ||||
| UNUSED_VARS_NDEBUG(userdata_chunk_array); | UNUSED_VARS_NDEBUG(userdata_chunk_array); | ||||
| if (userdata_chunk_size == 0) { | if (userdata_chunk_size == 0) { | ||||
| BLI_assert(userdata_chunk_array == NULL); | BLI_assert(userdata_chunk_array == NULL); | ||||
| continue; | continue; | ||||
| } | } | ||||
| if (state->func_finalize != NULL) { | if (state->func_reduce != NULL || state->func_free != NULL) { | ||||
| BLI_task_pool_push_from_thread( | BLI_task_pool_push_from_thread( | ||||
| task_pool, parallel_range_func_finalize, state, false, NULL, thread_id); | task_pool, parallel_range_func_reduce, state, false, NULL, thread_id); | ||||
| } | } | ||||
| } | } | ||||
| BLI_task_pool_work_and_wait(task_pool); | BLI_task_pool_work_and_wait(task_pool); | ||||
| BLI_task_pool_free(task_pool); | BLI_task_pool_free(task_pool); | ||||
| range_pool->pool = NULL; | range_pool->pool = NULL; | ||||
| /* Cleanup all tasks. */ | /* Cleanup all tasks. */ | ||||
| ▲ Show 20 Lines • Show All 127 Lines • ▼ Show 20 Lines | if (use_userdata_chunk) { | ||||
| memcpy(userdata_chunk_local, userdata_chunk, userdata_chunk_size); | memcpy(userdata_chunk_local, userdata_chunk, userdata_chunk_size); | ||||
| } | } | ||||
| /* Also marking it as non-threaded for the iterator callback. */ | /* Also marking it as non-threaded for the iterator callback. */ | ||||
| state->iter_shared.spin_lock = NULL; | state->iter_shared.spin_lock = NULL; | ||||
| parallel_iterator_func_do(state, userdata_chunk, 0); | parallel_iterator_func_do(state, userdata_chunk, 0); | ||||
| if (use_userdata_chunk) { | if (use_userdata_chunk && settings->func_free != NULL) { | ||||
| if (settings->func_finalize != NULL) { | settings->func_free(state->userdata, userdata_chunk_local); | ||||
| settings->func_finalize(state->userdata, userdata_chunk_local); | |||||
| } | |||||
| MALLOCA_FREE(userdata_chunk_local, userdata_chunk_size); | |||||
| } | } | ||||
mont29Unsubmitted Done Inline ActionsSame issues as above in parallel_range_single_thread() mont29: Same issues as above in `parallel_range_single_thread()` | |||||
jbakkerAuthorUnsubmitted Done Inline ActionsComments have been added in the structs and where func_free is invoked jbakker: Comments have been added in the structs and where func_free is invoked | |||||
| } | } | ||||
| static void task_parallel_iterator_do(const TaskParallelSettings *settings, | static void task_parallel_iterator_do(const TaskParallelSettings *settings, | ||||
| TaskParallelIteratorState *state) | TaskParallelIteratorState *state) | ||||
| { | { | ||||
| TaskScheduler *task_scheduler = BLI_task_scheduler_get(); | TaskScheduler *task_scheduler = BLI_task_scheduler_get(); | ||||
| const int num_threads = BLI_task_scheduler_num_threads(task_scheduler); | const int num_threads = BLI_task_scheduler_num_threads(task_scheduler); | ||||
| ▲ Show 20 Lines • Show All 41 Lines • ▼ Show 20 Lines | for (size_t i = 0; i < num_tasks; i++) { | ||||
| /* Use this pool's pre-allocated tasks. */ | /* Use this pool's pre-allocated tasks. */ | ||||
| BLI_task_pool_push_from_thread( | BLI_task_pool_push_from_thread( | ||||
| task_pool, parallel_iterator_func, userdata_chunk_local, false, NULL, thread_id); | task_pool, parallel_iterator_func, userdata_chunk_local, false, NULL, thread_id); | ||||
| } | } | ||||
| BLI_task_pool_work_and_wait(task_pool); | BLI_task_pool_work_and_wait(task_pool); | ||||
| BLI_task_pool_free(task_pool); | BLI_task_pool_free(task_pool); | ||||
| if (use_userdata_chunk) { | if (use_userdata_chunk && (settings->func_reduce != NULL || settings->func_free != NULL)) { | ||||
| if (settings->func_finalize != NULL) { | |||||
| for (size_t i = 0; i < num_tasks; i++) { | for (size_t i = 0; i < num_tasks; i++) { | ||||
| userdata_chunk_local = (char *)userdata_chunk_array + (userdata_chunk_size * i); | userdata_chunk_local = (char *)userdata_chunk_array + (userdata_chunk_size * i); | ||||
| settings->func_finalize(state->userdata, userdata_chunk_local); | if (settings->func_reduce != NULL) { | ||||
| settings->func_reduce(state->userdata, userdata_chunk, userdata_chunk_local); | |||||
| } | |||||
| if (settings->func_free != NULL) { | |||||
| settings->func_free(state->userdata, userdata_chunk_local); | |||||
| } | } | ||||
| } | } | ||||
| MALLOCA_FREE(userdata_chunk_array, userdata_chunk_size * num_tasks); | MALLOCA_FREE(userdata_chunk_array, userdata_chunk_size * num_tasks); | ||||
| } | } | ||||
| BLI_spin_end(&spin_lock); | BLI_spin_end(&spin_lock); | ||||
| state->iter_shared.spin_lock = NULL; | state->iter_shared.spin_lock = NULL; | ||||
| } | } | ||||
| ▲ Show 20 Lines • Show All 170 Lines • Show Last 20 Lines | |||||
I don’t think that that set of changes is valid? If I follow the code properly with this you: