diff --git a/core/object/worker_thread_pool.cpp b/core/object/worker_thread_pool.cpp index cf396c26760..08903d61964 100644 --- a/core/object/worker_thread_pool.cpp +++ b/core/object/worker_thread_pool.cpp @@ -180,13 +180,17 @@ void WorkerThreadPool::_process_task(Task *p_task) { void WorkerThreadPool::_thread_function(void *p_user) { ThreadData *thread_data = (ThreadData *)p_user; + while (true) { Task *task_to_process = nullptr; { MutexLock lock(singleton->task_mutex); - if (singleton->exit_threads) { - return; + + bool exit = singleton->_handle_runlevel(thread_data, lock); + if (unlikely(exit)) { + break; } + thread_data->signaled = false; if (singleton->task_queue.first()) { @@ -194,7 +198,6 @@ void WorkerThreadPool::_thread_function(void *p_user) { singleton->task_queue.remove(singleton->task_queue.first()); } else { thread_data->cond_var.wait(lock); - DEV_ASSERT(singleton->exit_threads || thread_data->signaled); } } @@ -204,19 +207,24 @@ void WorkerThreadPool::_thread_function(void *p_user) { } } -void WorkerThreadPool::_post_tasks_and_unlock(Task **p_tasks, uint32_t p_count, bool p_high_priority) { +void WorkerThreadPool::_post_tasks(Task **p_tasks, uint32_t p_count, bool p_high_priority, MutexLock &p_lock) { // Fall back to processing on the calling thread if there are no worker threads. // Separated into its own variable to make it easier to extend this logic // in custom builds. bool process_on_calling_thread = threads.size() == 0; if (process_on_calling_thread) { - task_mutex.unlock(); + p_lock.temp_unlock(); for (uint32_t i = 0; i < p_count; i++) { _process_task(p_tasks[i]); } + p_lock.temp_relock(); return; } + while (runlevel == RUNLEVEL_EXIT_LANGUAGES) { + control_cond_var.wait(p_lock); + } + uint32_t to_process = 0; uint32_t to_promote = 0; @@ -238,8 +246,6 @@ void WorkerThreadPool::_post_tasks_and_unlock(Task **p_tasks, uint32_t p_count, } _notify_threads(caller_pool_thread, to_process, to_promote); - - task_mutex.unlock(); } void WorkerThreadPool::_notify_threads(const ThreadData *p_current_thread_data, uint32_t p_process_count, uint32_t p_promote_count) { @@ -323,9 +329,8 @@ WorkerThreadPool::TaskID WorkerThreadPool::add_native_task(void (*p_func)(void * } WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description) { - ERR_FAIL_COND_V_MSG(threads.is_empty(), INVALID_TASK_ID, "Can't add a task because the WorkerThreadPool is either not initialized yet or already terminated."); + MutexLock lock(task_mutex); - task_mutex.lock(); // Get a free task Task *task = task_allocator.alloc(); TaskID id = last_task++; @@ -337,7 +342,7 @@ WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable, task->template_userdata = p_template_userdata; tasks.insert(id, task); - _post_tasks_and_unlock(&task, 1, p_high_priority); + _post_tasks(&task, 1, p_high_priority, lock); return id; } @@ -444,22 +449,34 @@ void WorkerThreadPool::_unlock_unlockable_mutexes() { void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task) { // Keep processing tasks until the condition to stop waiting is met. -#define IS_WAIT_OVER (unlikely(p_task == ThreadData::YIELDING) ? p_caller_pool_thread->yield_is_over : p_task->completed) - while (true) { Task *task_to_process = nullptr; bool relock_unlockables = false; { MutexLock lock(task_mutex); + bool was_signaled = p_caller_pool_thread->signaled; p_caller_pool_thread->signaled = false; - if (IS_WAIT_OVER) { - if (unlikely(p_task == ThreadData::YIELDING)) { - p_caller_pool_thread->yield_is_over = false; - } + bool exit = _handle_runlevel(p_caller_pool_thread, lock); + if (unlikely(exit)) { + break; + } - if (!exit_threads && was_signaled) { + bool wait_is_over = false; + if (unlikely(p_task == ThreadData::YIELDING)) { + if (p_caller_pool_thread->yield_is_over) { + p_caller_pool_thread->yield_is_over = false; + wait_is_over = true; + } + } else { + if (p_task->completed) { + wait_is_over = true; + } + } + + if (wait_is_over) { + if (was_signaled) { // This thread was awaken for some additional reason, but it's about to exit. // Let's find out what may be pending and forward the requests. uint32_t to_process = task_queue.first() ? 1 : 0; @@ -474,28 +491,26 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T break; } - if (!exit_threads) { - if (p_caller_pool_thread->current_task->low_priority && low_priority_task_queue.first()) { - if (_try_promote_low_priority_task()) { - _notify_threads(p_caller_pool_thread, 1, 0); - } + if (p_caller_pool_thread->current_task->low_priority && low_priority_task_queue.first()) { + if (_try_promote_low_priority_task()) { + _notify_threads(p_caller_pool_thread, 1, 0); } + } - if (singleton->task_queue.first()) { - task_to_process = task_queue.first()->self(); - task_queue.remove(task_queue.first()); - } + if (singleton->task_queue.first()) { + task_to_process = task_queue.first()->self(); + task_queue.remove(task_queue.first()); + } - if (!task_to_process) { - p_caller_pool_thread->awaited_task = p_task; + if (!task_to_process) { + p_caller_pool_thread->awaited_task = p_task; - _unlock_unlockable_mutexes(); - relock_unlockables = true; - p_caller_pool_thread->cond_var.wait(lock); + _unlock_unlockable_mutexes(); + relock_unlockables = true; - DEV_ASSERT(exit_threads || p_caller_pool_thread->signaled || IS_WAIT_OVER); - p_caller_pool_thread->awaited_task = nullptr; - } + p_caller_pool_thread->cond_var.wait(lock); + + p_caller_pool_thread->awaited_task = nullptr; } } @@ -509,16 +524,65 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T } } +void WorkerThreadPool::_switch_runlevel(Runlevel p_runlevel) { + DEV_ASSERT(p_runlevel > runlevel); + runlevel = p_runlevel; + memset(&runlevel_data, 0, sizeof(runlevel_data)); + for (uint32_t i = 0; i < threads.size(); i++) { + threads[i].cond_var.notify_one(); + threads[i].signaled = true; + } + control_cond_var.notify_all(); +} + +// Returns whether threads have to exit. This may perform the check about handling needed. +bool WorkerThreadPool::_handle_runlevel(ThreadData *p_thread_data, MutexLock &p_lock) { + bool exit = false; + switch (runlevel) { + case RUNLEVEL_NORMAL: { + } break; + case RUNLEVEL_PRE_EXIT_LANGUAGES: { + if (!p_thread_data->pre_exited_languages) { + if (!task_queue.first() && !low_priority_task_queue.first()) { + p_thread_data->pre_exited_languages = true; + runlevel_data.pre_exit_languages.num_idle_threads++; + control_cond_var.notify_all(); + } + } + } break; + case RUNLEVEL_EXIT_LANGUAGES: { + if (!p_thread_data->exited_languages) { + p_lock.temp_unlock(); + ScriptServer::thread_exit(); + p_lock.temp_relock(); + p_thread_data->exited_languages = true; + runlevel_data.exit_languages.num_exited_threads++; + control_cond_var.notify_all(); + } + } break; + case RUNLEVEL_EXIT: { + exit = true; + } break; + } + return exit; +} + void WorkerThreadPool::yield() { int th_index = get_thread_index(); ERR_FAIL_COND_MSG(th_index == -1, "This function can only be called from a worker thread."); _wait_collaboratively(&threads[th_index], ThreadData::YIELDING); - // If this long-lived task started before the scripting server was initialized, - // now is a good time to have scripting languages ready for the current thread. - // Otherwise, such a piece of setup won't happen unless another task has been - // run during the collaborative wait. - ScriptServer::thread_enter(); + task_mutex.lock(); + if (runlevel < RUNLEVEL_EXIT_LANGUAGES) { + // If this long-lived task started before the scripting server was initialized, + // now is a good time to have scripting languages ready for the current thread. + // Otherwise, such a piece of setup won't happen unless another task has been + // run during the collaborative wait. + task_mutex.unlock(); + ScriptServer::thread_enter(); + } else { + task_mutex.unlock(); + } } void WorkerThreadPool::notify_yield_over(TaskID p_task_id) { @@ -543,13 +607,13 @@ void WorkerThreadPool::notify_yield_over(TaskID p_task_id) { } WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_callable, void (*p_func)(void *, uint32_t), void *p_userdata, BaseTemplateUserdata *p_template_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) { - ERR_FAIL_COND_V_MSG(threads.is_empty(), INVALID_TASK_ID, "Can't add a group task because the WorkerThreadPool is either not initialized yet or already terminated."); ERR_FAIL_COND_V(p_elements < 0, INVALID_TASK_ID); if (p_tasks < 0) { p_tasks = MAX(1u, threads.size()); } - task_mutex.lock(); + MutexLock lock(task_mutex); + Group *group = group_allocator.alloc(); GroupID id = last_task++; group->max = p_elements; @@ -584,7 +648,7 @@ WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_ca groups[id] = group; - _post_tasks_and_unlock(tasks_posted, p_tasks, p_high_priority); + _post_tasks(tasks_posted, p_tasks, p_high_priority, lock); return id; } @@ -687,6 +751,9 @@ void WorkerThreadPool::thread_exit_unlock_allowance_zone(uint32_t p_zone_id) { void WorkerThreadPool::init(int p_thread_count, float p_low_priority_task_ratio) { ERR_FAIL_COND(threads.size() > 0); + + runlevel = RUNLEVEL_NORMAL; + if (p_thread_count < 0) { p_thread_count = OS::get_singleton()->get_default_thread_pool_size(); } @@ -704,6 +771,26 @@ void WorkerThreadPool::init(int p_thread_count, float p_low_priority_task_ratio) } } +void WorkerThreadPool::exit_languages_threads() { + if (threads.size() == 0) { + return; + } + + MutexLock lock(task_mutex); + + // Wait until all threads are idle. + _switch_runlevel(RUNLEVEL_PRE_EXIT_LANGUAGES); + while (runlevel_data.pre_exit_languages.num_idle_threads != threads.size()) { + control_cond_var.wait(lock); + } + + // Wait until all threads have detached from scripting languages. + _switch_runlevel(RUNLEVEL_EXIT_LANGUAGES); + while (runlevel_data.exit_languages.num_exited_threads != threads.size()) { + control_cond_var.wait(lock); + } +} + void WorkerThreadPool::finish() { if (threads.size() == 0) { return; @@ -716,15 +803,10 @@ void WorkerThreadPool::finish() { print_error("Task waiting was never re-claimed: " + E->self()->description); E = E->next(); } + + _switch_runlevel(RUNLEVEL_EXIT); } - { - MutexLock lock(task_mutex); - exit_threads = true; - } - for (ThreadData &data : threads) { - data.cond_var.notify_one(); - } for (ThreadData &data : threads) { data.thread.wait_to_finish(); } @@ -755,5 +837,5 @@ WorkerThreadPool::WorkerThreadPool() { } WorkerThreadPool::~WorkerThreadPool() { - DEV_ASSERT(threads.size() == 0 && "finish() hasn't been called!"); + finish(); } diff --git a/core/object/worker_thread_pool.h b/core/object/worker_thread_pool.h index 6374dbe8c7e..62296ac0405 100644 --- a/core/object/worker_thread_pool.h +++ b/core/object/worker_thread_pool.h @@ -114,17 +114,35 @@ private: Thread thread; bool signaled : 1; bool yield_is_over : 1; + bool pre_exited_languages : 1; + bool exited_languages : 1; Task *current_task = nullptr; Task *awaited_task = nullptr; // Null if not awaiting the condition variable, or special value (YIELDING). ConditionVariable cond_var; ThreadData() : signaled(false), - yield_is_over(false) {} + yield_is_over(false), + pre_exited_languages(false), + exited_languages(false) {} }; TightLocalVector threads; - bool exit_threads = false; + enum Runlevel { + RUNLEVEL_NORMAL, + RUNLEVEL_PRE_EXIT_LANGUAGES, // Block adding new tasks + RUNLEVEL_EXIT_LANGUAGES, // All threads detach from scripting threads. + RUNLEVEL_EXIT, + } runlevel = RUNLEVEL_NORMAL; + union { // Cleared on every runlevel change. + struct { + uint32_t num_idle_threads; + } pre_exit_languages; + struct { + uint32_t num_exited_threads; + } exit_languages; + } runlevel_data; + ConditionVariable control_cond_var; HashMap thread_ids; HashMap< @@ -152,7 +170,7 @@ private: void _process_task(Task *task); - void _post_tasks_and_unlock(Task **p_tasks, uint32_t p_count, bool p_high_priority); + void _post_tasks(Task **p_tasks, uint32_t p_count, bool p_high_priority, MutexLock &p_lock); void _notify_threads(const ThreadData *p_current_thread_data, uint32_t p_process_count, uint32_t p_promote_count); bool _try_promote_low_priority_task(); @@ -193,6 +211,9 @@ private: void _wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task); + void _switch_runlevel(Runlevel p_runlevel); + bool _handle_runlevel(ThreadData *p_thread_data, MutexLock &p_lock); + #ifdef THREADS_ENABLED static uint32_t _thread_enter_unlock_allowance_zone(THREADING_NAMESPACE::unique_lock &p_ulock); #endif @@ -256,6 +277,7 @@ public: #endif void init(int p_thread_count = -1, float p_low_priority_task_ratio = 0.3); + void exit_languages_threads(); void finish(); WorkerThreadPool(); ~WorkerThreadPool(); diff --git a/core/register_core_types.cpp b/core/register_core_types.cpp index 220ed9da313..c866ff04152 100644 --- a/core/register_core_types.cpp +++ b/core/register_core_types.cpp @@ -107,6 +107,8 @@ static Time *_time = nullptr; static core_bind::Geometry2D *_geometry_2d = nullptr; static core_bind::Geometry3D *_geometry_3d = nullptr; +static WorkerThreadPool *worker_thread_pool = nullptr; + extern Mutex _global_mutex; static GDExtensionManager *gdextension_manager = nullptr; @@ -295,6 +297,8 @@ void register_core_types() { GDREGISTER_NATIVE_STRUCT(AudioFrame, "float left;float right"); GDREGISTER_NATIVE_STRUCT(ScriptLanguageExtensionProfilingInfo, "StringName signature;uint64_t call_count;uint64_t total_time;uint64_t self_time"); + worker_thread_pool = memnew(WorkerThreadPool); + OS::get_singleton()->benchmark_end_measure("Core", "Register Types"); } @@ -345,7 +349,7 @@ void register_core_singletons() { Engine::get_singleton()->add_singleton(Engine::Singleton("Time", Time::get_singleton())); Engine::get_singleton()->add_singleton(Engine::Singleton("GDExtensionManager", GDExtensionManager::get_singleton())); Engine::get_singleton()->add_singleton(Engine::Singleton("ResourceUID", ResourceUID::get_singleton())); - Engine::get_singleton()->add_singleton(Engine::Singleton("WorkerThreadPool", WorkerThreadPool::get_singleton())); + Engine::get_singleton()->add_singleton(Engine::Singleton("WorkerThreadPool", worker_thread_pool)); OS::get_singleton()->benchmark_end_measure("Core", "Register Singletons"); } @@ -378,6 +382,8 @@ void unregister_core_types() { // Destroy singletons in reverse order to ensure dependencies are not broken. + memdelete(worker_thread_pool); + memdelete(_engine_debugger); memdelete(_marshalls); memdelete(_classdb); diff --git a/main/main.cpp b/main/main.cpp index 9c9542325ef..f1ee4bf2a6b 100644 --- a/main/main.cpp +++ b/main/main.cpp @@ -140,7 +140,6 @@ static Engine *engine = nullptr; static ProjectSettings *globals = nullptr; static Input *input = nullptr; static InputMap *input_map = nullptr; -static WorkerThreadPool *worker_thread_pool = nullptr; static TranslationServer *translation_server = nullptr; static Performance *performance = nullptr; static PackedData *packed_data = nullptr; @@ -691,8 +690,6 @@ Error Main::test_setup() { register_core_settings(); // Here globals are present. - worker_thread_pool = memnew(WorkerThreadPool); - translation_server = memnew(TranslationServer); tsman = memnew(TextServerManager); @@ -803,8 +800,6 @@ void Main::test_cleanup() { ResourceSaver::remove_custom_savers(); PropertyListHelper::clear_base_helpers(); - WorkerThreadPool::get_singleton()->finish(); - #ifdef TOOLS_ENABLED GDExtensionManager::get_singleton()->deinitialize_extensions(GDExtension::INITIALIZATION_LEVEL_EDITOR); uninitialize_modules(MODULE_INITIALIZATION_LEVEL_EDITOR); @@ -846,9 +841,6 @@ void Main::test_cleanup() { if (physics_server_2d_manager) { memdelete(physics_server_2d_manager); } - if (worker_thread_pool) { - memdelete(worker_thread_pool); - } if (globals) { memdelete(globals); } @@ -939,7 +931,6 @@ Error Main::setup(const char *execpath, int argc, char *argv[], bool p_second_ph register_core_settings(); //here globals are present - worker_thread_pool = memnew(WorkerThreadPool); translation_server = memnew(TranslationServer); performance = memnew(Performance); GDREGISTER_CLASS(Performance); @@ -2629,10 +2620,6 @@ error: if (translation_server) { memdelete(translation_server); } - if (worker_thread_pool) { - worker_thread_pool->finish(); - memdelete(worker_thread_pool); - } if (globals) { memdelete(globals); } @@ -4514,7 +4501,7 @@ void Main::cleanup(bool p_force) { ResourceLoader::clear_translation_remaps(); ResourceLoader::clear_path_remaps(); - WorkerThreadPool::get_singleton()->finish(); + WorkerThreadPool::get_singleton()->exit_languages_threads(); ScriptServer::finish_languages(); @@ -4606,9 +4593,6 @@ void Main::cleanup(bool p_force) { if (physics_server_2d_manager) { memdelete(physics_server_2d_manager); } - if (worker_thread_pool) { - memdelete(worker_thread_pool); - } if (globals) { memdelete(globals); }