speed up images with libcurl multi, raise concurrency

also maybe fix some rare crash
This commit is contained in:
ouwou 2021-02-11 18:03:16 -05:00
parent 57f5c67c94
commit bab9abf4af
4 changed files with 204 additions and 46 deletions

View File

@ -2,13 +2,21 @@
#include "filecache.hpp"
#include "MurmurHash3.h"
std::string GetCachedName(std::string str) {
uint32_t out;
MurmurHash3_x86_32(str.c_str(), str.size(), 0, &out);
return std::to_string(out);
}
Cache::Cache() {
m_tmp_path = std::filesystem::temp_directory_path() / "abaddon-cache";
std::filesystem::create_directories(m_tmp_path);
m_worker.set_file_path(m_tmp_path);
}
Cache::~Cache() {
m_canceled = true;
m_worker.stop();
for (auto &future : m_futures)
if (future.valid()) future.get();
@ -22,12 +30,6 @@ void Cache::ClearCache() {
std::filesystem::remove_all(path);
}
std::string Cache::GetCachedName(std::string str) {
uint32_t out;
MurmurHash3_x86_32(str.c_str(), str.size(), 0, &out);
return std::to_string(out);
}
void Cache::RespondFromPath(std::filesystem::path path, callback_type cb) {
cb(path.string());
}
@ -35,28 +37,19 @@ void Cache::RespondFromPath(std::filesystem::path path, callback_type cb) {
void Cache::GetFileFromURL(std::string url, callback_type cb) {
auto cache_path = m_tmp_path / GetCachedName(url);
if (std::filesystem::exists(cache_path)) {
m_mutex.lock();
m_futures.push_back(std::async(std::launch::async, [this, cache_path, cb]() { RespondFromPath(cache_path, cb); }));
m_mutex.unlock();
return;
}
// needs to be initialized like this or else ::Get() is called recursively
if (!m_semaphore)
m_semaphore = std::make_unique<Semaphore>(Abaddon::Get().GetSettings().GetCacheHTTPConcurrency());
if (m_callbacks.find(url) != m_callbacks.end()) {
m_callbacks[url].push_back(cb);
} else {
m_callbacks[url].push_back(cb);
auto future = std::async(std::launch::async, [this, url]() {
if (m_canceled) return;
m_semaphore->wait();
if (m_canceled) return;
http::request req(http::REQUEST_GET, url);
m_semaphore->notify();
if (m_canceled) return;
OnResponse(req.execute());
m_worker.add_image(url, [this, url](const std::string &path) {
OnFetchComplete(url);
});
m_futures.push_back(std::move(future));
}
}
@ -71,6 +64,7 @@ std::string Cache::GetPathIfCached(std::string url) {
// this just seems really yucky
void Cache::CleanupFutures() {
std::lock_guard<std::mutex> l(m_mutex);
for (auto it = m_futures.begin(); it != m_futures.end();) {
if (it->valid() && it->wait_for(std::chrono::seconds(0)) == std::future_status::ready)
it = m_futures.erase(it);
@ -79,20 +73,131 @@ void Cache::CleanupFutures() {
}
}
void Cache::OnResponse(const http::response_type &r) {
void Cache::OnResponse(const std::string &url) {
CleanupFutures(); // see above comment
if (r.error || r.status_code > 300) return;
std::vector<uint8_t> data(r.text.begin(), r.text.end());
auto path = m_tmp_path / GetCachedName(static_cast<std::string>(r.url));
FILE *fp = std::fopen(path.string().c_str(), "wb");
if (fp == nullptr)
return;
std::fwrite(data.data(), 1, data.size(), fp);
std::fclose(fp);
auto path = m_tmp_path / GetCachedName(url);
for (const auto &cb : m_callbacks[static_cast<std::string>(r.url)]) {
m_mutex.lock();
const auto key = static_cast<std::string>(url);
auto callbacks = std::move(m_callbacks[key]);
m_callbacks.erase(key);
m_mutex.unlock();
for (const auto &cb : callbacks)
cb(path.string());
}
m_callbacks.erase(static_cast<std::string>(r.url));
}
void Cache::OnFetchComplete(const std::string &url) {
m_mutex.lock();
m_futures.push_back(std::async(std::launch::async, std::bind(&Cache::OnResponse, this, url)));
m_mutex.unlock();
}
FileCacheWorkerThread::FileCacheWorkerThread() {
m_multi_handle = curl_multi_init();
m_thread = std::thread(std::bind(&FileCacheWorkerThread::loop, this));
}
FileCacheWorkerThread::~FileCacheWorkerThread() {
if (!m_stop) stop();
for (const auto handle : m_handles)
curl_easy_cleanup(handle);
curl_multi_cleanup(m_multi_handle);
}
void FileCacheWorkerThread::set_file_path(const std::filesystem::path &path) {
m_data_path = path;
}
void FileCacheWorkerThread::add_image(const std::string &string, callback_type callback) {
m_queue_mutex.lock();
m_queue.push({ string, callback });
m_cv.notify_one();
m_queue_mutex.unlock();
}
void FileCacheWorkerThread::stop() {
m_stop = true;
if (m_thread.joinable()) {
m_cv.notify_all();
m_thread.join();
}
}
void FileCacheWorkerThread::loop() {
while (!m_stop) {
if (m_handles.size() == 0) {
std::unique_lock<std::mutex> lock(m_queue_mutex);
int s = m_queue.size();
if (s == 0)
m_cv.wait(lock);
}
if (m_handles.size() < Abaddon::Get().GetSettings().GetCacheHTTPConcurrency()) {
std::optional<QueueEntry> entry;
m_queue_mutex.lock();
if (m_queue.size() > 0) {
entry = std::move(m_queue.front());
m_queue.pop();
}
m_queue_mutex.unlock();
if (entry.has_value()) {
if (m_callbacks.find(entry->URL) != m_callbacks.end()) {
printf("url is being requested twice :(\n");
continue;
}
// add the ! and rename after so the image loader thing doesnt pick it up if its not done yet
auto path = m_data_path / (GetCachedName(entry->URL) + "!");
FILE *fp = std::fopen(path.string().c_str(), "wb");
if (fp == nullptr) {
printf("couldn't open fp\n");
continue;
}
CURL *handle = curl_easy_init();
m_handles.insert(handle);
curl_easy_setopt(handle, CURLOPT_URL, entry->URL.c_str());
curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(handle, CURLOPT_WRITEDATA, fp);
m_handle_urls[handle] = entry->URL;
m_curl_file_handles[handle] = fp;
m_callbacks[entry->URL] = entry->Callback;
m_paths[entry->URL] = std::move(path);
curl_multi_add_handle(m_multi_handle, handle);
}
}
//int fds;
//curl_multi_wait(m_multi_handle, nullptr, 0, 10, &fds);
curl_multi_perform(m_multi_handle, &m_running_handles);
int num_msgs;
while (auto msg = curl_multi_info_read(m_multi_handle, &num_msgs)) {
if (msg->msg == CURLMSG_DONE) {
auto url = m_handle_urls.at(msg->easy_handle);
auto fp = m_curl_file_handles.find(msg->easy_handle);
std::fclose(fp->second);
curl_multi_remove_handle(m_multi_handle, msg->easy_handle);
curl_easy_cleanup(msg->easy_handle);
auto path = m_paths.at(url).string();
auto cb = m_callbacks.at(url);
m_callbacks.erase(url);
m_paths.erase(url);
m_handles.erase(msg->easy_handle);
m_handle_urls.erase(msg->easy_handle);
m_curl_file_handles.erase(fp);
// chop off the !
auto old = path;
path.pop_back();
std::filesystem::rename(old, path);
cb(path);
}
}
}
}

View File

@ -1,13 +1,58 @@
#pragma once
#include <curl/curl.h>
#include <functional>
#include <string>
#include <filesystem>
#include <vector>
#include <unordered_set>
#include <unordered_map>
#include <future>
#include <mutex>
#include "util.hpp"
#include "http.hpp"
class FileCacheWorkerThread {
public:
using callback_type = sigc::slot<void(std::string path)>;
FileCacheWorkerThread();
~FileCacheWorkerThread();
void set_file_path(const std::filesystem::path &path);
void add_image(const std::string &string, callback_type callback);
void stop();
private:
void loop();
bool m_stop = false;
std::thread m_thread;
struct QueueEntry {
std::string URL;
callback_type Callback;
};
std::condition_variable m_cv;
mutable std::mutex m_queue_mutex;
std::queue<QueueEntry> m_queue;
std::unordered_map<CURL *, FILE *> m_curl_file_handles;
std::unordered_map<CURL *, std::string> m_handle_urls;
std::unordered_map<std::string, std::filesystem::path> m_paths;
std::unordered_map<std::string, callback_type> m_callbacks;
int m_running_handles = 0;
std::unordered_set<CURL *> m_handles;
CURLM *m_multi_handle;
std::filesystem::path m_data_path;
};
class Cache {
public:
Cache();
@ -19,16 +64,16 @@ public:
void ClearCache();
private:
std::string GetCachedName(std::string str);
void CleanupFutures();
void RespondFromPath(std::filesystem::path path, callback_type cb);
void OnResponse(const http::response_type &r);
std::unique_ptr<Semaphore> m_semaphore;
void OnResponse(const std::string &url);
void OnFetchComplete(const std::string &url);
std::unordered_map<std::string, std::vector<callback_type>> m_callbacks;
std::vector<std::future<void>> m_futures;
std::filesystem::path m_tmp_path;
bool m_canceled = false;
mutable std::mutex m_mutex;
FileCacheWorkerThread m_worker;
};

View File

@ -45,10 +45,14 @@ void ImageManager::LoadFromURL(std::string url, callback_type cb) {
m_cache.GetFileFromURL(url, [this, url, signal](std::string path) {
try {
auto buf = ReadFileToPixbuf(path);
m_cb_mutex.lock();
m_cb_queue.push([signal, buf]() { signal.emit(buf); });
m_cb_dispatcher.emit();
m_cb_mutex.unlock();
if (!buf)
printf("%s (%s) is null\n", url.c_str(), path.c_str());
else {
m_cb_mutex.lock();
m_cb_queue.push([signal, buf]() { signal.emit(buf); });
m_cb_dispatcher.emit();
m_cb_mutex.unlock();
}
} catch (const std::exception &e) {
fprintf(stderr, "err loading pixbuf from %s: %s\n", path.c_str(), e.what());
}
@ -61,10 +65,14 @@ void ImageManager::LoadAnimationFromURL(std::string url, int w, int h, callback_
m_cache.GetFileFromURL(url, [this, url, signal, w, h](std::string path) {
try {
auto buf = ReadFileToPixbufAnimation(path, w, h);
m_cb_mutex.lock();
m_cb_queue.push([signal, buf]() { signal.emit(buf); });
m_cb_dispatcher.emit();
m_cb_mutex.unlock();
if (!buf)
printf("%s (%s) is null\n", url.c_str(), path.c_str());
else {
m_cb_mutex.lock();
m_cb_queue.push([signal, buf]() { signal.emit(buf); });
m_cb_dispatcher.emit();
m_cb_mutex.unlock();
}
} catch (const std::exception &e) {
fprintf(stderr, "err loading pixbuf animation from %s: %s\n", path.c_str(), e.what());
}

View File

@ -63,7 +63,7 @@ std::string SettingsManager::GetLinkColor() const {
}
int SettingsManager::GetCacheHTTPConcurrency() const {
return GetSettingInt("http", "concurrent", 10);
return GetSettingInt("http", "concurrent", 20);
}
bool SettingsManager::GetPrefetch() const {