From 147bbe215509b6875fa226286a4d3a8144e55d31 Mon Sep 17 00:00:00 2001 From: Fabio Alessandrelli Date: Thu, 25 Jun 2020 15:32:50 +0200 Subject: [PATCH] UDPServer handles PacketPeerUDP-client association UDPServer now uses a single socket which is shared with the PacketPeerUDP it creates and has a new `poll` function to read incoming packets on that socket and delivers them to the appropriate peer. PacketPeerUDP created this way never reads from the socket, but are allowed to write on it using sendto. This is needed because Windows (unlike Linux/BSD) does not support packet routing when multiple sockets are bound on the same address/port. --- core/io/packet_peer_udp.cpp | 65 ++++++++++++++--------- core/io/packet_peer_udp.h | 7 ++- core/io/udp_server.cpp | 100 +++++++++++++++++++++++++++++++++--- core/io/udp_server.h | 29 ++++++++++- 4 files changed, 166 insertions(+), 35 deletions(-) diff --git a/core/io/packet_peer_udp.cpp b/core/io/packet_peer_udp.cpp index 862fca96fc3..e633a56d54e 100644 --- a/core/io/packet_peer_udp.cpp +++ b/core/io/packet_peer_udp.cpp @@ -31,12 +31,14 @@ #include "packet_peer_udp.h" #include "core/io/ip.h" +#include "core/io/udp_server.h" void PacketPeerUDP::set_blocking_mode(bool p_enable) { blocking = p_enable; } void PacketPeerUDP::set_broadcast_enabled(bool p_enabled) { + ERR_FAIL_COND(udp_server); broadcast = p_enabled; if (_sock.is_valid() && _sock->is_open()) { _sock->set_broadcasting_enabled(p_enabled); @@ -44,6 +46,7 @@ void PacketPeerUDP::set_broadcast_enabled(bool p_enabled) { } Error PacketPeerUDP::join_multicast_group(IP_Address p_multi_address, String p_if_name) { + ERR_FAIL_COND_V(udp_server, ERR_LOCKED); ERR_FAIL_COND_V(!_sock.is_valid(), ERR_UNAVAILABLE); ERR_FAIL_COND_V(!p_multi_address.is_valid(), ERR_INVALID_PARAMETER); @@ -58,6 +61,7 @@ Error PacketPeerUDP::join_multicast_group(IP_Address p_multi_address, String p_i } Error PacketPeerUDP::leave_multicast_group(IP_Address p_multi_address, String p_if_name) { + ERR_FAIL_COND_V(udp_server, ERR_LOCKED); ERR_FAIL_COND_V(!_sock.is_valid(), ERR_UNAVAILABLE); ERR_FAIL_COND_V(!_sock->is_open(), ERR_UNCONFIGURED); return _sock->leave_multicast_group(p_multi_address, p_if_name); @@ -130,7 +134,7 @@ Error PacketPeerUDP::put_packet(const uint8_t *p_buffer, int p_buffer_size) { } do { - if (connected) { + if (connected && !udp_server) { err = _sock->send(p_buffer, p_buffer_size, sent); } else { err = _sock->sendto(p_buffer, p_buffer_size, sent, peer_addr, peer_port); @@ -186,26 +190,25 @@ Error PacketPeerUDP::listen(int p_port, const IP_Address &p_bind_address, int p_ return OK; } -Error PacketPeerUDP::connect_socket(Ref p_sock) { - Error err; - int read = 0; - uint16_t r_port; - IP_Address r_ip; - - err = p_sock->recvfrom(recv_buffer, sizeof(recv_buffer), read, r_ip, r_port, true); - ERR_FAIL_COND_V(err != OK, err); - err = p_sock->connect_to_host(r_ip, r_port); - ERR_FAIL_COND_V(err != OK, err); +Error PacketPeerUDP::connect_shared_socket(Ref p_sock, IP_Address p_ip, uint16_t p_port, UDPServer *p_server) { + udp_server = p_server; + connected = true; _sock = p_sock; - peer_addr = r_ip; - peer_port = r_port; + peer_addr = p_ip; + peer_port = p_port; packet_ip = peer_addr; packet_port = peer_port; - connected = true; return OK; } +void PacketPeerUDP::disconnect_shared_socket() { + udp_server = nullptr; + _sock = Ref(NetSocket::create()); + close(); +} + Error PacketPeerUDP::connect_to_host(const IP_Address &p_host, int p_port) { + ERR_FAIL_COND_V(udp_server, ERR_LOCKED); ERR_FAIL_COND_V(!_sock.is_valid(), ERR_UNAVAILABLE); ERR_FAIL_COND_V(!p_host.is_valid(), ERR_INVALID_PARAMETER); @@ -243,7 +246,11 @@ bool PacketPeerUDP::is_connected_to_host() const { } void PacketPeerUDP::close() { - if (_sock.is_valid()) { + if (udp_server) { + udp_server->remove_peer(peer_addr, peer_port); + udp_server = nullptr; + _sock = Ref(NetSocket::create()); + } else if (_sock.is_valid()) { _sock->close(); } rb.resize(16); @@ -262,6 +269,9 @@ Error PacketPeerUDP::_poll() { if (!_sock->is_open()) { return FAILED; } + if (udp_server) { + return OK; // Handled by UDPServer. + } Error err; int read; @@ -284,24 +294,29 @@ Error PacketPeerUDP::_poll() { return FAILED; } - if (rb.space_left() < read + 24) { + err = store_packet(ip, port, recv_buffer, read); #ifdef TOOLS_ENABLED + if (err != OK) { WARN_PRINT("Buffer full, dropping packets!"); -#endif - continue; } - - uint32_t port32 = port; - rb.write(ip.get_ipv6(), 16); - rb.write((uint8_t *)&port32, 4); - rb.write((uint8_t *)&read, 4); - rb.write(recv_buffer, read); - ++queue_count; +#endif } return OK; } +Error PacketPeerUDP::store_packet(IP_Address p_ip, uint32_t p_port, uint8_t *p_buf, int p_buf_size) { + if (rb.space_left() < p_buf_size + 24) { + return ERR_OUT_OF_MEMORY; + } + rb.write(p_ip.get_ipv6(), 16); + rb.write((uint8_t *)&p_port, 4); + rb.write((uint8_t *)&p_buf_size, 4); + rb.write(p_buf, p_buf_size); + ++queue_count; + return OK; +} + bool PacketPeerUDP::is_listening() const { return _sock.is_valid() && _sock->is_open(); } diff --git a/core/io/packet_peer_udp.h b/core/io/packet_peer_udp.h index ad0a60f60dc..9a44a1ebea5 100644 --- a/core/io/packet_peer_udp.h +++ b/core/io/packet_peer_udp.h @@ -35,6 +35,8 @@ #include "core/io/net_socket.h" #include "core/io/packet_peer.h" +class UDPServer; + class PacketPeerUDP : public PacketPeer { GDCLASS(PacketPeerUDP, PacketPeer); @@ -55,6 +57,7 @@ protected: bool connected = false; bool blocking = true; bool broadcast = false; + UDPServer *udp_server = nullptr; Ref _sock; static void _bind_methods(); @@ -72,7 +75,9 @@ public: Error wait(); bool is_listening() const; - Error connect_socket(Ref p_sock); // Used by UDPServer + Error connect_shared_socket(Ref p_sock, IP_Address p_ip, uint16_t p_port, UDPServer *ref); // Used by UDPServer + void disconnect_shared_socket(); // Used by UDPServer + Error store_packet(IP_Address p_ip, uint32_t p_port, uint8_t *p_buf, int p_buf_size); // Used internally and by UDPServer Error connect_to_host(const IP_Address &p_host, int p_port); bool is_connected_to_host() const; diff --git a/core/io/udp_server.cpp b/core/io/udp_server.cpp index 1d329daf8bc..acd15aadc65 100644 --- a/core/io/udp_server.cpp +++ b/core/io/udp_server.cpp @@ -32,10 +32,58 @@ void UDPServer::_bind_methods() { ClassDB::bind_method(D_METHOD("listen", "port", "bind_address"), &UDPServer::listen, DEFVAL("*")); + ClassDB::bind_method(D_METHOD("poll"), &UDPServer::poll); ClassDB::bind_method(D_METHOD("is_connection_available"), &UDPServer::is_connection_available); ClassDB::bind_method(D_METHOD("is_listening"), &UDPServer::is_listening); ClassDB::bind_method(D_METHOD("take_connection"), &UDPServer::take_connection); ClassDB::bind_method(D_METHOD("stop"), &UDPServer::stop); + ClassDB::bind_method(D_METHOD("set_max_pending_connections", "max_pending_connections"), &UDPServer::set_max_pending_connections); + ClassDB::bind_method(D_METHOD("get_max_pending_connections"), &UDPServer::get_max_pending_connections); + ADD_PROPERTY(PropertyInfo(Variant::INT, "max_pending_connections", PROPERTY_HINT_RANGE, "0,256,1"), "set_max_pending_connections", "get_max_pending_connections"); +} + +Error UDPServer::poll() { + ERR_FAIL_COND_V(!_sock.is_valid(), ERR_UNAVAILABLE); + if (!_sock->is_open()) { + return ERR_UNCONFIGURED; + } + Error err; + int read; + IP_Address ip; + uint16_t port; + while (true) { + err = _sock->recvfrom(recv_buffer, sizeof(recv_buffer), read, ip, port); + if (err != OK) { + if (err == ERR_BUSY) { + break; + } + return FAILED; + } + Peer p; + p.ip = ip; + p.port = port; + List::Element *E = peers.find(p); + if (!E) { + E = pending.find(p); + } + if (E) { + E->get().peer->store_packet(ip, port, recv_buffer, read); + } else { + if (pending.size() >= max_pending_connections) { + // Drop connection. + continue; + } + // It's a new peer, add it to the pending list. + Peer peer; + peer.ip = ip; + peer.port = port; + peer.peer = memnew(PacketPeerUDP); + peer.peer->connect_shared_socket(_sock, ip, port, this); + peer.peer->store_packet(ip, port, recv_buffer, read); + pending.push_back(peer); + } + } + return OK; } Error UDPServer::listen(uint16_t p_port, const IP_Address &p_bind_address) { @@ -82,8 +130,24 @@ bool UDPServer::is_connection_available() const { return false; } - Error err = _sock->poll(NetSocket::POLL_TYPE_IN, 0); - return (err == OK); + return pending.size() > 0; +} + +void UDPServer::set_max_pending_connections(int p_max) { + ERR_FAIL_COND_MSG(p_max < 0, "Max pending connections value must be a positive number (0 means refuse new connections)."); + max_pending_connections = p_max; + while (p_max > pending.size()) { + List::Element *E = pending.back(); + if (!E) { + break; + } + memdelete(E->get().peer); + pending.erase(E); + } +} + +int UDPServer::get_max_pending_connections() const { + return max_pending_connections; } Ref UDPServer::take_connection() { @@ -92,11 +156,20 @@ Ref UDPServer::take_connection() { return conn; } - conn = Ref(memnew(PacketPeerUDP)); - conn->connect_socket(_sock); - _sock = Ref(NetSocket::create()); - listen(bind_port, bind_address); - return conn; + Peer peer = pending[0]; + pending.pop_front(); + peers.push_back(peer); + return peer.peer; +} + +void UDPServer::remove_peer(IP_Address p_ip, int p_port) { + Peer peer; + peer.ip = p_ip; + peer.port = p_port; + List::Element *E = peers.find(peer); + if (E) { + peers.erase(E); + } } void UDPServer::stop() { @@ -105,6 +178,19 @@ void UDPServer::stop() { } bind_port = 0; bind_address = IP_Address(); + List::Element *E = peers.front(); + while (E) { + E->get().peer->disconnect_shared_socket(); + E = E->next(); + } + E = pending.front(); + while (E) { + E->get().peer->disconnect_shared_socket(); + memdelete(E->get().peer); + E = E->next(); + } + peers.clear(); + pending.clear(); } UDPServer::UDPServer() : diff --git a/core/io/udp_server.h b/core/io/udp_server.h index 90bb82b62b6..3175b09b195 100644 --- a/core/io/udp_server.h +++ b/core/io/udp_server.h @@ -38,15 +38,40 @@ class UDPServer : public Reference { GDCLASS(UDPServer, Reference); protected: - static void _bind_methods(); - int bind_port; + enum { + PACKET_BUFFER_SIZE = 65536 + }; + + struct Peer { + PacketPeerUDP *peer; + IP_Address ip; + uint16_t port = 0; + + bool operator==(const Peer &p_other) const { + return (ip == p_other.ip && port == p_other.port); + } + }; + uint8_t recv_buffer[PACKET_BUFFER_SIZE]; + + int bind_port = 0; IP_Address bind_address; + + List peers; + List pending; + int max_pending_connections = 16; + Ref _sock; + static void _bind_methods(); + public: + void remove_peer(IP_Address p_ip, int p_port); Error listen(uint16_t p_port, const IP_Address &p_bind_address = IP_Address("*")); + Error poll(); bool is_listening() const; bool is_connection_available() const; + void set_max_pending_connections(int p_max); + int get_max_pending_connections() const; Ref take_connection(); void stop();