Changeset View
Changeset View
Standalone View
Standalone View
source/blender/blenlib/intern/task.c
| Show All 18 Lines | |||||
| */ | */ | ||||
| /** \file blender/blenlib/intern/task.c | /** \file blender/blenlib/intern/task.c | ||||
| * \ingroup bli | * \ingroup bli | ||||
| * | * | ||||
| * A generic task system which can be used for any task based subsystem. | * A generic task system which can be used for any task based subsystem. | ||||
| */ | */ | ||||
| /* The main idea of this atempt is to get a fully lock-free task management, we only keep locks and conditions | |||||
| * to handle long-term sleeping of idle threads. | |||||
| * | |||||
| * To achieve this, we keep all task management contained into a single thread, the manager, which handles: | |||||
| * - allocation of tasks as needed. | |||||
| * - moving tasks between 'free' and 'todo' lists. | |||||
| * - all other additional handling (like pool->num inc/dec, etc.). | |||||
| * | |||||
| * Workers (and main) threads are only allowed to 'acquire' a task (which then become private to that thread), | |||||
| * work on it and 'release' it. | |||||
| * | |||||
| * To achieve this, the core member is Task->owner, which is used both as 'status' and 'thread ownership', and is | |||||
| * changed with atomic cas operations (such that we can be sure only one thread, worker or manager, has ownership | |||||
| * of the task at a given time). | |||||
| * | |||||
| * This is not working very well currently, blender starts and can load some files, but others | |||||
| * (complex with rigged chars e.g.) will trigger segfaults and such during depsgraph evaluation, often because a pool | |||||
| * is being used after freeing, as if we were suffering from re-ordering... Not sure why currently, | |||||
| * atomic ops should generate memory barriers and hence ensure coherence of data here. :/ | |||||
| */ | |||||
| #include <stdlib.h> | #include <stdlib.h> | ||||
| #include "MEM_guardedalloc.h" | #include "MEM_guardedalloc.h" | ||||
| #include "DNA_listBase.h" | #include "DNA_listBase.h" | ||||
| #include "BLI_listbase.h" | #include "BLI_listbase.h" | ||||
| #include "BLI_math.h" | #include "BLI_math.h" | ||||
| #include "BLI_task.h" | #include "BLI_task.h" | ||||
| #include "BLI_threads.h" | #include "BLI_threads.h" | ||||
| #include "atomic_ops.h" | #include "atomic_ops.h" | ||||
| /* Define this to enable some detailed statistic print. */ | #define DEBUG_PRINTF //printf | ||||
| #undef DEBUG_STATS | |||||
| /* Types */ | |||||
| /* Number of per-thread pre-allocated tasks. | /* Number of pre-allocated tasks per schedulers. | ||||
| * | |||||
| * For more details see description of TaskMemPool. | |||||
| */ | */ | ||||
| #define MEMPOOL_SIZE 256 | #define MEMPOOL_SIZE 256 | ||||
| /* Parameters controlling how much we spin in nanosleeps before switching to real condition-controlled sleeping. */ | |||||
| #define NANOSLEEP_MAX_SPINNING 200 /* Number of failed attempt to get a task before going to condition waiting. */ | |||||
| #define NANOSLEEP_MAX_SPINNING_MANAGER 1000 /* Number of void spinning in manager before going to condition waiting. */ | |||||
| #define NANOSLEEP_DURATION (const struct timespec[]){{0, 200L}} /* Nanosleep duration (in nano-seconds). */ | |||||
| #define TASK_MANAGED UINT32_MAX /* Task is owned by manager thread. */ | |||||
| #define TASK_FREE TASK_MANAGED - 1 /* Task is free to be used (acquired) by a new pool. */ | |||||
| #define TASK_TODO TASK_MANAGED - 2 /* Task is free to be processed by any worker. */ | |||||
| #define TASK_DONE TASK_MANAGED - 3 /* Task has been processed and may be recycled by manager thread. */ | |||||
| #define MANAGER_DO (1 << 31) | |||||
| /* Types */ | |||||
| typedef struct Task { | typedef struct Task { | ||||
| struct Task *next, *prev; | struct Task *next; | ||||
| uint32_t owner; | |||||
| int priority; | |||||
| TaskRunFunction run; | TaskRunFunction run; | ||||
| void *taskdata; | void *taskdata; | ||||
| bool free_taskdata; | bool free_taskdata; | ||||
| TaskFreeFunction freedata; | TaskFreeFunction freedata; | ||||
| TaskPool *pool; | TaskPool *pool; | ||||
| } Task; | } Task; | ||||
| /* This is a per-thread storage of pre-allocated tasks. | |||||
| * | |||||
| * The idea behind this is simple: reduce amount of malloc() calls when pushing | |||||
| * new task to the pool. This is done by keeping memory from the tasks which | |||||
| * were finished already, so instead of freeing that memory we put it to the | |||||
| * pool for the later re-use. | |||||
| * | |||||
| * The tricky part here is to avoid any inter-thread synchronization, hence no | |||||
| * lock must exist around this pool. The pool will become an owner of the pointer | |||||
| * from freed task, and only corresponding thread will be able to use this pool | |||||
| * (no memory stealing and such). | |||||
| * | |||||
| * This leads to the following use of the pool: | |||||
| * | |||||
| * - task_push() should provide proper thread ID from which the task is being | |||||
| * pushed from. | |||||
| * | |||||
| * - Task allocation function which check corresponding memory pool and if there | |||||
| * is any memory in there it'll mark memory as re-used, remove it from the pool | |||||
| * and use that memory for the new task. | |||||
| * | |||||
| * At this moment task queue owns the memory. | |||||
| * | |||||
| * - When task is done and task_free() is called the memory will be put to the | |||||
| * pool which corresponds to a thread which handled the task. | |||||
| */ | |||||
| typedef struct TaskMemPool { | |||||
| /* Number of pre-allocated tasks in the pool. */ | |||||
| int num_tasks; | |||||
| /* Pre-allocated task memory pointers. */ | |||||
| Task *tasks[MEMPOOL_SIZE]; | |||||
| } TaskMemPool; | |||||
| #ifdef DEBUG_STATS | |||||
| typedef struct TaskMemPoolStats { | |||||
| /* Number of allocations. */ | |||||
| int num_alloc; | |||||
| /* Number of avoided allocations (pointer was re-used from the pool). */ | |||||
| int num_reuse; | |||||
| /* Number of discarded memory due to pool saturation, */ | |||||
| int num_discard; | |||||
| } TaskMemPoolStats; | |||||
| #endif | |||||
| struct TaskPool { | struct TaskPool { | ||||
| TaskScheduler *scheduler; | TaskScheduler *scheduler; | ||||
| volatile size_t num; | bool is_new; | ||||
| volatile size_t done; | size_t num; | ||||
| size_t num_threads; | size_t num_threads; | ||||
| size_t currently_running_tasks; | size_t currently_running_tasks; | ||||
| ThreadMutex num_mutex; | |||||
| ThreadCondition num_cond; | |||||
| void *userdata; | void *userdata; | ||||
| ThreadMutex user_mutex; | ThreadMutex user_mutex; | ||||
| volatile bool do_cancel; | bool do_cancel; | ||||
| uint8_t do_clear_tasks; | |||||
| /* If set, this pool may never be work_and_wait'ed, which means TaskScheduler | /* If set, this pool may never be work_and_wait'ed, which means TaskScheduler | ||||
| * has to use its special background fallback thread in case we are in | * has to use its special background fallback thread in case we are in | ||||
| * single-threaded situation. | * single-threaded situation. | ||||
| */ | */ | ||||
| bool run_in_background; | bool run_in_background; | ||||
| /* This pool is used for caching task pointers for thread id 0. | |||||
| * This could either point to a global scheduler's task_mempool[0] if the | |||||
| * pool is handled form the main thread or point to task_mempool_local | |||||
| * otherwise. | |||||
| * | |||||
| * This way we solve possible threading conflicts accessing same global | |||||
| * memory pool from multiple threads from which wait_work() is called. | |||||
| */ | |||||
| TaskMemPool *task_mempool; | |||||
| TaskMemPool task_mempool_local; | |||||
| #ifdef DEBUG_STATS | |||||
| TaskMemPoolStats *mempool_stats; | |||||
| #endif | |||||
| }; | }; | ||||
| struct TaskScheduler { | struct TaskScheduler { | ||||
| pthread_t *threads; | pthread_t *threads; | ||||
| struct TaskThread *task_threads; | struct TaskThread *task_threads; | ||||
| TaskMemPool *task_mempool; | |||||
| int num_threads; | int num_threads; | ||||
| bool background_thread_only; | bool background_thread_only; | ||||
| ListBase queue; | pthread_t manager; | ||||
| ThreadMutex queue_mutex; | uint32_t num_manage; /* Counter of tasks needing management, with also a generic bit flag... */ | ||||
| ThreadCondition queue_cond; | Task todo_tasks; /* Single linked list. */ | ||||
| Task free_tasks; /* Single linked list. */ | |||||
| Task *next_free; | |||||
| size_t num_tasks_tot; /* Number of allocated tasks by manager thread. */ | |||||
| size_t num_tasks_free; /* Number of tasks detected as free by manager thread. */ | |||||
| ThreadMutex manager_mutex; | |||||
| ThreadCondition manager_condition; | |||||
| bool is_manager_sleeping; | |||||
| ThreadMutex workers_mutex; | |||||
| ThreadCondition workers_condition; | |||||
| size_t num_workers_sleeping; | |||||
| volatile bool do_exit; | uint8_t do_exit; | ||||
| }; | }; | ||||
| typedef struct TaskThread { | typedef struct TaskThread { | ||||
| TaskScheduler *scheduler; | TaskScheduler *scheduler; | ||||
| int id; | int id; | ||||
| } TaskThread; | } TaskThread; | ||||
| /* Helper */ | /* Helper */ | ||||
| static void task_data_free(Task *task, const int thread_id) | BLI_INLINE int get_thread_id(TaskScheduler *scheduler) | ||||
| { | |||||
| const pthread_t pthread_id = pthread_self(); | |||||
| for (int thread_id = scheduler->num_threads; thread_id--;) { | |||||
| if (pthread_equal(scheduler->threads[thread_id], pthread_id)) { | |||||
| return thread_id + 1; | |||||
| } | |||||
| } | |||||
| return 0; /* Main thread. */ | |||||
| } | |||||
| BLI_INLINE void task_data_free(Task *task, const int thread_id) | |||||
| { | { | ||||
| if (task->free_taskdata) { | if (task->free_taskdata) { | ||||
| void *taskdata = task->taskdata; | |||||
| if (taskdata != NULL) { | |||||
| /* We need atomic switch of taskdata pointer to avoid concurrent freeing. */ | |||||
| if (atomic_cas_z((size_t *)&task->taskdata, *(size_t *)&taskdata, 0 /* NULL */) == *(size_t *)&taskdata) { | |||||
| if (task->freedata) { | if (task->freedata) { | ||||
| task->freedata(task->pool, task->taskdata, thread_id); | task->freedata(task->pool, taskdata, thread_id); | ||||
| } | } | ||||
| else { | else { | ||||
| MEM_freeN(task->taskdata); | MEM_freeN(taskdata); | ||||
| } | |||||
| } | |||||
| } | } | ||||
| } | } | ||||
| } | } | ||||
| BLI_INLINE TaskMemPool *get_task_mempool(TaskPool *pool, const int thread_id) | BLI_INLINE Task *task_acquire(TaskScheduler *scheduler, const int thread_id) | ||||
| { | { | ||||
| if (thread_id == 0) { | BLI_assert(thread_id >= 0); | ||||
| return pool->task_mempool; | BLI_assert(thread_id <= scheduler->num_threads); | ||||
| Task *t; | |||||
| do { | |||||
| for (t = scheduler->next_free; t && atomic_cas_uint32(&t->owner, TASK_FREE, thread_id) != TASK_FREE; t = t->next); | |||||
| if (t == NULL) { | |||||
| scheduler->next_free = scheduler->free_tasks.next; | |||||
| atomic_fetch_and_or_uint32(&scheduler->num_manage, MANAGER_DO); | |||||
| if (scheduler->is_manager_sleeping) { | |||||
| BLI_mutex_lock(&scheduler->manager_mutex); | |||||
| BLI_condition_notify_all(&scheduler->manager_condition); | |||||
| BLI_mutex_unlock(&scheduler->manager_mutex); | |||||
| } | } | ||||
| return &pool->scheduler->task_mempool[thread_id]; | } | ||||
| } while (t == NULL); /* This is same as spinlock, we trust manager thread to always have enough free tasks! */ | |||||
| scheduler->next_free = t->next ? t->next : scheduler->free_tasks.next; | |||||
| return t; | |||||
| } | } | ||||
| static Task *task_alloc(TaskPool *pool, const int thread_id) | BLI_INLINE void task_todo_push(TaskScheduler *scheduler, Task *task, const int thread_id) | ||||
| { | { | ||||
| assert(thread_id <= pool->scheduler->num_threads); | BLI_assert(thread_id >= 0); | ||||
| if (thread_id != -1) { | BLI_assert(thread_id <= scheduler->num_threads); | ||||
| assert(thread_id >= 0); | |||||
| TaskMemPool *mem_pool = get_task_mempool(pool, thread_id); | atomic_add_and_fetch_uint32(&scheduler->num_manage, 1); | ||||
| /* Try to re-use task memory from a thread local storage. */ | if (atomic_cas_uint32(&task->owner, thread_id, TASK_TODO) != thread_id) { | ||||
| if (mem_pool->num_tasks > 0) { | BLI_assert(0); | ||||
| --mem_pool->num_tasks; | |||||
| /* Success! We've just avoided task allocation. */ | |||||
| #ifdef DEBUG_STATS | |||||
| pool->mempool_stats[thread_id].num_reuse++; | |||||
| #endif | |||||
| return mem_pool->tasks[mem_pool->num_tasks]; | |||||
| } | } | ||||
| /* We are doomed to allocate new task data. */ | |||||
| #ifdef DEBUG_STATS | if (scheduler->is_manager_sleeping) { | ||||
| pool->mempool_stats[thread_id].num_alloc++; | BLI_mutex_lock(&scheduler->manager_mutex); | ||||
| #endif | BLI_condition_notify_all(&scheduler->manager_condition); | ||||
| BLI_mutex_unlock(&scheduler->manager_mutex); | |||||
| } | } | ||||
| return MEM_mallocN(sizeof(Task), "New task"); | |||||
| DEBUG_PRINTF("new todo task (%p), pushed from thread %d...\n", task, thread_id); | |||||
| } | } | ||||
| static void task_free(TaskPool *pool, Task *task, const int thread_id) | BLI_INLINE void task_release(TaskScheduler *scheduler, Task *task, const int thread_id) | ||||
| { | { | ||||
| BLI_assert(thread_id >= 0); | |||||
| BLI_assert(thread_id <= scheduler->num_threads); | |||||
| task_data_free(task, thread_id); | task_data_free(task, thread_id); | ||||
| assert(thread_id >= 0); | |||||
| assert(thread_id <= pool->scheduler->num_threads); | atomic_add_and_fetch_uint32(&scheduler->num_manage, 1); | ||||
| TaskMemPool *mem_pool = get_task_mempool(pool, thread_id); | if (atomic_cas_uint32(&task->owner, thread_id, TASK_DONE) != thread_id) { | ||||
| if (mem_pool->num_tasks < MEMPOOL_SIZE - 1) { | BLI_assert(0); | ||||
| /* Successfully allowed the task to be re-used later. */ | |||||
| mem_pool->tasks[mem_pool->num_tasks] = task; | |||||
| ++mem_pool->num_tasks; | |||||
| } | } | ||||
| else { | |||||
| /* Local storage saturated, no other way than just discard | if (scheduler->is_manager_sleeping) { | ||||
| * the memory. | BLI_mutex_lock(&scheduler->manager_mutex); | ||||
| * | BLI_condition_notify_all(&scheduler->manager_condition); | ||||
| * TODO(sergey): We can perhaps store such pointer in a global | BLI_mutex_unlock(&scheduler->manager_mutex); | ||||
| * scheduler pool, maybe it'll be faster than discarding and | |||||
| * allocating again. | |||||
| */ | |||||
| MEM_freeN(task); | |||||
| #ifdef DEBUG_STATS | |||||
| pool->mempool_stats[thread_id].num_discard++; | |||||
| #endif | |||||
| } | } | ||||
| DEBUG_PRINTF("done with task (%p), released from thread %d...\n", task, thread_id); | |||||
| } | } | ||||
| /* Task Scheduler */ | /* Task Scheduler */ | ||||
| static void task_pool_num_decrease(TaskPool *pool, size_t done) | BLI_INLINE void task_pool_num_decrease(Task *task, TaskPool *pool, size_t done) | ||||
| { | { | ||||
| BLI_mutex_lock(&pool->num_mutex); | /* BLI_assert(pool->num >= done); */ /* Cannot be accurate due to unconstraint order of ops in threads... */ | ||||
| const size_t num = atomic_sub_and_fetch_z(&pool->num, done); | |||||
| DEBUG_PRINTF("[%p] pool->num-%lu = %ld (%p)\n", pool, done, *(off_t *)&num, task); | |||||
| } | |||||
| BLI_INLINE void task_pool_num_increase(Task *task, TaskPool *pool) | |||||
| { | |||||
| const size_t num = atomic_add_and_fetch_z(&pool->num, 1); | |||||
| DEBUG_PRINTF("[%p] pool->num+1 = %ld (%p)\n", pool, *(off_t *)&num, task); | |||||
| } | |||||
| BLI_INLINE bool task_find( | |||||
| TaskScheduler * restrict scheduler, Task ** restrict task, TaskPool * restrict pool, | |||||
| const int thread_id) | |||||
| { | |||||
| Task *current_task; | |||||
| bool found_task = false; | |||||
| const bool is_main = thread_id == 0; | |||||
| // DEBUG_PRINTF("%s: next todo: %p\n", __func__, scheduler->todo_tasks.next); | |||||
| /* This check allows us to completely avoid a spinlock in case the queue is reported empty. | |||||
| * There is a possibility of race condition here (check being done after task has been added to queue, | |||||
| * and before counter is increased), but this should not be an issue in practice, quite unlikely and | |||||
| * would just delay a bit that thread going back to work. */ | |||||
| if (scheduler->todo_tasks.next != NULL) { | |||||
| /* NOTE: We almost always do single iteration here, so spin time is most of the time is really low. */ | |||||
| for (current_task = scheduler->todo_tasks.next; | |||||
| current_task != NULL; | |||||
| current_task = current_task->next) | |||||
| { | |||||
| if (atomic_cas_uint32(¤t_task->owner, TASK_TODO, thread_id) == TASK_TODO) { | |||||
| DEBUG_PRINTF("acquired task %p\n", current_task); | |||||
| TaskPool *current_pool = current_task->pool; | |||||
| BLI_assert(pool->num >= done); | if (!ELEM(pool, NULL, current_pool)) { | ||||
| if (atomic_cas_uint32(¤t_task->owner, thread_id, TASK_TODO) != thread_id) { | |||||
| BLI_assert(0); | |||||
| } | |||||
| DEBUG_PRINTF("released task %p\n", current_task); | |||||
| continue; | |||||
| } | |||||
| if (current_pool->do_clear_tasks) { | |||||
| if (atomic_cas_uint32(¤t_task->owner, thread_id, TASK_TODO) != thread_id) { | |||||
| BLI_assert(0); | |||||
| } | |||||
| DEBUG_PRINTF("released task %p\n", current_task); | |||||
| continue; | |||||
| } | |||||
| if (!is_main && scheduler->background_thread_only && !current_pool->run_in_background) { | |||||
| if (atomic_cas_uint32(¤t_task->owner, thread_id, TASK_TODO) != thread_id) { | |||||
| BLI_assert(0); | |||||
| } | |||||
| DEBUG_PRINTF("released task %p\n", current_task); | |||||
| continue; | |||||
| } | |||||
| pool->num -= done; | if (atomic_add_and_fetch_z(¤t_pool->currently_running_tasks, 1) <= current_pool->num_threads || | ||||
| atomic_sub_and_fetch_z(&pool->currently_running_tasks, done); | is_main || current_pool->num_threads == 0) | ||||
| pool->done += done; | { | ||||
| *task = current_task; | |||||
| found_task = true; | |||||
| if (pool->num == 0) | // atomic_fetch_and_add_z(&scheduler->num_manage, 1); | ||||
| BLI_condition_notify_all(&pool->num_cond); | // if (scheduler->is_manager_sleeping) { | ||||
| // BLI_mutex_lock(&scheduler->manager_mutex); | |||||
| // BLI_condition_notify_all(&scheduler->manager_condition); | |||||
| // BLI_mutex_unlock(&scheduler->manager_mutex); | |||||
| // } | |||||
| BLI_mutex_unlock(&pool->num_mutex); | break; | ||||
| } | |||||
| else { | |||||
| atomic_sub_and_fetch_z(¤t_pool->currently_running_tasks, 1); | |||||
| if (atomic_cas_uint32(¤t_task->owner, thread_id, TASK_TODO) != thread_id) { | |||||
| BLI_assert(0); | |||||
| } | |||||
| DEBUG_PRINTF("released task %p\n", current_task); | |||||
| } | |||||
| } | |||||
| } | |||||
| } | |||||
| if (found_task) { | |||||
| DEBUG_PRINTF("thread %d working on a task (%p)...\n", thread_id, *task); | |||||
| } | |||||
| return found_task; | |||||
| } | } | ||||
| static void task_pool_num_increase(TaskPool *pool) | BLI_INLINE bool task_wait(TaskScheduler * restrict scheduler, TaskPool * restrict pool, int * restrict loop_count) | ||||
| { | { | ||||
| BLI_mutex_lock(&pool->num_mutex); | /* If we have iterated NANOSLEEP_MAX_SPINNING times without finding a task, go into real sleep. */ | ||||
| if (++(*loop_count) > NANOSLEEP_MAX_SPINNING) { | |||||
| BLI_mutex_lock(&scheduler->workers_mutex); | |||||
| pool->num++; | /* Check again inside the mutex, bad race condition is possible here (though unlikely), | ||||
| BLI_condition_notify_all(&pool->num_cond); | * leading to undying thread... */ | ||||
| if (scheduler->do_exit) { | |||||
| BLI_mutex_unlock(&scheduler->workers_mutex); | |||||
| return true; | |||||
| } | |||||
| BLI_mutex_unlock(&pool->num_mutex); | /* Here again, pool->num may have changed since check in run_and_wait... */ | ||||
| if (pool && pool->num == 0) { | |||||
| BLI_mutex_unlock(&scheduler->workers_mutex); | |||||
| return false; | |||||
| } | } | ||||
| static bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task) | /* Even though this is read outside of mutex lock, there is no real need to use atomic ops here, | ||||
| * changing the value inside mutex should be enough to ensure safety. */ | |||||
| scheduler->num_workers_sleeping++; | |||||
| BLI_condition_wait(&scheduler->workers_condition, &scheduler->workers_mutex); | |||||
| scheduler->num_workers_sleeping--; | |||||
| BLI_mutex_unlock(&scheduler->workers_mutex); | |||||
| *loop_count = 0; | |||||
| } | |||||
| else { | |||||
| nanosleep(NANOSLEEP_DURATION, NULL); | |||||
| } | |||||
| return false; | |||||
| } | |||||
| BLI_INLINE bool task_scheduler_thread_wait_pop(TaskScheduler *scheduler, Task **task, const int thread_id) | |||||
| { | { | ||||
| bool found_task = false; | bool found_task = false; | ||||
| BLI_mutex_lock(&scheduler->queue_mutex); | int loop_count = 0; | ||||
| while (!scheduler->queue.first && !scheduler->do_exit) | DEBUG_PRINTF("thread %d waiting on task...\n", thread_id); | ||||
| BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex); | |||||
| do { | do { | ||||
| Task *current_task; | |||||
| /* Assuming we can only have a void queue in 'exit' case here seems logical (we should only be here after | /* Assuming we can only have a void queue in 'exit' case here seems logical (we should only be here after | ||||
| * our worker thread has been woken up from a condition_wait(), which only happens after a new task was | * our worker thread has been woken up from a condition_wait(), which only happens after a new task was | ||||
| * added to the queue), but it is wrong. | * added to the queue), but it is wrong. | ||||
| * Waiting on condition may wake up the thread even if condition is not signaled (spurious wake-ups), and some | * Waiting on condition may wake up the thread even if condition is not signaled (spurious wake-ups), and some | ||||
| * race condition may also empty the queue **after** condition has been signaled, but **before** awoken thread | * race condition may also empty the queue **after** condition has been signaled, but **before** awoken thread | ||||
| * reaches this point... | * reaches this point... | ||||
| * See http://stackoverflow.com/questions/8594591 | * See http://stackoverflow.com/questions/8594591 | ||||
| * | * | ||||
| * So we only abort here if do_exit is set. | * So we only abort here if do_exit is set. | ||||
| */ | */ | ||||
| if (scheduler->do_exit) { | if (scheduler->do_exit) { | ||||
| BLI_mutex_unlock(&scheduler->queue_mutex); | |||||
| return false; | return false; | ||||
| } | } | ||||
| for (current_task = scheduler->queue.first; | if (!(found_task = task_find(scheduler, task, NULL, thread_id))) { | ||||
| current_task != NULL; | if (task_wait(scheduler, NULL, &loop_count)) { | ||||
| current_task = current_task->next) | return false; | ||||
| { | |||||
| TaskPool *pool = current_task->pool; | |||||
| if (scheduler->background_thread_only && !pool->run_in_background) { | |||||
| continue; | |||||
| } | |||||
| if (atomic_add_and_fetch_z(&pool->currently_running_tasks, 1) <= pool->num_threads || | |||||
| pool->num_threads == 0) | |||||
| { | |||||
| *task = current_task; | |||||
| found_task = true; | |||||
| BLI_remlink(&scheduler->queue, *task); | |||||
| break; | |||||
| } | |||||
| else { | |||||
| atomic_sub_and_fetch_z(&pool->currently_running_tasks, 1); | |||||
| } | } | ||||
| } | } | ||||
| if (!found_task) | |||||
| BLI_condition_wait(&scheduler->queue_cond, &scheduler->queue_mutex); | |||||
| } while (!found_task); | } while (!found_task); | ||||
| BLI_mutex_unlock(&scheduler->queue_mutex); | DEBUG_PRINTF("thread %d found a task...\n", thread_id); | ||||
| return true; | return true; | ||||
| } | } | ||||
| static void *task_scheduler_thread_run(void *thread_p) | static void *task_scheduler_thread_run(void *thread_p) | ||||
| { | { | ||||
| TaskThread *thread = (TaskThread *) thread_p; | TaskThread *thread = (TaskThread *) thread_p; | ||||
| TaskScheduler *scheduler = thread->scheduler; | TaskScheduler *scheduler = thread->scheduler; | ||||
| int thread_id = thread->id; | const int thread_id = thread->id; | ||||
| Task *task; | Task *task; | ||||
| /* keep popping off tasks */ | /* keep popping off tasks */ | ||||
| while (task_scheduler_thread_wait_pop(scheduler, &task)) { | while (task_scheduler_thread_wait_pop(scheduler, &task, thread_id)) { | ||||
| TaskPool *pool = task->pool; | TaskPool *pool = task->pool; | ||||
| /* run task */ | /* run task */ | ||||
| DEBUG_PRINTF("thread %d runnning task %p\n", thread_id, task); | |||||
| task->run(pool, task->taskdata, thread_id); | task->run(pool, task->taskdata, thread_id); | ||||
| /* delete task */ | atomic_sub_and_fetch_z(&pool->currently_running_tasks, 1); | ||||
| task_free(pool, task, thread_id); | |||||
| /* release task */ | |||||
| task_release(scheduler, task, thread_id); | |||||
| } | |||||
| return NULL; | |||||
| } | |||||
| static void *task_scheduler_thread_manager(void *scheduler_p) | |||||
| { | |||||
| TaskScheduler *scheduler = (TaskScheduler *)scheduler_p; | |||||
| int void_loops_count = 0; | |||||
| Task *t_prev, *t, *t_next; | |||||
| /* for now, we assume that pointer assignment is atomic. */ | |||||
| DEBUG_PRINTF("STARTING MANAGER!\n"); | |||||
| /* run until Hell freezes... */ | |||||
| while (!scheduler->do_exit) { | |||||
| /* Important to be atomic here, so that no setting of do_manage by other threads goes unoticed. */ | |||||
| if (atomic_fetch_and_and_uint32(&scheduler->num_manage, ~MANAGER_DO) != 0) { | |||||
| DEBUG_PRINTF("manager RUNNING (%u)...........................................\n", scheduler->num_manage); | |||||
| size_t new_free = 0; | |||||
| Task *last_valid_free = NULL, tmp_free = {0}; | |||||
| Task *first_valid_todo_high = NULL, *last_valid_tmp_todo_high = NULL, tmp_todo_high = {0}; | |||||
| Task *last_valid_todo_low = NULL, tmp_todo_low = {0}; | |||||
| /* Finally, remove todo tasks from free list. */ | |||||
| int new_todo = 0; | |||||
| int curr_new = 0; | |||||
| for (t = scheduler->free_tasks.next, t_prev = &scheduler->free_tasks; t; t = t_next, curr_new++) { | |||||
| t_next = t->next; | |||||
| if (atomic_cas_uint32(&t->owner, TASK_TODO, TASK_MANAGED) == TASK_TODO) { | |||||
| task_pool_num_increase(t, t->pool); | |||||
| if (t->pool->is_new) { | |||||
| t->pool->is_new = false; | |||||
| task_pool_num_decrease(t, t->pool, 1); | |||||
| } | |||||
| t_prev->next = t_next; | |||||
| if (t->priority == TASK_PRIORITY_HIGH) { | |||||
| t->next = tmp_todo_high.next; | |||||
| tmp_todo_high.next = t; | |||||
| if (!last_valid_tmp_todo_high) { | |||||
| last_valid_tmp_todo_high = t; | |||||
| } | |||||
| } | |||||
| else { | |||||
| t->next = tmp_todo_low.next; | |||||
| tmp_todo_low.next = t; | |||||
| } | |||||
| DEBUG_PRINTF("\tnew todo: %p\n", t); | |||||
| atomic_sub_and_fetch_uint32(&scheduler->num_manage, 1); | |||||
| new_todo++; | |||||
| if (atomic_cas_uint32(&t->owner, TASK_MANAGED, TASK_TODO) != TASK_MANAGED) { | |||||
| BLI_assert(0); | |||||
| } | |||||
| } | |||||
| else if (atomic_cas_uint32(&t->owner, TASK_DONE, TASK_MANAGED) == TASK_DONE) { | |||||
| /* Should never happen */ | |||||
| BLI_assert(0); | |||||
| new_free++; | |||||
| task_pool_num_decrease(t, t->pool, 1); | |||||
| atomic_sub_and_fetch_uint32(&scheduler->num_manage, 1); | |||||
| DEBUG_PRINTF("\tnew free: %p\n", t); | |||||
| if (atomic_cas_uint32(&t->owner, TASK_MANAGED, TASK_FREE) != TASK_MANAGED) { | |||||
| BLI_assert(0); | |||||
| } | |||||
| t_prev = t; | |||||
| } | |||||
| else { | |||||
| t_prev = t; | |||||
| } | |||||
| } | |||||
| DEBUG_PRINTF("\tWAS %d tasks in FREE list!\n", curr_new); | |||||
| scheduler->next_free = scheduler->free_tasks.next; | |||||
| last_valid_free = t_prev; | |||||
| /* First, remove done tasks from todo list. */ | |||||
| int curr_todo = 0; | |||||
| for (t = scheduler->todo_tasks.next, t_prev = &scheduler->todo_tasks; t; t = t_next, curr_todo++) { | |||||
| t_next = t->next; | |||||
| if (atomic_cas_uint32(&t->owner, TASK_DONE, TASK_MANAGED) == TASK_DONE) { | |||||
| t_prev->next = t_next; | |||||
| t->next = tmp_free.next; | |||||
| tmp_free.next = t; | |||||
| new_free++; | |||||
| task_pool_num_decrease(t, t->pool, 1); | |||||
| atomic_sub_and_fetch_uint32(&scheduler->num_manage, 1); | |||||
| DEBUG_PRINTF("\tnew free: %p\n", t); | |||||
| if (atomic_cas_uint32(&t->owner, TASK_MANAGED, TASK_FREE) != TASK_MANAGED) { | |||||
| BLI_assert(0); | |||||
| } | |||||
| } | |||||
| else if (atomic_cas_uint32(&t->owner, TASK_TODO, TASK_MANAGED) == TASK_TODO) { | |||||
| if (t->pool->do_clear_tasks) { | |||||
| task_data_free(t, 0); | |||||
| t_prev->next = t_next; | |||||
| t->next = tmp_free.next; | |||||
| tmp_free.next = t; | |||||
| new_free++; | |||||
| task_pool_num_decrease(t, t->pool, 1); | |||||
| DEBUG_PRINTF("\tnew free: %p\n", t); | |||||
| if (atomic_cas_uint32(&t->owner, TASK_MANAGED, TASK_FREE) != TASK_MANAGED) { | |||||
| BLI_assert(0); | |||||
| } | |||||
| continue; | |||||
| } | |||||
| if (!first_valid_todo_high) { | |||||
| first_valid_todo_high = t; | |||||
| } | |||||
| t_prev = t; | |||||
| if (atomic_cas_uint32(&t->owner, TASK_MANAGED, TASK_TODO) != TASK_MANAGED) { | |||||
| BLI_assert(0); | |||||
| } | |||||
| } | |||||
| else if (atomic_cas_uint32(&t->owner, TASK_FREE, TASK_MANAGED) == TASK_FREE) { | |||||
| /* Should never happen!!! */ | |||||
| BLI_assert(0); | |||||
| t_prev->next = t_next; | |||||
| t->next = tmp_free.next; | |||||
| tmp_free.next = t; | |||||
| new_free++; | |||||
| DEBUG_PRINTF("\tnew free: %p\n", t); | |||||
| atomic_sub_and_fetch_uint32(&scheduler->num_manage, 1); | |||||
| if (atomic_cas_uint32(&t->owner, TASK_MANAGED, TASK_FREE) != TASK_MANAGED) { | |||||
| BLI_assert(0); | |||||
| } | |||||
| } | |||||
| else { | |||||
| /* Any other case here means a thread claimed the task, nothing to do for now. */ | |||||
| if (!first_valid_todo_high) { | |||||
| first_valid_todo_high = t; | |||||
| } | |||||
| t_prev = t; | |||||
| } | |||||
| } | |||||
| DEBUG_PRINTF("\tWAS %d tasks in TODO list!\n", curr_todo); | |||||
| last_valid_todo_low = t_prev; | |||||
| /* Now merge temp lists into their respective 'public' ones. */ | |||||
| BLI_assert(scheduler->num_tasks_free >= new_todo); | |||||
| atomic_sub_and_fetch_z(&scheduler->num_tasks_free, new_todo); | |||||
| /* notify pool task was done */ | while (atomic_add_and_fetch_z(&scheduler->num_tasks_free, new_free) < MEMPOOL_SIZE) { | ||||
| task_pool_num_decrease(pool, 1); | scheduler->num_tasks_tot += MEMPOOL_SIZE; | ||||
| for (int i = MEMPOOL_SIZE; i--;) { | |||||
| Task *new_task = MEM_callocN(sizeof(*new_task), __func__); | |||||
| /* minimal init. */ | |||||
| if (atomic_cas_uint32(&new_task->owner, 0, TASK_FREE) != 0) { | |||||
| BLI_assert(0); | |||||
| } | |||||
| /* and add to free list */ | |||||
| last_valid_free->next = new_task; | |||||
| last_valid_free = new_task; | |||||
| } | |||||
| new_free = MEMPOOL_SIZE; | |||||
| } | |||||
| last_valid_free->next = tmp_free.next; | |||||
| if (tmp_todo_high.next) { | |||||
| last_valid_tmp_todo_high->next = first_valid_todo_high; | |||||
| scheduler->todo_tasks.next = tmp_todo_high.next; | |||||
| if (last_valid_todo_low == &scheduler->todo_tasks) { | |||||
| BLI_assert(first_valid_todo_high == NULL); | |||||
| last_valid_todo_low = last_valid_tmp_todo_high; | |||||
| } | |||||
| } | |||||
| last_valid_todo_low->next = tmp_todo_low.next; | |||||
| if (new_free || tmp_free.next || tmp_todo_high.next || tmp_todo_low.next) { | |||||
| DEBUG_PRINTF("manager kicking %lu sleeping threads (found %d new todo's [%p])\n", scheduler->num_workers_sleeping, new_todo, scheduler->todo_tasks.next); | |||||
| /* Some new free or todo tasks, kick sleeping workers! */ | |||||
| if (scheduler->num_workers_sleeping != 0) { | |||||
| BLI_mutex_lock(&scheduler->workers_mutex); | |||||
| BLI_condition_notify_all(&scheduler->workers_condition); | |||||
| BLI_mutex_unlock(&scheduler->workers_mutex); | |||||
| } | } | ||||
| } | |||||
| void_loops_count = 0; | |||||
| DEBUG_PRINTF("manager DONE (%u)...........................................\n", scheduler->num_manage); | |||||
| } | |||||
| if (++void_loops_count > NANOSLEEP_MAX_SPINNING_MANAGER) { | |||||
| BLI_mutex_lock(&scheduler->manager_mutex); | |||||
| /* Check again inside the mutex, bad race condition is possible here (though unlikely), | |||||
| * leading to undying thread... */ | |||||
| if (scheduler->do_exit) { | |||||
| BLI_mutex_unlock(&scheduler->manager_mutex); | |||||
| break; | |||||
| } | |||||
| /* Even though this is read outside of mutex lock, there is no real need to use atomic ops here, | |||||
| * changing the value inside mutex should be enough to ensure safety. */ | |||||
| scheduler->is_manager_sleeping = true; | |||||
| DEBUG_PRINTF("manager sleeping...\n"); | |||||
| BLI_condition_wait(&scheduler->manager_condition, &scheduler->manager_mutex); | |||||
| DEBUG_PRINTF("manager awaken...\n"); | |||||
| scheduler->is_manager_sleeping = false; | |||||
| void_loops_count = 0; | |||||
| BLI_mutex_unlock(&scheduler->manager_mutex); | |||||
| } | |||||
| else { | |||||
| nanosleep(NANOSLEEP_DURATION, NULL); | |||||
| } | |||||
| } | |||||
| DEBUG_PRINTF("STOPING MANAGER!\n"); | |||||
| return NULL; | return NULL; | ||||
| } | } | ||||
| TaskScheduler *BLI_task_scheduler_create(int num_threads) | TaskScheduler *BLI_task_scheduler_create(int num_threads) | ||||
| { | { | ||||
| TaskScheduler *scheduler = MEM_callocN(sizeof(TaskScheduler), "TaskScheduler"); | TaskScheduler *scheduler = MEM_callocN(sizeof(TaskScheduler), "TaskScheduler"); | ||||
| /* multiple places can use this task scheduler, sharing the same | /* multiple places can use this task scheduler, sharing the same | ||||
| * threads, so we keep track of the number of users. */ | * threads, so we keep track of the number of users. */ | ||||
| scheduler->do_exit = false; | scheduler->do_exit = 0; | ||||
| BLI_listbase_clear(&scheduler->queue); | BLI_mutex_init(&scheduler->manager_mutex); | ||||
| BLI_mutex_init(&scheduler->queue_mutex); | BLI_condition_init(&scheduler->manager_condition); | ||||
| BLI_condition_init(&scheduler->queue_cond); | |||||
| BLI_mutex_init(&scheduler->workers_mutex); | |||||
| BLI_condition_init(&scheduler->workers_condition); | |||||
| if (num_threads == 0) { | if (num_threads == 0) { | ||||
| /* automatic number of threads will be main thread + num cores */ | /* automatic number of threads will be main thread + num cores */ | ||||
| num_threads = BLI_system_thread_count(); | num_threads = BLI_system_thread_count(); | ||||
| } | } | ||||
| /* main thread will also work, so we count it too */ | /* main thread will also work, so we count it too */ | ||||
| num_threads -= 1; | num_threads -= 1; | ||||
| /* Add background-only thread if needed. */ | /* Add background-only thread if needed. */ | ||||
| if (num_threads == 0) { | if (num_threads == 0) { | ||||
| scheduler->background_thread_only = true; | scheduler->background_thread_only = true; | ||||
| num_threads = 1; | num_threads = 1; | ||||
| } | } | ||||
| /* Launch manager thread (it will also create initial pool of tasks etc.). */ | |||||
| scheduler->num_manage = MANAGER_DO; | |||||
| if (pthread_create(&scheduler->manager, NULL, task_scheduler_thread_manager, scheduler) != 0) { | |||||
| fprintf(stderr, "TaskScheduler failed to launch manager thread.\n"); | |||||
| return scheduler; | |||||
| } | |||||
| /* launch threads that will be waiting for work */ | /* launch threads that will be waiting for work */ | ||||
| if (num_threads > 0) { | if (num_threads > 0) { | ||||
| int i; | int i; | ||||
| scheduler->num_threads = num_threads; | scheduler->num_threads = num_threads; | ||||
| scheduler->threads = MEM_callocN(sizeof(pthread_t) * num_threads, "TaskScheduler threads"); | scheduler->threads = MEM_callocN(sizeof(pthread_t) * num_threads, "TaskScheduler threads"); | ||||
| scheduler->task_threads = MEM_callocN(sizeof(TaskThread) * num_threads, "TaskScheduler task threads"); | scheduler->task_threads = MEM_callocN(sizeof(TaskThread) * num_threads, "TaskScheduler task threads"); | ||||
| for (i = 0; i < num_threads; i++) { | for (i = 0; i < num_threads; i++) { | ||||
| TaskThread *thread = &scheduler->task_threads[i]; | TaskThread *thread = &scheduler->task_threads[i]; | ||||
| thread->scheduler = scheduler; | thread->scheduler = scheduler; | ||||
| thread->id = i + 1; | thread->id = i + 1; | ||||
| if (pthread_create(&scheduler->threads[i], NULL, task_scheduler_thread_run, thread) != 0) { | if (pthread_create(&scheduler->threads[i], NULL, task_scheduler_thread_run, thread) != 0) { | ||||
| fprintf(stderr, "TaskScheduler failed to launch thread %d/%d\n", i, num_threads); | fprintf(stderr, "TaskScheduler failed to launch thread %d/%d\n", i, num_threads); | ||||
| } | } | ||||
| } | } | ||||
| scheduler->task_mempool = MEM_callocN(sizeof(*scheduler->task_mempool) * (num_threads + 1), | |||||
| "TaskScheduler task_mempool"); | |||||
| } | } | ||||
| return scheduler; | return scheduler; | ||||
| } | } | ||||
| void BLI_task_scheduler_free(TaskScheduler *scheduler) | void BLI_task_scheduler_free(TaskScheduler *scheduler) | ||||
| { | { | ||||
| Task *task; | |||||
| /* stop all waiting threads */ | /* stop all waiting threads */ | ||||
| BLI_mutex_lock(&scheduler->queue_mutex); | atomic_fetch_and_or_uint8(&scheduler->do_exit, 1); | ||||
| scheduler->do_exit = true; | |||||
| BLI_condition_notify_all(&scheduler->queue_cond); | /* Signal all sleeping ones. | ||||
| BLI_mutex_unlock(&scheduler->queue_mutex); | * Note that here we cannot accept to miss one due to race condition, so we lock before checking sleeping count. */ | ||||
| BLI_mutex_lock(&scheduler->workers_mutex); | |||||
| if (scheduler->num_workers_sleeping != 0) { | |||||
| BLI_condition_notify_all(&scheduler->workers_condition); | |||||
| } | |||||
| BLI_mutex_unlock(&scheduler->workers_mutex); | |||||
| BLI_mutex_lock(&scheduler->manager_mutex); | |||||
| if (scheduler->is_manager_sleeping) { | |||||
| BLI_condition_notify_all(&scheduler->manager_condition); | |||||
| } | |||||
| BLI_mutex_unlock(&scheduler->manager_mutex); | |||||
| /* delete threads */ | /* delete threads */ | ||||
| if (scheduler->threads) { | if (scheduler->threads) { | ||||
| int i; | int i; | ||||
| for (i = 0; i < scheduler->num_threads; i++) { | for (i = 0; i < scheduler->num_threads; i++) { | ||||
| if (pthread_join(scheduler->threads[i], NULL) != 0) | if (pthread_join(scheduler->threads[i], NULL) != 0) | ||||
| fprintf(stderr, "TaskScheduler failed to join thread %d/%d\n", i, scheduler->num_threads); | fprintf(stderr, "TaskScheduler failed to join thread %d/%d\n", i, scheduler->num_threads); | ||||
| } | } | ||||
| MEM_freeN(scheduler->threads); | MEM_freeN(scheduler->threads); | ||||
| } | } | ||||
| /* Delete task thread data */ | /* Delete task thread data */ | ||||
| if (scheduler->task_threads) { | if (scheduler->task_threads) { | ||||
| MEM_freeN(scheduler->task_threads); | MEM_freeN(scheduler->task_threads); | ||||
| } | } | ||||
| /* Delete task memory pool */ | if (pthread_join(scheduler->manager, NULL) != 0) { | ||||
| if (scheduler->task_mempool) { | fprintf(stderr, "TaskScheduler failed to join manager thread\n"); | ||||
| for (int i = 0; i <= scheduler->num_threads; ++i) { | |||||
| for (int j = 0; j < scheduler->task_mempool[i].num_tasks; ++j) { | |||||
| MEM_freeN(scheduler->task_mempool[i].tasks[j]); | |||||
| } | |||||
| } | } | ||||
| MEM_freeN(scheduler->task_mempool); | |||||
| /* Cleanup tasks. */ | |||||
| Task *t, *t_next; | |||||
| for (t = scheduler->todo_tasks.next; t; t = t_next) { | |||||
| t_next = t; | |||||
| task_data_free(t, 0); | |||||
| MEM_freeN(t); | |||||
| scheduler->num_tasks_tot--; | |||||
| } | } | ||||
| /* delete leftover tasks */ | for (t = scheduler->free_tasks.next; t; t = t_next) { | ||||
| for (task = scheduler->queue.first; task; task = task->next) { | t_next = t; | ||||
| task_data_free(task, 0); | task_data_free(t, 0); | ||||
| MEM_freeN(t); | |||||
| scheduler->num_tasks_tot--; | |||||
| } | } | ||||
| BLI_freelistN(&scheduler->queue); | |||||
| BLI_assert(scheduler->num_tasks_tot == 0); | |||||
| /* delete mutex/condition */ | /* delete mutex/condition */ | ||||
| BLI_mutex_end(&scheduler->queue_mutex); | BLI_mutex_end(&scheduler->manager_mutex); | ||||
| BLI_condition_end(&scheduler->queue_cond); | BLI_condition_end(&scheduler->manager_condition); | ||||
| BLI_mutex_end(&scheduler->workers_mutex); | |||||
| BLI_condition_end(&scheduler->workers_condition); | |||||
| MEM_freeN(scheduler); | MEM_freeN(scheduler); | ||||
| } | } | ||||
| int BLI_task_scheduler_num_threads(TaskScheduler *scheduler) | int BLI_task_scheduler_num_threads(TaskScheduler *scheduler) | ||||
| { | { | ||||
| return scheduler->num_threads + 1; | return scheduler->num_threads + 1; | ||||
| } | } | ||||
| static void task_scheduler_push(TaskScheduler *scheduler, Task *task, TaskPriority priority) | |||||
| { | |||||
| task_pool_num_increase(task->pool); | |||||
| /* add task to queue */ | |||||
| BLI_mutex_lock(&scheduler->queue_mutex); | |||||
| if (priority == TASK_PRIORITY_HIGH) | |||||
| BLI_addhead(&scheduler->queue, task); | |||||
| else | |||||
| BLI_addtail(&scheduler->queue, task); | |||||
| BLI_condition_notify_one(&scheduler->queue_cond); | |||||
| BLI_mutex_unlock(&scheduler->queue_mutex); | |||||
| } | |||||
| static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool) | static void task_scheduler_clear(TaskScheduler *scheduler, TaskPool *pool) | ||||
| { | { | ||||
| Task *task, *nexttask; | Task *task; | ||||
| size_t done = 0; | size_t done = 0; | ||||
| BLI_mutex_lock(&scheduler->queue_mutex); | |||||
| /* free all tasks from this pool from the queue */ | /* free all tasks from this pool from the queue */ | ||||
| for (task = scheduler->queue.first; task; task = nexttask) { | atomic_fetch_and_or_uint8(&pool->do_clear_tasks, 1); | ||||
| nexttask = task->next; | for (task = scheduler->todo_tasks.next; task; task = task->next) { | ||||
| if (task->pool == pool && atomic_cas_uint32(&task->owner, TASK_TODO, 0) == TASK_TODO) { | |||||
| /* There is a faint possibility that this changes between first check and atomic acquiring of the task. */ | |||||
| if (task->pool == pool) { | if (task->pool == pool) { | ||||
| task_data_free(task, 0); | task_data_free(task, 0); | ||||
| BLI_freelinkN(&scheduler->queue, task); | atomic_add_and_fetch_uint32(&scheduler->num_manage, 1); | ||||
| if (atomic_cas_uint32(&task->owner, 0, TASK_DONE) != 0) { | |||||
| BLI_assert(0); | |||||
| } | |||||
| done++; | done++; | ||||
| } | } | ||||
| else { | |||||
| if (atomic_cas_uint32(&task->owner, 0, TASK_TODO) != 0) { | |||||
| BLI_assert(0); | |||||
| } | |||||
| } | |||||
| } | |||||
| } | } | ||||
| BLI_mutex_unlock(&scheduler->queue_mutex); | if (done > 0) { | ||||
| if (scheduler->is_manager_sleeping) { | |||||
| /* notify done */ | BLI_mutex_lock(&scheduler->manager_mutex); | ||||
| task_pool_num_decrease(pool, done); | BLI_condition_notify_all(&scheduler->manager_condition); | ||||
| BLI_mutex_unlock(&scheduler->manager_mutex); | |||||
| } | |||||
| } | |||||
| } | } | ||||
| /* Task Pool */ | /* Task Pool */ | ||||
| static TaskPool *task_pool_create_ex(TaskScheduler *scheduler, void *userdata, const bool is_background) | static TaskPool *task_pool_create_ex(TaskScheduler *scheduler, void *userdata, const bool is_background) | ||||
| { | { | ||||
| TaskPool *pool = MEM_mallocN(sizeof(TaskPool), "TaskPool"); | TaskPool *pool = MEM_mallocN(sizeof(TaskPool), "TaskPool"); | ||||
| #ifndef NDEBUG | |||||
| /* Assert we do not try to create a background pool from some parent task - those only work OK from main thread. */ | /* Assert we do not try to create a background pool from some parent task - those only work OK from main thread. */ | ||||
| if (is_background) { | BLI_assert(!is_background || get_thread_id(scheduler) == 0); | ||||
| const pthread_t thread_id = pthread_self(); | |||||
| int i = scheduler->num_threads; | |||||
| while (i--) { | |||||
| BLI_assert(!pthread_equal(scheduler->threads[i], thread_id)); | |||||
| } | |||||
| } | |||||
| #endif | |||||
| pool->scheduler = scheduler; | pool->scheduler = scheduler; | ||||
| pool->num = 0; | pool->num = 1; /* Those two ensure we cannot get freed before any task gets added... */ | ||||
| pool->done = 0; | pool->is_new = true; | ||||
| pool->num_threads = 0; | pool->num_threads = 0; | ||||
| pool->currently_running_tasks = 0; | pool->currently_running_tasks = 0; | ||||
| pool->do_cancel = false; | pool->do_cancel = false; | ||||
| pool->do_clear_tasks = 0; | |||||
| pool->run_in_background = is_background; | pool->run_in_background = is_background; | ||||
| BLI_mutex_init(&pool->num_mutex); | |||||
| BLI_condition_init(&pool->num_cond); | |||||
| pool->userdata = userdata; | pool->userdata = userdata; | ||||
| BLI_mutex_init(&pool->user_mutex); | BLI_mutex_init(&pool->user_mutex); | ||||
| if (BLI_thread_is_main()) { | |||||
| pool->task_mempool = scheduler->task_mempool; | |||||
| } | |||||
| else { | |||||
| pool->task_mempool = &pool->task_mempool_local; | |||||
| pool->task_mempool_local.num_tasks = 0; | |||||
| } | |||||
| #ifdef DEBUG_STATS | |||||
| pool->mempool_stats = | |||||
| MEM_callocN(sizeof(*pool->mempool_stats) * (scheduler->num_threads + 1), | |||||
| "per-taskpool mempool stats"); | |||||
| #endif | |||||
| /* Ensure malloc will go fine from threads, | /* Ensure malloc will go fine from threads, | ||||
| * | * | ||||
| * This is needed because we could be in main thread here | * This is needed because we could be in main thread here | ||||
| * and malloc could be non-threda safe at this point because | * and malloc could be non-threda safe at this point because | ||||
| * no other jobs are running. | * no other jobs are running. | ||||
| */ | */ | ||||
| BLI_begin_threaded_malloc(); | BLI_begin_threaded_malloc(); | ||||
| Show All 22 Lines | |||||
| */ | */ | ||||
| TaskPool *BLI_task_pool_create_background(TaskScheduler *scheduler, void *userdata) | TaskPool *BLI_task_pool_create_background(TaskScheduler *scheduler, void *userdata) | ||||
| { | { | ||||
| return task_pool_create_ex(scheduler, userdata, true); | return task_pool_create_ex(scheduler, userdata, true); | ||||
| } | } | ||||
| void BLI_task_pool_free(TaskPool *pool) | void BLI_task_pool_free(TaskPool *pool) | ||||
| { | { | ||||
| BLI_task_pool_stop(pool); | BLI_task_pool_cancel(pool); | ||||
| BLI_mutex_end(&pool->num_mutex); | |||||
| BLI_condition_end(&pool->num_cond); | |||||
| BLI_mutex_end(&pool->user_mutex); | BLI_mutex_end(&pool->user_mutex); | ||||
| /* Free local memory pool, those pointers are lost forever. */ | DEBUG_PRINTF("[%p] FREEING POOL!\n", pool); | ||||
| if (pool->task_mempool == &pool->task_mempool_local) { | |||||
| for (int i = 0; i < pool->task_mempool_local.num_tasks; i++) { | |||||
| MEM_freeN(pool->task_mempool_local.tasks[i]); | |||||
| } | |||||
| } | |||||
| #ifdef DEBUG_STATS | |||||
| printf("Thread ID Allocated Reused Discarded\n"); | |||||
| for (int i = 0; i < pool->scheduler->num_threads + 1; ++i) { | |||||
| printf("%02d %05d %05d %05d\n", | |||||
| i, | |||||
| pool->mempool_stats[i].num_alloc, | |||||
| pool->mempool_stats[i].num_reuse, | |||||
| pool->mempool_stats[i].num_discard); | |||||
| } | |||||
| MEM_freeN(pool->mempool_stats); | |||||
| #endif | |||||
| BLI_assert(pool->num == 0); | |||||
| MEM_freeN(pool); | MEM_freeN(pool); | ||||
| BLI_end_threaded_malloc(); | BLI_end_threaded_malloc(); | ||||
| } | } | ||||
| static void task_pool_push( | static void task_pool_push( | ||||
| TaskPool *pool, TaskRunFunction run, void *taskdata, | TaskPool *pool, TaskRunFunction run, void *taskdata, | ||||
| bool free_taskdata, TaskFreeFunction freedata, TaskPriority priority, | bool free_taskdata, TaskFreeFunction freedata, TaskPriority priority, | ||||
| int thread_id) | int thread_id) | ||||
| { | { | ||||
| Task *task = task_alloc(pool, thread_id); | /* Get thread-ownership over a free task. */ | ||||
| Task *task = task_acquire(pool->scheduler, thread_id); | |||||
| task->run = run; | task->run = run; | ||||
| task->taskdata = taskdata; | task->taskdata = taskdata; | ||||
| task->free_taskdata = free_taskdata; | task->free_taskdata = free_taskdata; | ||||
| task->freedata = freedata; | task->freedata = freedata; | ||||
| task->pool = pool; | task->pool = pool; | ||||
| task->priority = priority; | |||||
| task_scheduler_push(pool->scheduler, task, priority); | /* Releases our thread-ownership of task, thread manager will handle moving it to todo list. */ | ||||
| task_todo_push(pool->scheduler, task, thread_id); | |||||
| } | } | ||||
| void BLI_task_pool_push_ex( | void BLI_task_pool_push_ex( | ||||
| TaskPool *pool, TaskRunFunction run, void *taskdata, | TaskPool *pool, TaskRunFunction run, void *taskdata, | ||||
| bool free_taskdata, TaskFreeFunction freedata, TaskPriority priority) | bool free_taskdata, TaskFreeFunction freedata, TaskPriority priority) | ||||
| { | { | ||||
| task_pool_push(pool, run, taskdata, free_taskdata, freedata, priority, -1); | task_pool_push(pool, run, taskdata, free_taskdata, freedata, priority, 0); | ||||
| } | } | ||||
| void BLI_task_pool_push( | void BLI_task_pool_push( | ||||
| TaskPool *pool, TaskRunFunction run, void *taskdata, bool free_taskdata, TaskPriority priority) | TaskPool *pool, TaskRunFunction run, void *taskdata, bool free_taskdata, TaskPriority priority) | ||||
| { | { | ||||
| BLI_task_pool_push_ex(pool, run, taskdata, free_taskdata, NULL, priority); | BLI_task_pool_push_ex(pool, run, taskdata, free_taskdata, NULL, priority); | ||||
| } | } | ||||
| void BLI_task_pool_push_from_thread(TaskPool *pool, TaskRunFunction run, | void BLI_task_pool_push_from_thread(TaskPool *pool, TaskRunFunction run, | ||||
| void *taskdata, bool free_taskdata, TaskPriority priority, int thread_id) | void *taskdata, bool free_taskdata, TaskPriority priority, int thread_id) | ||||
| { | { | ||||
| task_pool_push(pool, run, taskdata, free_taskdata, NULL, priority, thread_id); | task_pool_push(pool, run, taskdata, free_taskdata, NULL, priority, thread_id); | ||||
| } | } | ||||
| void BLI_task_pool_work_and_wait(TaskPool *pool) | void BLI_task_pool_work_and_wait(TaskPool *pool) | ||||
| { | { | ||||
| TaskScheduler *scheduler = pool->scheduler; | TaskScheduler *scheduler = pool->scheduler; | ||||
| const int thread_id = get_thread_id(pool->scheduler); | |||||
| int loop_count = 0; | |||||
| BLI_mutex_lock(&pool->num_mutex); | while (1) { | ||||
| Task *task; | |||||
| while (pool->num != 0) { | |||||
| Task *task, *work_task = NULL; | |||||
| bool found_task = false; | bool found_task = false; | ||||
| BLI_mutex_unlock(&pool->num_mutex); | |||||
| BLI_mutex_lock(&scheduler->queue_mutex); | |||||
| /* find task from this pool. if we get a task from another pool, | /* find task from this pool. if we get a task from another pool, | ||||
| * we can get into deadlock */ | * we can get into deadlock */ | ||||
| if (pool->num_threads == 0 || | found_task = task_find(scheduler, &task, pool, thread_id); | ||||
| pool->currently_running_tasks < pool->num_threads) | |||||
| { | |||||
| for (task = scheduler->queue.first; task; task = task->next) { | |||||
| if (task->pool == pool) { | |||||
| work_task = task; | |||||
| found_task = true; | |||||
| BLI_remlink(&scheduler->queue, task); | |||||
| break; | |||||
| } | |||||
| } | |||||
| } | |||||
| BLI_mutex_unlock(&scheduler->queue_mutex); | /* We cannot exclusively ask for tasks from our pool, else we can easily end in a deadlock situation | ||||
| * where all threads are blocked in work_and_wait waiting for others to finish tasks of their pool... */ | |||||
| // if (!found_task) { | |||||
| // found_task = task_find(scheduler, &task, NULL, thread_id); | |||||
| // } | |||||
| /* if found task, do it, otherwise wait until other tasks are done */ | /* if found task, do it, otherwise wait until other tasks are done */ | ||||
| if (found_task) { | if (found_task) { | ||||
| /* run task */ | /* run task */ | ||||
| atomic_add_and_fetch_z(&pool->currently_running_tasks, 1); | task->run(pool, task->taskdata, thread_id); | ||||
| work_task->run(pool, work_task->taskdata, 0); | |||||
| /* delete task */ | atomic_sub_and_fetch_z(&pool->currently_running_tasks, 1); | ||||
| task_free(pool, task, 0); | |||||
| /* notify pool task was done */ | /* release task */ | ||||
| task_pool_num_decrease(pool, 1); | task_release(scheduler, task, thread_id); | ||||
| } | |||||
| BLI_mutex_lock(&pool->num_mutex); | /* Reset the 'failed' counter to zero. */ | ||||
| if (pool->num == 0) | loop_count = 0; | ||||
| } | |||||
| else if (pool->num == 0) { | |||||
| /* Some atomic ops to ensure memory barrier, alas it does not seem to work/be enough... */ | |||||
| if (atomic_fetch_and_add_z(&pool->num, 1) == 0) { | |||||
| atomic_fetch_and_sub_z(&pool->num, 1); | |||||
| break; | break; | ||||
| if (!found_task) | |||||
| BLI_condition_wait(&pool->num_cond, &pool->num_mutex); | |||||
| } | } | ||||
| } | |||||
| BLI_mutex_unlock(&pool->num_mutex); | else { | ||||
| task_wait(scheduler, pool, &loop_count); /* We do not care about return value here. */ | |||||
| } | |||||
| } | |||||
| } | } | ||||
| int BLI_pool_get_num_threads(TaskPool *pool) | int BLI_pool_get_num_threads(TaskPool *pool) | ||||
| { | { | ||||
| if (pool->num_threads != 0) { | if (pool->num_threads != 0) { | ||||
| return pool->num_threads; | return pool->num_threads; | ||||
| } | } | ||||
| else { | else { | ||||
| Show All 9 Lines | |||||
| void BLI_task_pool_cancel(TaskPool *pool) | void BLI_task_pool_cancel(TaskPool *pool) | ||||
| { | { | ||||
| pool->do_cancel = true; | pool->do_cancel = true; | ||||
| task_scheduler_clear(pool->scheduler, pool); | task_scheduler_clear(pool->scheduler, pool); | ||||
| /* wait until all entries are cleared */ | /* wait until all entries are cleared */ | ||||
| BLI_mutex_lock(&pool->num_mutex); | BLI_task_pool_work_and_wait(pool); | ||||
| while (pool->num) | |||||
| BLI_condition_wait(&pool->num_cond, &pool->num_mutex); | |||||
| BLI_mutex_unlock(&pool->num_mutex); | |||||
| pool->do_cancel = false; | pool->do_cancel = false; | ||||
| } | } | ||||
| void BLI_task_pool_stop(TaskPool *pool) | |||||
| { | |||||
| task_scheduler_clear(pool->scheduler, pool); | |||||
| BLI_assert(pool->num == 0); | |||||
| } | |||||
| bool BLI_task_pool_canceled(TaskPool *pool) | bool BLI_task_pool_canceled(TaskPool *pool) | ||||
| { | { | ||||
| return pool->do_cancel; | return pool->do_cancel; | ||||
| } | } | ||||
| void *BLI_task_pool_userdata(TaskPool *pool) | void *BLI_task_pool_userdata(TaskPool *pool) | ||||
| { | { | ||||
| return pool->userdata; | return pool->userdata; | ||||
| } | } | ||||
| ThreadMutex *BLI_task_pool_user_mutex(TaskPool *pool) | ThreadMutex *BLI_task_pool_user_mutex(TaskPool *pool) | ||||
| { | { | ||||
| return &pool->user_mutex; | return &pool->user_mutex; | ||||
| } | } | ||||
| size_t BLI_task_pool_tasks_done(TaskPool *pool) | size_t BLI_task_pool_tasks_todo(TaskPool *pool) | ||||
| { | { | ||||
| return pool->done; | return pool->num; | ||||
| } | } | ||||
| /* Parallel range routines */ | /* Parallel range routines */ | ||||
| /** | /** | ||||
| * | * | ||||
| * Main functions: | * Main functions: | ||||
| * - #BLI_task_parallel_range | * - #BLI_task_parallel_range | ||||
| ▲ Show 20 Lines • Show All 378 Lines • Show Last 20 Lines | |||||