Changeset View
Standalone View
source/blender/blenlib/intern/task.c
| Show First 20 Lines • Show All 42 Lines • ▼ Show 20 Lines | |||||
| /* Types */ | /* Types */ | ||||
| /* Number of per-thread pre-allocated tasks. | /* Number of per-thread pre-allocated tasks. | ||||
| * | * | ||||
| * For more details see description of TaskMemPool. | * For more details see description of TaskMemPool. | ||||
| */ | */ | ||||
| #define MEMPOOL_SIZE 256 | #define MEMPOOL_SIZE 256 | ||||
| /* Parameters controlling how much we spin in nanosleeps before switching to real condition-controlled sleeping. */ | |||||
| #define NANOSLEEP_MAX_SPINNING 200 /* Number of failed attempt to get a task before going to condition waiting. */ | |||||
| #define NANOSLEEP_DURATION (const struct timespec[]){{0, 200L}} /* Nanosleep duration (in nano-seconds). */ | |||||
| typedef struct Task { | typedef struct Task { | ||||
| struct Task *next, *prev; | struct Task *next, *prev; | ||||
| TaskRunFunction run; | TaskRunFunction run; | ||||
| void *taskdata; | void *taskdata; | ||||
| bool free_taskdata; | bool free_taskdata; | ||||
| TaskFreeFunction freedata; | TaskFreeFunction freedata; | ||||
| TaskPool *pool; | TaskPool *pool; | ||||
| ▲ Show 20 Lines • Show All 41 Lines • ▼ Show 20 Lines | typedef struct TaskMemPoolStats { | ||||
| /* Number of discarded memory due to pool saturation, */ | /* Number of discarded memory due to pool saturation, */ | ||||
| int num_discard; | int num_discard; | ||||
| } TaskMemPoolStats; | } TaskMemPoolStats; | ||||
| #endif | #endif | ||||
| struct TaskPool { | struct TaskPool { | ||||
| TaskScheduler *scheduler; | TaskScheduler *scheduler; | ||||
| volatile size_t num; | size_t num; | ||||
| size_t num_threads; | size_t num_threads; | ||||
| size_t currently_running_tasks; | size_t currently_running_tasks; | ||||
| ThreadMutex num_mutex; | |||||
| ThreadCondition num_cond; | |||||
| void *userdata; | void *userdata; | ||||
| ThreadMutex user_mutex; | ThreadMutex user_mutex; | ||||
| volatile bool do_cancel; | volatile bool do_cancel; | ||||
| /* If set, this pool may never be work_and_wait'ed, which means TaskScheduler | /* If set, this pool may never be work_and_wait'ed, which means TaskScheduler | ||||
| * has to use its special background fallback thread in case we are in | * has to use its special background fallback thread in case we are in | ||||
| Show All 20 Lines | |||||
| struct TaskScheduler { | struct TaskScheduler { | ||||
| pthread_t *threads; | pthread_t *threads; | ||||
| struct TaskThread *task_threads; | struct TaskThread *task_threads; | ||||
| TaskMemPool *task_mempool; | TaskMemPool *task_mempool; | ||||
| int num_threads; | int num_threads; | ||||
| bool background_thread_only; | bool background_thread_only; | ||||
| ListBase queue; | ListBase queue; | ||||
| ThreadMutex queue_mutex; | // size_t num_queued; | ||||
| ThreadCondition queue_cond; | SpinLock queue_spinlock; | ||||
| ThreadMutex workers_mutex; | |||||
| ThreadCondition workers_condition; | |||||
| size_t num_workers_sleeping; | |||||
sergey: Do we really need num_queued? Can we instead test `queue.head != NULL` ? | |||||
Not Done Inline ActionsNot sure… queue.head is protected by spinlock, so not sure reading it outside of spinlock would be safe? On the other hand, we would mainly risk false non-NULL result I guess… Will give it a try. mont29: Not sure… queue.head is protected by spinlock, so not sure reading it outside of spinlock would… | |||||
Done Inline ActionsWould use num_ prefix here. sergey: Would use `num_` prefix here. | |||||
| volatile bool do_exit; | uint8_t do_exit; | ||||
| }; | }; | ||||
| typedef struct TaskThread { | typedef struct TaskThread { | ||||
| TaskScheduler *scheduler; | TaskScheduler *scheduler; | ||||
| int id; | int id; | ||||
| } TaskThread; | } TaskThread; | ||||
| /* Helper */ | /* Helper */ | ||||
| ▲ Show 20 Lines • Show All 63 Lines • ▼ Show 20 Lines | |||||
| #ifdef DEBUG_STATS | #ifdef DEBUG_STATS | ||||
| pool->mempool_stats[thread_id].num_discard++; | pool->mempool_stats[thread_id].num_discard++; | ||||
| #endif | #endif | ||||
| } | } | ||||
| } | } | ||||
| /* Task Scheduler */ | /* Task Scheduler */ | ||||
| static void task_pool_num_decrease(TaskPool *pool, size_t done) | BLI_INLINE void task_pool_num_decrease(TaskPool *pool, size_t done) | ||||
| { | { | ||||
| BLI_mutex_lock(&pool->num_mutex); | |||||
| BLI_assert(pool->num >= done); | BLI_assert(pool->num >= done); | ||||
| TaskScheduler *scheduler = pool->scheduler; | |||||
| pool->num -= done; | const size_t num = atomic_sub_and_fetch_z(&pool->num, done); | ||||
| atomic_sub_and_fetch_z(&pool->currently_running_tasks, done); | |||||
| if (pool->num == 0) | /* WARNING! do not use pool anymore, it might be already freed by concurrent thread! */ | ||||
| BLI_condition_notify_all(&pool->num_cond); | |||||
| BLI_mutex_unlock(&pool->num_mutex); | /* This is needed for several things: | ||||
| * - Wake up all sleeping threads on exit, before we join them. | |||||
| * - Wake up 'main' thread itself in case it called BLI_task_pool_work_and_wait() and ended up sleeping there. | |||||
| * - Wake up 'main' thread itself in case it called BLI_task_pool_cancel() and ended up sleeping there. */ | |||||
| if (num == 0 && scheduler->num_workers_sleeping != 0) { | |||||
| BLI_mutex_lock(&scheduler->workers_mutex); | |||||
| BLI_condition_notify_all(&scheduler->workers_condition); | |||||
| BLI_mutex_unlock(&scheduler->workers_mutex); | |||||
| } | |||||
| } | } | ||||
| static void task_pool_num_increase(TaskPool *pool) | BLI_INLINE void task_pool_num_increase(TaskPool *pool) | ||||
| { | { | ||||
| BLI_mutex_lock(&pool->num_mutex); | atomic_add_and_fetch_z(&pool->num, 1); | ||||
| pool->num++; | |||||
| BLI_condition_notify_all(&pool->num_cond); | |||||
| BLI_mutex_unlock(&pool->num_mutex); | if (pool->scheduler->num_workers_sleeping != 0) { | ||||
| BLI_mutex_lock(&pool->scheduler->workers_mutex); | |||||
| /* NOTE: Even tho it's only single task added here we notify all threads. | |||||
| * The reason for that is because there might be much more tasks coming | |||||
| * right after this one, so waking up all threads earlier seems to give | |||||
| * a bit less of threading overhead. | |||||
| */ | |||||
| BLI_condition_notify_all(&pool->scheduler->workers_condition); | |||||
| BLI_mutex_unlock(&pool->scheduler->workers_mutex); | |||||
| } | |||||
| } | } | ||||
| static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task) | BLI_INLINE bool task_find( | ||||
| TaskScheduler * restrict scheduler, Task ** restrict task, TaskPool * restrict pool, const bool is_main) | |||||
| { | { | ||||
| Task *current_task; | |||||
| bool found_task = false; | bool found_task = false; | ||||
| BLI_mutex_lock(&scheduler->queue_mutex); | |||||
| while (!scheduler->queue.first && !scheduler->do_exit) | /* This check allows us to completely avoid a spinlock in case the queue is reported empty. | ||||
| BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex); | * There is a possibility of race condition here (check being done after task has been added to queue, | ||||
| * and before counter is increased), but this should not be an issue in practice, quite unlikely and | |||||
| * would just delay a bit that thread going back to work. */ | |||||
| // if (scheduler->num_queued != 0) { | |||||
Not Done Inline ActionsThink we should get rid of per-pool num threads limit and use global scheduler thread instead. Would require changes in some areas (different scenes can use different threads number..) But maybe we can reconsider something there for 2.8 project while we are allowing to break some compatibility. Perhaps not highest priority tho. sergey: Think we should get rid of per-pool num threads limit and use global scheduler thread instead. | |||||
Not Done Inline ActionsI do not think it costs us much right now, so would indeed leave it for later. mont29: I do not think it costs us much right now, so would indeed leave it for later. | |||||
| if (scheduler->queue.first != NULL) { | |||||
| /* NOTE: We almost always do single iteration here, so spin time is most of the time is really low. */ | |||||
| BLI_spin_lock(&scheduler->queue_spinlock); | |||||
| for (current_task = scheduler->queue.first; | |||||
| current_task != NULL; | |||||
| current_task = current_task->next) | |||||
| { | |||||
| TaskPool *current_pool = current_task->pool; | |||||
| if (!ELEM(pool, NULL, current_pool)) { | |||||
| continue; | |||||
| } | |||||
| do { | if (!is_main && scheduler->background_thread_only && !current_pool->run_in_background) { | ||||
| Task *current_task; | continue; | ||||
| } | |||||
| if (atomic_add_and_fetch_z(¤t_pool->currently_running_tasks, 1) <= current_pool->num_threads || | |||||
Done Inline ActionsConsider using restricted pointers. Even tho it's an inlined function compilers are not always smart enough to detect that there's no pointer alias. sergey: Consider using restricted pointers. Even tho it's an inlined function compilers are not always… | |||||
| is_main || current_pool->num_threads == 0) | |||||
| { | |||||
| *task = current_task; | |||||
| found_task = true; | |||||
| BLI_remlink(&scheduler->queue, *task); | |||||
| // atomic_sub_and_fetch_z(&scheduler->num_queued, 1); | |||||
| break; | |||||
| } | |||||
| else { | |||||
| atomic_sub_and_fetch_z(¤t_pool->currently_running_tasks, 1); | |||||
| } | |||||
| } | |||||
| BLI_spin_unlock(&scheduler->queue_spinlock); | |||||
| } | |||||
| return found_task; | |||||
| } | |||||
| BLI_INLINE bool task_wait(TaskScheduler * restrict scheduler, int * restrict loop_count) | |||||
| { | |||||
| /* If we have iterated NANOSLEEP_MAX_SPINNING times without finding a task, go into real sleep. */ | |||||
| if (++(*loop_count) > NANOSLEEP_MAX_SPINNING) { | |||||
| BLI_mutex_lock(&scheduler->workers_mutex); | |||||
| /* Check again inside the mutex, bad race condition is possible here (though unlikely), | |||||
| * leading to undying thread... */ | |||||
| if (scheduler->do_exit) { | |||||
| BLI_mutex_unlock(&scheduler->workers_mutex); | |||||
| return true; | |||||
| } | |||||
| /* Even though this is read outside of mutex lock, there is no real need to use atomic ops here, | |||||
| * changing the value inside mutex should be enough to ensure safety. */ | |||||
| scheduler->num_workers_sleeping++; | |||||
| BLI_condition_wait(&scheduler->workers_condition, &scheduler->workers_mutex); | |||||
| scheduler->num_workers_sleeping--; | |||||
| BLI_mutex_unlock(&scheduler->workers_mutex); | |||||
| } | |||||
| else { | |||||
| nanosleep(NANOSLEEP_DURATION, NULL); | |||||
| } | |||||
| return false; | |||||
| } | |||||
| BLI_INLINE bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task) | |||||
| { | |||||
| bool found_task = false; | |||||
| int loop_count = 0; | |||||
| do { | |||||
| /* Assuming we can only have a void queue in 'exit' case here seems logical (we should only be here after | /* Assuming we can only have a void queue in 'exit' case here seems logical (we should only be here after | ||||
| * our worker thread has been woken up from a condition_wait(), which only happens after a new task was | * our worker thread has been woken up from a condition_wait(), which only happens after a new task was | ||||
| * added to the queue), but it is wrong. | * added to the queue), but it is wrong. | ||||
| * Waiting on condition may wake up the thread even if condition is not signaled (spurious wake-ups), and some | * Waiting on condition may wake up the thread even if condition is not signaled (spurious wake-ups), and some | ||||
| * race condition may also empty the queue **after** condition has been signaled, but **before** awoken thread | * race condition may also empty the queue **after** condition has been signaled, but **before** awoken thread | ||||
| * reaches this point... | * reaches this point... | ||||
| * See http://stackoverflow.com/questions/8594591 | * See http://stackoverflow.com/questions/8594591 | ||||
| * | * | ||||
| * So we only abort here if do_exit is set. | * So we only abort here if do_exit is set. | ||||
| */ | */ | ||||
| if (scheduler->do_exit) { | if (scheduler->do_exit) { | ||||
| BLI_mutex_unlock(&scheduler->queue_mutex); | |||||
| return false; | return false; | ||||
| } | } | ||||
| for (current_task = scheduler->queue.first; | if (!(found_task = task_find(scheduler, task, NULL, false))) { | ||||
| current_task != NULL; | if (task_wait(scheduler, &loop_count)) { | ||||
| current_task = current_task->next) | return false; | ||||
| { | |||||
| TaskPool *pool = current_task->pool; | |||||
| if (scheduler->background_thread_only && !pool->run_in_background) { | |||||
| continue; | |||||
| } | |||||
| if (atomic_add_and_fetch_z(&pool->currently_running_tasks, 1) <= pool->num_threads || | |||||
| pool->num_threads == 0) | |||||
| { | |||||
| *task = current_task; | |||||
| found_task = true; | |||||
| BLI_remlink(&scheduler->queue, *task); | |||||
| break; | |||||
| } | |||||
| else { | |||||
| atomic_sub_and_fetch_z(&pool->currently_running_tasks, 1); | |||||
| } | } | ||||
| } | } | ||||
| if (!found_task) | |||||
| BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex); | |||||
| } while (!found_task); | } while (!found_task); | ||||
| BLI_mutex_unlock(&scheduler->queue_mutex); | |||||
| return true; | return true; | ||||
| } | } | ||||
| static void *task_scheduler_thread_run(void *thread_p) | static void *task_scheduler_thread_run(void *thread_p) | ||||
| { | { | ||||
| TaskThread *thread = (TaskThread *) thread_p; | TaskThread *thread = (TaskThread *) thread_p; | ||||
| TaskScheduler *scheduler = thread->scheduler; | TaskScheduler *scheduler = thread->scheduler; | ||||
| int thread_id = thread->id; | int thread_id = thread->id; | ||||
| Task *task; | Task *task; | ||||
| /* keep popping off tasks */ | /* keep popping off tasks */ | ||||
| while (task_scheduler_thread_wait_pop(scheduler, &task)) { | while (task_scheduler_thread_wait_pop(scheduler, &task)) { | ||||
| TaskPool *pool = task->pool; | TaskPool *pool = task->pool; | ||||
| /* run task */ | /* run task */ | ||||
| task->run(pool, task->taskdata, thread_id); | task->run(pool, task->taskdata, thread_id); | ||||
| /* delete task */ | /* delete task */ | ||||
| task_free(pool, task, thread_id); | task_free(pool, task, thread_id); | ||||
| atomic_sub_and_fetch_z(&pool->currently_running_tasks, 1); | |||||
| /* notify pool task was done */ | /* notify pool task was done */ | ||||
| task_pool_num_decrease(pool, 1); | task_pool_num_decrease(pool, 1); | ||||
| } | } | ||||
| return NULL; | return NULL; | ||||
| } | } | ||||
| TaskScheduler *BLI_task_scheduler_create(int num_threads) | TaskScheduler *BLI_task_scheduler_create(int num_threads) | ||||
| { | { | ||||
| TaskScheduler *scheduler = MEM_callocN(sizeof(TaskScheduler), "TaskScheduler"); | TaskScheduler *scheduler = MEM_callocN(sizeof(TaskScheduler), "TaskScheduler"); | ||||
| /* multiple places can use this task scheduler, sharing the same | /* multiple places can use this task scheduler, sharing the same | ||||
| * threads, so we keep track of the number of users. */ | * threads, so we keep track of the number of users. */ | ||||
| scheduler->do_exit = false; | scheduler->do_exit = 0; | ||||
| BLI_listbase_clear(&scheduler->queue); | BLI_listbase_clear(&scheduler->queue); | ||||
| BLI_mutex_init(&scheduler->queue_mutex); | BLI_spin_init(&scheduler->queue_spinlock); | ||||
| BLI_condition_init(&scheduler->queue_cond); | |||||
| BLI_mutex_init(&scheduler->workers_mutex); | |||||
| BLI_condition_init(&scheduler->workers_condition); | |||||
| if (num_threads == 0) { | if (num_threads == 0) { | ||||
| /* automatic number of threads will be main thread + num cores */ | /* automatic number of threads will be main thread + num cores */ | ||||
| num_threads = BLI_system_thread_count(); | num_threads = BLI_system_thread_count(); | ||||
| } | } | ||||
| /* main thread will also work, so we count it too */ | /* main thread will also work, so we count it too */ | ||||
| num_threads -= 1; | num_threads -= 1; | ||||
| Show All 29 Lines | TaskScheduler *BLI_task_scheduler_create(int num_threads) | ||||
| return scheduler; | return scheduler; | ||||
| } | } | ||||
| void BLI_task_scheduler_free(TaskScheduler *scheduler) | void BLI_task_scheduler_free(TaskScheduler *scheduler) | ||||
| { | { | ||||
| Task *task; | Task *task; | ||||
| /* stop all waiting threads */ | /* stop all waiting threads */ | ||||
| BLI_mutex_lock(&scheduler->queue_mutex); | atomic_fetch_and_or_uint8(&scheduler->do_exit, 1); | ||||
| scheduler->do_exit = true; | if (scheduler->num_workers_sleeping != 0) { | ||||
| BLI_condition_notify_all(&scheduler->queue_cond); | BLI_mutex_lock(&scheduler->workers_mutex); | ||||
| BLI_mutex_unlock(&scheduler->queue_mutex); | BLI_condition_notify_all(&scheduler->workers_condition); | ||||
| BLI_mutex_unlock(&scheduler->workers_mutex); | |||||
| } | |||||
| /* delete threads */ | /* delete threads */ | ||||
| if (scheduler->threads) { | if (scheduler->threads) { | ||||
| int i; | int i; | ||||
| for (i = 0; i < scheduler->num_threads; i++) { | for (i = 0; i < scheduler->num_threads; i++) { | ||||
| if (pthread_join(scheduler->threads[i], NULL) != 0) | if (pthread_join(scheduler->threads[i], NULL) != 0) | ||||
| fprintf(stderr, "TaskScheduler failed to join thread %d/%d\n", i, scheduler->num_threads); | fprintf(stderr, "TaskScheduler failed to join thread %d/%d\n", i, scheduler->num_threads); | ||||
| Show All 19 Lines | void BLI_task_scheduler_free(TaskScheduler *scheduler) | ||||
| /* delete leftover tasks */ | /* delete leftover tasks */ | ||||
| for (task = scheduler->queue.first; task; task = task->next) { | for (task = scheduler->queue.first; task; task = task->next) { | ||||
| task_data_free(task, 0); | task_data_free(task, 0); | ||||
| } | } | ||||
| BLI_freelistN(&scheduler->queue); | BLI_freelistN(&scheduler->queue); | ||||
| /* delete mutex/condition */ | /* delete mutex/condition */ | ||||
| BLI_mutex_end(&scheduler->queue_mutex); | BLI_spin_end(&scheduler->queue_spinlock); | ||||
| BLI_condition_end(&scheduler->queue_cond); | |||||
| BLI_mutex_end(&scheduler->workers_mutex); | |||||
| BLI_condition_end(&scheduler->workers_condition); | |||||
| MEM_freeN(scheduler); | MEM_freeN(scheduler); | ||||
| } | } | ||||
| int BLI_task_scheduler_num_threads(TaskScheduler *scheduler) | int BLI_task_scheduler_num_threads(TaskScheduler *scheduler) | ||||
| { | { | ||||
| return scheduler->num_threads + 1; | return scheduler->num_threads + 1; | ||||
| } | } | ||||
| static void task_scheduler_push(TaskScheduler *scheduler, Task *task, TaskPriority priority) | BLI_INLINE void task_scheduler_push(TaskScheduler *scheduler, Task *task, TaskPriority priority) | ||||
| { | { | ||||
| task_pool_num_increase(task->pool); | TaskPool *pool = task->pool; | ||||
| /* add task to queue */ | /* add task to queue */ | ||||
| BLI_mutex_lock(&scheduler->queue_mutex); | BLI_spin_lock(&scheduler->queue_spinlock); | ||||
| if (priority == TASK_PRIORITY_HIGH) | if (priority == TASK_PRIORITY_HIGH) | ||||
| BLI_addhead(&scheduler->queue, task); | BLI_addhead(&scheduler->queue, task); | ||||
| else | else | ||||
| BLI_addtail(&scheduler->queue, task); | BLI_addtail(&scheduler->queue, task); | ||||
| BLI_condition_notify_one(&scheduler->queue_cond); | BLI_spin_unlock(&scheduler->queue_spinlock); | ||||
| BLI_mutex_unlock(&scheduler->queue_mutex); | |||||
| /* WARNING! do not use task anymore, it might be already processed and freed by concurrent thread! */ | |||||
| task_pool_num_increase(pool); | |||||
| // atomic_add_and_fetch_z(&scheduler->num_queued, 1); | |||||
| } | } | ||||
| static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool) | static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool) | ||||
| { | { | ||||
| Task *task, *nexttask; | Task *task, *nexttask; | ||||
| size_t done = 0; | size_t done = 0; | ||||
| BLI_mutex_lock(&scheduler->queue_mutex); | BLI_spin_lock(&scheduler->queue_spinlock); | ||||
| /* free all tasks from this pool from the queue */ | /* free all tasks from this pool from the queue */ | ||||
| for (task = scheduler->queue.first; task; task = nexttask) { | for (task = scheduler->queue.first; task; task = nexttask) { | ||||
| nexttask = task->next; | nexttask = task->next; | ||||
| if (task->pool == pool) { | if (task->pool == pool) { | ||||
| task_data_free(task, 0); | task_data_free(task, 0); | ||||
| BLI_freelinkN(&scheduler->queue, task); | BLI_freelinkN(&scheduler->queue, task); | ||||
| done++; | done++; | ||||
| } | } | ||||
| } | } | ||||
| BLI_mutex_unlock(&scheduler->queue_mutex); | BLI_spin_unlock(&scheduler->queue_spinlock); | ||||
| // atomic_sub_and_fetch_z(&scheduler->num_queued, done); | |||||
| /* notify done */ | /* notify done */ | ||||
| task_pool_num_decrease(pool, done); | task_pool_num_decrease(pool, done); | ||||
| } | } | ||||
| /* Task Pool */ | /* Task Pool */ | ||||
| static TaskPool *task_pool_create_ex(TaskScheduler *scheduler, void *userdata, const bool is_background) | static TaskPool *task_pool_create_ex(TaskScheduler *scheduler, void *userdata, const bool is_background) | ||||
| Show All 14 Lines | #endif | ||||
| pool->scheduler = scheduler; | pool->scheduler = scheduler; | ||||
| pool->num = 0; | pool->num = 0; | ||||
| pool->num_threads = 0; | pool->num_threads = 0; | ||||
| pool->currently_running_tasks = 0; | pool->currently_running_tasks = 0; | ||||
| pool->do_cancel = false; | pool->do_cancel = false; | ||||
| pool->run_in_background = is_background; | pool->run_in_background = is_background; | ||||
| BLI_mutex_init(&pool->num_mutex); | |||||
| BLI_condition_init(&pool->num_cond); | |||||
| pool->userdata = userdata; | pool->userdata = userdata; | ||||
| BLI_mutex_init(&pool->user_mutex); | BLI_mutex_init(&pool->user_mutex); | ||||
| if (BLI_thread_is_main()) { | if (BLI_thread_is_main()) { | ||||
| pool->task_mempool = scheduler->task_mempool; | pool->task_mempool = scheduler->task_mempool; | ||||
| } | } | ||||
| else { | else { | ||||
| pool->task_mempool = &pool->task_mempool_local; | pool->task_mempool = &pool->task_mempool_local; | ||||
| ▲ Show 20 Lines • Show All 41 Lines • ▼ Show 20 Lines | |||||
| { | { | ||||
| return task_pool_create_ex(scheduler, userdata, true); | return task_pool_create_ex(scheduler, userdata, true); | ||||
| } | } | ||||
| void BLI_task_pool_free(TaskPool *pool) | void BLI_task_pool_free(TaskPool *pool) | ||||
| { | { | ||||
| BLI_task_pool_stop(pool); | BLI_task_pool_stop(pool); | ||||
| BLI_mutex_end(&pool->num_mutex); | |||||
| BLI_condition_end(&pool->num_cond); | |||||
| BLI_mutex_end(&pool->user_mutex); | BLI_mutex_end(&pool->user_mutex); | ||||
| /* Free local memory pool, those pointers are lost forever. */ | /* Free local memory pool, those pointers are lost forever. */ | ||||
| if (pool->task_mempool == &pool->task_mempool_local) { | if (pool->task_mempool == &pool->task_mempool_local) { | ||||
| for (int i = 0; i < pool->task_mempool_local.num_tasks; i++) { | for (int i = 0; i < pool->task_mempool_local.num_tasks; i++) { | ||||
| MEM_freeN(pool->task_mempool_local.tasks[i]); | MEM_freeN(pool->task_mempool_local.tasks[i]); | ||||
| } | } | ||||
| } | } | ||||
| ▲ Show 20 Lines • Show All 48 Lines • ▼ Show 20 Lines | void BLI_task_pool_push_from_thread(TaskPool *pool, TaskRunFunction run, | ||||
| void *taskdata, bool free_taskdata, TaskPriority priority, int thread_id) | void *taskdata, bool free_taskdata, TaskPriority priority, int thread_id) | ||||
| { | { | ||||
| task_pool_push(pool, run, taskdata, free_taskdata, NULL, priority, thread_id); | task_pool_push(pool, run, taskdata, free_taskdata, NULL, priority, thread_id); | ||||
| } | } | ||||
| void BLI_task_pool_work_and_wait(TaskPool *pool) | void BLI_task_pool_work_and_wait(TaskPool *pool) | ||||
| { | { | ||||
| TaskScheduler *scheduler = pool->scheduler; | TaskScheduler *scheduler = pool->scheduler; | ||||
| int loop_count = 0; | |||||
| BLI_mutex_lock(&pool->num_mutex); | |||||
| while (pool->num != 0) { | while (pool->num != 0) { | ||||
| Task *task, *work_task = NULL; | Task *task; | ||||
| bool found_task = false; | bool found_task = false; | ||||
| BLI_mutex_unlock(&pool->num_mutex); | |||||
| BLI_mutex_lock(&scheduler->queue_mutex); | |||||
| /* find task from this pool. if we get a task from another pool, | /* find task from this pool. if we get a task from another pool, | ||||
| * we can get into deadlock */ | * we can get into deadlock */ | ||||
| if (pool->num_threads == 0 || | found_task = task_find(scheduler, &task, pool, true); | ||||
| pool->currently_running_tasks < pool->num_threads) | |||||
| { | |||||
| for (task = scheduler->queue.first; task; task = task->next) { | |||||
| if (task->pool == pool) { | |||||
| work_task = task; | |||||
| found_task = true; | |||||
| BLI_remlink(&scheduler->queue, task); | |||||
| break; | |||||
| } | |||||
| } | |||||
| } | |||||
| BLI_mutex_unlock(&scheduler->queue_mutex); | |||||
| /* if found task, do it, otherwise wait until other tasks are done */ | /* if found task, do it, otherwise wait until other tasks are done */ | ||||
| if (found_task) { | if (found_task) { | ||||
| /* run task */ | /* run task */ | ||||
| atomic_add_and_fetch_z(&pool->currently_running_tasks, 1); | task->run(pool, task->taskdata, 0); | ||||
| work_task->run(pool, work_task->taskdata, 0); | |||||
| /* delete task */ | /* delete task */ | ||||
| task_free(pool, task, 0); | task_free(pool, task, 0); | ||||
| atomic_sub_and_fetch_z(&pool->currently_running_tasks, 1); | |||||
| /* notify pool task was done */ | /* notify pool task was done */ | ||||
| task_pool_num_decrease(pool, 1); | task_pool_num_decrease(pool, 1); | ||||
| } | |||||
| BLI_mutex_lock(&pool->num_mutex); | /* Reset the 'failed' counter to zero. */ | ||||
| if (pool->num == 0) | loop_count = 0; | ||||
| } | |||||
| else if (pool->num == 0) { | |||||
| break; | break; | ||||
| if (!found_task) | |||||
| BLI_condition_wait(&pool->num_cond, &pool->num_mutex); | |||||
| } | } | ||||
| else { | |||||
| BLI_mutex_unlock(&pool->num_mutex); | task_wait(scheduler, &loop_count); /* We do not care about return value here. */ | ||||
| } | |||||
| } | |||||
| } | } | ||||
| void BLI_pool_set_num_threads(TaskPool *pool, int num_threads) | void BLI_pool_set_num_threads(TaskPool *pool, size_t num_threads) | ||||
| { | { | ||||
| /* NOTE: Don't try to modify threads while tasks are running! */ | /* NOTE: Don't try to modify threads while tasks are running! */ | ||||
| pool->num_threads = num_threads; | pool->num_threads = num_threads; | ||||
| } | } | ||||
| void BLI_task_pool_cancel(TaskPool *pool) | void BLI_task_pool_cancel(TaskPool *pool) | ||||
| { | { | ||||
| pool->do_cancel = true; | pool->do_cancel = true; | ||||
| task_scheduler_clear(pool->scheduler, pool); | task_scheduler_clear(pool->scheduler, pool); | ||||
| /* wait until all entries are cleared */ | /* wait until all entries are cleared */ | ||||
| BLI_mutex_lock(&pool->num_mutex); | while (pool->num) { | ||||
| while (pool->num) | /* No real point in spinning here... */ | ||||
| BLI_condition_wait(&pool->num_cond, &pool->num_mutex); | BLI_mutex_lock(&pool->scheduler->workers_mutex); | ||||
| BLI_mutex_unlock(&pool->num_mutex); | pool->scheduler->num_workers_sleeping++; | ||||
| BLI_condition_wait(&pool->scheduler->workers_condition, &pool->scheduler->workers_mutex); | |||||
| pool->scheduler->num_workers_sleeping--; | |||||
| BLI_mutex_unlock(&pool->scheduler->workers_mutex); | |||||
| } | |||||
| pool->do_cancel = false; | pool->do_cancel = false; | ||||
| } | } | ||||
| void BLI_task_pool_stop(TaskPool *pool) | void BLI_task_pool_stop(TaskPool *pool) | ||||
| { | { | ||||
| task_scheduler_clear(pool->scheduler, pool); | task_scheduler_clear(pool->scheduler, pool); | ||||
| Show All 9 Lines | |||||
| { | { | ||||
| return pool->userdata; | return pool->userdata; | ||||
| } | } | ||||
| ThreadMutex *BLI_task_pool_user_mutex(TaskPool *pool) | ThreadMutex *BLI_task_pool_user_mutex(TaskPool *pool) | ||||
| { | { | ||||
| return &pool->user_mutex; | return &pool->user_mutex; | ||||
| } | } | ||||
| /* Parallel range routines */ | /* Parallel range routines */ | ||||
Not Done Inline ActionsJust ditch it out. Good code MUST NOT make decisions based on neither current thread index, nor on the amount of tasks done, not on anything else coming from the scheduler specific behavior. sergey: Just ditch it out.
Good code MUST NOT make decisions based on neither current thread index… | |||||
Not Done Inline ActionsWelll… This is only used for info/stats purpose, where it avoids caller to implement own counting of done work… mont29: Welll… This is only used for info/stats purpose, where it avoids caller to implement own… | |||||
| /** | /** | ||||
| * | * | ||||
| * Main functions: | * Main functions: | ||||
| * - #BLI_task_parallel_range | * - #BLI_task_parallel_range | ||||
| * - #BLI_task_parallel_listbase (#ListBase - double linked list) | * - #BLI_task_parallel_listbase (#ListBase - double linked list) | ||||
| * | * | ||||
| * TODO: | * TODO: | ||||
| ▲ Show 20 Lines • Show All 375 Lines • Show Last 20 Lines | |||||
Do we really need num_queued? Can we instead test queue.head != NULL ?