diff --git a/Boss/Mod/JitRebalancer.cpp b/Boss/Mod/JitRebalancer.cpp new file mode 100644 index 000000000..597201be5 --- /dev/null +++ b/Boss/Mod/JitRebalancer.cpp @@ -0,0 +1,541 @@ +#include"Boss/Mod/JitRebalancer.hpp" +#include"Boss/Mod/Rpc.hpp" +#include"Boss/ModG/ReqResp.hpp" +#include"Boss/Msg/Init.hpp" +#include"Boss/Msg/ListpeersResult.hpp" +#include"Boss/Msg/ReleaseHtlcAccepted.hpp" +#include"Boss/Msg/RequestEarningsInfo.hpp" +#include"Boss/Msg/RequestMoveFunds.hpp" +#include"Boss/Msg/ResponseEarningsInfo.hpp" +#include"Boss/Msg/ResponseMoveFunds.hpp" +#include"Boss/Msg/ProvideHtlcAcceptedDeferrer.hpp" +#include"Boss/Msg/SolicitHtlcAcceptedDeferrer.hpp" +#include"Boss/concurrent.hpp" +#include"Boss/log.hpp" +#include"Boss/random_engine.hpp" +#include"Ev/Io.hpp" +#include"Ev/yield.hpp" +#include"Jsmn/Object.hpp" +#include"Json/Out.hpp" +#include"Ln/HtlcAccepted.hpp" +#include"Ln/NodeId.hpp" +#include"Ln/Scid.hpp" +#include"S/Bus.hpp" +#include"Stats/ReservoirSampler.hpp" +#include"Util/make_unique.hpp" +#include"Util/stringify.hpp" +#include + +namespace { + +/* Weight of an HTLC output on the commitment tx. + * The HTLC needs to be paid for, so also factor that in our spendable + * computation. + */ +auto constexpr htlc_weight = std::size_t(172); + +/* Up to how many percent of the total earnings from the outgoing + * channel do we allow a rebalancing fee to that channel. + * + * See https://lists.ozlabs.org/pipermail/c-lightning/2019-July/000160.html + * for the attack this prevents. + * + * The below is how much the above attack can extract from us. + * Our hope is that the attack is not mounted often enough that the + * below extractable amount can be taken from us. + */ +auto constexpr max_fee_percent = double(25.0); + +/* Up to how much rebalancing fee we allow "for free" before we insist on + * the rebalancing fee being less than the above percentage of the fee already + * earned. + * + * Without this, a fresh node would not ever JIT-rebalance. + * + * This is exploitable by attackers (via the attack linked above) by + * starting up new channels with us, but since the below limit is fairly + * low, they can only steal this "free fee" and not more than that. + * Our expectation is that attackers will spend much, much more on + * channel opening fees than the few sats we give them for free for + * rebalancing, while this small amount should be sufficient for at least + * one or two rebalances to "seed" our node with actual successful forwards. + */ +auto const free_fee = Ln::Amount::sat(4); + +/* Maximum limit for costs of a *single* rebalance. */ +auto const max_rebalance_fee = Ln::Amount::sat(10); + +} + +namespace Boss { namespace Mod { + +class JitRebalancer::Impl { +private: + S::Bus& bus; + Boss::Mod::Rpc* rpc; + + /* Maps channels to nodes, as we need node information. */ + std::map nodemap; + + void start() { + bus.subscribe([this](Msg::Init const& init) { + rpc = &init.rpc; + return Ev::lift(); + }); + bus.subscribe([this](Msg::ListpeersResult const& r) { + return listpeers_result(r.peers); + }); + bus.subscribe([this + ](Msg::SolicitHtlcAcceptedDeferrer const&) { + auto f = [this](Ln::HtlcAccepted::Request const& req) { + return htlc_accepted(req); + }; + return bus.raise(Msg::ProvideHtlcAcceptedDeferrer{ + std::move(f) + }); + }); + } + Ev::Io wait_for_rpc() { + return Ev::lift().then([this]() { + if (rpc) + return Ev::lift(); + return Ev::yield() + wait_for_rpc(); + }); + } + + Ev::Io + listpeers_result(Jsmn::Object ps) { + for ( auto i = std::size_t(0) + ; i < ps.size() + ; ++i + ) { + auto p = ps[i]; + if (!p.has("id")) + continue; + auto node_j = p["id"]; + if (!node_j.is_string()) + continue; + auto node_s = std::string(node_j); + if (!Ln::NodeId::valid_string(node_s)) + continue; + auto node = Ln::NodeId(node_s); + auto cs = p["channels"]; + for ( auto j = std::size_t(0) + ; j < cs.size() + ; ++j + ) { + auto c = cs[j]; + if (!c.has("short_channel_id")) + continue; + auto scid_j = c["short_channel_id"]; + if (!scid_j.is_string()) + continue; + auto scid_s = std::string(scid_j); + if (!Ln::Scid::valid_string(scid_s)) + continue; + auto scid = Ln::Scid(scid_s); + nodemap[scid] = node; + } + } + return Ev::lift(); + } + + Ev::Io + htlc_accepted(Ln::HtlcAccepted::Request const& req) { + /* Is it a forward? */ + if (!req.next_channel) + return Ev::lift(false); + + /* Is it in the table? */ + auto it = nodemap.find(req.next_channel); + if (it == nodemap.end()) + return Ev::lift(false); + + auto& node = it->second; + auto id = req.id; + auto amount = req.next_amount; + + return Boss::concurrent( check_and_move(node, amount, id) + ).then([this]() { + return Ev::lift(true); + }); + } + + class Run { + private: + class Impl; + std::shared_ptr pimpl; + public: + Run() =delete; + + Run(S::Bus& bus, Boss::Mod::Rpc& rpc + , Ln::NodeId const& node + , Ln::Amount amount + , std::uint64_t id + ); + Run(Run&&) =default; + Run(Run const&) =default; + ~Run() =default; + + Ev::Io execute(); + }; + + Ev::Io + check_and_move( Ln::NodeId const& node + , Ln::Amount amount + , std::uint64_t id + ) { + return wait_for_rpc().then([this, node, amount, id]() { + auto r = Run(bus, *rpc, node, amount, id); + return r.execute(); + }); + } + +public: + Impl(S::Bus& bus_) : bus(bus_) { start(); } +}; + +/* Yes, what a messy name... */ +class JitRebalancer::Impl::Run::Impl { +private: + S::Bus& bus; + Boss::Mod::Rpc& rpc; + Ln::NodeId out_node; + Ln::Amount amount; + std::uint64_t id; + + /* Unilateral-close feerate. */ + std::uint64_t feerate; + /* Amount available on all nodes. */ + struct ChannelInfo { + Ln::Amount to_us; + Ln::Amount capacity; + }; + std::map available; + /* How much should we add to the destination? */ + Ln::Amount to_move; + /* Up to how much to pay for *this* rebalance. */ + Ln::Amount this_rebalance_fee; + + /* ReqResp to `Boss::Mod::EarningsTracker`. */ + Boss::ModG::ReqResp< Msg::RequestEarningsInfo + , Msg::ResponseEarningsInfo + > earnings_info_rr; + /* ReqResp to `Boss::Mod::FundsMover`. */ + Boss::ModG::ReqResp< Msg::RequestMoveFunds + , Msg::ResponseMoveFunds + > move_funds_rr; + + /* Thrown to signal that we should release the HTLC. */ + struct Continue {}; + /* Exceptions are dual to `call-with-current-continuation`, + * and one use of `call-with-current-continuation` is to + * create "early outs" without having to thread through a lot + * of functions. + * We simply use thrown objects to implement a sort of + * `call-with-current-continuation` to implement early outs. + */ + + /* Just the information we need from the earnings tracker. */ + struct Earnings { + /* These are the "out" earnings and expenditures. */ + Ln::Amount earnings; + Ln::Amount expenditures; + }; + + Ev::Io core_execute() { + return Ev::lift().then([this]() { + + /* Get the unilateral close feerate. */ + auto parms = Json::Out() + .start_array() + .entry("perkw") + .end_array() + ; + return rpc.command("feerates", std::move(parms)); + }).then([this](Jsmn::Object res) { + feerate = 0; + try { + auto data = res["perkw"]; + if (data.has("unilateral_close")) + feerate = std::uint64_t(double( + data["unilateral_close"] + )); + } catch (Jsmn::TypeError const&) { + return Boss::log( bus, Error + , "JitRebalancer: Unexpected " + "result from feerates: %s" + , Util::stringify(res).c_str() + ).then([]() { + throw Continue(); + return Ev::lift(); + }); + } + return Ev::lift(); + }).then([this]() { + + /* Adjust the amount by the feerate times weight + * plus the HTLC. */ + amount += Ln::Amount::msat(feerate * htlc_weight); + + /* Determine the amounts available. */ + auto parms = Json::Out::empty_object(); + return rpc.command("listpeers", std::move(parms)); + }).then([this](Jsmn::Object res) { + try { + auto ps = res["peers"]; + for ( auto i = std::size_t(0) + ; i < ps.size() + ; ++i + ) { + auto p = ps[i]; + auto to_us = Ln::Amount::sat(0); + auto capacity = Ln::Amount::sat(0); + auto peer = Ln::NodeId(std::string( + p["id"] + )); + + auto cs = p["channels"]; + for ( auto j = std::size_t(0) + ; j < cs.size() + ; ++j + ) { + auto c = cs[i]; + auto state = std::string( + c["state"] + ); + if (state != "CHANNELD_NORMAL") + continue; + to_us += Ln::Amount( + std::string( + c["to_us_msat"] + )); + capacity += Ln::Amount( + std::string( + c["total_msat"] + )); + } + + auto& av = available[peer]; + av.to_us = to_us; + av.capacity = capacity; + } + } catch (std::exception const&) { + return Boss::log( bus, Error + , "JitRebalancer: Unexpected " + "result from listpeers: %s" + , Util::stringify(res).c_str() + ).then([]() { + throw Continue(); + return Ev::lift(); + }); + } + return Ev::lift(); + }).then([this]() { + + /* Check if it is available. */ + auto it = available.find(out_node); + if (it == available.end()) + /* Not in the map? + * Continue, this will fail. */ + throw Continue(); + + /* Check if the amount fits. */ + if (amount <= it->second.to_us) + /* Will fit, just continue. */ + return Boss::log( bus, Debug + , "JitRebalancer: HTLC %s " + "of amount %s fits in " + "outgoing amount %s." + , Util::stringify(id).c_str() + , std::string(amount).c_str() + , std::string(it->second.to_us) + .c_str() + ).then([this]() { + throw Continue(); + return Ev::lift(); + }); + + /* Target to have the out-node have midway between + * the needed amount, and the total capacity of + * the channel, to also fund future outgoing forwards + * via this channel. + */ + auto target = (amount + it->second.capacity) / 2.0; + to_move = target - it->second.to_us; + + return Boss::log( bus, Debug + , "JitRebalancer: HTLC %s needs " + "amount %s, only %s available " + "at %s, want to rebalance %s." + , Util::stringify(id).c_str() + , std::string(amount).c_str() + , std::string(it->second.to_us).c_str() + , std::string(out_node).c_str() + , std::string(to_move).c_str() + ); + }).then([this]() { + + /* Determine how much fee we can use for + * rebalancing. */ + return get_earnings(out_node); + }).then([this](Earnings e) { + /* Total aggregated limit. */ + auto limit = free_fee + + (e.earnings * (max_fee_percent / 100.0)) + ; + if (limit < e.expenditures) + return Boss::log( bus, Debug + , "JitRebalancer: HTLC %s: " + "Cannot rebalance to %s, we " + "already spent %s on " + "rebalances, limit is %s." + , Util::stringify(id).c_str() + , std::string(out_node).c_str() + , std::string(e.expenditures) + .c_str() + , std::string(limit).c_str() + ).then([]() { + throw Continue(); + return Ev::lift(); + }); + this_rebalance_fee = limit - e.expenditures; + if (this_rebalance_fee > max_rebalance_fee) + this_rebalance_fee = max_rebalance_fee; + + /* Now select a source channel. */ + auto min_required = to_move + + (this_rebalance_fee / 2.0) + ; + auto sampler = Stats::ReservoirSampler(1); + for (auto& e : available) { + auto& candidate = e.first; + auto& channel = e.second; + + /* The outgoing node is disqualified. */ + if (candidate == out_node) + continue; + /* If the node does not have enough, + * skip it. */ + if (channel.to_us < min_required) + continue; + + /* Score according to how much it is + * balanced in our favor. */ + sampler.add( candidate + , channel.to_us / channel.capacity + , Boss::random_engine + ); + } + auto& samples = sampler.get(); + if (samples.size() == 0) + return Boss::log( bus, Debug + , "JitRebalancer: HTLC %s: " + "No candidates to get funds " + "from." + , Util::stringify(id).c_str() + ).then([]() { + throw Continue(); + return Ev::lift(); + }); + auto& selected = samples[0]; + + return Boss::log( bus, Debug + , "JitRebalancer: HTLC %s: Move %s " + "from %s to %s." + , Util::stringify(id).c_str() + , std::string(to_move).c_str() + , std::string(selected).c_str() + , std::string(out_node).c_str() + ) + + move_funds(selected) + ; + }); + } + + Ev::Io get_earnings(Ln::NodeId const& node) { + return earnings_info_rr.execute(Msg::RequestEarningsInfo{ + nullptr, node + }).then([this](Msg::ResponseEarningsInfo raw) { + return Ev::lift(Earnings{ + raw.out_earnings, raw.out_expenditures + }); + }); + } + + Ev::Io move_funds(Ln::NodeId const& source) { + return move_funds_rr.execute(Msg::RequestMoveFunds{ + nullptr, + source, out_node, + to_move, this_rebalance_fee + }).then([this](Msg::ResponseMoveFunds _) { + /* Ignore result. */ + return Ev::lift(); + }); + } + +public: + Impl( S::Bus& bus_ + , Boss::Mod::Rpc& rpc_ + , Ln::NodeId const& out_node_ + , Ln::Amount amount_ + , std::uint64_t id_ + ) : bus(bus_), rpc(rpc_) + , out_node(out_node_), amount(amount_), id(id_) + , earnings_info_rr( bus + , [](Msg::RequestEarningsInfo& msg, void* p) { + msg.requester = p; + } + , [](Msg::ResponseEarningsInfo& msg) { + return msg.requester; + } + ) + , move_funds_rr( bus + , [](Msg::RequestMoveFunds& msg, void* p) { + msg.requester = p; + } + , [](Msg::ResponseMoveFunds& msg) { + return msg.requester; + } + ) + { } + + static + Ev::Io execute(std::shared_ptr self) { + return self->core_execute().catching([](Continue const& _) { + return Ev::lift(); + }).catching([self](std::exception const& e) { + return Boss::log( self->bus, Error + , "JitRebalancer: Uncaught error: " + "%s" + , e.what() + ); + }).then([self]() { + return self->bus.raise(Msg::ReleaseHtlcAccepted{ + Ln::HtlcAccepted::Response::cont(self->id) + }); + }); + } +}; +JitRebalancer::Impl::Run::Run( S::Bus& bus + , Boss::Mod::Rpc& rpc + , Ln::NodeId const& node + , Ln::Amount amount + , std::uint64_t id + ) + : pimpl(std::make_shared(bus, rpc, node, amount, id)) { } +Ev::Io JitRebalancer::Impl::Run::execute() { + return Impl::execute(pimpl); +} + +JitRebalancer::JitRebalancer(JitRebalancer&&) =default; +JitRebalancer::~JitRebalancer() =default; + +JitRebalancer::JitRebalancer(S::Bus& bus) + : pimpl(Util::make_unique(bus)) { } + +}} diff --git a/Boss/Mod/JitRebalancer.hpp b/Boss/Mod/JitRebalancer.hpp new file mode 100644 index 000000000..12453b960 --- /dev/null +++ b/Boss/Mod/JitRebalancer.hpp @@ -0,0 +1,36 @@ +#ifndef BOSS_MOD_JITREBALANCER_HPP +#define BOSS_MOD_JITREBALANCER_HPP + +#include + +namespace S { class Bus; } + +namespace Boss { namespace Mod { + +/** class Boss::Mod::JitRebalancer + * + * @brief Rebalances channels just in time for a forward. + * + * @desc Registers as an `htlc_accepted` deferrer, and defers + * forwards if they are to nodes with insufficient outgoing + * capacity. + * It then arranges to move funds to that channel if possible. + */ +class JitRebalancer { +private: + class Impl; + std::unique_ptr pimpl; + +public: + JitRebalancer() =delete; + + JitRebalancer(JitRebalancer&&); + ~JitRebalancer(); + + explicit + JitRebalancer(S::Bus& bus); +}; + +}} + +#endif /* !defined(BOSS_MOD_JITREBALANCER_HPP) */ diff --git a/Boss/Mod/all.cpp b/Boss/Mod/all.cpp index 44de656d6..923f0d881 100644 --- a/Boss/Mod/all.cpp +++ b/Boss/Mod/all.cpp @@ -29,6 +29,7 @@ #include"Boss/Mod/Initiator.hpp" #include"Boss/Mod/InternetConnectionMonitor.hpp" #include"Boss/Mod/InvoicePayer.hpp" +#include"Boss/Mod/JitRebalancer.hpp" #include"Boss/Mod/JsonOutputter.hpp" #include"Boss/Mod/ListfundsAnnouncer.hpp" #include"Boss/Mod/ListpaysHandler.hpp" @@ -167,6 +168,7 @@ std::shared_ptr all( std::ostream& cout all->install(bus); all->install(bus); all->install(bus); + all->install(bus); return all; } diff --git a/Makefile.am b/Makefile.am index 7cf5f70da..2074a6f55 100644 --- a/Makefile.am +++ b/Makefile.am @@ -142,6 +142,8 @@ libclboss_la_SOURCES = \ Boss/Mod/InternetConnectionMonitor.hpp \ Boss/Mod/InvoicePayer.cpp \ Boss/Mod/InvoicePayer.hpp \ + Boss/Mod/JitRebalancer.cpp \ + Boss/Mod/JitRebalancer.hpp \ Boss/Mod/JsonOutputter.cpp \ Boss/Mod/JsonOutputter.hpp \ Boss/Mod/ListfundsAnnouncer.cpp \