diff --git a/filecache.cpp b/filecache.cpp index 1f4024f..05da20e 100644 --- a/filecache.cpp +++ b/filecache.cpp @@ -2,13 +2,21 @@ #include "filecache.hpp" #include "MurmurHash3.h" +std::string GetCachedName(std::string str) { + uint32_t out; + MurmurHash3_x86_32(str.c_str(), str.size(), 0, &out); + return std::to_string(out); +} + Cache::Cache() { m_tmp_path = std::filesystem::temp_directory_path() / "abaddon-cache"; std::filesystem::create_directories(m_tmp_path); + m_worker.set_file_path(m_tmp_path); } Cache::~Cache() { - m_canceled = true; + m_worker.stop(); + for (auto &future : m_futures) if (future.valid()) future.get(); @@ -22,12 +30,6 @@ void Cache::ClearCache() { std::filesystem::remove_all(path); } -std::string Cache::GetCachedName(std::string str) { - uint32_t out; - MurmurHash3_x86_32(str.c_str(), str.size(), 0, &out); - return std::to_string(out); -} - void Cache::RespondFromPath(std::filesystem::path path, callback_type cb) { cb(path.string()); } @@ -35,28 +37,19 @@ void Cache::RespondFromPath(std::filesystem::path path, callback_type cb) { void Cache::GetFileFromURL(std::string url, callback_type cb) { auto cache_path = m_tmp_path / GetCachedName(url); if (std::filesystem::exists(cache_path)) { + m_mutex.lock(); m_futures.push_back(std::async(std::launch::async, [this, cache_path, cb]() { RespondFromPath(cache_path, cb); })); + m_mutex.unlock(); return; } - // needs to be initialized like this or else ::Get() is called recursively - if (!m_semaphore) - m_semaphore = std::make_unique(Abaddon::Get().GetSettings().GetCacheHTTPConcurrency()); - if (m_callbacks.find(url) != m_callbacks.end()) { m_callbacks[url].push_back(cb); } else { m_callbacks[url].push_back(cb); - auto future = std::async(std::launch::async, [this, url]() { - if (m_canceled) return; - m_semaphore->wait(); - if (m_canceled) return; - http::request req(http::REQUEST_GET, url); - m_semaphore->notify(); - if (m_canceled) return; - OnResponse(req.execute()); + m_worker.add_image(url, [this, url](const std::string &path) { + OnFetchComplete(url); }); - m_futures.push_back(std::move(future)); } } @@ -71,6 +64,7 @@ std::string Cache::GetPathIfCached(std::string url) { // this just seems really yucky void Cache::CleanupFutures() { + std::lock_guard l(m_mutex); for (auto it = m_futures.begin(); it != m_futures.end();) { if (it->valid() && it->wait_for(std::chrono::seconds(0)) == std::future_status::ready) it = m_futures.erase(it); @@ -79,20 +73,131 @@ void Cache::CleanupFutures() { } } -void Cache::OnResponse(const http::response_type &r) { +void Cache::OnResponse(const std::string &url) { CleanupFutures(); // see above comment - if (r.error || r.status_code > 300) return; - std::vector data(r.text.begin(), r.text.end()); - auto path = m_tmp_path / GetCachedName(static_cast(r.url)); - FILE *fp = std::fopen(path.string().c_str(), "wb"); - if (fp == nullptr) - return; - std::fwrite(data.data(), 1, data.size(), fp); - std::fclose(fp); + auto path = m_tmp_path / GetCachedName(url); - for (const auto &cb : m_callbacks[static_cast(r.url)]) { + m_mutex.lock(); + const auto key = static_cast(url); + auto callbacks = std::move(m_callbacks[key]); + m_callbacks.erase(key); + m_mutex.unlock(); + for (const auto &cb : callbacks) cb(path.string()); - } - m_callbacks.erase(static_cast(r.url)); +} + +void Cache::OnFetchComplete(const std::string &url) { + m_mutex.lock(); + m_futures.push_back(std::async(std::launch::async, std::bind(&Cache::OnResponse, this, url))); + m_mutex.unlock(); +} + +FileCacheWorkerThread::FileCacheWorkerThread() { + m_multi_handle = curl_multi_init(); + m_thread = std::thread(std::bind(&FileCacheWorkerThread::loop, this)); +} + +FileCacheWorkerThread::~FileCacheWorkerThread() { + if (!m_stop) stop(); + for (const auto handle : m_handles) + curl_easy_cleanup(handle); + curl_multi_cleanup(m_multi_handle); +} + +void FileCacheWorkerThread::set_file_path(const std::filesystem::path &path) { + m_data_path = path; +} + +void FileCacheWorkerThread::add_image(const std::string &string, callback_type callback) { + m_queue_mutex.lock(); + m_queue.push({ string, callback }); + m_cv.notify_one(); + m_queue_mutex.unlock(); +} + +void FileCacheWorkerThread::stop() { + m_stop = true; + if (m_thread.joinable()) { + m_cv.notify_all(); + m_thread.join(); + } +} + +void FileCacheWorkerThread::loop() { + while (!m_stop) { + if (m_handles.size() == 0) { + std::unique_lock lock(m_queue_mutex); + int s = m_queue.size(); + if (s == 0) + m_cv.wait(lock); + } + + if (m_handles.size() < Abaddon::Get().GetSettings().GetCacheHTTPConcurrency()) { + std::optional entry; + m_queue_mutex.lock(); + if (m_queue.size() > 0) { + entry = std::move(m_queue.front()); + m_queue.pop(); + } + m_queue_mutex.unlock(); + + if (entry.has_value()) { + if (m_callbacks.find(entry->URL) != m_callbacks.end()) { + printf("url is being requested twice :(\n"); + continue; + } + + // add the ! and rename after so the image loader thing doesnt pick it up if its not done yet + auto path = m_data_path / (GetCachedName(entry->URL) + "!"); + FILE *fp = std::fopen(path.string().c_str(), "wb"); + if (fp == nullptr) { + printf("couldn't open fp\n"); + continue; + } + + CURL *handle = curl_easy_init(); + m_handles.insert(handle); + curl_easy_setopt(handle, CURLOPT_URL, entry->URL.c_str()); + curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1L); + curl_easy_setopt(handle, CURLOPT_WRITEDATA, fp); + + m_handle_urls[handle] = entry->URL; + m_curl_file_handles[handle] = fp; + m_callbacks[entry->URL] = entry->Callback; + m_paths[entry->URL] = std::move(path); + + curl_multi_add_handle(m_multi_handle, handle); + } + } + + //int fds; + //curl_multi_wait(m_multi_handle, nullptr, 0, 10, &fds); + curl_multi_perform(m_multi_handle, &m_running_handles); + + int num_msgs; + while (auto msg = curl_multi_info_read(m_multi_handle, &num_msgs)) { + if (msg->msg == CURLMSG_DONE) { + auto url = m_handle_urls.at(msg->easy_handle); + auto fp = m_curl_file_handles.find(msg->easy_handle); + std::fclose(fp->second); + + curl_multi_remove_handle(m_multi_handle, msg->easy_handle); + curl_easy_cleanup(msg->easy_handle); + + auto path = m_paths.at(url).string(); + auto cb = m_callbacks.at(url); + m_callbacks.erase(url); + m_paths.erase(url); + m_handles.erase(msg->easy_handle); + m_handle_urls.erase(msg->easy_handle); + m_curl_file_handles.erase(fp); + // chop off the ! + auto old = path; + path.pop_back(); + std::filesystem::rename(old, path); + cb(path); + } + } + } } diff --git a/filecache.hpp b/filecache.hpp index 0eacbbd..d25fdb3 100644 --- a/filecache.hpp +++ b/filecache.hpp @@ -1,13 +1,58 @@ #pragma once +#include #include #include #include #include +#include #include #include +#include #include "util.hpp" #include "http.hpp" +class FileCacheWorkerThread { +public: + using callback_type = sigc::slot; + + FileCacheWorkerThread(); + ~FileCacheWorkerThread(); + + void set_file_path(const std::filesystem::path &path); + + void add_image(const std::string &string, callback_type callback); + + void stop(); + +private: + void loop(); + + bool m_stop = false; + std::thread m_thread; + + struct QueueEntry { + std::string URL; + callback_type Callback; + }; + + std::condition_variable m_cv; + + mutable std::mutex m_queue_mutex; + std::queue m_queue; + + std::unordered_map m_curl_file_handles; + std::unordered_map m_handle_urls; + std::unordered_map m_paths; + std::unordered_map m_callbacks; + + int m_running_handles = 0; + + std::unordered_set m_handles; + CURLM *m_multi_handle; + + std::filesystem::path m_data_path; +}; + class Cache { public: Cache(); @@ -19,16 +64,16 @@ public: void ClearCache(); private: - std::string GetCachedName(std::string str); void CleanupFutures(); void RespondFromPath(std::filesystem::path path, callback_type cb); - void OnResponse(const http::response_type &r); - - std::unique_ptr m_semaphore; + void OnResponse(const std::string &url); + void OnFetchComplete(const std::string &url); std::unordered_map> m_callbacks; std::vector> m_futures; std::filesystem::path m_tmp_path; - bool m_canceled = false; + mutable std::mutex m_mutex; + + FileCacheWorkerThread m_worker; }; diff --git a/imgmanager.cpp b/imgmanager.cpp index 8d23144..261b8d6 100644 --- a/imgmanager.cpp +++ b/imgmanager.cpp @@ -45,10 +45,14 @@ void ImageManager::LoadFromURL(std::string url, callback_type cb) { m_cache.GetFileFromURL(url, [this, url, signal](std::string path) { try { auto buf = ReadFileToPixbuf(path); - m_cb_mutex.lock(); - m_cb_queue.push([signal, buf]() { signal.emit(buf); }); - m_cb_dispatcher.emit(); - m_cb_mutex.unlock(); + if (!buf) + printf("%s (%s) is null\n", url.c_str(), path.c_str()); + else { + m_cb_mutex.lock(); + m_cb_queue.push([signal, buf]() { signal.emit(buf); }); + m_cb_dispatcher.emit(); + m_cb_mutex.unlock(); + } } catch (const std::exception &e) { fprintf(stderr, "err loading pixbuf from %s: %s\n", path.c_str(), e.what()); } @@ -61,10 +65,14 @@ void ImageManager::LoadAnimationFromURL(std::string url, int w, int h, callback_ m_cache.GetFileFromURL(url, [this, url, signal, w, h](std::string path) { try { auto buf = ReadFileToPixbufAnimation(path, w, h); - m_cb_mutex.lock(); - m_cb_queue.push([signal, buf]() { signal.emit(buf); }); - m_cb_dispatcher.emit(); - m_cb_mutex.unlock(); + if (!buf) + printf("%s (%s) is null\n", url.c_str(), path.c_str()); + else { + m_cb_mutex.lock(); + m_cb_queue.push([signal, buf]() { signal.emit(buf); }); + m_cb_dispatcher.emit(); + m_cb_mutex.unlock(); + } } catch (const std::exception &e) { fprintf(stderr, "err loading pixbuf animation from %s: %s\n", path.c_str(), e.what()); } diff --git a/settings.cpp b/settings.cpp index 7790e36..bba97f3 100644 --- a/settings.cpp +++ b/settings.cpp @@ -63,7 +63,7 @@ std::string SettingsManager::GetLinkColor() const { } int SettingsManager::GetCacheHTTPConcurrency() const { - return GetSettingInt("http", "concurrent", 10); + return GetSettingInt("http", "concurrent", 20); } bool SettingsManager::GetPrefetch() const {