Skip to content

Commit

Permalink
server push
Browse files Browse the repository at this point in the history
  • Loading branch information
Young-Flash committed Jul 6, 2022
1 parent 5aca4da commit 93228f2
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 1 deletion.
2 changes: 2 additions & 0 deletions include/rpc/rpc_thread_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class RpcThreadManager {
void queue_item(void* newFunc);

torrent::Poll* poll();

void publish_ws_topic(std::string_view topic, std::string_view message);

private:

Expand Down
2 changes: 2 additions & 0 deletions include/websockets_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class lt_cacheline_aligned WebsocketsThread : public ProtocolThread {

void queue_item(void*) override;

void publish_ws_topic(std::string_view topic, std::string_view message);

private:

std::unique_ptr<std::thread> m_websockets_thread = nullptr;
Expand Down
18 changes: 17 additions & 1 deletion src/core/download_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,31 @@
#include "core/download_list.h"
#include "core/download_store.h"
#include "ui/root.h"
#include "nlohmann/json.hpp"

#define DL_TRIGGER_EVENT(download, event_name) \
rpc::commands.call_catch(event_name, \
rpc::make_target(download), \
torrent::Object(), \
"Event '" event_name "' failed: ");
"Event '" event_name "' failed: "); \
publish_topic(download, event_name);

namespace core {

void publish_topic(Download* download, std::string_view topic) {
nlohmann::json message = {
{"id", nullptr},
{"jsonrpc", "2,0"},
{"result", {
{"torrent_hash_value", torrent::utils::transform_hex_str(download->info()->hash().str())},
{"event_name", topic},
{"time_stamp", std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count()}
}
}
};
worker_thread->publish_ws_topic("event.*", message.dump());
}

#ifdef RT_USE_EXTRA_DEBUG
inline void
DownloadList::check_contains(Download* d) {
Expand Down
4 changes: 4 additions & 0 deletions src/rpc/rpc_thread_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,8 @@ void RpcThreadManager::queue_item(void* newFunc) {

torrent::Poll* RpcThreadManager::poll() {
return m_thread_worker->poll();
}

void RpcThreadManager::publish_ws_topic(std::string_view topic, std::string_view message) {
m_websockets_thread->publish_ws_topic(topic, message);
}
12 changes: 12 additions & 0 deletions src/websockets_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,15 @@ WebsocketsThread::queue_item(void*) {
void
WebsocketsThread::start_thread() {
auto create_ws_server_and_run = [&]() {
if (!listen_info) {
throw torrent::internal_error("Can't get listen info for websocket, please check network.websockets.open_local or network.websockets.open_port in rtorrent.rc !!!");
}

m_websockets_app = new App();

App::WebSocketBehavior<ConnectionData> behavior;
behavior.open = [&](WebSocket<false, true, ConnectionData>* ws) {
ws->subscribe("event.*");
all_connection.emplace_back(ws);
ws->getUserData()->address = ws->getRemoteAddressAsText();
};
Expand Down Expand Up @@ -108,4 +113,11 @@ WebsocketsThread::handle_request(const std::string_view& request) {
rpc::rpc.dispatch(rpc::RpcManager::RPCType::JSON, request.data(), request.length(), [&](const char* response, uint32_t) {
return m_websocket_connection->send(std::string_view(response), uWS::OpCode::TEXT);
});
}

void WebsocketsThread::publish_ws_topic(std::string_view topic, std::string_view message) {
// `m_websockets_app` maybe uninitialized when some events happen so null check is required here
if (m_websockets_app) {
m_websockets_app->publish(topic, message, OpCode::TEXT);
}
}

0 comments on commit 93228f2

Please sign in to comment.