Merge pull request #96593 from RandomShaper/res_changed_multiverse
Some checks are pending
🔗 GHA / 📊 Static checks (push) Waiting to run
🔗 GHA / 🤖 Android (push) Blocked by required conditions
🔗 GHA / 🍏 iOS (push) Blocked by required conditions
🔗 GHA / 🐧 Linux (push) Blocked by required conditions
🔗 GHA / 🍎 macOS (push) Blocked by required conditions
🔗 GHA / 🏁 Windows (push) Blocked by required conditions
🔗 GHA / 🌐 Web (push) Blocked by required conditions
🔗 GHA / 🪲 Godot CPP (push) Blocked by required conditions

ResourceLoader: Add thread-aware resource changed mechanism
This commit is contained in:
Rémi Verschelde 2024-09-06 11:11:13 +02:00
commit 05d985496c
No known key found for this signature in database
GPG Key ID: C3336907360768E1
3 changed files with 139 additions and 49 deletions

View File

@ -40,12 +40,12 @@
#include <stdio.h> #include <stdio.h>
void Resource::emit_changed() { void Resource::emit_changed() {
if (ResourceLoader::is_within_load() && MessageQueue::get_main_singleton() != MessageQueue::get_singleton() && !MessageQueue::get_singleton()->is_flushing()) { if (ResourceLoader::is_within_load() && !Thread::is_main_thread()) {
// Let the connection happen on the call queue, later, since signals are not thread-safe. ResourceLoader::resource_changed_emit(this);
call_deferred("emit_signal", CoreStringName(changed)); return;
} else {
emit_signal(CoreStringName(changed));
} }
emit_signal(CoreStringName(changed));
} }
void Resource::_resource_path_changed() { void Resource::_resource_path_changed() {
@ -166,22 +166,22 @@ bool Resource::editor_can_reload_from_file() {
} }
void Resource::connect_changed(const Callable &p_callable, uint32_t p_flags) { void Resource::connect_changed(const Callable &p_callable, uint32_t p_flags) {
if (ResourceLoader::is_within_load() && MessageQueue::get_main_singleton() != MessageQueue::get_singleton() && !MessageQueue::get_singleton()->is_flushing()) { if (ResourceLoader::is_within_load() && !Thread::is_main_thread()) {
// Let the check and connection happen on the call queue, later, since signals are not thread-safe. ResourceLoader::resource_changed_connect(this, p_callable, p_flags);
callable_mp(this, &Resource::connect_changed).call_deferred(p_callable, p_flags);
return; return;
} }
if (!is_connected(CoreStringName(changed), p_callable) || p_flags & CONNECT_REFERENCE_COUNTED) { if (!is_connected(CoreStringName(changed), p_callable) || p_flags & CONNECT_REFERENCE_COUNTED) {
connect(CoreStringName(changed), p_callable, p_flags); connect(CoreStringName(changed), p_callable, p_flags);
} }
} }
void Resource::disconnect_changed(const Callable &p_callable) { void Resource::disconnect_changed(const Callable &p_callable) {
if (ResourceLoader::is_within_load() && MessageQueue::get_main_singleton() != MessageQueue::get_singleton() && !MessageQueue::get_singleton()->is_flushing()) { if (ResourceLoader::is_within_load() && !Thread::is_main_thread()) {
// Let the check and disconnection happen on the call queue, later, since signals are not thread-safe. ResourceLoader::resource_changed_disconnect(this, p_callable);
callable_mp(this, &Resource::disconnect_changed).call_deferred(p_callable);
return; return;
} }
if (is_connected(CoreStringName(changed), p_callable)) { if (is_connected(CoreStringName(changed), p_callable)) {
disconnect(CoreStringName(changed), p_callable); disconnect(CoreStringName(changed), p_callable);
} }

View File

@ -31,6 +31,7 @@
#include "resource_loader.h" #include "resource_loader.h"
#include "core/config/project_settings.h" #include "core/config/project_settings.h"
#include "core/core_bind.h"
#include "core/io/file_access.h" #include "core/io/file_access.h"
#include "core/io/resource_importer.h" #include "core/io/resource_importer.h"
#include "core/object/script_language.h" #include "core/object/script_language.h"
@ -234,17 +235,22 @@ void ResourceLoader::LoadToken::clear() {
// User-facing tokens shouldn't be deleted until completely claimed. // User-facing tokens shouldn't be deleted until completely claimed.
DEV_ASSERT(user_rc == 0 && user_path.is_empty()); DEV_ASSERT(user_rc == 0 && user_path.is_empty());
if (!local_path.is_empty()) { // Empty is used for the special case where the load task is not registered. if (!local_path.is_empty()) {
DEV_ASSERT(thread_load_tasks.has(local_path)); if (task_if_unregistered) {
ThreadLoadTask &load_task = thread_load_tasks[local_path]; memdelete(task_if_unregistered);
if (load_task.task_id && !load_task.awaited) { task_if_unregistered = nullptr;
task_to_await = load_task.task_id; } else {
DEV_ASSERT(thread_load_tasks.has(local_path));
ThreadLoadTask &load_task = thread_load_tasks[local_path];
if (load_task.task_id && !load_task.awaited) {
task_to_await = load_task.task_id;
}
// Removing a task which is still in progress would be catastrophic.
// Tokens must be alive until the task thread function is done.
DEV_ASSERT(load_task.status == THREAD_LOAD_FAILED || load_task.status == THREAD_LOAD_LOADED);
thread_load_tasks.erase(local_path);
} }
// Removing a task which is still in progress would be catastrophic. local_path.clear(); // Mark as already cleared.
// Tokens must be alive until the task thread function is done.
DEV_ASSERT(load_task.status == THREAD_LOAD_FAILED || load_task.status == THREAD_LOAD_LOADED);
thread_load_tasks.erase(local_path);
local_path.clear();
} }
} }
@ -324,6 +330,9 @@ void ResourceLoader::_run_load_task(void *p_userdata) {
} }
} }
ThreadLoadTask *curr_load_task_backup = curr_load_task;
curr_load_task = &load_task;
// Thread-safe either if it's the current thread or a brand new one. // Thread-safe either if it's the current thread or a brand new one.
CallQueue *own_mq_override = nullptr; CallQueue *own_mq_override = nullptr;
if (load_nesting == 0) { if (load_nesting == 0) {
@ -451,6 +460,8 @@ void ResourceLoader::_run_load_task(void *p_userdata) {
} }
DEV_ASSERT(load_paths_stack.is_empty()); DEV_ASSERT(load_paths_stack.is_empty());
} }
curr_load_task = curr_load_task_backup;
} }
static String _validate_local_path(const String &p_path) { static String _validate_local_path(const String &p_path) {
@ -521,9 +532,7 @@ Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path,
Ref<LoadToken> load_token; Ref<LoadToken> load_token;
bool must_not_register = false; bool must_not_register = false;
ThreadLoadTask unregistered_load_task; // Once set, must be valid up to the call to do the load.
ThreadLoadTask *load_task_ptr = nullptr; ThreadLoadTask *load_task_ptr = nullptr;
bool run_on_current_thread = false;
{ {
MutexLock thread_load_lock(thread_load_mutex); MutexLock thread_load_lock(thread_load_mutex);
@ -578,12 +587,11 @@ Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path,
} }
} }
// If we want to ignore cache, but there's another task loading it, we can't add this one to the map and we also have to finish within scope. // If we want to ignore cache, but there's another task loading it, we can't add this one to the map.
must_not_register = ignoring_cache && thread_load_tasks.has(local_path); must_not_register = ignoring_cache && thread_load_tasks.has(local_path);
if (must_not_register) { if (must_not_register) {
load_token->local_path.clear(); load_token->task_if_unregistered = memnew(ThreadLoadTask(load_task));
unregistered_load_task = load_task; load_task_ptr = load_token->task_if_unregistered;
load_task_ptr = &unregistered_load_task;
} else { } else {
DEV_ASSERT(!thread_load_tasks.has(local_path)); DEV_ASSERT(!thread_load_tasks.has(local_path));
HashMap<String, ResourceLoader::ThreadLoadTask>::Iterator E = thread_load_tasks.insert(local_path, load_task); HashMap<String, ResourceLoader::ThreadLoadTask>::Iterator E = thread_load_tasks.insert(local_path, load_task);
@ -591,9 +599,7 @@ Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path,
} }
} }
run_on_current_thread = must_not_register || p_thread_mode == LOAD_THREAD_FROM_CURRENT; if (p_thread_mode == LOAD_THREAD_FROM_CURRENT) {
if (run_on_current_thread) {
// The current thread may happen to be a thread from the pool. // The current thread may happen to be a thread from the pool.
WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->get_caller_task_id(); WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->get_caller_task_id();
if (tid != WorkerThreadPool::INVALID_TASK_ID) { if (tid != WorkerThreadPool::INVALID_TASK_ID) {
@ -606,11 +612,8 @@ Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path,
} }
} // MutexLock(thread_load_mutex). } // MutexLock(thread_load_mutex).
if (run_on_current_thread) { if (p_thread_mode == LOAD_THREAD_FROM_CURRENT) {
_run_load_task(load_task_ptr); _run_load_task(load_task_ptr);
if (must_not_register) {
load_token->res_if_unregistered = load_task_ptr->resource;
}
} }
return load_token; return load_token;
@ -738,7 +741,10 @@ Ref<Resource> ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro
*r_error = OK; *r_error = OK;
} }
if (!p_load_token.local_path.is_empty()) { ThreadLoadTask *load_task_ptr = nullptr;
if (p_load_token.task_if_unregistered) {
load_task_ptr = p_load_token.task_if_unregistered;
} else {
if (!thread_load_tasks.has(p_load_token.local_path)) { if (!thread_load_tasks.has(p_load_token.local_path)) {
if (r_error) { if (r_error) {
*r_error = ERR_BUG; *r_error = ERR_BUG;
@ -809,22 +815,47 @@ Ref<Resource> ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro
load_task.error = FAILED; load_task.error = FAILED;
} }
Ref<Resource> resource = load_task.resource; load_task_ptr = &load_task;
if (r_error) { }
*r_error = load_task.error;
} Ref<Resource> resource = load_task_ptr->resource;
return resource; if (r_error) {
} else { *r_error = load_task_ptr->error;
// Special case of an unregistered task. }
// The resource should have been loaded by now.
Ref<Resource> resource = p_load_token.res_if_unregistered; if (resource.is_valid()) {
if (!resource.is_valid()) { if (curr_load_task) {
if (r_error) { // A task awaiting another => Let the awaiter accumulate the resource changed connections.
*r_error = FAILED; DEV_ASSERT(curr_load_task != load_task_ptr);
for (const ThreadLoadTask::ResourceChangedConnection &rcc : load_task_ptr->resource_changed_connections) {
curr_load_task->resource_changed_connections.push_back(rcc);
}
} else {
// A leaf task being awaited => Propagate the resource changed connections.
if (Thread::is_main_thread()) {
// On the main thread it's safe to migrate the connections to the standard signal mechanism.
for (const ThreadLoadTask::ResourceChangedConnection &rcc : load_task_ptr->resource_changed_connections) {
if (rcc.callable.is_valid()) {
rcc.source->connect_changed(rcc.callable, rcc.flags);
}
}
} else {
// On non-main threads, we have to queue and call it done when processed.
if (!load_task_ptr->resource_changed_connections.is_empty()) {
for (const ThreadLoadTask::ResourceChangedConnection &rcc : load_task_ptr->resource_changed_connections) {
if (rcc.callable.is_valid()) {
MessageQueue::get_main_singleton()->push_callable(callable_mp(rcc.source, &Resource::connect_changed).bind(rcc.callable, rcc.flags));
}
}
core_bind::Semaphore done;
MessageQueue::get_main_singleton()->push_callable(callable_mp(&done, &core_bind::Semaphore::post));
done.wait();
}
} }
} }
return resource;
} }
return resource;
} }
bool ResourceLoader::_ensure_load_progress() { bool ResourceLoader::_ensure_load_progress() {
@ -838,6 +869,50 @@ bool ResourceLoader::_ensure_load_progress() {
return true; return true;
} }
void ResourceLoader::resource_changed_connect(Resource *p_source, const Callable &p_callable, uint32_t p_flags) {
print_lt(vformat("%d\t%ud:%s\t" FUNCTION_STR "\t%d", Thread::get_caller_id(), p_source->get_instance_id(), p_source->get_class(), p_callable.get_object_id()));
MutexLock lock(thread_load_mutex);
for (const ThreadLoadTask::ResourceChangedConnection &rcc : curr_load_task->resource_changed_connections) {
if (unlikely(rcc.source == p_source && rcc.callable == p_callable)) {
return;
}
}
ThreadLoadTask::ResourceChangedConnection rcc;
rcc.source = p_source;
rcc.callable = p_callable;
rcc.flags = p_flags;
curr_load_task->resource_changed_connections.push_back(rcc);
}
void ResourceLoader::resource_changed_disconnect(Resource *p_source, const Callable &p_callable) {
print_lt(vformat("%d\t%ud:%s\t" FUNCTION_STR "t%d", Thread::get_caller_id(), p_source->get_instance_id(), p_source->get_class(), p_callable.get_object_id()));
MutexLock lock(thread_load_mutex);
for (uint32_t i = 0; i < curr_load_task->resource_changed_connections.size(); ++i) {
const ThreadLoadTask::ResourceChangedConnection &rcc = curr_load_task->resource_changed_connections[i];
if (unlikely(rcc.source == p_source && rcc.callable == p_callable)) {
curr_load_task->resource_changed_connections.remove_at_unordered(i);
return;
}
}
}
void ResourceLoader::resource_changed_emit(Resource *p_source) {
print_lt(vformat("%d\t%ud:%s\t" FUNCTION_STR, Thread::get_caller_id(), p_source->get_instance_id(), p_source->get_class()));
MutexLock lock(thread_load_mutex);
for (const ThreadLoadTask::ResourceChangedConnection &rcc : curr_load_task->resource_changed_connections) {
if (unlikely(rcc.source == p_source)) {
rcc.callable.call();
}
}
}
Ref<Resource> ResourceLoader::ensure_resource_ref_override_for_outer_load(const String &p_path, const String &p_res_type) { Ref<Resource> ResourceLoader::ensure_resource_ref_override_for_outer_load(const String &p_path, const String &p_res_type) {
ERR_FAIL_COND_V(load_nesting == 0, Ref<Resource>()); // It makes no sense to use this from nesting level 0. ERR_FAIL_COND_V(load_nesting == 0, Ref<Resource>()); // It makes no sense to use this from nesting level 0.
const String &local_path = _validate_local_path(p_path); const String &local_path = _validate_local_path(p_path);
@ -1368,6 +1443,7 @@ bool ResourceLoader::timestamp_on_load = false;
thread_local int ResourceLoader::load_nesting = 0; thread_local int ResourceLoader::load_nesting = 0;
thread_local Vector<String> ResourceLoader::load_paths_stack; thread_local Vector<String> ResourceLoader::load_paths_stack;
thread_local HashMap<int, HashMap<String, Ref<Resource>>> ResourceLoader::res_ref_overrides; thread_local HashMap<int, HashMap<String, Ref<Resource>>> ResourceLoader::res_ref_overrides;
thread_local ResourceLoader::ThreadLoadTask *ResourceLoader::curr_load_task = nullptr;
SafeBinaryMutex<ResourceLoader::BINARY_MUTEX_TAG> &_get_res_loader_mutex() { SafeBinaryMutex<ResourceLoader::BINARY_MUTEX_TAG> &_get_res_loader_mutex() {
return ResourceLoader::thread_load_mutex; return ResourceLoader::thread_load_mutex;

View File

@ -106,6 +106,8 @@ class ResourceLoader {
MAX_LOADERS = 64 MAX_LOADERS = 64
}; };
struct ThreadLoadTask;
public: public:
enum ThreadLoadStatus { enum ThreadLoadStatus {
THREAD_LOAD_INVALID_RESOURCE, THREAD_LOAD_INVALID_RESOURCE,
@ -124,7 +126,7 @@ public:
String local_path; String local_path;
String user_path; String user_path;
uint32_t user_rc = 0; // Having user RC implies regular RC incremented in one, until the user RC reaches zero. uint32_t user_rc = 0; // Having user RC implies regular RC incremented in one, until the user RC reaches zero.
Ref<Resource> res_if_unregistered; ThreadLoadTask *task_if_unregistered = nullptr;
void clear(); void clear();
@ -187,6 +189,13 @@ private:
Ref<Resource> resource; Ref<Resource> resource;
bool use_sub_threads = false; bool use_sub_threads = false;
HashSet<String> sub_tasks; HashSet<String> sub_tasks;
struct ResourceChangedConnection {
Resource *source = nullptr;
Callable callable;
uint32_t flags = 0;
};
LocalVector<ResourceChangedConnection> resource_changed_connections;
}; };
static void _run_load_task(void *p_userdata); static void _run_load_task(void *p_userdata);
@ -194,6 +203,7 @@ private:
static thread_local int load_nesting; static thread_local int load_nesting;
static thread_local HashMap<int, HashMap<String, Ref<Resource>>> res_ref_overrides; // Outermost key is nesting level. static thread_local HashMap<int, HashMap<String, Ref<Resource>>> res_ref_overrides; // Outermost key is nesting level.
static thread_local Vector<String> load_paths_stack; static thread_local Vector<String> load_paths_stack;
static thread_local ThreadLoadTask *curr_load_task;
static SafeBinaryMutex<BINARY_MUTEX_TAG> thread_load_mutex; static SafeBinaryMutex<BINARY_MUTEX_TAG> thread_load_mutex;
friend SafeBinaryMutex<BINARY_MUTEX_TAG> &_get_res_loader_mutex(); friend SafeBinaryMutex<BINARY_MUTEX_TAG> &_get_res_loader_mutex();
@ -214,6 +224,10 @@ public:
static bool is_within_load() { return load_nesting > 0; }; static bool is_within_load() { return load_nesting > 0; };
static void resource_changed_connect(Resource *p_source, const Callable &p_callable, uint32_t p_flags);
static void resource_changed_disconnect(Resource *p_source, const Callable &p_callable);
static void resource_changed_emit(Resource *p_source);
static Ref<Resource> load(const String &p_path, const String &p_type_hint = "", ResourceFormatLoader::CacheMode p_cache_mode = ResourceFormatLoader::CACHE_MODE_REUSE, Error *r_error = nullptr); static Ref<Resource> load(const String &p_path, const String &p_type_hint = "", ResourceFormatLoader::CacheMode p_cache_mode = ResourceFormatLoader::CACHE_MODE_REUSE, Error *r_error = nullptr);
static bool exists(const String &p_path, const String &p_type_hint = ""); static bool exists(const String &p_path, const String &p_type_hint = "");