Changeset View
Standalone View
source/blender/blenlib/intern/task.c
| Show First 20 Lines • Show All 99 Lines • ▼ Show 20 Lines | typedef struct TaskMemPoolStats { | ||||
| /* Number of discarded memory due to pool saturation, */ | /* Number of discarded memory due to pool saturation, */ | ||||
| int num_discard; | int num_discard; | ||||
| } TaskMemPoolStats; | } TaskMemPoolStats; | ||||
| #endif | #endif | ||||
| struct TaskPool { | struct TaskPool { | ||||
| TaskScheduler *scheduler; | TaskScheduler *scheduler; | ||||
| volatile size_t num; | size_t num; | ||||
| volatile size_t done; | size_t done; | ||||
| size_t num_threads; | size_t num_threads; | ||||
| size_t currently_running_tasks; | size_t currently_running_tasks; | ||||
| ThreadMutex num_mutex; | |||||
| ThreadCondition num_cond; | |||||
| void *userdata; | void *userdata; | ||||
| ThreadMutex user_mutex; | ThreadMutex user_mutex; | ||||
| volatile bool do_cancel; | volatile bool do_cancel; | ||||
| /* If set, this pool may never be work_and_wait'ed, which means TaskScheduler | /* If set, this pool may never be work_and_wait'ed, which means TaskScheduler | ||||
| * has to use its special background fallback thread in case we are in | * has to use its special background fallback thread in case we are in | ||||
| Show All 22 Lines | struct TaskScheduler { | ||||
| struct TaskThread *task_threads; | struct TaskThread *task_threads; | ||||
| TaskMemPool *task_mempool; | TaskMemPool *task_mempool; | ||||
| int num_threads; | int num_threads; | ||||
| bool background_thread_only; | bool background_thread_only; | ||||
| ListBase queue; | ListBase queue; | ||||
| ThreadMutex queue_mutex; | ThreadMutex queue_mutex; | ||||
| ThreadCondition queue_cond; | ThreadCondition queue_cond; | ||||
| SpinLock queue_spin; | |||||
| volatile bool do_exit; | volatile bool do_exit; | ||||
| }; | }; | ||||
| typedef struct TaskThread { | typedef struct TaskThread { | ||||
| TaskScheduler *scheduler; | TaskScheduler *scheduler; | ||||
| int id; | int id; | ||||
| } TaskThread; | } TaskThread; | ||||
| ▲ Show 20 Lines • Show All 67 Lines • ▼ Show 20 Lines | |||||
| #endif | #endif | ||||
| } | } | ||||
| } | } | ||||
| /* Task Scheduler */ | /* Task Scheduler */ | ||||
| static void task_pool_num_decrease(TaskPool *pool, size_t done) | static void task_pool_num_decrease(TaskPool *pool, size_t done) | ||||
| { | { | ||||
| BLI_mutex_lock(&pool->num_mutex); | |||||
| BLI_assert(pool->num >= done); | BLI_assert(pool->num >= done); | ||||
| TaskScheduler *scheduler = pool->scheduler; | |||||
| pool->num -= done; | /* TODO(sergey): Can we perhaps avoid some memory barriers here? | ||||
| * For example. if we never have pools with limited number of threads? | |||||
| */ | |||||
| atomic_sub_and_fetch_z(&pool->currently_running_tasks, done); | atomic_sub_and_fetch_z(&pool->currently_running_tasks, done); | ||||
mont29: Think we could get rid of `done` counter at least... It’s only used for stats report in one… | |||||
| pool->done += done; | atomic_add_and_fetch_z(&pool->done, done); | ||||
| const size_t num = atomic_sub_and_fetch_z(&pool->num, done); | |||||
| if (pool->num == 0) | /* WARNING! Do not use pool after this point, may have been already freed by concurrent thread. */ | ||||
| BLI_condition_notify_all(&pool->num_cond); | |||||
| BLI_mutex_unlock(&pool->num_mutex); | /* Notify all worker threads so they can wake up on exit. */ | ||||
| if (num == 0) { | |||||
Not Done Inline ActionsYou could use result from atomic_sub_and_fetch_z(&pool->num, done); here, would avoid this piece of code potentially being called multiple times? mont29: You could use result from `atomic_sub_and_fetch_z(&pool->num, done);` here, would avoid this… | |||||
| BLI_mutex_lock(&scheduler->queue_mutex); | |||||
| BLI_condition_notify_all(&scheduler->queue_cond); | |||||
| BLI_mutex_unlock(&scheduler->queue_mutex); | |||||
| } | |||||
| } | } | ||||
Not Done Inline ActionsEeeeh… comment is quite not accurate here? If I’m correct, this notification is *only* needed to ensure BLI_task_pool_work_and_wait() is not blocked? I.e. not the worker threads (which shall remain sleeping in this case, in fact), but the main 'calling' thread. mont29: Eeeeh… comment is quite not accurate here? If I’m correct, this notification is *only* needed… | |||||
Not Done Inline ActionsOn exit we join to every worker thread, so need to make sure they do not wait for anything. sergey: On exit we join to every worker thread, so need to make sure they do not wait for anything. | |||||
| static void task_pool_num_increase(TaskPool *pool) | static void task_pool_num_increase(TaskPool *pool) | ||||
| { | { | ||||
| BLI_mutex_lock(&pool->num_mutex); | atomic_add_and_fetch_z(&pool->num, 1); | ||||
| pool->num++; | /* TODO(sergey): Can we avoid mutex lock completely? | ||||
| BLI_condition_notify_all(&pool->num_cond); | |||||
| BLI_mutex_unlock(&pool->num_mutex); | |||||
| } | |||||
| static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task) | |||||
| { | |||||
| bool found_task = false; | |||||
| BLI_mutex_lock(&scheduler->queue_mutex); | |||||
| while (!scheduler->queue.first && !scheduler->do_exit) | |||||
| BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex); | |||||
| do { | |||||
| Task *current_task; | |||||
| /* Assuming we can only have a void queue in 'exit' case here seems logical (we should only be here after | |||||
| * our worker thread has been woken up from a condition_wait(), which only happens after a new task was | |||||
| * added to the queue), but it is wrong. | |||||
| * Waiting on condition may wake up the thread even if condition is not signaled (spurious wake-ups), and some | |||||
| * race condition may also empty the queue **after** condition has been signaled, but **before** awoken thread | |||||
| * reaches this point... | |||||
| * See http://stackoverflow.com/questions/8594591 | |||||
| * | * | ||||
| * So we only abort here if do_exit is set. | * One of the ideas could be to do it from a spin-lock, but then "sleep" | ||||
| * logic in the worker threads becomes non-atomic. | |||||
| * | |||||
| * Other idea here could be to only do notification if there's any thread | |||||
| * sent to sleep. But how to implement this in an atomic manner? | |||||
| */ | */ | ||||
| if (scheduler->do_exit) { | BLI_mutex_lock(&pool->scheduler->queue_mutex); | ||||
| BLI_mutex_unlock(&scheduler->queue_mutex); | /* NOTE: Even tho it's only single task added here we notify all threads. | ||||
| return false; | * The reason for that is because there might be much more tasks coming | ||||
| * right after this one, so waking up all threads earlier seems to give | |||||
| * a bit less of threading overhead. | |||||
| */ | |||||
| BLI_condition_notify_all(&pool->scheduler->queue_cond); | |||||
| BLI_mutex_unlock(&pool->scheduler->queue_mutex); | |||||
| } | } | ||||
| for (current_task = scheduler->queue.first; | static bool tash_scheduler_task_pop(TaskScheduler *scheduler, Task **task) | ||||
| { | |||||
| bool task_found = false; | |||||
| /* NOTE: We almost always do single iteration here, so spin time is | |||||
| * most of the time is really low. | |||||
| */ | |||||
| BLI_spin_lock(&scheduler->queue_spin); | |||||
| for (Task *current_task = scheduler->queue.first; | |||||
| current_task != NULL; | current_task != NULL; | ||||
| current_task = current_task->next) | current_task = current_task->next) | ||||
| { | { | ||||
| TaskPool *pool = current_task->pool; | TaskPool *pool = current_task->pool; | ||||
| if (scheduler->background_thread_only && !pool->run_in_background) { | if (scheduler->background_thread_only && !pool->run_in_background) { | ||||
| continue; | continue; | ||||
| } | } | ||||
| if (atomic_add_and_fetch_z(&pool->currently_running_tasks, 1) <= pool->num_threads || | if (atomic_add_and_fetch_z(&pool->currently_running_tasks, 1) <= pool->num_threads || | ||||
| pool->num_threads == 0) | pool->num_threads == 0) | ||||
| { | { | ||||
| *task = current_task; | *task = current_task; | ||||
| found_task = true; | task_found = true; | ||||
| BLI_remlink(&scheduler->queue, *task); | BLI_remlink(&scheduler->queue, *task); | ||||
| break; | break; | ||||
| } | } | ||||
| else { | else { | ||||
| atomic_sub_and_fetch_z(&pool->currently_running_tasks, 1); | atomic_sub_and_fetch_z(&pool->currently_running_tasks, 1); | ||||
| } | } | ||||
| } | } | ||||
| if (!found_task) | BLI_spin_unlock(&scheduler->queue_spin); | ||||
| BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex); | return task_found; | ||||
| } while (!found_task); | } | ||||
Not Done Inline ActionsI would try the second idea. Adding an atomic counter of waiting threads, increased just before locking and awaiting on condition, and decreased just after condition return and unlocking, looks like a very good way to always know whether we have sleeping threads and hence should signal condition. This could also be used in task_pool_num_decrease() above. I don’t think atomicity is an issue here anymore, see my comment about it in main message. mont29: I would try the second idea.
Adding an atomic counter of waiting threads, increased just… | |||||
| BLI_mutex_unlock(&scheduler->queue_mutex); | static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task) | ||||
| { | |||||
| bool task_found = false; | |||||
| do { | |||||
| /* Assuming we can only have a void queue in 'exit' case here seems logical (we should only be here after | |||||
| * our worker thread has been woken up from a condition_wait(), which only happens after a new task was | |||||
| * added to the queue), but it is wrong. | |||||
| * Waiting on condition may wake up the thread even if condition is not signaled (spurious wake-ups), and some | |||||
| * race condition may also empty the queue **after** condition has been signaled, but **before** awoken thread | |||||
| * reaches this point... | |||||
| * See http://stackoverflow.com/questions/8594591 | |||||
| * | |||||
| * So we only abort here if do_exit is set. | |||||
| */ | |||||
| if (scheduler->do_exit) { | |||||
| return false; | |||||
| } | |||||
| task_found = tash_scheduler_task_pop(scheduler, task); | |||||
| if (!task_found) { | |||||
| BLI_mutex_lock(&scheduler->queue_mutex); | |||||
| /* We check for tasks again from inside mutex lock so we don't miss | |||||
| * any possible notification which happens after spin-lock section | |||||
| * but before mutex-lock section. | |||||
| */ | |||||
| task_found = tash_scheduler_task_pop(scheduler, task); | |||||
| if (!task_found) { | |||||
| /* Send thread to sleep for until new task arrives. */ | |||||
| BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex); | |||||
| } | |||||
| BLI_mutex_unlock(&scheduler->queue_mutex); | |||||
| } | |||||
| } while (!task_found); | |||||
| return true; | return true; | ||||
| } | } | ||||
| static void *task_scheduler_thread_run(void *thread_p) | static void *task_scheduler_thread_run(void *thread_p) | ||||
| { | { | ||||
| TaskThread *thread = (TaskThread *) thread_p; | TaskThread *thread = (TaskThread *) thread_p; | ||||
| TaskScheduler *scheduler = thread->scheduler; | TaskScheduler *scheduler = thread->scheduler; | ||||
| int thread_id = thread->id; | int thread_id = thread->id; | ||||
| Show All 22 Lines | TaskScheduler *BLI_task_scheduler_create(int num_threads) | ||||
| /* multiple places can use this task scheduler, sharing the same | /* multiple places can use this task scheduler, sharing the same | ||||
| * threads, so we keep track of the number of users. */ | * threads, so we keep track of the number of users. */ | ||||
| scheduler->do_exit = false; | scheduler->do_exit = false; | ||||
| BLI_listbase_clear(&scheduler->queue); | BLI_listbase_clear(&scheduler->queue); | ||||
| BLI_mutex_init(&scheduler->queue_mutex); | BLI_mutex_init(&scheduler->queue_mutex); | ||||
| BLI_condition_init(&scheduler->queue_cond); | BLI_condition_init(&scheduler->queue_cond); | ||||
| BLI_spin_init(&scheduler->queue_spin); | |||||
| if (num_threads == 0) { | if (num_threads == 0) { | ||||
| /* automatic number of threads will be main thread + num cores */ | /* automatic number of threads will be main thread + num cores */ | ||||
| num_threads = BLI_system_thread_count(); | num_threads = BLI_system_thread_count(); | ||||
| } | } | ||||
| /* main thread will also work, so we count it too */ | /* main thread will also work, so we count it too */ | ||||
| num_threads -= 1; | num_threads -= 1; | ||||
| ▲ Show 20 Lines • Show All 70 Lines • ▼ Show 20 Lines | void BLI_task_scheduler_free(TaskScheduler *scheduler) | ||||
| for (task = scheduler->queue.first; task; task = task->next) { | for (task = scheduler->queue.first; task; task = task->next) { | ||||
| task_data_free(task, 0); | task_data_free(task, 0); | ||||
| } | } | ||||
| BLI_freelistN(&scheduler->queue); | BLI_freelistN(&scheduler->queue); | ||||
| /* delete mutex/condition */ | /* delete mutex/condition */ | ||||
| BLI_mutex_end(&scheduler->queue_mutex); | BLI_mutex_end(&scheduler->queue_mutex); | ||||
| BLI_condition_end(&scheduler->queue_cond); | BLI_condition_end(&scheduler->queue_cond); | ||||
| BLI_spin_end(&scheduler->queue_spin); | |||||
| MEM_freeN(scheduler); | MEM_freeN(scheduler); | ||||
| } | } | ||||
| int BLI_task_scheduler_num_threads(TaskScheduler *scheduler) | int BLI_task_scheduler_num_threads(TaskScheduler *scheduler) | ||||
| { | { | ||||
| return scheduler->num_threads + 1; | return scheduler->num_threads + 1; | ||||
| } | } | ||||
| static void task_scheduler_push(TaskScheduler *scheduler, Task *task, TaskPriority priority) | static void task_scheduler_push(TaskScheduler *scheduler, Task *task, TaskPriority priority) | ||||
| { | { | ||||
| task_pool_num_increase(task->pool); | TaskPool *pool = task->pool; | ||||
| BLI_spin_lock(&scheduler->queue_spin); | |||||
| /* add task to queue */ | if (priority == TASK_PRIORITY_HIGH) { | ||||
| BLI_mutex_lock(&scheduler->queue_mutex); | |||||
| if (priority == TASK_PRIORITY_HIGH) | |||||
| BLI_addhead(&scheduler->queue, task); | BLI_addhead(&scheduler->queue, task); | ||||
| else | } | ||||
| else { | |||||
| BLI_addtail(&scheduler->queue, task); | BLI_addtail(&scheduler->queue, task); | ||||
| } | |||||
| BLI_spin_unlock(&scheduler->queue_spin); | |||||
| BLI_condition_notify_one(&scheduler->queue_cond); | /* WARNING! Do not use task after this point, may have been already processed and freed by concurrent thread. */ | ||||
| BLI_mutex_unlock(&scheduler->queue_mutex); | |||||
| task_pool_num_increase(pool); | |||||
| } | } | ||||
| static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool) | static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool) | ||||
| { | { | ||||
| Task *task, *nexttask; | Task *task, *nexttask; | ||||
| size_t done = 0; | size_t done = 0; | ||||
| BLI_mutex_lock(&scheduler->queue_mutex); | |||||
| /* free all tasks from this pool from the queue */ | /* free all tasks from this pool from the queue */ | ||||
| BLI_spin_lock(&scheduler->queue_spin); | |||||
| for (task = scheduler->queue.first; task; task = nexttask) { | for (task = scheduler->queue.first; task; task = nexttask) { | ||||
| nexttask = task->next; | nexttask = task->next; | ||||
| if (task->pool == pool) { | if (task->pool == pool) { | ||||
| task_data_free(task, 0); | task_data_free(task, 0); | ||||
| BLI_freelinkN(&scheduler->queue, task); | BLI_freelinkN(&scheduler->queue, task); | ||||
| done++; | done++; | ||||
| } | } | ||||
| } | } | ||||
| BLI_spin_unlock(&scheduler->queue_spin); | |||||
| BLI_mutex_unlock(&scheduler->queue_mutex); | |||||
| /* notify done */ | /* notify done */ | ||||
| task_pool_num_decrease(pool, done); | task_pool_num_decrease(pool, done); | ||||
| } | } | ||||
| /* Task Pool */ | /* Task Pool */ | ||||
| static TaskPool *task_pool_create_ex(TaskScheduler *scheduler, void *userdata, const bool is_background) | static TaskPool *task_pool_create_ex(TaskScheduler *scheduler, void *userdata, const bool is_background) | ||||
| Show All 15 Lines | #endif | ||||
| pool->scheduler = scheduler; | pool->scheduler = scheduler; | ||||
| pool->num = 0; | pool->num = 0; | ||||
| pool->done = 0; | pool->done = 0; | ||||
| pool->num_threads = 0; | pool->num_threads = 0; | ||||
| pool->currently_running_tasks = 0; | pool->currently_running_tasks = 0; | ||||
| pool->do_cancel = false; | pool->do_cancel = false; | ||||
| pool->run_in_background = is_background; | pool->run_in_background = is_background; | ||||
| BLI_mutex_init(&pool->num_mutex); | |||||
| BLI_condition_init(&pool->num_cond); | |||||
| pool->userdata = userdata; | pool->userdata = userdata; | ||||
| BLI_mutex_init(&pool->user_mutex); | BLI_mutex_init(&pool->user_mutex); | ||||
| if (BLI_thread_is_main()) { | if (BLI_thread_is_main()) { | ||||
| pool->task_mempool = scheduler->task_mempool; | pool->task_mempool = scheduler->task_mempool; | ||||
| } | } | ||||
| else { | else { | ||||
| pool->task_mempool = &pool->task_mempool_local; | pool->task_mempool = &pool->task_mempool_local; | ||||
| ▲ Show 20 Lines • Show All 41 Lines • ▼ Show 20 Lines | |||||
| { | { | ||||
| return task_pool_create_ex(scheduler, userdata, true); | return task_pool_create_ex(scheduler, userdata, true); | ||||
| } | } | ||||
| void BLI_task_pool_free(TaskPool *pool) | void BLI_task_pool_free(TaskPool *pool) | ||||
| { | { | ||||
| BLI_task_pool_stop(pool); | BLI_task_pool_stop(pool); | ||||
| BLI_mutex_end(&pool->num_mutex); | |||||
| BLI_condition_end(&pool->num_cond); | |||||
| BLI_mutex_end(&pool->user_mutex); | BLI_mutex_end(&pool->user_mutex); | ||||
| /* Free local memory pool, those pointers are lost forever. */ | /* Free local memory pool, those pointers are lost forever. */ | ||||
| if (pool->task_mempool == &pool->task_mempool_local) { | if (pool->task_mempool == &pool->task_mempool_local) { | ||||
| for (int i = 0; i < pool->task_mempool_local.num_tasks; i++) { | for (int i = 0; i < pool->task_mempool_local.num_tasks; i++) { | ||||
| MEM_freeN(pool->task_mempool_local.tasks[i]); | MEM_freeN(pool->task_mempool_local.tasks[i]); | ||||
| } | } | ||||
| } | } | ||||
| ▲ Show 20 Lines • Show All 48 Lines • ▼ Show 20 Lines | void BLI_task_pool_push_from_thread(TaskPool *pool, TaskRunFunction run, | ||||
| void *taskdata, bool free_taskdata, TaskPriority priority, int thread_id) | void *taskdata, bool free_taskdata, TaskPriority priority, int thread_id) | ||||
| { | { | ||||
| task_pool_push(pool, run, taskdata, free_taskdata, NULL, priority, thread_id); | task_pool_push(pool, run, taskdata, free_taskdata, NULL, priority, thread_id); | ||||
| } | } | ||||
| void BLI_task_pool_work_and_wait(TaskPool *pool) | void BLI_task_pool_work_and_wait(TaskPool *pool) | ||||
| { | { | ||||
| TaskScheduler *scheduler = pool->scheduler; | TaskScheduler *scheduler = pool->scheduler; | ||||
| BLI_mutex_lock(&pool->num_mutex); | |||||
| while (pool->num != 0) { | while (pool->num != 0) { | ||||
| Task *task, *work_task = NULL; | Task *work_task = NULL; | ||||
| bool found_task = false; | bool found_task = false; | ||||
| BLI_mutex_unlock(&pool->num_mutex); | |||||
| BLI_mutex_lock(&scheduler->queue_mutex); | |||||
| /* find task from this pool. if we get a task from another pool, | /* find task from this pool. if we get a task from another pool, | ||||
| * we can get into deadlock */ | * we can get into deadlock */ | ||||
| if (pool->num_threads == 0 || | if (pool->num_threads == 0 || | ||||
| pool->currently_running_tasks < pool->num_threads) | pool->currently_running_tasks < pool->num_threads) | ||||
| { | { | ||||
| for (task = scheduler->queue.first; task; task = task->next) { | BLI_spin_lock(&scheduler->queue_spin); | ||||
| for (Task *task = scheduler->queue.first; task; task = task->next) { | |||||
| if (task->pool == pool) { | if (task->pool == pool) { | ||||
| work_task = task; | work_task = task; | ||||
| found_task = true; | found_task = true; | ||||
| BLI_remlink(&scheduler->queue, task); | BLI_remlink(&scheduler->queue, task); | ||||
| break; | break; | ||||
| } | } | ||||
| } | } | ||||
| BLI_spin_unlock(&scheduler->queue_spin); | |||||
| } | } | ||||
| BLI_mutex_unlock(&scheduler->queue_mutex); | |||||
| /* if found task, do it, otherwise wait until other tasks are done */ | /* if found task, do it, otherwise wait until other tasks are done */ | ||||
| if (found_task) { | if (found_task) { | ||||
| /* run task */ | /* run task */ | ||||
| atomic_add_and_fetch_z(&pool->currently_running_tasks, 1); | atomic_add_and_fetch_z(&pool->currently_running_tasks, 1); | ||||
| work_task->run(pool, work_task->taskdata, 0); | work_task->run(pool, work_task->taskdata, 0); | ||||
| /* delete task */ | /* delete task */ | ||||
| task_free(pool, task, 0); | task_free(pool, work_task, 0); | ||||
| /* notify pool task was done */ | /* notify pool task was done */ | ||||
| task_pool_num_decrease(pool, 1); | task_pool_num_decrease(pool, 1); | ||||
| } | } | ||||
| else { | |||||
| BLI_mutex_lock(&scheduler->queue_mutex); | |||||
Not Done Inline ActionsYou can as well put the queue_mutex lock/unlock inside the if imho? pool->num is not protected by that mutex, so it’s not preventing us from race conditions anyway... mont29: You can as well put the `queue_mutex` lock/unlock inside the if imho? `pool->num` is not… | |||||
Not Done Inline ActionsYes, but if it's inside of mutex lock then it's possible to have num decreased before entering mutex-lock here which will dead-lock us. So guess instead we need to guard pool->num here. Tried doing nanosleep here, but that kills any speed benefits. sergey: Yes, but if it's inside of mutex lock then it's possible to have num decreased before entering… | |||||
| if (pool->num != 0) { | |||||
| BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex); | |||||
| } | |||||
| BLI_mutex_unlock(&scheduler->queue_mutex); | |||||
| } | |||||
| BLI_mutex_lock(&pool->num_mutex); | if (pool->num == 0) { | ||||
| if (pool->num == 0) | |||||
| break; | break; | ||||
| if (!found_task) | |||||
| BLI_condition_wait(&pool->num_cond, &pool->num_mutex); | |||||
| } | } | ||||
| } | |||||
| BLI_mutex_unlock(&pool->num_mutex); | |||||
| } | } | ||||
| int BLI_pool_get_num_threads(TaskPool *pool) | int BLI_pool_get_num_threads(TaskPool *pool) | ||||
| { | { | ||||
| if (pool->num_threads != 0) { | if (pool->num_threads != 0) { | ||||
| return pool->num_threads; | return pool->num_threads; | ||||
| } | } | ||||
| else { | else { | ||||
| return BLI_task_scheduler_num_threads(pool->scheduler); | return BLI_task_scheduler_num_threads(pool->scheduler); | ||||
| } | } | ||||
| } | } | ||||
| void BLI_pool_set_num_threads(TaskPool *pool, int num_threads) | void BLI_pool_set_num_threads(TaskPool *pool, int num_threads) | ||||
| { | { | ||||
| /* NOTE: Don't try to modify threads while tasks are running! */ | /* NOTE: Don't try to modify threads while tasks are running! */ | ||||
| pool->num_threads = num_threads; | pool->num_threads = num_threads; | ||||
| } | } | ||||
| void BLI_task_pool_cancel(TaskPool *pool) | void BLI_task_pool_cancel(TaskPool *pool) | ||||
| { | { | ||||
| TaskScheduler *scheduler = pool->scheduler; | |||||
| pool->do_cancel = true; | pool->do_cancel = true; | ||||
| task_scheduler_clear(pool->scheduler, pool); | task_scheduler_clear(scheduler, pool); | ||||
| /* wait until all entries are cleared */ | /* wait until all entries are cleared. */ | ||||
| BLI_mutex_lock(&pool->num_mutex); | BLI_mutex_lock(&scheduler->queue_mutex); | ||||
| while (pool->num) | while (pool->num) | ||||
| BLI_condition_wait(&pool->num_cond, &pool->num_mutex); | BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex); | ||||
| BLI_mutex_unlock(&pool->num_mutex); | BLI_mutex_unlock(&scheduler->queue_mutex); | ||||
| pool->do_cancel = false; | pool->do_cancel = false; | ||||
| } | } | ||||
| void BLI_task_pool_stop(TaskPool *pool) | void BLI_task_pool_stop(TaskPool *pool) | ||||
| { | { | ||||
| task_scheduler_clear(pool->scheduler, pool); | task_scheduler_clear(pool->scheduler, pool); | ||||
| ▲ Show 20 Lines • Show All 407 Lines • Show Last 20 Lines | |||||
Think we could get rid of done counter at least... It’s only used for stats report in one place afaict, and this can be achieved by other ways (like checking pool->num, not as accurate but would work with high numbers of tasks, or just add your own 'done' counter in your tasks' code if you really need it)?
About limited number of threads, am not sure… I wonder why we do need this? Seems to only be used in one place, OGL render?