Changeset View
Changeset View
Standalone View
Standalone View
source/blender/blenlib/intern/task_pool.cc
| Show All 11 Lines | |||||
| * You should have received a copy of the GNU General Public License | * You should have received a copy of the GNU General Public License | ||||
| * along with this program; if not, write to the Free Software Foundation, | * along with this program; if not, write to the Free Software Foundation, | ||||
| * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | ||||
| */ | */ | ||||
| /** \file | /** \file | ||||
| * \ingroup bli | * \ingroup bli | ||||
| * | * | ||||
| * A generic task system which can be used for any task based subsystem. | * Task pool to run tasks in parallel. | ||||
| */ | */ | ||||
| #include <stdlib.h> | #include <stdlib.h> | ||||
brecht: Add this to fix build without TBB:
```
#include <memory>
#include <utility>
```
| |||||
| #include "MEM_guardedalloc.h" | #include "MEM_guardedalloc.h" | ||||
| #include "DNA_listBase.h" | #include "DNA_listBase.h" | ||||
| #include "BLI_listbase.h" | |||||
| #include "BLI_math.h" | #include "BLI_math.h" | ||||
| #include "BLI_mempool.h" | #include "BLI_mempool.h" | ||||
| #include "BLI_task.h" | #include "BLI_task.h" | ||||
| #include "BLI_threads.h" | #include "BLI_threads.h" | ||||
| #include "atomic_ops.h" | #ifdef WITH_TBB | ||||
| /* Quiet top level deprecation message, unrelated to API usage here. */ | |||||
| /* Define this to enable some detailed statistic print. */ | # define TBB_SUPPRESS_DEPRECATED_MESSAGES 1 | ||||
| #undef DEBUG_STATS | # include <tbb/tbb.h> | ||||
| /* Types */ | |||||
| /* Number of per-thread pre-allocated tasks. | |||||
| * | |||||
| * For more details see description of TaskMemPool. | |||||
| */ | |||||
| #define MEMPOOL_SIZE 256 | |||||
| /* Number of tasks which are pushed directly to local thread queue. | |||||
| * | |||||
| * This allows thread to fetch next task without locking the whole queue. | |||||
| */ | |||||
| #define LOCAL_QUEUE_SIZE 1 | |||||
| /* Number of tasks which are allowed to be scheduled in a delayed manner. | |||||
| * | |||||
| * This allows to use less locks per graph node children schedule. More details | |||||
| * could be found at TaskThreadLocalStorage::do_delayed_push. | |||||
| */ | |||||
| #define DELAYED_QUEUE_SIZE 4096 | |||||
| #ifndef NDEBUG | |||||
| # define ASSERT_THREAD_ID(scheduler, thread_id) \ | |||||
| do { \ | |||||
| if (!BLI_thread_is_main()) { \ | |||||
| TaskThread *thread = (TaskThread *)pthread_getspecific(scheduler->tls_id_key); \ | |||||
| if (thread == NULL) { \ | |||||
| BLI_assert(thread_id == 0); \ | |||||
| } \ | |||||
| else { \ | |||||
| BLI_assert(thread_id == thread->id); \ | |||||
| } \ | |||||
| } \ | |||||
| else { \ | |||||
| BLI_assert(thread_id == 0); \ | |||||
| } \ | |||||
| } while (false) | |||||
| #else | |||||
| # define ASSERT_THREAD_ID(scheduler, thread_id) | |||||
| #endif | #endif | ||||
| typedef struct Task { | /* Task | ||||
| struct Task *next, *prev; | * | ||||
| * Unit of work to execute. This is a C++ class to work with TBB. */ | |||||
| class Task { | |||||
| public: | |||||
| TaskPool *pool; | |||||
| TaskRunFunction run; | TaskRunFunction run; | ||||
| void *taskdata; | void *taskdata; | ||||
| bool free_taskdata; | bool free_taskdata; | ||||
| TaskFreeFunction freedata; | TaskFreeFunction freedata; | ||||
| TaskPool *pool; | |||||
| } Task; | |||||
| /* This is a per-thread storage of pre-allocated tasks. | |||||
| * | |||||
| * The idea behind this is simple: reduce amount of malloc() calls when pushing | |||||
| * new task to the pool. This is done by keeping memory from the tasks which | |||||
| * were finished already, so instead of freeing that memory we put it to the | |||||
| * pool for the later re-use. | |||||
| * | |||||
| * The tricky part here is to avoid any inter-thread synchronization, hence no | |||||
| * lock must exist around this pool. The pool will become an owner of the pointer | |||||
| * from freed task, and only corresponding thread will be able to use this pool | |||||
| * (no memory stealing and such). | |||||
| * | |||||
| * This leads to the following use of the pool: | |||||
| * | |||||
| * - task_push() should provide proper thread ID from which the task is being | |||||
| * pushed from. | |||||
| * | |||||
| * - Task allocation function which check corresponding memory pool and if there | |||||
| * is any memory in there it'll mark memory as re-used, remove it from the pool | |||||
| * and use that memory for the new task. | |||||
| * | |||||
| * At this moment task queue owns the memory. | |||||
| * | |||||
| * - When task is done and task_free() is called the memory will be put to the | |||||
| * pool which corresponds to a thread which handled the task. | |||||
| */ | |||||
| typedef struct TaskMemPool { | |||||
| /* Number of pre-allocated tasks in the pool. */ | |||||
| int num_tasks; | |||||
| /* Pre-allocated task memory pointers. */ | |||||
| Task *tasks[MEMPOOL_SIZE]; | |||||
| } TaskMemPool; | |||||
| #ifdef DEBUG_STATS | |||||
| typedef struct TaskMemPoolStats { | |||||
| /* Number of allocations. */ | |||||
| int num_alloc; | |||||
| /* Number of avoided allocations (pointer was re-used from the pool). */ | |||||
| int num_reuse; | |||||
| /* Number of discarded memory due to pool saturation, */ | |||||
| int num_discard; | |||||
| } TaskMemPoolStats; | |||||
| #endif | |||||
| typedef struct TaskThreadLocalStorage { | |||||
| /* Memory pool for faster task allocation. | |||||
| * The idea is to re-use memory of finished/discarded tasks by this thread. | |||||
| */ | |||||
| TaskMemPool task_mempool; | |||||
| /* Local queue keeps thread alive by keeping small amount of tasks ready | |||||
| * to be picked up without causing global thread locks for synchronization. | |||||
| */ | |||||
| int num_local_queue; | |||||
| Task *local_queue[LOCAL_QUEUE_SIZE]; | |||||
| /* Thread can be marked for delayed tasks push. This is helpful when it's | |||||
| * know that lots of subsequent task pushed will happen from the same thread | |||||
| * without "interrupting" for task execution. | |||||
| * | |||||
| * We try to accumulate as much tasks as possible in a local queue without | |||||
| * any locks first, and then we push all of them into a scheduler's queue | |||||
| * from within a single mutex lock. | |||||
| */ | |||||
| bool do_delayed_push; | |||||
| int num_delayed_queue; | |||||
| Task *delayed_queue[DELAYED_QUEUE_SIZE]; | |||||
| } TaskThreadLocalStorage; | |||||
| struct TaskPool { | |||||
| TaskScheduler *scheduler; | |||||
| volatile size_t num; | |||||
| ThreadMutex num_mutex; | |||||
| ThreadCondition num_cond; | |||||
| void *userdata; | |||||
| ThreadMutex user_mutex; | |||||
| volatile bool do_cancel; | Task(TaskPool *pool, | ||||
| volatile bool do_work; | TaskRunFunction run, | ||||
| void *taskdata, | |||||
| volatile bool is_suspended; | bool free_taskdata, | ||||
| bool start_suspended; | TaskFreeFunction freedata) | ||||
| ListBase suspended_queue; | : pool(pool), run(run), taskdata(taskdata), free_taskdata(free_taskdata), freedata(freedata) | ||||
| size_t num_suspended; | |||||
| TaskPriority priority; | |||||
| /* If set, this pool may never be work_and_wait'ed, which means TaskScheduler | |||||
| * has to use its special background fallback thread in case we are in | |||||
| * single-threaded situation. | |||||
| */ | |||||
| bool run_in_background; | |||||
| /* This is a task scheduler's ID of a thread at which pool was constructed. | |||||
| * It will be used to access task TLS. | |||||
| */ | |||||
| int thread_id; | |||||
| /* For the pools which are created from non-main thread which is not a | |||||
| * scheduler worker thread we can't re-use any of scheduler's threads TLS | |||||
| * and have to use our own one. | |||||
| */ | |||||
| bool use_local_tls; | |||||
| TaskThreadLocalStorage local_tls; | |||||
| #ifndef NDEBUG | |||||
| pthread_t creator_thread_id; | |||||
| #endif | |||||
| #ifdef DEBUG_STATS | |||||
| TaskMemPoolStats *mempool_stats; | |||||
| #endif | |||||
| }; | |||||
| struct TaskScheduler { | |||||
| pthread_t *threads; | |||||
| struct TaskThread *task_threads; | |||||
| int num_threads; | |||||
| bool background_thread_only; | |||||
| ListBase queue; | |||||
| ThreadMutex queue_mutex; | |||||
| ThreadCondition queue_cond; | |||||
| ThreadMutex startup_mutex; | |||||
| ThreadCondition startup_cond; | |||||
| volatile int num_thread_started; | |||||
| volatile bool do_exit; | |||||
| /* NOTE: In pthread's TLS we store the whole TaskThread structure. */ | |||||
| pthread_key_t tls_id_key; | |||||
| }; | |||||
| typedef struct TaskThread { | |||||
| TaskScheduler *scheduler; | |||||
| int id; | |||||
| TaskThreadLocalStorage tls; | |||||
| } TaskThread; | |||||
| /* Helper */ | |||||
| BLI_INLINE void task_data_free(Task *task, const int thread_id) | |||||
| { | |||||
| if (task->free_taskdata) { | |||||
| if (task->freedata) { | |||||
| task->freedata(task->pool, task->taskdata, thread_id); | |||||
| } | |||||
| else { | |||||
| MEM_freeN(task->taskdata); | |||||
| } | |||||
| } | |||||
| } | |||||
| BLI_INLINE void initialize_task_tls(TaskThreadLocalStorage *tls) | |||||
| { | { | ||||
| memset(tls, 0, sizeof(TaskThreadLocalStorage)); | |||||
| } | } | ||||
| BLI_INLINE TaskThreadLocalStorage *get_task_tls(TaskPool *pool, const int thread_id) | ~Task() | ||||
| { | { | ||||
| TaskScheduler *scheduler = pool->scheduler; | if (free_taskdata) { | ||||
| BLI_assert(thread_id >= 0); | if (freedata) { | ||||
| BLI_assert(thread_id <= scheduler->num_threads); | freedata(pool, taskdata); | ||||
| if (pool->use_local_tls && thread_id == 0) { | |||||
| BLI_assert(pool->thread_id == 0); | |||||
| BLI_assert(!BLI_thread_is_main()); | |||||
| BLI_assert(pthread_equal(pthread_self(), pool->creator_thread_id)); | |||||
| return &pool->local_tls; | |||||
| } | } | ||||
| if (thread_id == 0) { | else { | ||||
| BLI_assert(BLI_thread_is_main()); | MEM_freeN(taskdata); | ||||
| return &scheduler->task_threads[pool->thread_id].tls; | |||||
| } | |||||
| return &scheduler->task_threads[thread_id].tls; | |||||
| } | } | ||||
| BLI_INLINE void free_task_tls(TaskThreadLocalStorage *tls) | |||||
| { | |||||
| TaskMemPool *task_mempool = &tls->task_mempool; | |||||
| for (int i = 0; i < task_mempool->num_tasks; i++) { | |||||
| MEM_freeN(task_mempool->tasks[i]); | |||||
| } | } | ||||
| } | } | ||||
| static Task *task_alloc(TaskPool *pool, const int thread_id) | /* Move constructor. */ | ||||
| Task(Task &&other) | |||||
| : pool(other.pool), | |||||
| run(other.run), | |||||
| taskdata(other.taskdata), | |||||
| free_taskdata(other.free_taskdata), | |||||
| freedata(other.freedata) | |||||
| { | { | ||||
| BLI_assert(thread_id <= pool->scheduler->num_threads); | other.pool = NULL; | ||||
| if (thread_id != -1) { | other.run = NULL; | ||||
| BLI_assert(thread_id >= 0); | other.taskdata = NULL; | ||||
| BLI_assert(thread_id <= pool->scheduler->num_threads); | other.free_taskdata = false; | ||||
| TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id); | other.freedata = NULL; | ||||
| TaskMemPool *task_mempool = &tls->task_mempool; | |||||
| /* Try to re-use task memory from a thread local storage. */ | |||||
| if (task_mempool->num_tasks > 0) { | |||||
| --task_mempool->num_tasks; | |||||
| /* Success! We've just avoided task allocation. */ | |||||
| #ifdef DEBUG_STATS | |||||
| pool->mempool_stats[thread_id].num_reuse++; | |||||
| #endif | |||||
| return task_mempool->tasks[task_mempool->num_tasks]; | |||||
| } | |||||
| /* We are doomed to allocate new task data. */ | |||||
| #ifdef DEBUG_STATS | |||||
| pool->mempool_stats[thread_id].num_alloc++; | |||||
| #endif | |||||
| } | |||||
| return (Task *)MEM_mallocN(sizeof(Task), "New task"); | |||||
| } | } | ||||
| static void task_free(TaskPool *pool, Task *task, const int thread_id) | /* Execute task. */ | ||||
| void operator()() const | |||||
| { | { | ||||
| task_data_free(task, thread_id); | run(pool, taskdata); | ||||
| BLI_assert(thread_id >= 0); | |||||
| BLI_assert(thread_id <= pool->scheduler->num_threads); | |||||
| if (thread_id == 0) { | |||||
| BLI_assert(pool->use_local_tls || BLI_thread_is_main()); | |||||
| } | |||||
| TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id); | |||||
| TaskMemPool *task_mempool = &tls->task_mempool; | |||||
| if (task_mempool->num_tasks < MEMPOOL_SIZE - 1) { | |||||
| /* Successfully allowed the task to be re-used later. */ | |||||
| task_mempool->tasks[task_mempool->num_tasks] = task; | |||||
| ++task_mempool->num_tasks; | |||||
| } | |||||
| else { | |||||
| /* Local storage saturated, no other way than just discard | |||||
| * the memory. | |||||
| * | |||||
| * TODO(sergey): We can perhaps store such pointer in a global | |||||
| * scheduler pool, maybe it'll be faster than discarding and | |||||
| * allocating again. | |||||
| */ | |||||
| MEM_freeN(task); | |||||
| #ifdef DEBUG_STATS | |||||
| pool->mempool_stats[thread_id].num_discard++; | |||||
| #endif | |||||
| } | } | ||||
| } | |||||
| /* Task Scheduler */ | |||||
| static void task_pool_num_decrease(TaskPool *pool, size_t done) | /* For performance, ensure we never copy the task and only move it. */ | ||||
| { | Task(const Task &other) = delete; | ||||
| BLI_mutex_lock(&pool->num_mutex); | Task &operator=(const Task &other) = delete; | ||||
| Task &operator=(Task &&other) = delete; | |||||
| BLI_assert(pool->num >= done); | }; | ||||
| pool->num -= done; | /* TBB Task Group. | ||||
| * | |||||
| * Subclass since there seems to be no other way to set priority. */ | |||||
| if (pool->num == 0) { | #ifdef WITH_TBB | ||||
| BLI_condition_notify_all(&pool->num_cond); | class TBBTaskGroup : public tbb::task_group { | ||||
| public: | |||||
| TBBTaskGroup(TaskPriority priority) | |||||
| { | |||||
| switch (priority) { | |||||
| case TASK_PRIORITY_LOW: | |||||
| my_context.set_priority(tbb::priority_low); | |||||
| break; | |||||
| case TASK_PRIORITY_HIGH: | |||||
| my_context.set_priority(tbb::priority_normal); | |||||
| break; | |||||
| } | } | ||||
| BLI_mutex_unlock(&pool->num_mutex); | |||||
| } | } | ||||
| static void task_pool_num_increase(TaskPool *pool, size_t new_num) | ~TBBTaskGroup() | ||||
| { | { | ||||
| BLI_mutex_lock(&pool->num_mutex); | |||||
| pool->num += new_num; | |||||
| BLI_condition_notify_all(&pool->num_cond); | |||||
| BLI_mutex_unlock(&pool->num_mutex); | |||||
| } | } | ||||
| }; | |||||
| #endif | |||||
| static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task) | /* Task Pool */ | ||||
| { | |||||
| bool found_task = false; | |||||
| BLI_mutex_lock(&scheduler->queue_mutex); | |||||
| while (!scheduler->queue.first && !scheduler->do_exit) { | |||||
| BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex); | |||||
| } | |||||
| do { | typedef enum TaskPoolType { | ||||
| Task *current_task; | TASK_POOL_TBB, | ||||
| TASK_POOL_TBB_SUSPENDED, | |||||
| TASK_POOL_NO_THREADS, | |||||
| TASK_POOL_BACKGROUND, | |||||
| TASK_POOL_BACKGROUND_SERIAL, | |||||
| } TaskPoolType; | |||||
| /* Assuming we can only have a void queue in 'exit' case here seems logical | struct TaskPool { | ||||
| * (we should only be here after our worker thread has been woken up from a | TaskPoolType type; | ||||
| * condition_wait(), which only happens after a new task was added to the queue), | bool use_threads; | ||||
| * but it is wrong. | |||||
| * Waiting on condition may wake up the thread even if condition is not signaled | |||||
| * (spurious wake-ups), and some race condition may also empty the queue **after** | |||||
| * condition has been signaled, but **before** awoken thread reaches this point... | |||||
| * See http://stackoverflow.com/questions/8594591 | |||||
| * | |||||
| * So we only abort here if do_exit is set. | |||||
| */ | |||||
| if (scheduler->do_exit) { | |||||
| BLI_mutex_unlock(&scheduler->queue_mutex); | |||||
| return false; | |||||
| } | |||||
| for (current_task = (Task *)scheduler->queue.first; current_task != NULL; | ThreadMutex user_mutex; | ||||
| current_task = current_task->next) { | void *userdata; | ||||
| TaskPool *pool = current_task->pool; | |||||
| if (scheduler->background_thread_only && !pool->run_in_background) { | /* TBB task pool. */ | ||||
| continue; | #ifdef WITH_TBB | ||||
| } | TBBTaskGroup tbb_group; | ||||
| #endif | |||||
| volatile bool is_suspended; | |||||
| BLI_mempool *suspended_mempool; | |||||
| *task = current_task; | /* Background task pool. */ | ||||
| found_task = true; | ListBase background_threads; | ||||
| BLI_remlink(&scheduler->queue, *task); | ThreadQueue *background_queue; | ||||
| break; | volatile bool background_is_canceling; | ||||
| } | }; | ||||
| if (!found_task) { | |||||
| BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex); | |||||
| } | |||||
| } while (!found_task); | |||||
| BLI_mutex_unlock(&scheduler->queue_mutex); | /* TBB Task Pool. | ||||
| * | |||||
| * Task pool using the TBB scheduler for tasks. When building without TBB | |||||
| * support or running Blender with -t 1, this reverts to single threaded. | |||||
| * | |||||
| * Tasks may be suspended until in all are created, to make it possible to | |||||
| * initialize data structures and create tasks in a single pass. */ | |||||
| return true; | static void tbb_task_pool_create(TaskPool *pool, TaskPriority priority) | ||||
| { | |||||
| if (pool->type == TASK_POOL_TBB_SUSPENDED) { | |||||
| pool->is_suspended = true; | |||||
| pool->suspended_mempool = BLI_mempool_create(sizeof(Task), 512, 512, BLI_MEMPOOL_ALLOW_ITER); | |||||
| } | } | ||||
| BLI_INLINE void handle_local_queue(TaskThreadLocalStorage *tls, const int thread_id) | #ifdef WITH_TBB | ||||
| { | if (pool->use_threads) { | ||||
| BLI_assert(!tls->do_delayed_push); | new (&pool->tbb_group) TBBTaskGroup(priority); | ||||
| while (tls->num_local_queue > 0) { | |||||
| /* We pop task from queue before handling it so handler of the task can | |||||
| * push next job to the local queue. | |||||
| */ | |||||
| tls->num_local_queue--; | |||||
| Task *local_task = tls->local_queue[tls->num_local_queue]; | |||||
| /* TODO(sergey): Double-check work_and_wait() doesn't handle other's | |||||
| * pool tasks. | |||||
| */ | |||||
| TaskPool *local_pool = local_task->pool; | |||||
| local_task->run(local_pool, local_task->taskdata, thread_id); | |||||
| task_free(local_pool, local_task, thread_id); | |||||
| } | } | ||||
| BLI_assert(!tls->do_delayed_push); | #endif | ||||
| } | } | ||||
| static void *task_scheduler_thread_run(void *thread_p) | static void tbb_task_pool_run(TaskPool *pool, Task &&task) | ||||
| { | { | ||||
| TaskThread *thread = (TaskThread *)thread_p; | if (pool->is_suspended) { | ||||
| TaskThreadLocalStorage *tls = &thread->tls; | /* Suspended task that will be executed in work_and_wait(). */ | ||||
| TaskScheduler *scheduler = thread->scheduler; | Task *task_mem = (Task *)BLI_mempool_alloc(pool->suspended_mempool); | ||||
| int thread_id = thread->id; | new (task_mem) Task(std::move(task)); | ||||
| Task *task; | #ifdef __GNUC__ | ||||
| /* Work around apparent compiler bug where task is not properly copied | |||||
| pthread_setspecific(scheduler->tls_id_key, thread); | * to task_mem. This appears unrelated to the use of placement new or | ||||
| * move semantics, happens even writing to a plain C struct. Rather the | |||||
| /* signal the main thread when all threads have started */ | * call into TBB seems to have some indirect effect. */ | ||||
| BLI_mutex_lock(&scheduler->startup_mutex); | std::atomic_thread_fence(std::memory_order_release); | ||||
| scheduler->num_thread_started++; | #endif | ||||
| if (scheduler->num_thread_started == scheduler->num_threads) { | |||||
| BLI_condition_notify_one(&scheduler->startup_cond); | |||||
| } | } | ||||
| BLI_mutex_unlock(&scheduler->startup_mutex); | #ifdef WITH_TBB | ||||
| else if (pool->use_threads) { | |||||
| /* keep popping off tasks */ | /* Execute in TBB task group. */ | ||||
| while (task_scheduler_thread_wait_pop(scheduler, &task)) { | pool->tbb_group.run(std::move(task)); | ||||
| TaskPool *pool = task->pool; | } | ||||
| #endif | |||||
| /* run task */ | else { | ||||
| BLI_assert(!tls->do_delayed_push); | /* Execute immediately. */ | ||||
| task->run(pool, task->taskdata, thread_id); | task(); | ||||
| BLI_assert(!tls->do_delayed_push); | |||||
| /* delete task */ | |||||
| task_free(pool, task, thread_id); | |||||
| /* Handle all tasks from local queue. */ | |||||
| handle_local_queue(tls, thread_id); | |||||
| /* notify pool task was done */ | |||||
| task_pool_num_decrease(pool, 1); | |||||
| } | } | ||||
| return NULL; | |||||
| } | } | ||||
| TaskScheduler *BLI_task_scheduler_create(int num_threads) | static void tbb_task_pool_work_and_wait(TaskPool *pool) | ||||
| { | { | ||||
| TaskScheduler *scheduler = (TaskScheduler *)MEM_callocN(sizeof(TaskScheduler), "TaskScheduler"); | /* Start any suspended task now. */ | ||||
| if (pool->suspended_mempool) { | |||||
| /* multiple places can use this task scheduler, sharing the same | pool->is_suspended = false; | ||||
| * threads, so we keep track of the number of users. */ | |||||
| scheduler->do_exit = false; | |||||
| BLI_listbase_clear(&scheduler->queue); | |||||
| BLI_mutex_init(&scheduler->queue_mutex); | |||||
| BLI_condition_init(&scheduler->queue_cond); | |||||
| BLI_mutex_init(&scheduler->startup_mutex); | BLI_mempool_iter iter; | ||||
| BLI_condition_init(&scheduler->startup_cond); | BLI_mempool_iternew(pool->suspended_mempool, &iter); | ||||
| scheduler->num_thread_started = 0; | while (Task *task = (Task *)BLI_mempool_iterstep(&iter)) { | ||||
| tbb_task_pool_run(pool, std::move(*task)); | |||||
| if (num_threads == 0) { | |||||
| /* automatic number of threads will be main thread + num cores */ | |||||
| num_threads = BLI_system_thread_count(); | |||||
| } | } | ||||
| /* main thread will also work, so we count it too */ | BLI_mempool_clear(pool->suspended_mempool); | ||||
| num_threads -= 1; | |||||
| /* Add background-only thread if needed. */ | |||||
| if (num_threads == 0) { | |||||
| scheduler->background_thread_only = true; | |||||
| num_threads = 1; | |||||
| } | } | ||||
| scheduler->task_threads = (TaskThread *)MEM_mallocN(sizeof(TaskThread) * (num_threads + 1), | #ifdef WITH_TBB | ||||
| "TaskScheduler task threads"); | if (pool->use_threads) { | ||||
| /* This is called wait(), but internally it can actually do work. This | |||||
| /* Initialize TLS for main thread. */ | * matters because we don't want recursive usage of task pools to run | ||||
| initialize_task_tls(&scheduler->task_threads[0].tls); | * out of threads and get stuck. */ | ||||
| pool->tbb_group.wait(); | |||||
| pthread_key_create(&scheduler->tls_id_key, NULL); | |||||
| /* launch threads that will be waiting for work */ | |||||
| if (num_threads > 0) { | |||||
| int i; | |||||
| scheduler->num_threads = num_threads; | |||||
| scheduler->threads = (pthread_t *)MEM_callocN(sizeof(pthread_t) * num_threads, | |||||
| "TaskScheduler threads"); | |||||
| for (i = 0; i < num_threads; i++) { | |||||
| TaskThread *thread = &scheduler->task_threads[i + 1]; | |||||
| thread->scheduler = scheduler; | |||||
| thread->id = i + 1; | |||||
| initialize_task_tls(&thread->tls); | |||||
| if (pthread_create(&scheduler->threads[i], NULL, task_scheduler_thread_run, thread) != 0) { | |||||
| fprintf(stderr, "TaskScheduler failed to launch thread %d/%d\n", i, num_threads); | |||||
| } | |||||
| } | } | ||||
| #endif | |||||
| } | } | ||||
| /* Wait for all worker threads to start before returning to caller to prevent the case where | static void tbb_task_pool_cancel(TaskPool *pool) | ||||
| * threads are still starting and pthread_join is called, which causes a deadlock on pthreads4w. | { | ||||
| */ | #ifdef WITH_TBB | ||||
| BLI_mutex_lock(&scheduler->startup_mutex); | if (pool->use_threads) { | ||||
| /* NOTE: Use loop here to avoid false-positive everything-is-ready caused by spontaneous thread | pool->tbb_group.cancel(); | ||||
| * wake up. */ | pool->tbb_group.wait(); | ||||
| while (scheduler->num_thread_started != num_threads) { | |||||
| BLI_condition_wait(&scheduler->startup_cond, &scheduler->startup_mutex); | |||||
| } | } | ||||
| BLI_mutex_unlock(&scheduler->startup_mutex); | #endif | ||||
| return scheduler; | |||||
| } | } | ||||
| void BLI_task_scheduler_free(TaskScheduler *scheduler) | static bool tbb_task_pool_canceled(TaskPool *pool) | ||||
| { | { | ||||
| Task *task; | #ifdef WITH_TBB | ||||
| if (pool->use_threads) { | |||||
| /* stop all waiting threads */ | return pool->tbb_group.is_canceling(); | ||||
| BLI_mutex_lock(&scheduler->queue_mutex); | |||||
| scheduler->do_exit = true; | |||||
| BLI_condition_notify_all(&scheduler->queue_cond); | |||||
| BLI_mutex_unlock(&scheduler->queue_mutex); | |||||
| pthread_key_delete(scheduler->tls_id_key); | |||||
| /* delete threads */ | |||||
| if (scheduler->threads) { | |||||
| int i; | |||||
| for (i = 0; i < scheduler->num_threads; i++) { | |||||
| if (pthread_join(scheduler->threads[i], NULL) != 0) { | |||||
| fprintf(stderr, "TaskScheduler failed to join thread %d/%d\n", i, scheduler->num_threads); | |||||
| } | |||||
| } | } | ||||
| #endif | |||||
| MEM_freeN(scheduler->threads); | return false; | ||||
| } | } | ||||
| /* Delete task thread data */ | static void tbb_task_pool_free(TaskPool *pool) | ||||
| if (scheduler->task_threads) { | { | ||||
| for (int i = 0; i < scheduler->num_threads + 1; i++) { | #ifdef WITH_TBB | ||||
| TaskThreadLocalStorage *tls = &scheduler->task_threads[i].tls; | if (pool->use_threads) { | ||||
| free_task_tls(tls); | pool->tbb_group.~TBBTaskGroup(); | ||||
| } | } | ||||
| #endif | |||||
| MEM_freeN(scheduler->task_threads); | if (pool->suspended_mempool) { | ||||
| BLI_mempool_destroy(pool->suspended_mempool); | |||||
| } | } | ||||
| /* delete leftover tasks */ | |||||
| for (task = (Task *)scheduler->queue.first; task; task = task->next) { | |||||
| task_data_free(task, 0); | |||||
| } | } | ||||
| BLI_freelistN(&scheduler->queue); | |||||
| /* delete mutex/condition */ | |||||
| BLI_mutex_end(&scheduler->queue_mutex); | |||||
| BLI_condition_end(&scheduler->queue_cond); | |||||
| BLI_mutex_end(&scheduler->startup_mutex); | |||||
| BLI_condition_end(&scheduler->startup_cond); | |||||
| MEM_freeN(scheduler); | /* Background Task Pool. | ||||
| } | * | ||||
| * Fallback for running background tasks when building without TBB. */ | |||||
| int BLI_task_scheduler_num_threads(TaskScheduler *scheduler) | static void *background_task_run(void *userdata) | ||||
| { | { | ||||
| return scheduler->num_threads + 1; | TaskPool *pool = (TaskPool *)userdata; | ||||
| while (Task *task = (Task *)BLI_thread_queue_pop(pool->background_queue)) { | |||||
| (*task)(); | |||||
| task->~Task(); | |||||
| MEM_freeN(task); | |||||
| } | |||||
| return NULL; | |||||
| } | } | ||||
| static void task_scheduler_push(TaskScheduler *scheduler, Task *task, TaskPriority priority) | static void background_task_pool_create(TaskPool *pool) | ||||
| { | { | ||||
| task_pool_num_increase(task->pool, 1); | pool->background_queue = BLI_thread_queue_init(); | ||||
| BLI_threadpool_init(&pool->background_threads, background_task_run, 1); | |||||
| /* add task to queue */ | BLI_threadpool_insert(&pool->background_threads, pool); | ||||
| BLI_mutex_lock(&scheduler->queue_mutex); | |||||
| if (priority == TASK_PRIORITY_HIGH) { | |||||
| BLI_addhead(&scheduler->queue, task); | |||||
| } | |||||
| else { | |||||
| BLI_addtail(&scheduler->queue, task); | |||||
| } | } | ||||
| BLI_condition_notify_one(&scheduler->queue_cond); | static void background_task_pool_run(TaskPool *pool, Task &&task) | ||||
| BLI_mutex_unlock(&scheduler->queue_mutex); | { | ||||
| Task *task_mem = (Task *)MEM_mallocN(sizeof(Task), __func__); | |||||
| new (task_mem) Task(std::move(task)); | |||||
| BLI_thread_queue_push(pool->background_queue, task_mem); | |||||
| } | } | ||||
| static void task_scheduler_push_all(TaskScheduler *scheduler, | static void background_task_pool_work_and_wait(TaskPool *pool) | ||||
| TaskPool *pool, | |||||
| Task **tasks, | |||||
| int num_tasks) | |||||
| { | { | ||||
| if (num_tasks == 0) { | /* Signal background thread to stop waiting for new tasks if none are | ||||
| return; | * left, and wait for tasks and thread to finish. */ | ||||
| BLI_thread_queue_nowait(pool->background_queue); | |||||
| BLI_thread_queue_wait_finish(pool->background_queue); | |||||
| BLI_threadpool_remove(&pool->background_threads, pool); | |||||
| } | } | ||||
| task_pool_num_increase(pool, num_tasks); | static void background_task_pool_cancel(TaskPool *pool) | ||||
| { | |||||
| BLI_mutex_lock(&scheduler->queue_mutex); | pool->background_is_canceling = true; | ||||
| for (int i = 0; i < num_tasks; i++) { | /* Remove tasks not yet started by background thread. */ | ||||
| BLI_addhead(&scheduler->queue, tasks[i]); | BLI_thread_queue_nowait(pool->background_queue); | ||||
| while (Task *task = (Task *)BLI_thread_queue_pop(pool->background_queue)) { | |||||
| task->~Task(); | |||||
| MEM_freeN(task); | |||||
| } | } | ||||
| BLI_condition_notify_all(&scheduler->queue_cond); | /* Let background thread finish or cancel task it is working on. */ | ||||
| BLI_mutex_unlock(&scheduler->queue_mutex); | BLI_threadpool_remove(&pool->background_threads, pool); | ||||
| pool->background_is_canceling = false; | |||||
| } | } | ||||
| static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool) | static bool background_task_pool_canceled(TaskPool *pool) | ||||
| { | { | ||||
| Task *task, *nexttask; | return pool->background_is_canceling; | ||||
| size_t done = 0; | |||||
| BLI_mutex_lock(&scheduler->queue_mutex); | |||||
| /* free all tasks from this pool from the queue */ | |||||
| for (task = (Task *)scheduler->queue.first; task; task = nexttask) { | |||||
| nexttask = task->next; | |||||
| if (task->pool == pool) { | |||||
| task_data_free(task, pool->thread_id); | |||||
| BLI_freelinkN(&scheduler->queue, task); | |||||
| done++; | |||||
| } | |||||
| } | } | ||||
| BLI_mutex_unlock(&scheduler->queue_mutex); | static void background_task_pool_free(TaskPool *pool) | ||||
| { | |||||
| background_task_pool_work_and_wait(pool); | |||||
| /* notify done */ | BLI_threadpool_end(&pool->background_threads); | ||||
| task_pool_num_decrease(pool, done); | BLI_thread_queue_free(pool->background_queue); | ||||
| } | } | ||||
| /* Task Pool */ | /* Task Pool */ | ||||
| static TaskPool *task_pool_create_ex(TaskScheduler *scheduler, | static TaskPool *task_pool_create_ex(void *userdata, TaskPoolType type, TaskPriority priority) | ||||
| void *userdata, | { | ||||
| const bool is_background, | /* Ensure malloc will go fine from threads, | ||||
| const bool is_suspended, | * | ||||
| TaskPriority priority) | * This is needed because we could be in main thread here | ||||
| { | * and malloc could be non-thread safe at this point because | ||||
| TaskPool *pool = (TaskPool *)MEM_mallocN(sizeof(TaskPool), "TaskPool"); | * no other jobs are running. | ||||
| */ | |||||
| #ifndef NDEBUG | BLI_threaded_malloc_begin(); | ||||
| /* Assert we do not try to create a background pool from some parent task - | |||||
| * those only work OK from main thread. */ | |||||
| if (is_background) { | |||||
| const pthread_t thread_id = pthread_self(); | |||||
| int i = scheduler->num_threads; | |||||
| while (i--) { | const bool use_threads = BLI_task_scheduler_num_threads() > 1 && type != TASK_POOL_NO_THREADS; | ||||
| BLI_assert(!pthread_equal(scheduler->threads[i], thread_id)); | |||||
| } | /* Background task pool uses regular TBB scheduling if available. Only when | ||||
| * building without TBB or running with -t 1 do we need to ensure these tasks | |||||
| * do not block the main thread. */ | |||||
| if (type == TASK_POOL_BACKGROUND && use_threads) { | |||||
| type = TASK_POOL_TBB; | |||||
| } | } | ||||
| #endif | |||||
| pool->scheduler = scheduler; | /* Allocate task pool. */ | ||||
| pool->num = 0; | TaskPool *pool = (TaskPool *)MEM_callocN(sizeof(TaskPool), "TaskPool"); | ||||
| pool->do_cancel = false; | |||||
| pool->do_work = false; | |||||
| pool->is_suspended = is_suspended; | |||||
| pool->start_suspended = is_suspended; | |||||
| pool->num_suspended = 0; | |||||
| pool->suspended_queue.first = pool->suspended_queue.last = NULL; | |||||
| pool->priority = priority; | |||||
| pool->run_in_background = is_background; | |||||
| pool->use_local_tls = false; | |||||
| BLI_mutex_init(&pool->num_mutex); | pool->type = type; | ||||
| BLI_condition_init(&pool->num_cond); | pool->use_threads = use_threads; | ||||
| pool->userdata = userdata; | pool->userdata = userdata; | ||||
| BLI_mutex_init(&pool->user_mutex); | BLI_mutex_init(&pool->user_mutex); | ||||
| if (BLI_thread_is_main()) { | switch (type) { | ||||
| pool->thread_id = 0; | case TASK_POOL_TBB: | ||||
| } | case TASK_POOL_TBB_SUSPENDED: | ||||
| else { | case TASK_POOL_NO_THREADS: | ||||
| TaskThread *thread = (TaskThread *)pthread_getspecific(scheduler->tls_id_key); | tbb_task_pool_create(pool, priority); | ||||
| if (thread == NULL) { | break; | ||||
| /* NOTE: Task pool is created from non-main thread which is not | case TASK_POOL_BACKGROUND: | ||||
| * managed by the task scheduler. We identify ourselves as thread ID | case TASK_POOL_BACKGROUND_SERIAL: | ||||
| * 0 but we do not use scheduler's TLS storage and use our own | background_task_pool_create(pool); | ||||
| * instead to avoid any possible threading conflicts. | break; | ||||
| */ | |||||
| pool->thread_id = 0; | |||||
| pool->use_local_tls = true; | |||||
| #ifndef NDEBUG | |||||
| pool->creator_thread_id = pthread_self(); | |||||
| #endif | |||||
| initialize_task_tls(&pool->local_tls); | |||||
| } | |||||
| else { | |||||
| pool->thread_id = thread->id; | |||||
| } | |||||
| } | } | ||||
| #ifdef DEBUG_STATS | |||||
| pool->mempool_stats = (TaskMemPoolStats *)MEM_callocN( | |||||
| sizeof(*pool->mempool_stats) * (scheduler->num_threads + 1), "per-taskpool mempool stats"); | |||||
| #endif | |||||
| /* Ensure malloc will go fine from threads, | |||||
| * | |||||
| * This is needed because we could be in main thread here | |||||
| * and malloc could be non-thread safe at this point because | |||||
| * no other jobs are running. | |||||
| */ | |||||
| BLI_threaded_malloc_begin(); | |||||
| return pool; | return pool; | ||||
| } | } | ||||
| /** | /** | ||||
| * Create a normal task pool. Tasks will be executed as soon as they are added. | * Create a normal task pool. Tasks will be executed as soon as they are added. | ||||
| */ | */ | ||||
| TaskPool *BLI_task_pool_create(TaskScheduler *scheduler, void *userdata, TaskPriority priority) | TaskPool *BLI_task_pool_create(void *userdata, TaskPriority priority) | ||||
| { | { | ||||
| return task_pool_create_ex(scheduler, userdata, false, false, priority); | return task_pool_create_ex(userdata, TASK_POOL_TBB, priority); | ||||
| } | } | ||||
| /** | /** | ||||
| * Create a background task pool. | * Create a background task pool. | ||||
| * In multi-threaded context, there is no differences with #BLI_task_pool_create(), | * In multi-threaded context, there is no differences with #BLI_task_pool_legacy_create(), | ||||
| * but in single-threaded case it is ensured to have at least one worker thread to run on | * but in single-threaded case it is ensured to have at least one worker thread to run on | ||||
| * (i.e. you don't have to call #BLI_task_pool_work_and_wait | * (i.e. you don't have to call #BLI_task_pool_legacy_work_and_wait | ||||
| * on it to be sure it will be processed). | * on it to be sure it will be processed). | ||||
| * | * | ||||
| * \note Background pools are non-recursive | * \note Background pools are non-recursive | ||||
| * (that is, you should not create other background pools in tasks assigned to a background pool, | * (that is, you should not create other background pools in tasks assigned to a background pool, | ||||
| * they could end never being executed, since the 'fallback' background thread is already | * they could end never being executed, since the 'fallback' background thread is already | ||||
| * busy with parent task in single-threaded context). | * busy with parent task in single-threaded context). | ||||
| */ | */ | ||||
| TaskPool *BLI_task_pool_create_background(TaskScheduler *scheduler, | TaskPool *BLI_task_pool_create_background(void *userdata, TaskPriority priority) | ||||
| void *userdata, | |||||
| TaskPriority priority) | |||||
| { | { | ||||
| return task_pool_create_ex(scheduler, userdata, true, false, priority); | return task_pool_create_ex(userdata, TASK_POOL_BACKGROUND, priority); | ||||
| } | } | ||||
| /** | /** | ||||
| * Similar to BLI_task_pool_create() but does not schedule any tasks for execution | * Similar to BLI_task_pool_legacy_create() but does not schedule any tasks for execution | ||||
| * for until BLI_task_pool_work_and_wait() is called. This helps reducing threading | * for until BLI_task_pool_legacy_work_and_wait() is called. This helps reducing threading | ||||
| * overhead when pushing huge amount of small initial tasks from the main thread. | * overhead when pushing huge amount of small initial tasks from the main thread. | ||||
| */ | */ | ||||
| TaskPool *BLI_task_pool_create_suspended(TaskScheduler *scheduler, | TaskPool *BLI_task_pool_create_suspended(void *userdata, TaskPriority priority) | ||||
| void *userdata, | |||||
| TaskPriority priority) | |||||
| { | { | ||||
| return task_pool_create_ex(scheduler, userdata, false, true, priority); | return task_pool_create_ex(userdata, TASK_POOL_TBB_SUSPENDED, priority); | ||||
| } | } | ||||
| void BLI_task_pool_free(TaskPool *pool) | /** | ||||
| * Single threaded task pool that executes pushed task immediately, for | |||||
| * debugging purposes. | |||||
| */ | |||||
| TaskPool *BLI_task_pool_create_no_threads(void *userdata) | |||||
| { | { | ||||
| BLI_task_pool_cancel(pool); | return task_pool_create_ex(userdata, TASK_POOL_NO_THREADS, TASK_PRIORITY_HIGH); | ||||
| } | |||||
| BLI_mutex_end(&pool->num_mutex); | |||||
| BLI_condition_end(&pool->num_cond); | |||||
| BLI_mutex_end(&pool->user_mutex); | |||||
| #ifdef DEBUG_STATS | /** | ||||
| printf("Thread ID Allocated Reused Discarded\n"); | * Task pool that executeds one task after the other, possibly on different threads | ||||
| for (int i = 0; i < pool->scheduler->num_threads + 1; i++) { | * but never in parallel. | ||||
| printf("%02d %05d %05d %05d\n", | */ | ||||
| i, | TaskPool *BLI_task_pool_create_background_serial(void *userdata, TaskPriority priority) | ||||
| pool->mempool_stats[i].num_alloc, | { | ||||
| pool->mempool_stats[i].num_reuse, | return task_pool_create_ex(userdata, TASK_POOL_BACKGROUND_SERIAL, priority); | ||||
| pool->mempool_stats[i].num_discard); | |||||
| } | } | ||||
| MEM_freeN(pool->mempool_stats); | |||||
| #endif | |||||
| if (pool->use_local_tls) { | void BLI_task_pool_free(TaskPool *pool) | ||||
| free_task_tls(&pool->local_tls); | { | ||||
| switch (pool->type) { | |||||
| case TASK_POOL_TBB: | |||||
| case TASK_POOL_TBB_SUSPENDED: | |||||
| case TASK_POOL_NO_THREADS: | |||||
| tbb_task_pool_free(pool); | |||||
| break; | |||||
| case TASK_POOL_BACKGROUND: | |||||
| case TASK_POOL_BACKGROUND_SERIAL: | |||||
| background_task_pool_free(pool); | |||||
| break; | |||||
| } | } | ||||
| BLI_mutex_end(&pool->user_mutex); | |||||
| MEM_freeN(pool); | MEM_freeN(pool); | ||||
| BLI_threaded_malloc_end(); | BLI_threaded_malloc_end(); | ||||
| } | } | ||||
| BLI_INLINE bool task_can_use_local_queues(TaskPool *pool, int thread_id) | |||||
| { | |||||
| return (thread_id != -1 && (thread_id != pool->thread_id || pool->do_work)); | |||||
| } | |||||
| static void task_pool_push(TaskPool *pool, | |||||
| TaskRunFunction run, | |||||
| void *taskdata, | |||||
| bool free_taskdata, | |||||
| TaskFreeFunction freedata, | |||||
| int thread_id) | |||||
| { | |||||
| /* Allocate task and fill it's properties. */ | |||||
| Task *task = task_alloc(pool, thread_id); | |||||
| task->run = run; | |||||
| task->taskdata = taskdata; | |||||
| task->free_taskdata = free_taskdata; | |||||
| task->freedata = freedata; | |||||
| task->pool = pool; | |||||
| /* For suspended pools we put everything yo a global queue first | |||||
| * and exit as soon as possible. | |||||
| * | |||||
| * This tasks will be moved to actual execution when pool is | |||||
| * activated by work_and_wait(). | |||||
| */ | |||||
| if (pool->is_suspended) { | |||||
| BLI_addhead(&pool->suspended_queue, task); | |||||
| atomic_fetch_and_add_z(&pool->num_suspended, 1); | |||||
| return; | |||||
| } | |||||
| /* Populate to any local queue first, this is cheapest push ever. */ | |||||
| if (task_can_use_local_queues(pool, thread_id)) { | |||||
| ASSERT_THREAD_ID(pool->scheduler, thread_id); | |||||
| TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id); | |||||
| /* Try to push to a local execution queue. | |||||
| * These tasks will be picked up next. | |||||
| */ | |||||
| if (tls->num_local_queue < LOCAL_QUEUE_SIZE) { | |||||
| tls->local_queue[tls->num_local_queue] = task; | |||||
| tls->num_local_queue++; | |||||
| return; | |||||
| } | |||||
| /* If we are in the delayed tasks push mode, we push tasks to a | |||||
| * temporary local queue first without any locks, and then move them | |||||
| * to global execution queue with a single lock. | |||||
| */ | |||||
| if (tls->do_delayed_push && tls->num_delayed_queue < DELAYED_QUEUE_SIZE) { | |||||
| tls->delayed_queue[tls->num_delayed_queue] = task; | |||||
| tls->num_delayed_queue++; | |||||
| return; | |||||
| } | |||||
| } | |||||
| /* Do push to a global execution pool, slowest possible method, | |||||
| * causes quite reasonable amount of threading overhead. | |||||
| */ | |||||
| task_scheduler_push(pool->scheduler, task, pool->priority); | |||||
| } | |||||
| void BLI_task_pool_push(TaskPool *pool, | void BLI_task_pool_push(TaskPool *pool, | ||||
| TaskRunFunction run, | TaskRunFunction run, | ||||
| void *taskdata, | void *taskdata, | ||||
| bool free_taskdata, | bool free_taskdata, | ||||
| TaskFreeFunction freedata) | TaskFreeFunction freedata) | ||||
| { | { | ||||
| task_pool_push(pool, run, taskdata, free_taskdata, freedata, -1); | Task task(pool, run, taskdata, free_taskdata, freedata); | ||||
| } | |||||
| void BLI_task_pool_push_from_thread(TaskPool *pool, | switch (pool->type) { | ||||
| TaskRunFunction run, | case TASK_POOL_TBB: | ||||
| void *taskdata, | case TASK_POOL_TBB_SUSPENDED: | ||||
| bool free_taskdata, | case TASK_POOL_NO_THREADS: | ||||
| TaskFreeFunction freedata, | tbb_task_pool_run(pool, std::move(task)); | ||||
| int thread_id) | break; | ||||
| { | case TASK_POOL_BACKGROUND: | ||||
| task_pool_push(pool, run, taskdata, free_taskdata, freedata, thread_id); | case TASK_POOL_BACKGROUND_SERIAL: | ||||
| background_task_pool_run(pool, std::move(task)); | |||||
| break; | |||||
| } | |||||
| } | } | ||||
| void BLI_task_pool_work_and_wait(TaskPool *pool) | void BLI_task_pool_work_and_wait(TaskPool *pool) | ||||
| { | { | ||||
| TaskThreadLocalStorage *tls = get_task_tls(pool, pool->thread_id); | switch (pool->type) { | ||||
| TaskScheduler *scheduler = pool->scheduler; | case TASK_POOL_TBB: | ||||
| case TASK_POOL_TBB_SUSPENDED: | |||||
| if (atomic_fetch_and_and_uint8((uint8_t *)&pool->is_suspended, 0)) { | case TASK_POOL_NO_THREADS: | ||||
| if (pool->num_suspended) { | tbb_task_pool_work_and_wait(pool); | ||||
| task_pool_num_increase(pool, pool->num_suspended); | |||||
| BLI_mutex_lock(&scheduler->queue_mutex); | |||||
| BLI_movelisttolist(&scheduler->queue, &pool->suspended_queue); | |||||
| BLI_condition_notify_all(&scheduler->queue_cond); | |||||
| BLI_mutex_unlock(&scheduler->queue_mutex); | |||||
| pool->num_suspended = 0; | |||||
| } | |||||
| } | |||||
| pool->do_work = true; | |||||
| ASSERT_THREAD_ID(pool->scheduler, pool->thread_id); | |||||
| handle_local_queue(tls, pool->thread_id); | |||||
| BLI_mutex_lock(&pool->num_mutex); | |||||
| while (pool->num != 0) { | |||||
| Task *task, *work_task = NULL; | |||||
| bool found_task = false; | |||||
| BLI_mutex_unlock(&pool->num_mutex); | |||||
| BLI_mutex_lock(&scheduler->queue_mutex); | |||||
| /* find task from this pool. if we get a task from another pool, | |||||
| * we can get into deadlock */ | |||||
| for (task = (Task *)scheduler->queue.first; task; task = task->next) { | |||||
| if (task->pool == pool) { | |||||
| work_task = task; | |||||
| found_task = true; | |||||
| BLI_remlink(&scheduler->queue, task); | |||||
| break; | break; | ||||
| } | case TASK_POOL_BACKGROUND: | ||||
| } | case TASK_POOL_BACKGROUND_SERIAL: | ||||
| background_task_pool_work_and_wait(pool); | |||||
| BLI_mutex_unlock(&scheduler->queue_mutex); | |||||
| /* if found task, do it, otherwise wait until other tasks are done */ | |||||
| if (found_task) { | |||||
| /* run task */ | |||||
| BLI_assert(!tls->do_delayed_push); | |||||
| work_task->run(pool, work_task->taskdata, pool->thread_id); | |||||
| BLI_assert(!tls->do_delayed_push); | |||||
| /* delete task */ | |||||
| task_free(pool, task, pool->thread_id); | |||||
| /* Handle all tasks from local queue. */ | |||||
| handle_local_queue(tls, pool->thread_id); | |||||
| /* notify pool task was done */ | |||||
| task_pool_num_decrease(pool, 1); | |||||
| } | |||||
| BLI_mutex_lock(&pool->num_mutex); | |||||
| if (pool->num == 0) { | |||||
| break; | break; | ||||
| } | } | ||||
| if (!found_task) { | |||||
| BLI_condition_wait(&pool->num_cond, &pool->num_mutex); | |||||
| } | |||||
| } | |||||
| BLI_mutex_unlock(&pool->num_mutex); | |||||
| BLI_assert(tls->num_local_queue == 0); | |||||
| } | |||||
| void BLI_task_pool_work_wait_and_reset(TaskPool *pool) | |||||
| { | |||||
| BLI_task_pool_work_and_wait(pool); | |||||
| pool->do_work = false; | |||||
| pool->is_suspended = pool->start_suspended; | |||||
| } | } | ||||
| void BLI_task_pool_cancel(TaskPool *pool) | void BLI_task_pool_cancel(TaskPool *pool) | ||||
| { | { | ||||
| pool->do_cancel = true; | switch (pool->type) { | ||||
| case TASK_POOL_TBB: | |||||
| task_scheduler_clear(pool->scheduler, pool); | case TASK_POOL_TBB_SUSPENDED: | ||||
| case TASK_POOL_NO_THREADS: | |||||
| /* wait until all entries are cleared */ | tbb_task_pool_cancel(pool); | ||||
| BLI_mutex_lock(&pool->num_mutex); | break; | ||||
| while (pool->num) { | case TASK_POOL_BACKGROUND: | ||||
| BLI_condition_wait(&pool->num_cond, &pool->num_mutex); | case TASK_POOL_BACKGROUND_SERIAL: | ||||
| background_task_pool_cancel(pool); | |||||
| break; | |||||
| } | } | ||||
| BLI_mutex_unlock(&pool->num_mutex); | |||||
| pool->do_cancel = false; | |||||
| } | } | ||||
| bool BLI_task_pool_canceled(TaskPool *pool) | bool BLI_task_pool_canceled(TaskPool *pool) | ||||
| { | { | ||||
| return pool->do_cancel; | switch (pool->type) { | ||||
| case TASK_POOL_TBB: | |||||
| case TASK_POOL_TBB_SUSPENDED: | |||||
| case TASK_POOL_NO_THREADS: | |||||
| return tbb_task_pool_canceled(pool); | |||||
| case TASK_POOL_BACKGROUND: | |||||
| case TASK_POOL_BACKGROUND_SERIAL: | |||||
| return background_task_pool_canceled(pool); | |||||
| } | |||||
| BLI_assert("BLI_task_pool_canceled: Control flow should not come here!"); | |||||
| return false; | |||||
| } | } | ||||
| void *BLI_task_pool_user_data(TaskPool *pool) | void *BLI_task_pool_user_data(TaskPool *pool) | ||||
| { | { | ||||
| return pool->userdata; | return pool->userdata; | ||||
| } | } | ||||
| ThreadMutex *BLI_task_pool_user_mutex(TaskPool *pool) | ThreadMutex *BLI_task_pool_user_mutex(TaskPool *pool) | ||||
| { | { | ||||
| return &pool->user_mutex; | return &pool->user_mutex; | ||||
| } | } | ||||
| No newline at end of file | |||||
| int BLI_task_pool_creator_thread_id(TaskPool *pool) | |||||
| { | |||||
| return pool->thread_id; | |||||
| } | |||||
| void BLI_task_pool_delayed_push_begin(TaskPool *pool, int thread_id) | |||||
| { | |||||
| if (task_can_use_local_queues(pool, thread_id)) { | |||||
| ASSERT_THREAD_ID(pool->scheduler, thread_id); | |||||
| TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id); | |||||
| tls->do_delayed_push = true; | |||||
| } | |||||
| } | |||||
| void BLI_task_pool_delayed_push_end(TaskPool *pool, int thread_id) | |||||
| { | |||||
| if (task_can_use_local_queues(pool, thread_id)) { | |||||
| ASSERT_THREAD_ID(pool->scheduler, thread_id); | |||||
| TaskThreadLocalStorage *tls = get_task_tls(pool, thread_id); | |||||
| BLI_assert(tls->do_delayed_push); | |||||
| task_scheduler_push_all(pool->scheduler, pool, tls->delayed_queue, tls->num_delayed_queue); | |||||
| tls->do_delayed_push = false; | |||||
| tls->num_delayed_queue = 0; | |||||
| } | |||||
| } | |||||
Add this to fix build without TBB: