Skip to content

Commit

Permalink
Boss/Mod/CommandReceiver.cpp: Handle rpc_command at this level, to …
Browse files Browse the repository at this point in the history
…increase our responsiveness.
  • Loading branch information
ZmnSCPxj committed Nov 10, 2020
1 parent aa02c9e commit dccbbc9
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 29 deletions.
5 changes: 2 additions & 3 deletions Boss/JsonInput.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#include<assert.h>
#include"Boss/JsonInput.hpp"
#include"Boss/Msg/JsonCin.hpp"
#include"Boss/concurrent.hpp"
#include"Ev/ThreadPool.hpp"
#include"Jsmn/Object.hpp"
#include"S/Bus.hpp"
Expand Down Expand Up @@ -39,9 +38,9 @@ class JsonInput::Impl {
/* Exit loop. */
return Ev::lift();

return Boss::concurrent(bus.raise(Boss::Msg::JsonCin{
return bus.raise(Boss::Msg::JsonCin{
std::move(*pobj)
}))
})
+ run()
;
});
Expand Down
41 changes: 35 additions & 6 deletions Boss/Mod/CommandReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
#include"Boss/Msg/CommandResponse.hpp"
#include"Boss/Msg/JsonCin.hpp"
#include"Boss/Msg/JsonCout.hpp"
#include"Boss/Msg/ManifestHook.hpp"
#include"Boss/Msg/Manifestation.hpp"
#include"Boss/Msg/Notification.hpp"
#include"Boss/Msg/RpcCommandHook.hpp"
#include"Boss/concurrent.hpp"
#include"S/Bus.hpp"

namespace Boss { namespace Mod {
Expand All @@ -28,9 +32,28 @@ CommandReceiver::CommandReceiver(S::Bus& bus_) : bus(bus_) {

if (!inp.has("id")) {
/* Notification. */
return bus.raise(Boss::Msg::Notification{
method, params
});
return Boss::concurrent(
bus.raise(Boss::Msg::Notification{
method, params
})
);
} else if (method == "rpc_command") {
/* Respond immediately! */
auto js = Json::Out()
.start_object()
.field("jsonrpc", std::string("2.0"))
.field("id", inp["id"])
.start_object("result")
.field("result", "continue")
.end_object()
.end_object()
;
return bus.raise(Msg::JsonCout{std::move(js)})
/* Execute in background. */
+ Boss::concurrent(
bus.raise(Msg::RpcCommandHook{params})
)
;
} else {
if (!inp["id"].is_number())
return Ev::lift();
Expand All @@ -39,9 +62,11 @@ CommandReceiver::CommandReceiver(S::Bus& bus_) : bus(bus_) {
auto id = std::uint64_t((double) inp["id"]);
pendings.insert(id);

return bus.raise(Boss::Msg::CommandRequest{
method, params, id
});
return Boss::concurrent(
bus.raise(Boss::Msg::CommandRequest{
method, params, id
})
);
}
});
bus.subscribe<Boss::Msg::CommandResponse>([this](Boss::Msg::CommandResponse const& resp) {
Expand Down Expand Up @@ -80,6 +105,10 @@ CommandReceiver::CommandReceiver(S::Bus& bus_) : bus(bus_) {
;
return bus.raise(Boss::Msg::JsonCout{std::move(js)});
});
bus.subscribe<Msg::Manifestation
>([this](Msg::Manifestation const& _) {
return bus.raise(Msg::ManifestHook{"rpc_command"});
});
}

}}
25 changes: 5 additions & 20 deletions Boss/Mod/SendpayResultMonitor.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
#include"Boss/Mod/SendpayResultMonitor.hpp"
#include"Boss/Msg/CommandRequest.hpp"
#include"Boss/Msg/CommandResponse.hpp"
#include"Boss/Msg/DbResource.hpp"
#include"Boss/Msg/ManifestHook.hpp"
#include"Boss/Msg/ManifestNotification.hpp"
#include"Boss/Msg/Manifestation.hpp"
#include"Boss/Msg/Notification.hpp"
#include"Boss/Msg/RpcCommandHook.hpp"
#include"Boss/Msg/SendpayResult.hpp"
#include"Boss/Msg/Timer10Minutes.hpp"
#include"Boss/concurrent.hpp"
Expand Down Expand Up @@ -54,8 +52,7 @@ class SendpayResultMonitor::Impl {
void start() {
bus.subscribe<Msg::Manifestation
>([this](Msg::Manifestation const&_) {
return bus.raise(Msg::ManifestHook{"rpc_command"})
+ bus.raise(Msg::ManifestNotification{
return bus.raise(Msg::ManifestNotification{
"sendpay_success"
})
+ bus.raise(Msg::ManifestNotification{
Expand Down Expand Up @@ -102,21 +99,9 @@ class SendpayResultMonitor::Impl {
/* Extract the first hop for the payment, from the
* RPC command used.
*/
bus.subscribe<Msg::CommandRequest
>([this](Msg::CommandRequest const& r) {
if (r.command != "rpc_command")
return Ev::lift();

auto resp = Json::Out()
.start_object()
.field("result", "continue")
.end_object()
;

bus.subscribe<Msg::RpcCommandHook
>([this](Msg::RpcCommandHook const& r) {
return Boss::concurrent(process_rpc_command(r))
+ bus.raise(Msg::CommandResponse{
r.id, std::move(resp)
})
;
});
/* Check for `sendpay_failure` or `sendpay_success`.
Expand Down Expand Up @@ -152,7 +137,7 @@ class SendpayResultMonitor::Impl {
* commands.
*/
Ev::Io<void>
process_rpc_command(Msg::CommandRequest const& r) {
process_rpc_command(Msg::RpcCommandHook const& r) {
auto params = std::make_shared<Jsmn::Object>(r.params);
return Ev::lift().then([this, params]() {
try {
Expand Down
24 changes: 24 additions & 0 deletions Boss/Msg/RpcCommandHook.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#ifndef BOSS_MSG_RPCCOMMANDHOOK_HPP
#define BOSS_MSG_RPCCOMMANDHOOK_HPP

#include"Jsmn/Object.hpp"

namespace Boss { namespace Msg {

/** struct Boss::Msg::RpcCommandHook
*
* @brief Emitted in response to the `lightningd` receiving
* an RPC command.
*
* @desc This does not require or wait for a response: the
* `Boss::Mod::CommandReceiver` specially treats `rpc_command`
* hooks and responds to those immediately.
* After responding, the `CommandReceiver` emits this message.
*/
struct RpcCommandHook {
Jsmn::Object params;
};

}}

#endif /* !defined(BOSS_MSG_RPCCOMMANDHOOK_HPP) */
1 change: 1 addition & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
0.5A
- Handle `rpc_command` specially for better RPC response times even when CLBOSS is busy.
- Make compilable on FreeBSD.
- Print more debug logs for internet connection monitoring.
- Limit resources used by rebalancing attempts.
Expand Down
1 change: 1 addition & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ libclboss_la_SOURCES = \
Boss/Msg/ResponseNewaddr.hpp \
Boss/Msg/ResponsePeerMetrics.hpp \
Boss/Msg/ResponsePeerStatistics.hpp \
Boss/Msg/RpcCommandHook.hpp \
Boss/Msg/SendpayResult.hpp \
Boss/Msg/SetChannelFee.hpp \
Boss/Msg/SolicitChannelCandidates.hpp \
Expand Down

0 comments on commit dccbbc9

Please sign in to comment.