From 998a754a398f16ef3f1b89dc4c9a9db5193891d9 Mon Sep 17 00:00:00 2001 From: Young-Flash <871946895@qq.com> Date: Wed, 26 Oct 2022 16:40:18 +0800 Subject: [PATCH] setup websocket and scgi within one socket --- ...WebSockets.cmake => FinduWebsockets.cmake} | 0 doc/rtorrent.rc | 12 ++---- include/rpc/scgi.h | 2 + include/utils/socket_fd.h | 1 + src/command_network.cc | 19 ++++----- src/main.cc | 3 -- src/rpc/rpc_json.cc | 4 +- src/rpc/scgi.cc | 42 +++++++++++++++++-- src/utils/socket_fd.cc | 8 ++++ src/websockets_thread.cc | 3 +- 10 files changed, 64 insertions(+), 30 deletions(-) rename cmake/{FinduWebSockets.cmake => FinduWebsockets.cmake} (100%) diff --git a/cmake/FinduWebSockets.cmake b/cmake/FinduWebsockets.cmake similarity index 100% rename from cmake/FinduWebSockets.cmake rename to cmake/FinduWebsockets.cmake diff --git a/doc/rtorrent.rc b/doc/rtorrent.rc index e0b12381..1060ee28 100644 --- a/doc/rtorrent.rc +++ b/doc/rtorrent.rc @@ -147,13 +147,9 @@ network.http.dns_cache_timeout.set = 25 # Run the rTorrent process as a daemon in the background #system.daemon.set = false -# scgi listen way -# network.scgi.open_local = (cat,(cfg.basedir),rtorrent_scgi.sock) -network.scgi.open_port = 127.0.0.1:1256 - -# websockets listen way -# network.websockets.open_local = (cat,(cfg.basedir),rtorrent_websockets.sock) -network.websockets.open_port = 127.0.0.1:1258 +# websockets_scgi listen way +# network.websockets_scgi.open_local = (cat,(cfg.basedir),rtorrent_websockets_scgi.sock) +network.websockets_scgi.open_port = 127.0.0.1:1258 # Logging: # Levels = critical error warn notice info debug @@ -163,4 +159,4 @@ log.open_file = "log", (cfg.logfile) log.add_output = "info", "log" ##log.add_output = "tracker_debug", "log" -### END of rtorrent.rc ### +### END of rtorrent.rc ### \ No newline at end of file diff --git a/include/rpc/scgi.h b/include/rpc/scgi.h index 757f4543..19ee5832 100644 --- a/include/rpc/scgi.h +++ b/include/rpc/scgi.h @@ -52,6 +52,8 @@ class lt_cacheline_aligned SCgi : public torrent::Event { bool receive_call(SCgiTask* task, const char* buffer, uint32_t length); + static bool process_and_send(int fd, const char* data, int length, bool is_json); + utils::SocketFd& get_fd() { return *reinterpret_cast(&m_fileDesc); } diff --git a/include/utils/socket_fd.h b/include/utils/socket_fd.h index 3b89ce2b..172310f4 100644 --- a/include/utils/socket_fd.h +++ b/include/utils/socket_fd.h @@ -31,6 +31,7 @@ class SocketFd { bool set_nonblock(); bool set_reuse_address(bool state); + bool set_reuse_port(bool state); bool set_dont_route(bool state); bool set_bind_to_device(const char* device); diff --git a/src/command_network.cc b/src/command_network.cc index 6904f1e1..3822f75d 100644 --- a/src/command_network.cc +++ b/src/command_network.cc @@ -236,11 +236,14 @@ apply_scgi(const std::string& arg, int type) { } torrent::Object -apply_websockets(const std::string& arg, int type) { +apply_websockets_scgi(const std::string& arg, int type) { if (worker_thread->websockets_protocol() != nullptr) { throw torrent::input_error("websockets RPC thread already enabled."); } + if (!rpc::rpc.is_initialized()) + initialize_rpc(); + auto listen_info = new std::pair(arg, type); worker_thread->set_websockets_protocol(listen_info); @@ -396,17 +399,11 @@ initialize_command_network() { "network.max_open_sockets.set", [cm](const auto&, const auto& v) { return cm->set_max_size(v); }, false); - CMD2_ANY_STRING("network.scgi.open_port", [](const auto&, const auto& arg) { - return apply_scgi(arg, 1); - }, false); - CMD2_ANY_STRING("network.scgi.open_local", [](const auto&, const auto& arg) { - return apply_scgi(arg, 2); - }, false); - CMD2_ANY_STRING("network.websockets.open_port", [](const auto&, const auto& arg) { - return apply_websockets(arg, 1); + CMD2_ANY_STRING("network.websockets_scgi.open_port", [](const auto&, const auto& arg) { + return apply_websockets_scgi(arg, 1); }, false); - CMD2_ANY_STRING("network.websockets.open_local", [](const auto&, const auto& arg) { - return apply_websockets(arg, 2); + CMD2_ANY_STRING("network.websockets_scgi.open_local", [](const auto&, const auto& arg) { + return apply_websockets_scgi(arg, 2); }, false); CMD2_VAR_BOOL("network.scgi.dont_route", false, false); diff --git a/src/main.cc b/src/main.cc index e4d2249d..e80ef19d 100644 --- a/src/main.cc +++ b/src/main.cc @@ -505,9 +505,6 @@ main(int argc, char** argv) { CMD2_REDIRECT("port_random", "network.port_random.set"); CMD2_REDIRECT("proxy_address", "network.proxy_address.set"); - CMD2_REDIRECT("scgi_port", "network.scgi.open_port"); - CMD2_REDIRECT("scgi_local", "network.scgi.open_local"); - CMD2_REDIRECT_GENERIC("directory", "directory.default.set"); CMD2_REDIRECT_GENERIC("session", "session.path.set"); diff --git a/src/rpc/rpc_json.cc b/src/rpc/rpc_json.cc index 85c30a3e..197285c6 100644 --- a/src/rpc/rpc_json.cc +++ b/src/rpc/rpc_json.cc @@ -293,9 +293,7 @@ RpcJson::is_valid() const { bool RpcJson::process(const char* inBuffer, uint32_t length, res_callback callback) { - const std::string& response = - m_jsonrpc->HandleRequest(std::string_view(inBuffer, length)); - + const std::string& response = m_jsonrpc->HandleRequest(std::string_view(inBuffer, length)); return callback(response.c_str(), response.size()); } diff --git a/src/rpc/scgi.cc b/src/rpc/scgi.cc index 85992f1d..9dce0fa1 100644 --- a/src/rpc/scgi.cc +++ b/src/rpc/scgi.cc @@ -84,7 +84,7 @@ SCgi::open_named(const std::string& filename) { void SCgi::open(void* sa, unsigned int length) { try { - if (!get_fd().set_nonblock() || !get_fd().set_reuse_address(true) || + if (!get_fd().set_nonblock() || !get_fd().set_reuse_address(true) || !get_fd().set_reuse_port(true) || !get_fd().bind(*reinterpret_cast(sa), length) || !get_fd().listen(max_tasks)) @@ -161,13 +161,47 @@ SCgi::receive_call(SCgiTask* task, const char* buffer, uint32_t length) { break; case SCgiTask::ContentType::XML: default: -// torrent::thread_base::acquire_global_lock(); -// torrent::main_thread()->interrupt(); result = rpc.dispatch(RpcManager::RPCType::XML, buffer, length, callback); -// torrent::thread_base::release_global_lock(); } return result; } +bool +SCgi::process_and_send(int fd, const char* data, int length, bool is_json) { + + static auto callback = [&](const char* buffer, uint32_t length) { + const auto header = is_json + ? "Status: 200 OK\r\nContent-Type: " + "application/json\r\nContent-Length: %i\r\n\r\n" + : "Status: 200 OK\r\nContent-Type: " + "text/xml\r\nContent-Length: %i\r\n\r\n"; + + char* data_buffer = new char[length + 256]; + int header_size = sprintf(data_buffer, header, length); + + std::memcpy(data_buffer + header_size, buffer, length); + ::send(fd, data_buffer, header_size + length, 0); + return true; + }; + + bool result = true; + int head_length = 0; + + while (*data != ','){ + ++data; ++head_length; + } + ++data; ++head_length; + length -= head_length; + + if (is_json) { + result = rpc.dispatch(RpcManager::RPCType::JSON, data, length, callback); + } + else { + result = rpc.dispatch(RpcManager::RPCType::XML, data, length, callback); + } + + return result; } + +} // namespace rpc diff --git a/src/utils/socket_fd.cc b/src/utils/socket_fd.cc index d2c48b1e..359defe9 100644 --- a/src/utils/socket_fd.cc +++ b/src/utils/socket_fd.cc @@ -51,6 +51,14 @@ SocketFd::set_reuse_address(bool state) { return setsockopt(m_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == 0; } +bool +SocketFd::set_reuse_port(bool state) { + check_valid(); + int opt = state; + + return setsockopt(m_fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) == 0; +} + bool SocketFd::set_dont_route(bool state) { check_valid(); diff --git a/src/websockets_thread.cc b/src/websockets_thread.cc index 7f64f1cb..f3b7f20d 100644 --- a/src/websockets_thread.cc +++ b/src/websockets_thread.cc @@ -3,6 +3,7 @@ #include "globals.h" #include "control.h" #include "core/manager.h" +#include "rpc/scgi.h" #include #include @@ -67,7 +68,7 @@ WebsocketsThread::start_thread() { throw torrent::internal_error("Can't get listen info for websocket, please check network.websockets.open_local or network.websockets.open_port in rtorrent.rc !!!"); } - m_websockets_app = new App(); + m_websockets_app = new App(SocketContextOptions {}, rpc::SCgi::process_and_send); App::WebSocketBehavior behavior; behavior.open = [&](WebSocket* ws) {