Skip to content

Commit

Permalink
setup websocket and scgi within one socket
Browse files Browse the repository at this point in the history
  • Loading branch information
Young-Flash committed Oct 26, 2022
1 parent 7806374 commit 998a754
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 30 deletions.
File renamed without changes.
12 changes: 4 additions & 8 deletions doc/rtorrent.rc
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,9 @@ network.http.dns_cache_timeout.set = 25
# Run the rTorrent process as a daemon in the background
#system.daemon.set = false

# scgi listen way
# network.scgi.open_local = (cat,(cfg.basedir),rtorrent_scgi.sock)
network.scgi.open_port = 127.0.0.1:1256

# websockets listen way
# network.websockets.open_local = (cat,(cfg.basedir),rtorrent_websockets.sock)
network.websockets.open_port = 127.0.0.1:1258
# websockets_scgi listen way
# network.websockets_scgi.open_local = (cat,(cfg.basedir),rtorrent_websockets_scgi.sock)
network.websockets_scgi.open_port = 127.0.0.1:1258

# Logging:
# Levels = critical error warn notice info debug
Expand All @@ -163,4 +159,4 @@ log.open_file = "log", (cfg.logfile)
log.add_output = "info", "log"
##log.add_output = "tracker_debug", "log"

### END of rtorrent.rc ###
### END of rtorrent.rc ###
2 changes: 2 additions & 0 deletions include/rpc/scgi.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class lt_cacheline_aligned SCgi : public torrent::Event {

bool receive_call(SCgiTask* task, const char* buffer, uint32_t length);

static bool process_and_send(int fd, const char* data, int length, bool is_json);

utils::SocketFd& get_fd() {
return *reinterpret_cast<utils::SocketFd*>(&m_fileDesc);
}
Expand Down
1 change: 1 addition & 0 deletions include/utils/socket_fd.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class SocketFd {

bool set_nonblock();
bool set_reuse_address(bool state);
bool set_reuse_port(bool state);
bool set_dont_route(bool state);

bool set_bind_to_device(const char* device);
Expand Down
19 changes: 8 additions & 11 deletions src/command_network.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,14 @@ apply_scgi(const std::string& arg, int type) {
}

torrent::Object
apply_websockets(const std::string& arg, int type) {
apply_websockets_scgi(const std::string& arg, int type) {
if (worker_thread->websockets_protocol() != nullptr) {
throw torrent::input_error("websockets RPC thread already enabled.");
}

if (!rpc::rpc.is_initialized())
initialize_rpc();

auto listen_info = new std::pair<std::string, int>(arg, type);
worker_thread->set_websockets_protocol(listen_info);

Expand Down Expand Up @@ -396,17 +399,11 @@ initialize_command_network() {
"network.max_open_sockets.set",
[cm](const auto&, const auto& v) { return cm->set_max_size(v); }, false);

CMD2_ANY_STRING("network.scgi.open_port", [](const auto&, const auto& arg) {
return apply_scgi(arg, 1);
}, false);
CMD2_ANY_STRING("network.scgi.open_local", [](const auto&, const auto& arg) {
return apply_scgi(arg, 2);
}, false);
CMD2_ANY_STRING("network.websockets.open_port", [](const auto&, const auto& arg) {
return apply_websockets(arg, 1);
CMD2_ANY_STRING("network.websockets_scgi.open_port", [](const auto&, const auto& arg) {
return apply_websockets_scgi(arg, 1);
}, false);
CMD2_ANY_STRING("network.websockets.open_local", [](const auto&, const auto& arg) {
return apply_websockets(arg, 2);
CMD2_ANY_STRING("network.websockets_scgi.open_local", [](const auto&, const auto& arg) {
return apply_websockets_scgi(arg, 2);
}, false);
CMD2_VAR_BOOL("network.scgi.dont_route", false, false);

Expand Down
3 changes: 0 additions & 3 deletions src/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -505,9 +505,6 @@ main(int argc, char** argv) {
CMD2_REDIRECT("port_random", "network.port_random.set");
CMD2_REDIRECT("proxy_address", "network.proxy_address.set");

CMD2_REDIRECT("scgi_port", "network.scgi.open_port");
CMD2_REDIRECT("scgi_local", "network.scgi.open_local");

CMD2_REDIRECT_GENERIC("directory", "directory.default.set");
CMD2_REDIRECT_GENERIC("session", "session.path.set");

Expand Down
4 changes: 1 addition & 3 deletions src/rpc/rpc_json.cc
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,7 @@ RpcJson::is_valid() const {

bool
RpcJson::process(const char* inBuffer, uint32_t length, res_callback callback) {
const std::string& response =
m_jsonrpc->HandleRequest(std::string_view(inBuffer, length));

const std::string& response = m_jsonrpc->HandleRequest(std::string_view(inBuffer, length));
return callback(response.c_str(), response.size());
}

Expand Down
42 changes: 38 additions & 4 deletions src/rpc/scgi.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ SCgi::open_named(const std::string& filename) {
void
SCgi::open(void* sa, unsigned int length) {
try {
if (!get_fd().set_nonblock() || !get_fd().set_reuse_address(true) ||
if (!get_fd().set_nonblock() || !get_fd().set_reuse_address(true) || !get_fd().set_reuse_port(true) ||
!get_fd().bind(*reinterpret_cast<torrent::utils::socket_address*>(sa),
length) ||
!get_fd().listen(max_tasks))
Expand Down Expand Up @@ -161,13 +161,47 @@ SCgi::receive_call(SCgiTask* task, const char* buffer, uint32_t length) {
break;
case SCgiTask::ContentType::XML:
default:
// torrent::thread_base::acquire_global_lock();
// torrent::main_thread()->interrupt();
result = rpc.dispatch(RpcManager::RPCType::XML, buffer, length, callback);
// torrent::thread_base::release_global_lock();
}

return result;
}

bool
SCgi::process_and_send(int fd, const char* data, int length, bool is_json) {

static auto callback = [&](const char* buffer, uint32_t length) {
const auto header = is_json
? "Status: 200 OK\r\nContent-Type: "
"application/json\r\nContent-Length: %i\r\n\r\n"
: "Status: 200 OK\r\nContent-Type: "
"text/xml\r\nContent-Length: %i\r\n\r\n";

char* data_buffer = new char[length + 256];
int header_size = sprintf(data_buffer, header, length);

std::memcpy(data_buffer + header_size, buffer, length);
::send(fd, data_buffer, header_size + length, 0);
return true;
};

bool result = true;
int head_length = 0;

while (*data != ','){
++data; ++head_length;
}
++data; ++head_length;
length -= head_length;

if (is_json) {
result = rpc.dispatch(RpcManager::RPCType::JSON, data, length, callback);
}
else {
result = rpc.dispatch(RpcManager::RPCType::XML, data, length, callback);
}

return result;
}

} // namespace rpc
8 changes: 8 additions & 0 deletions src/utils/socket_fd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ SocketFd::set_reuse_address(bool state) {
return setsockopt(m_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == 0;
}

bool
SocketFd::set_reuse_port(bool state) {
check_valid();
int opt = state;

return setsockopt(m_fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) == 0;
}

bool
SocketFd::set_dont_route(bool state) {
check_valid();
Expand Down
3 changes: 2 additions & 1 deletion src/websockets_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "globals.h"
#include "control.h"
#include "core/manager.h"
#include "rpc/scgi.h"

#include <torrent/utils/path.h>
#include <fcntl.h>
Expand Down Expand Up @@ -67,7 +68,7 @@ WebsocketsThread::start_thread() {
throw torrent::internal_error("Can't get listen info for websocket, please check network.websockets.open_local or network.websockets.open_port in rtorrent.rc !!!");
}

m_websockets_app = new App();
m_websockets_app = new App(SocketContextOptions {}, rpc::SCgi::process_and_send);

App::WebSocketBehavior<ConnectionData> behavior;
behavior.open = [&](WebSocket<false, true, ConnectionData>* ws) {
Expand Down

0 comments on commit 998a754

Please sign in to comment.