fix heartbeat breaking after resume, make websocket use sigc

This commit is contained in:
ouwou 2020-11-16 03:08:56 -05:00
parent fae5ef4ba4
commit a0ece884d0
4 changed files with 54 additions and 14 deletions

View File

@ -7,6 +7,10 @@ DiscordClient::DiscordClient()
, m_decompress_buf(InflateChunkSize) {
m_msg_dispatch.connect(sigc::mem_fun(*this, &DiscordClient::MessageDispatch));
m_websocket.signal_message().connect(sigc::mem_fun(*this, &DiscordClient::HandleGatewayMessageRaw));
m_websocket.signal_open().connect(sigc::mem_fun(*this, &DiscordClient::HandleSocketOpen));
m_websocket.signal_close().connect(sigc::mem_fun(*this, &DiscordClient::HandleSocketClose));
LoadEventMap();
}
@ -18,7 +22,6 @@ void DiscordClient::Start() {
m_heartbeat_acked = true;
m_client_connected = true;
m_websocket.StartConnection(DiscordGateway);
m_websocket.SetMessageCallback(std::bind(&DiscordClient::HandleGatewayMessageRaw, this, std::placeholders::_1));
}
void DiscordClient::Stop() {
@ -99,8 +102,6 @@ std::set<Snowflake> DiscordClient::GetMessagesForChannel(Snowflake id) const {
}
void DiscordClient::FetchInviteData(std::string code, std::function<void(Invite)> cb, std::function<void(bool)> err) {
//printf("test: %s\n", code.c_str());
//err(true);
m_http.MakeGET("/invites/" + code + "?with_counts=true", [this, cb, err](cpr::Response r) {
if (!CheckCode(r)) {
err(r.status_code == 404);
@ -554,6 +555,7 @@ void DiscordClient::HandleGatewayMessage(std::string str) {
void DiscordClient::HandleGatewayHello(const GatewayMessage &msg) {
HelloMessageData d = msg.Data;
m_heartbeat_msec = d.HeartbeatInterval;
m_heartbeat_waiter.revive();
m_heartbeat_thread = std::thread(std::bind(&DiscordClient::HeartbeatThread, this));
m_signal_connected.emit(); // socket is connected before this but emitting here should b fine
if (m_wants_resume) {
@ -688,7 +690,7 @@ void DiscordClient::HandleGatewayReconnect(const GatewayMessage &msg) {
m_heartbeat_waiter.kill();
if (m_heartbeat_thread.joinable()) m_heartbeat_thread.join();
m_websocket.Stop(1002); // 1000 (kNormalClosureCode) and 1001 will invalidate the session id
m_websocket.Stop(1012); // 1000 (kNormalClosureCode) and 1001 will invalidate the session id
std::memset(&m_zstream, 0, sizeof(m_zstream));
inflateInit2(&m_zstream, MAX_WBITS + 32);
@ -696,7 +698,6 @@ void DiscordClient::HandleGatewayReconnect(const GatewayMessage &msg) {
m_heartbeat_acked = true;
m_wants_resume = true;
m_websocket.StartConnection(DiscordGateway);
m_websocket.SetMessageCallback(std::bind(&DiscordClient::HandleGatewayMessageRaw, this, std::placeholders::_1));
}
void DiscordClient::HandleGatewayMessageUpdate(const GatewayMessage &msg) {
@ -816,6 +817,12 @@ void DiscordClient::SendResume() {
m_websocket.Send(msg);
}
void DiscordClient::HandleSocketOpen() {
}
void DiscordClient::HandleSocketClose(uint16_t code) {
}
bool DiscordClient::CheckCode(const cpr::Response &r) {
if (r.status_code >= 300 || r.error) {
fprintf(stderr, "api request to %s failed with status code %d\n", r.url.c_str(), r.status_code);

View File

@ -34,6 +34,11 @@ public:
cv.notify_all();
}
void revive() {
std::unique_lock<std::mutex> lock(m);
terminate = false;
}
private:
mutable std::condition_variable cv;
mutable std::mutex m;
@ -138,6 +143,9 @@ private:
void SendIdentify();
void SendResume();
void HandleSocketOpen();
void HandleSocketClose(uint16_t code);
bool CheckCode(const cpr::Response &r);
std::string m_token;

View File

@ -23,10 +23,6 @@ bool Websocket::IsOpen() const {
return state == ix::ReadyState::Open;
}
void Websocket::SetMessageCallback(MessageCallback_t func) {
m_callback = func;
}
void Websocket::Send(const std::string &str) {
printf("sending %s\n", str.c_str());
m_websocket.sendText(str);
@ -38,9 +34,26 @@ void Websocket::Send(const nlohmann::json &j) {
void Websocket::OnMessage(const ix::WebSocketMessagePtr &msg) {
switch (msg->type) {
case ix::WebSocketMessageType::Open: {
m_signal_open.emit();
} break;
case ix::WebSocketMessageType::Close: {
m_signal_close.emit(msg->closeInfo.code);
} break;
case ix::WebSocketMessageType::Message: {
if (m_callback)
m_callback(msg->str);
m_signal_message.emit(msg->str);
} break;
}
}
Websocket::type_signal_open Websocket::signal_open() {
return m_signal_open;
}
Websocket::type_signal_close Websocket::signal_close() {
return m_signal_close;
}
Websocket::type_signal_message Websocket::signal_message() {
return m_signal_message;
}

View File

@ -4,14 +4,13 @@
#include <string>
#include <functional>
#include <nlohmann/json.hpp>
#include <sigc++/sigc++.h>
class Websocket {
public:
Websocket();
void StartConnection(std::string url);
using MessageCallback_t = std::function<void(std::string data)>;
void SetMessageCallback(MessageCallback_t func);
void Send(const std::string &str);
void Send(const nlohmann::json &j);
void Stop();
@ -21,6 +20,19 @@ public:
private:
void OnMessage(const ix::WebSocketMessagePtr &msg);
MessageCallback_t m_callback;
ix::WebSocket m_websocket;
public:
typedef sigc::signal<void> type_signal_open;
typedef sigc::signal<void, uint16_t> type_signal_close;
typedef sigc::signal<void, std::string> type_signal_message;
type_signal_open signal_open();
type_signal_close signal_close();
type_signal_message signal_message();
private:
type_signal_open m_signal_open;
type_signal_close m_signal_close;
type_signal_message m_signal_message;
};