From eec8de018b5873312a0e8a6913088b66e81f56ee Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Tue, 7 Oct 2025 12:40:55 +1030 Subject: [PATCH 01/14] plugins: make fatal errors neater. Without this they get run together on stderr. Signed-off-by: Rusty Russell --- plugins/libplugin.c | 1 + 1 file changed, 1 insertion(+) diff --git a/plugins/libplugin.c b/plugins/libplugin.c index 36a74d9f8134..daec11d492b0 100644 --- a/plugins/libplugin.c +++ b/plugins/libplugin.c @@ -2021,6 +2021,7 @@ void NORETURN plugin_errv(struct plugin *p, const char *fmt, va_list ap) plugin_logv(p, LOG_BROKEN, fmt, ap); vfprintf(stderr, fmt, ap2); + fprintf(stderr, "\n"); plugin_exit(p, 1); va_end(ap2); } From 3494e11dda3412095103477643f5fea26c2c50fb Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Tue, 7 Oct 2025 12:41:55 +1030 Subject: [PATCH 02/14] common: export helper membuf_tal_realloc. We have to call it membuf_tal_resize() because the other on is a ccan/json_out static function! Signed-off-by: Rusty Russell --- common/msg_queue.c | 12 +----------- common/utils.c | 10 ++++++++++ common/utils.h | 5 ++++- plugins/libplugin.c | 12 +----------- 4 files changed, 16 insertions(+), 23 deletions(-) diff --git a/common/msg_queue.c b/common/msg_queue.c index ba702f373053..b63a60be0231 100644 --- a/common/msg_queue.c +++ b/common/msg_queue.c @@ -36,21 +36,11 @@ static void destroy_msg_queue(struct msg_queue *q) } } -/* Realloc helper for tal membufs */ -static void *membuf_tal_realloc(struct membuf *mb, void *rawelems, - size_t newsize) -{ - char *p = rawelems; - - tal_resize(&p, newsize); - return p; -} - struct msg_queue *msg_queue_new(const tal_t *ctx, bool fd_passing) { struct msg_queue *q = tal(ctx, struct msg_queue); q->fd_passing = fd_passing; - membuf_init(&q->mb, tal_arr(q, const u8 *, 0), 0, membuf_tal_realloc); + membuf_init(&q->mb, tal_arr(q, const u8 *, 0), 0, membuf_tal_resize); if (q->fd_passing) tal_add_destructor(q, destroy_msg_queue); diff --git a/common/utils.c b/common/utils.c index f51094f41eeb..b383f63c3df4 100644 --- a/common/utils.c +++ b/common/utils.c @@ -191,3 +191,13 @@ char *str_lowering(const void *ctx, const char *string TAKES) for (char *p = ret; *p; p++) *p = tolower(*p); return ret; } + +/* Realloc helper for tal membufs */ +void *membuf_tal_resize(struct membuf *mb, void *rawelems, size_t newsize) +{ + char *p = rawelems; + + tal_resize(&p, newsize); + return p; +} + diff --git a/common/utils.h b/common/utils.h index 26992fdb06b0..a4041320cadb 100644 --- a/common/utils.h +++ b/common/utils.h @@ -9,8 +9,8 @@ #include #include +struct membuf; extern secp256k1_context *secp256k1_ctx; - extern const struct chainparams *chainparams; /* Unsigned min/max macros: BUILD_ASSERT make sure types are unsigned */ @@ -164,6 +164,9 @@ extern const tal_t *wally_tal_ctx; * Returns created temporary path name at *created if successful. */ int tmpdir_mkstemp(const tal_t *ctx, const char *template TAKES, char **created); +/* For use with membuf_init */ +void *membuf_tal_resize(struct membuf *mb, void *rawelems, size_t newsize); + /** * tal_strlowering - return the same string by in lower case. * @ctx: the context to tal from (often NULL) diff --git a/plugins/libplugin.c b/plugins/libplugin.c index daec11d492b0..6bfb813718a2 100644 --- a/plugins/libplugin.c +++ b/plugins/libplugin.c @@ -561,16 +561,6 @@ struct json_out *json_out_obj(const tal_t *ctx, return jout; } -/* Realloc helper for tal membufs */ -static void *membuf_tal_realloc(struct membuf *mb, void *rawelems, - size_t newsize) -{ - char *p = rawelems; - - tal_resize(&p, newsize); - return p; -} - static int read_json_from_rpc(struct plugin *p) { char *end; @@ -1623,7 +1613,7 @@ static struct command_result *handle_init(struct command *cmd, with_rpc = true; membuf_init(&p->rpc_conn->mb, tal_arr(p, char, READ_CHUNKSIZE), - READ_CHUNKSIZE, membuf_tal_realloc); + READ_CHUNKSIZE, membuf_tal_resize); } else with_rpc = false; From f18d24cdc46fe3e7f284ba92521253260c1073ff Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Tue, 7 Oct 2025 13:52:41 +1030 Subject: [PATCH 03/14] common: add json_dup_contents() to duplicate toks and buffer. We do this in several places, might as well make it common code. Signed-off-by: Rusty Russell --- common/json_parse.c | 10 ++++++++++ common/json_parse.h | 7 +++++++ lightningd/jsonrpc.c | 9 +++++---- lightningd/test/run-jsonrpc.c | 7 +++++++ 4 files changed, 29 insertions(+), 4 deletions(-) diff --git a/common/json_parse.c b/common/json_parse.c index 260e9e1b5c89..7bf9bc496c83 100644 --- a/common/json_parse.c +++ b/common/json_parse.c @@ -676,3 +676,13 @@ json_tok_channel_id(const char *buffer, const jsmntok_t *tok, return hex_decode(buffer + tok->start, tok->end - tok->start, cid, sizeof(*cid)); } + +void json_dup_contents(const tal_t *ctx, + const char *buffer, + const jsmntok_t *tok, + const char **new_buffer, + const jsmntok_t **new_toks) +{ + *new_buffer = tal_dup_arr(ctx, char, buffer, tok->end, 0); + *new_toks = tal_dup_arr(ctx, jsmntok_t, tok, json_next(tok) - tok, 0); +} diff --git a/common/json_parse.h b/common/json_parse.h index 4bd23cbddcd7..7e19fdfe4883 100644 --- a/common/json_parse.h +++ b/common/json_parse.h @@ -137,6 +137,13 @@ const char *json_scan(const tal_t *ctx, const char *guide, ...); +/* Duplicate the tok(s) and buffer required (don't assume they're tal objects!) */ +void json_dup_contents(const tal_t *ctx, + const char *buffer, + const jsmntok_t *tok, + const char **new_buffer, + const jsmntok_t **new_toks); + /* eg. JSON_SCAN(json_to_bool, &boolvar) */ #define JSON_SCAN(fmt, var) \ json_scan, \ diff --git a/lightningd/jsonrpc.c b/lightningd/jsonrpc.c index 7370ad73ac5b..6bd3cb784715 100644 --- a/lightningd/jsonrpc.c +++ b/lightningd/jsonrpc.c @@ -998,8 +998,8 @@ rpc_command_hook_callback(struct rpc_command_hook_payload *p, if (tok) { /* We need to make copies here, as buffer and tokens * can be reused. */ - p->custom_replace = json_tok_copy(p, tok); - p->custom_buffer = tal_dup_talarr(p, char, buffer); + json_dup_contents(p, buffer, tok, + &p->custom_buffer, &p->custom_replace); return true; } @@ -1146,8 +1146,9 @@ parse_request(struct json_connection *jcon, const jsmntok_t tok[]) rpc_hook = tal(c, struct rpc_command_hook_payload); rpc_hook->cmd = c; /* Duplicate since we might outlive the connection */ - rpc_hook->buffer = tal_dup_talarr(rpc_hook, char, jcon->buffer); - rpc_hook->request = tal_dup_talarr(rpc_hook, jsmntok_t, tok); + json_dup_contents(rpc_hook, jcon->buffer, tok, + &rpc_hook->buffer, + &rpc_hook->request); /* NULL the custom_ values for the hooks */ rpc_hook->custom_result = NULL; diff --git a/lightningd/test/run-jsonrpc.c b/lightningd/test/run-jsonrpc.c index cf064ca891a4..05d9a91751b2 100644 --- a/lightningd/test/run-jsonrpc.c +++ b/lightningd/test/run-jsonrpc.c @@ -51,6 +51,13 @@ char *hsm_secret_arg(const tal_t *ctx UNNEEDED, const char *arg UNNEEDED, const u8 **hsm_secret UNNEEDED) { fprintf(stderr, "hsm_secret_arg called!\n"); abort(); } +/* Generated stub for json_dup_contents */ +void json_dup_contents(const tal_t *ctx UNNEEDED, + const char *buffer UNNEEDED, + const jsmntok_t *tok UNNEEDED, + const char **new_buffer UNNEEDED, + const jsmntok_t **new_toks UNNEEDED) +{ fprintf(stderr, "json_dup_contents called!\n"); abort(); } /* Generated stub for json_to_jsonrpc_errcode */ bool json_to_jsonrpc_errcode(const char *buffer UNNEEDED, const jsmntok_t *tok UNNEEDED, enum jsonrpc_errcode *errcode UNNEEDED) From b2feb724c21d99c208cbf0ef776f84bb28bb52e3 Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Tue, 7 Oct 2025 13:52:48 +1030 Subject: [PATCH 04/14] plugins: remove unused json_buffer / json_toks members from libplugin-pay. They were never referenced, and saving the toks is questionable since their lifetime is not guaranteed to live beyond this call (at least the buffer was duplicated, but that also assumed it was at the start of the object). Signed-off-by: Rusty Russell --- plugins/keysend.c | 2 -- plugins/libplugin-pay.h | 3 --- plugins/pay.c | 2 -- 3 files changed, 7 deletions(-) diff --git a/plugins/keysend.c b/plugins/keysend.c index 6104bf607ccd..030d62c37e16 100644 --- a/plugins/keysend.c +++ b/plugins/keysend.c @@ -221,8 +221,6 @@ static struct command_result *json_keysend(struct command *cmd, const char *buf, p = payment_new(cmd, cmd, NULL /* No parent */, global_hints, pay_mods); p->local_id = &my_id; - p->json_buffer = tal_dup_talarr(p, const char, buf); - p->json_toks = params; p->route_destination = tal_steal(p, destination); p->pay_destination = p->route_destination; p->payment_secret = NULL; diff --git a/plugins/libplugin-pay.h b/plugins/libplugin-pay.h index bc1ec51b5dc9..88ccd7d7e72b 100644 --- a/plugins/libplugin-pay.h +++ b/plugins/libplugin-pay.h @@ -151,9 +151,6 @@ struct payment { struct plugin *plugin; struct node_id *local_id; - const char *json_buffer; - const jsmntok_t *json_toks; - /* The current phase we are in. */ enum payment_step step; diff --git a/plugins/pay.c b/plugins/pay.c index 7deaf85bc518..e16a6bf001b4 100644 --- a/plugins/pay.c +++ b/plugins/pay.c @@ -1475,8 +1475,6 @@ static struct command_result *json_pay(struct command *cmd, } p->local_id = &my_id; - p->json_buffer = buf; - p->json_toks = params; p->why = "Initial attempt"; p->constraints.cltv_budget = *maxdelay; tal_free(maxdelay); From 86fb367aa6657469d18727fd3cf27223fa30c032 Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Mon, 13 Oct 2025 18:13:32 +1030 Subject: [PATCH 05/14] common/jsonrpc_io: helper routines for reading JSON from sockets. The efficient way to do this is to use membuf, which handles the buffer control (only using memmove when necessary). We have multiple places where we opencoded this, some of which did not use membuf at all. So now we create common infrastructure. I tried making it a single function but the various users are quite different, so instead I opted for a toolbox approach. Signed-off-by: Rusty Russell --- common/Makefile | 1 + common/jsonrpc_io.c | 128 ++++++++++++++ common/jsonrpc_io.h | 71 ++++++++ common/test/Makefile | 2 + common/test/run-jsonrpc_io.c | 317 +++++++++++++++++++++++++++++++++++ 5 files changed, 519 insertions(+) create mode 100644 common/jsonrpc_io.c create mode 100644 common/jsonrpc_io.h create mode 100644 common/test/run-jsonrpc_io.c diff --git a/common/Makefile b/common/Makefile index dec98e099b26..cd68eefd8e09 100644 --- a/common/Makefile +++ b/common/Makefile @@ -62,6 +62,7 @@ COMMON_SRC_NOGEN := \ common/json_parse.c \ common/json_parse_simple.c \ common/json_stream.c \ + common/jsonrpc_io.c \ common/key_derive.c \ common/keyset.c \ common/lease_rates.c \ diff --git a/common/jsonrpc_io.c b/common/jsonrpc_io.c new file mode 100644 index 000000000000..ef0eb0c434b2 --- /dev/null +++ b/common/jsonrpc_io.c @@ -0,0 +1,128 @@ +#include "config.h" + +#include +#include +#include +#include +#include + +#define READ_CHUNKSIZE 64 + +struct jsonrpc_io { + MEMBUF(char) membuf; + jsmn_parser parser; + jsmntok_t *toks; + + /* Amount of unparsed JSON from previous reads */ + size_t bytes_unparsed; + /* Amount just read by io_read_partial */ + size_t bytes_read; +}; + +struct jsonrpc_io *jsonrpc_io_new(const tal_t *ctx) +{ + struct jsonrpc_io *json_in; + + json_in = tal(ctx, struct jsonrpc_io); + json_in->bytes_unparsed = 0; + json_in->bytes_read = 0; + + membuf_init(&json_in->membuf, + tal_arr(json_in, char, READ_CHUNKSIZE), + READ_CHUNKSIZE, membuf_tal_resize); + json_in->toks = toks_alloc(json_in); + jsmn_init(&json_in->parser); + + return json_in; +} + +/* Empty new bytes read into our unparsed buffer */ +static void add_newly_read(struct jsonrpc_io *json_in) +{ + /* Now added it to our ubparsed buffer */ + membuf_added(&json_in->membuf, json_in->bytes_read); + json_in->bytes_unparsed += json_in->bytes_read; + json_in->bytes_read = 0; +} + +const char *jsonrpc_newly_read(struct jsonrpc_io *json_in, + size_t *len) +{ + const char *ret; + + ret = membuf_elems(&json_in->membuf) + json_in->bytes_unparsed; + *len = json_in->bytes_read; + + add_newly_read(json_in); + return ret; +} + +const char *jsonrpc_io_parse(const tal_t *ctx, + struct jsonrpc_io *json_in, + const jsmntok_t **toks, + const char **buf) +{ + bool complete; + + /* If we're read any more, add that */ + add_newly_read(json_in); + *toks = NULL; + *buf = NULL; + + if (!json_parse_input(&json_in->parser, &json_in->toks, + membuf_elems(&json_in->membuf), + membuf_num_elems(&json_in->membuf), + &complete)) { + return tal_fmt(ctx, "Failed to parse RPC JSON response '%.*s'", + (int)membuf_num_elems(&json_in->membuf), + membuf_elems(&json_in->membuf)); + } + + if (!complete) + return NULL; + + /* Must have jsonrpc to be valid! */ + if (!json_get_member(membuf_elems(&json_in->membuf), + json_in->toks, + "jsonrpc")) { + return tal_fmt(ctx, + "JSON-RPC message does not contain \"jsonrpc\" field: '%.*s'", + (int)membuf_num_elems(&json_in->membuf), + membuf_elems(&json_in->membuf)); + } + + *toks = json_in->toks; + *buf = membuf_elems(&json_in->membuf); + return NULL; +} + +void jsonrpc_io_parse_done(struct jsonrpc_io *json_in) +{ + size_t bytes_parsed = json_in->toks[0].end; + json_in->bytes_unparsed -= bytes_parsed; + membuf_consume(&json_in->membuf, bytes_parsed); + + jsmn_init(&json_in->parser); + toks_reset(json_in->toks); +} + +struct io_plan *jsonrpc_io_read_(struct io_conn *conn, + struct jsonrpc_io *json_in, + struct io_plan *(*next)(struct io_conn *, + void *), + void *arg) +{ + /* Make sure there's more room */ + membuf_prepare_space(&json_in->membuf, READ_CHUNKSIZE); + + /* Try to read more. */ + json_in->bytes_read = 0; + return io_read_partial(conn, + membuf_elems(&json_in->membuf) + + json_in->bytes_unparsed, + membuf_num_elems(&json_in->membuf) + - json_in->bytes_unparsed + + membuf_num_space(&json_in->membuf), + &json_in->bytes_read, + next, arg); +} diff --git a/common/jsonrpc_io.h b/common/jsonrpc_io.h new file mode 100644 index 000000000000..9032aecc48d2 --- /dev/null +++ b/common/jsonrpc_io.h @@ -0,0 +1,71 @@ +/* Low-level helper library for C plugins using ccan/io and jsonrpc socket. */ +#ifndef LIGHTNING_COMMON_JSONRPC_IO_H +#define LIGHTNING_COMMON_JSONRPC_IO_H +#include "config.h" +#include +#include +#include + +struct io_conn; +struct plugin; + +/** + * jsonrpc_io_new: allocate a fresh jsonrpc_io + */ +struct jsonrpc_io *jsonrpc_io_new(const tal_t *ctx); + + +/** + * jsonrpc_io_read: set io_plan for reading more into buffer. + * @conn: the io_conn to read. + * @json_in: the jsonrpc_io. + * @next: the callback once a read is done. + * @arg: the argument for @next (typesafe). + */ +struct io_plan *jsonrpc_io_read_(struct io_conn *conn, + struct jsonrpc_io *json_in, + struct io_plan *(*next)(struct io_conn *, + void *), + void *arg); +#define jsonrpc_io_read(ctx, json_in, next, arg) \ + jsonrpc_io_read_((ctx), (json_in), \ + typesafe_cb_preargs(struct io_plan *, void *, \ + (next), (arg), \ + struct io_conn *), \ + (arg)) + +/** + * jsonrpc_newly_read: how much did we read into the buffer? + * + * Returns the buffer and sets *len to the bytes just read. After + * that it will return *len == 0. + */ +const char *jsonrpc_newly_read(struct jsonrpc_io *json_in, + size_t *len); + +/** + * jsonrpc_io_parse: try to parse more of the buffer. + * @ctx: context to allocate error message off. + * @json_in: json_in after jsonrpc_io_read. + * @toks: returned non-NULL if there's a whole valid json object. + * @buf: returned non-NULL as above. + * + * On error, a message is returned. On incomplete, *@toks and *@buf + * are NULL. Usually you call this, the use the result and call + * jsonrpc_io_parse_done(), then call it again. + */ +const char *jsonrpc_io_parse(const tal_t *ctx, + struct jsonrpc_io *json_in, + const jsmntok_t **toks, + const char **buf); + +/** + * jsonrpc_io_parse_done: call aftr using toks from jsonrpc_io_parse. + * @json_in: json_in after jsonrpc_io_parse. + * + * You must call this if jsonrpc_io_parse() sets *toks non-NULL + * (i.e. complete, and no error). + */ +void jsonrpc_io_parse_done(struct jsonrpc_io *json_in); + +#endif /* LIGHTNING_COMMON_JSONRPC_IO_H */ diff --git a/common/test/Makefile b/common/test/Makefile index db0fbcaab70e..6352beaa39c6 100644 --- a/common/test/Makefile +++ b/common/test/Makefile @@ -130,4 +130,6 @@ common/test/run-shutdown_scriptpubkey: wire/towire.o wire/fromwire.o common/test/run-wireaddr: wire/towire.o wire/fromwire.o +common/test/run-jsonrpc_io: common/json_parse_simple.o + check-units: $(COMMON_TEST_PROGRAMS:%=unittest/%) diff --git a/common/test/run-jsonrpc_io.c b/common/test/run-jsonrpc_io.c new file mode 100644 index 000000000000..4d5e604bb1f7 --- /dev/null +++ b/common/test/run-jsonrpc_io.c @@ -0,0 +1,317 @@ +/* Body of tests written by ChatGPT 5 */ +#include "config.h" +#include +#include +#include +#include +#include +#include +#include + +#undef io_read_partial +#define io_read_partial io_read_partial_test + +struct jsonrpc_io; + +static struct io_plan *io_read_partial_test(struct io_conn *conn, + void *data, + size_t maxlen, + size_t *lenp, + struct io_plan *(*next)(struct io_conn *, + void *), + void *arg); + + +#include "../jsonrpc_io.c" + +/* AUTOGENERATED MOCKS START */ +/* Generated stub for amount_asset_is_main */ +bool amount_asset_is_main(struct amount_asset *asset UNNEEDED) +{ fprintf(stderr, "amount_asset_is_main called!\n"); abort(); } +/* Generated stub for amount_asset_to_sat */ +struct amount_sat amount_asset_to_sat(struct amount_asset *asset UNNEEDED) +{ fprintf(stderr, "amount_asset_to_sat called!\n"); abort(); } +/* Generated stub for amount_feerate */ + bool amount_feerate(u32 *feerate UNNEEDED, struct amount_sat fee UNNEEDED, size_t weight UNNEEDED) +{ fprintf(stderr, "amount_feerate called!\n"); abort(); } +/* Generated stub for amount_sat */ +struct amount_sat amount_sat(u64 satoshis UNNEEDED) +{ fprintf(stderr, "amount_sat called!\n"); abort(); } +/* Generated stub for amount_sat_add */ + bool amount_sat_add(struct amount_sat *val UNNEEDED, + struct amount_sat a UNNEEDED, + struct amount_sat b UNNEEDED) +{ fprintf(stderr, "amount_sat_add called!\n"); abort(); } +/* Generated stub for amount_sat_eq */ +bool amount_sat_eq(struct amount_sat a UNNEEDED, struct amount_sat b UNNEEDED) +{ fprintf(stderr, "amount_sat_eq called!\n"); abort(); } +/* Generated stub for amount_sat_greater_eq */ +bool amount_sat_greater_eq(struct amount_sat a UNNEEDED, struct amount_sat b UNNEEDED) +{ fprintf(stderr, "amount_sat_greater_eq called!\n"); abort(); } +/* Generated stub for amount_sat_sub */ + bool amount_sat_sub(struct amount_sat *val UNNEEDED, + struct amount_sat a UNNEEDED, + struct amount_sat b UNNEEDED) +{ fprintf(stderr, "amount_sat_sub called!\n"); abort(); } +/* Generated stub for amount_sat_to_asset */ +struct amount_asset amount_sat_to_asset(struct amount_sat *sat UNNEEDED, const u8 *asset UNNEEDED) +{ fprintf(stderr, "amount_sat_to_asset called!\n"); abort(); } +/* Generated stub for amount_tx_fee */ +struct amount_sat amount_tx_fee(u32 fee_per_kw UNNEEDED, size_t weight UNNEEDED) +{ fprintf(stderr, "amount_tx_fee called!\n"); abort(); } +/* Generated stub for fromwire */ +const u8 *fromwire(const u8 **cursor UNNEEDED, size_t *max UNNEEDED, void *copy UNNEEDED, size_t n UNNEEDED) +{ fprintf(stderr, "fromwire called!\n"); abort(); } +/* Generated stub for fromwire_bool */ +bool fromwire_bool(const u8 **cursor UNNEEDED, size_t *max UNNEEDED) +{ fprintf(stderr, "fromwire_bool called!\n"); abort(); } +/* Generated stub for fromwire_fail */ +void *fromwire_fail(const u8 **cursor UNNEEDED, size_t *max UNNEEDED) +{ fprintf(stderr, "fromwire_fail called!\n"); abort(); } +/* Generated stub for fromwire_secp256k1_ecdsa_signature */ +void fromwire_secp256k1_ecdsa_signature(const u8 **cursor UNNEEDED, size_t *max UNNEEDED, + secp256k1_ecdsa_signature *signature UNNEEDED) +{ fprintf(stderr, "fromwire_secp256k1_ecdsa_signature called!\n"); abort(); } +/* Generated stub for fromwire_sha256 */ +void fromwire_sha256(const u8 **cursor UNNEEDED, size_t *max UNNEEDED, struct sha256 *sha256 UNNEEDED) +{ fprintf(stderr, "fromwire_sha256 called!\n"); abort(); } +/* Generated stub for fromwire_tal_arrn */ +u8 *fromwire_tal_arrn(const tal_t *ctx UNNEEDED, + const u8 **cursor UNNEEDED, size_t *max UNNEEDED, size_t num UNNEEDED) +{ fprintf(stderr, "fromwire_tal_arrn called!\n"); abort(); } +/* Generated stub for fromwire_u32 */ +u32 fromwire_u32(const u8 **cursor UNNEEDED, size_t *max UNNEEDED) +{ fprintf(stderr, "fromwire_u32 called!\n"); abort(); } +/* Generated stub for fromwire_u64 */ +u64 fromwire_u64(const u8 **cursor UNNEEDED, size_t *max UNNEEDED) +{ fprintf(stderr, "fromwire_u64 called!\n"); abort(); } +/* Generated stub for fromwire_u8 */ +u8 fromwire_u8(const u8 **cursor UNNEEDED, size_t *max UNNEEDED) +{ fprintf(stderr, "fromwire_u8 called!\n"); abort(); } +/* Generated stub for fromwire_u8_array */ +void fromwire_u8_array(const u8 **cursor UNNEEDED, size_t *max UNNEEDED, u8 *arr UNNEEDED, size_t num UNNEEDED) +{ fprintf(stderr, "fromwire_u8_array called!\n"); abort(); } +/* Generated stub for towire */ +void towire(u8 **pptr UNNEEDED, const void *data UNNEEDED, size_t len UNNEEDED) +{ fprintf(stderr, "towire called!\n"); abort(); } +/* Generated stub for towire_bool */ +void towire_bool(u8 **pptr UNNEEDED, bool v UNNEEDED) +{ fprintf(stderr, "towire_bool called!\n"); abort(); } +/* Generated stub for towire_secp256k1_ecdsa_signature */ +void towire_secp256k1_ecdsa_signature(u8 **pptr UNNEEDED, + const secp256k1_ecdsa_signature *signature UNNEEDED) +{ fprintf(stderr, "towire_secp256k1_ecdsa_signature called!\n"); abort(); } +/* Generated stub for towire_sha256 */ +void towire_sha256(u8 **pptr UNNEEDED, const struct sha256 *sha256 UNNEEDED) +{ fprintf(stderr, "towire_sha256 called!\n"); abort(); } +/* Generated stub for towire_u32 */ +void towire_u32(u8 **pptr UNNEEDED, u32 v UNNEEDED) +{ fprintf(stderr, "towire_u32 called!\n"); abort(); } +/* Generated stub for towire_u64 */ +void towire_u64(u8 **pptr UNNEEDED, u64 v UNNEEDED) +{ fprintf(stderr, "towire_u64 called!\n"); abort(); } +/* Generated stub for towire_u8 */ +void towire_u8(u8 **pptr UNNEEDED, u8 v UNNEEDED) +{ fprintf(stderr, "towire_u8 called!\n"); abort(); } +/* Generated stub for towire_u8_array */ +void towire_u8_array(u8 **pptr UNNEEDED, const u8 *arr UNNEEDED, size_t num UNNEEDED) +{ fprintf(stderr, "towire_u8_array called!\n"); abort(); } +/* AUTOGENERATED MOCKS END */ + +struct test_feed { + const char *data; + size_t len, off; + size_t max_chunk; /* 0 => no artificial limit */ + unsigned calls_to_io_read; +}; +static struct test_feed FEED; + +static void feed_set(const char *s, size_t max_chunk) +{ + FEED.data = s; + FEED.len = strlen(s); + FEED.off = 0; + FEED.max_chunk = max_chunk; + FEED.calls_to_io_read = 0; +} + +static size_t feed_next_chunk(size_t want) +{ + size_t remain = FEED.len - FEED.off; + size_t cap = (FEED.max_chunk && FEED.max_chunk < want) ? FEED.max_chunk : want; + return (remain < cap) ? remain : cap; +} + +static struct io_plan *io_read_partial_test(struct io_conn *conn, + void *data, + size_t maxlen, + size_t *lenp, + struct io_plan *(*next)(struct io_conn *, void *), + void *arg) +{ + char *out = (char *)data; + size_t n = feed_next_chunk(maxlen); + + FEED.calls_to_io_read++; + if (n) { + memcpy(out, FEED.data + FEED.off, n); + FEED.off += n; + } + *lenp = n; + + /* No more input -> end the chain */ + if (n == 0) + return NULL; + + return next(conn, arg); +} + +/* ---------- minimal “handler” to count parsed messages ---------- */ + +struct handler_ctx { + unsigned called; + char last_buf[512]; + size_t last_len; +}; + +static void record_message(const char *buf, const jsmntok_t *toks, struct handler_ctx *hc) +{ + size_t obj_len = (size_t)(toks[0].end - toks[0].start); + if (obj_len > sizeof(hc->last_buf)) obj_len = sizeof(hc->last_buf); + memcpy(hc->last_buf, buf + toks[0].start, obj_len); + hc->last_len = obj_len; + hc->called++; +} + +/* ---------- pump that drives read -> parse -> (maybe) read again ---------- */ + +struct pump_ctx { + struct jsonrpc_io *jin; + struct handler_ctx *hc; +}; + +static struct io_plan *pump_next(struct io_conn *conn, struct pump_ctx *pc) +{ + for (;;) { + const jsmntok_t *toks; + const char *buf; + const char *err = jsonrpc_io_parse(tmpctx, pc->jin, &toks, &buf); + + assert(!err); + if (!toks) { + /* Need more bytes */ + return jsonrpc_io_read(conn, pc->jin, pump_next, pc); + } + + /* Got a full JSON-RPC message */ + record_message(buf, toks, pc->hc); + jsonrpc_io_parse_done(pc->jin); + + /* Loop to consume any additional buffered messages + * without asking for more input yet. */ + } +} + +/* ---------- helpers ---------- */ + +static struct jsonrpc_io *mk_reader(const tal_t *ctx) +{ + return jsonrpc_io_new(ctx); +} + +static void run_once(struct jsonrpc_io *jin, struct handler_ctx *hc) +{ + struct pump_ctx pc = { .jin = jin, .hc = hc }; + jsonrpc_io_read(NULL, jin, pump_next, &pc); +} + +/* ---------- tests ---------- */ + +static size_t test_single_message_chunked(size_t chunksize) +{ + struct handler_ctx hc = {0}; + const char *msg = "{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":true}\n"; + struct jsonrpc_io *jin = mk_reader(NULL); + + feed_set(msg, chunksize); + run_once(jin, &hc); + + assert(hc.called == 1); + assert(FEED.off == FEED.len); + assert(hc.last_len > 0 && hc.last_buf[0] == '{' && hc.last_buf[hc.last_len-1] == '}'); + + tal_free(jin); + return strlen(msg); +} + +static size_t test_two_messages_back_to_back(size_t chunksize) +{ + struct handler_ctx hc = {0}; + const char *msgs = + "{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"ping\"}\n" + "{\"jsonrpc\":\"2.0\",\"id\":2,\"result\":42}\n"; + struct jsonrpc_io *jin = mk_reader(NULL); + + feed_set(msgs, chunksize); + run_once(jin, &hc); + + assert(hc.called == 2); + assert(FEED.off == FEED.len); + + tal_free(jin); + return strlen(msgs); +} + +static size_t test_whitespace_only(size_t chunksize) +{ + struct handler_ctx hc = {0}; + const char *ws = " \t \n \r\n "; + struct jsonrpc_io *jin = mk_reader(NULL); + + feed_set(ws, chunksize); + run_once(jin, &hc); + + assert(hc.called == 0); + assert(FEED.off == FEED.len); + + tal_free(jin); + return strlen(ws); +} + +static size_t test_message_then_whitespace_then_message(size_t chunksize) +{ + struct handler_ctx hc = {0}; + const char *msgs = + "{\"jsonrpc\":\"2.0\",\"id\":7,\"result\":true}\n" + " \n \t" + "{\"jsonrpc\":\"2.0\",\"id\":8,\"result\":false}\n"; + struct jsonrpc_io *jin = mk_reader(NULL); + + feed_set(msgs, chunksize); + run_once(jin, &hc); + + assert(hc.called == 2); + assert(FEED.off == FEED.len); + + tal_free(jin); + return strlen(msgs); +} + +/* ---------- main ---------- */ + +int main(int argc, char *argv[]) +{ + size_t max = 1; + + common_setup(argv[0]); + + for (size_t i = 0; i < max + 10; i++) { + max = max_u64(max, test_single_message_chunked(i)); + max = max_u64(max, test_two_messages_back_to_back(i)); + max = max_u64(max, test_whitespace_only(i)); + max = max_u64(max, test_message_then_whitespace_then_message(i)); + } + + common_shutdown(); + return 0; +} From 57cfd844d9091d14fb5bb48376d21da818508069 Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Mon, 13 Oct 2025 18:13:34 +1030 Subject: [PATCH 06/14] libplugin: use jsonrpc_io for reading replies to our async commands. This will also be more efficient than doing memmove every time. Signed-off-by: Rusty Russell --- plugins/Makefile | 1 + plugins/bkpr/test/run-sql.c | 19 ++++++++ plugins/libplugin.c | 96 ++++++++++--------------------------- plugins/test/Makefile | 2 +- 4 files changed, 46 insertions(+), 72 deletions(-) diff --git a/plugins/Makefile b/plugins/Makefile index b1d5120b6f5c..2db8837a42c7 100644 --- a/plugins/Makefile +++ b/plugins/Makefile @@ -191,6 +191,7 @@ PLUGIN_COMMON_OBJS := \ common/json_parse_simple.o \ common/json_filter.o \ common/json_stream.o \ + common/jsonrpc_io.o \ common/lease_rates.o \ common/memleak.o \ common/node_id.o \ diff --git a/plugins/bkpr/test/run-sql.c b/plugins/bkpr/test/run-sql.c index 8be5859adad7..9be785d203de 100644 --- a/plugins/bkpr/test/run-sql.c +++ b/plugins/bkpr/test/run-sql.c @@ -69,6 +69,25 @@ bool json_filter_ok(const struct json_filter *filter UNNEEDED, const char *membe /* Generated stub for json_filter_up */ bool json_filter_up(struct json_filter **filter UNNEEDED) { fprintf(stderr, "json_filter_up called!\n"); abort(); } +/* Generated stub for jsonrpc_io_new */ +struct jsonrpc_io *jsonrpc_io_new(const tal_t *ctx UNNEEDED) +{ fprintf(stderr, "jsonrpc_io_new called!\n"); abort(); } +/* Generated stub for jsonrpc_io_parse */ +const char *jsonrpc_io_parse(const tal_t *ctx UNNEEDED, + struct jsonrpc_io *json_in UNNEEDED, + const jsmntok_t **toks UNNEEDED, + const char **buf UNNEEDED) +{ fprintf(stderr, "jsonrpc_io_parse called!\n"); abort(); } +/* Generated stub for jsonrpc_io_parse_done */ +void jsonrpc_io_parse_done(struct jsonrpc_io *json_in UNNEEDED) +{ fprintf(stderr, "jsonrpc_io_parse_done called!\n"); abort(); } +/* Generated stub for jsonrpc_io_read_ */ +struct io_plan *jsonrpc_io_read_(struct io_conn *conn UNNEEDED, + struct jsonrpc_io *json_in UNNEEDED, + struct io_plan *(*next)(struct io_conn * UNNEEDED, + void *) UNNEEDED, + void *arg UNNEEDED) +{ fprintf(stderr, "jsonrpc_io_read_ called!\n"); abort(); } /* Generated stub for last_fee_state */ enum htlc_state last_fee_state(enum side opener UNNEEDED) { fprintf(stderr, "last_fee_state called!\n"); abort(); } diff --git a/plugins/libplugin.c b/plugins/libplugin.c index 6bfb813718a2..75ebdcd4f736 100644 --- a/plugins/libplugin.c +++ b/plugins/libplugin.c @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -99,10 +100,8 @@ struct plugin { /* Asynchronous RPC interaction */ struct io_conn *io_rpc_conn; struct list_head rpc_js_list; - char *rpc_buffer; - size_t rpc_used, rpc_len_read, rpc_read_offset; - jsmn_parser rpc_parser; - jsmntok_t *rpc_toks; + struct jsonrpc_io *jsonrpc_in; + /* Tracking async RPC requests */ STRMAP(struct out_req *) out_reqs; u64 next_outreq_id; @@ -1391,73 +1390,33 @@ static void rpc_conn_finished(struct io_conn *conn, plugin_err(plugin, "Lost connection to the RPC socket."); } -static bool rpc_read_response_one(struct plugin *plugin) -{ - const jsmntok_t *jrtok; - bool complete; - - if (!json_parse_input(&plugin->rpc_parser, &plugin->rpc_toks, - plugin->rpc_buffer + plugin->rpc_read_offset, - plugin->rpc_used - plugin->rpc_read_offset, - &complete)) { - plugin_err(plugin, "Failed to parse RPC JSON response '%.*s'", - (int)(plugin->rpc_used - plugin->rpc_read_offset), - plugin->rpc_buffer + plugin->rpc_read_offset); - } - - if (!complete) { - /* We need more. */ - goto compact; - } - - /* Empty buffer? (eg. just whitespace). */ - if (tal_count(plugin->rpc_toks) == 1) { - jsmn_init(&plugin->rpc_parser); - toks_reset(plugin->rpc_toks); - goto compact; - } - - jrtok = json_get_member(plugin->rpc_buffer + plugin->rpc_read_offset, - plugin->rpc_toks, "jsonrpc"); - if (!jrtok) { - plugin_err(plugin, "JSON-RPC message does not contain \"jsonrpc\" field: '%.*s'", - (int)(plugin->rpc_used - plugin->rpc_read_offset), - plugin->rpc_buffer + plugin->rpc_read_offset); - } - - handle_rpc_reply(plugin, plugin->rpc_buffer + plugin->rpc_read_offset, plugin->rpc_toks); - - /* Move this object out of the buffer */ - plugin->rpc_read_offset += plugin->rpc_toks[0].end; - jsmn_init(&plugin->rpc_parser); - toks_reset(plugin->rpc_toks); - return true; - -compact: - memmove(plugin->rpc_buffer, plugin->rpc_buffer + plugin->rpc_read_offset, - plugin->rpc_used - plugin->rpc_read_offset); - plugin->rpc_used -= plugin->rpc_read_offset; - plugin->rpc_read_offset = 0; - return false; -} static struct io_plan *rpc_conn_read_response(struct io_conn *conn, struct plugin *plugin) { - plugin->rpc_used += plugin->rpc_len_read; - if (plugin->rpc_used == tal_count(plugin->rpc_buffer)) - tal_resize(&plugin->rpc_buffer, plugin->rpc_used * 2); + /* Gather an parse any new bytes */ + for (;;) { + const jsmntok_t *toks; + const char *buf; + const char *err; - /* Read and process all messages from the connection */ - while (rpc_read_response_one(plugin)) - ; + err = jsonrpc_io_parse(tmpctx, + plugin->jsonrpc_in, + &toks, &buf); + if (err) + plugin_err(plugin, "%s", err); + + if (!toks) + break; + + handle_rpc_reply(plugin, buf, toks); + jsonrpc_io_parse_done(plugin->jsonrpc_in); + } - /* Read more, if there is. */ - return io_read_partial(plugin->io_rpc_conn, - plugin->rpc_buffer + plugin->rpc_used, - tal_bytelen(plugin->rpc_buffer) - plugin->rpc_used, - &plugin->rpc_len_read, - rpc_conn_read_response, plugin); + /* Read more */ + return jsonrpc_io_read(conn, plugin->jsonrpc_in, + rpc_conn_read_response, + plugin); } static struct io_plan *rpc_conn_write_request(struct io_conn *conn, @@ -2442,14 +2401,9 @@ static struct plugin *new_plugin(const tal_t *ctx, jsmn_init(&p->parser); p->toks = toks_alloc(p); /* Async RPC */ - p->rpc_buffer = tal_arr(p, char, 64); + p->jsonrpc_in = jsonrpc_io_new(p); list_head_init(&p->rpc_js_list); p->io_rpc_conn = NULL; - p->rpc_used = 0; - p->rpc_read_offset = 0; - p->rpc_len_read = 0; - jsmn_init(&p->rpc_parser); - p->rpc_toks = toks_alloc(p); p->next_outreq_id = 0; strmap_init(&p->out_reqs); p->beglist = NULL; diff --git a/plugins/test/Makefile b/plugins/test/Makefile index 3b5712d5ac09..db5b40c37cbe 100644 --- a/plugins/test/Makefile +++ b/plugins/test/Makefile @@ -33,6 +33,6 @@ plugins/test/run-route-calc: \ $(PLUGIN_TEST_PROGRAMS): $(BITCOIN_OBJS) $(WIRE_OBJS) $(PLUGIN_TEST_COMMON_OBJS) -$(PLUGIN_TEST_OBJS): $(PLUGIN_FUNDER_HEADER) $(PLUGIN_FUNDER_SRC) +$(PLUGIN_TEST_OBJS): $(PLUGIN_ALL_HEADER) $(PLUGIN_ALL_SRC) check-units: $(PLUGIN_TEST_PROGRAMS:%=unittest/%) From 2f2a0dca4d6d90a8366ae0a2afaf07d17d2c9f6e Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Mon, 13 Oct 2025 18:13:34 +1030 Subject: [PATCH 07/14] libplugin: wean ld_command_handle off referncing plugin->buffer. Hand it in as a parameter to reduce churn in next patch. Signed-off-by: Rusty Russell --- plugins/libplugin.c | 45 +++++++++++++++++++++++---------------------- 1 file changed, 23 insertions(+), 22 deletions(-) diff --git a/plugins/libplugin.c b/plugins/libplugin.c index 75ebdcd4f736..26c37330a91b 100644 --- a/plugins/libplugin.c +++ b/plugins/libplugin.c @@ -2051,6 +2051,7 @@ static struct command_result *param_tok(struct command *cmd, const char *name, } static void ld_command_handle(struct plugin *plugin, + const char *buffer, const jsmntok_t *toks) { const jsmntok_t *methtok, *paramstok, *filtertok; @@ -2059,18 +2060,18 @@ static void ld_command_handle(struct plugin *plugin, const char *id; enum command_type type; - methtok = json_get_member(plugin->buffer, toks, "method"); - paramstok = json_get_member(plugin->buffer, toks, "params"); - filtertok = json_get_member(plugin->buffer, toks, "filter"); + methtok = json_get_member(buffer, toks, "method"); + paramstok = json_get_member(buffer, toks, "params"); + filtertok = json_get_member(buffer, toks, "filter"); if (!methtok || !paramstok) plugin_err(plugin, "Malformed JSON-RPC notification missing " "\"method\" or \"params\": %.*s", json_tok_full_len(toks), - json_tok_full(plugin->buffer, toks)); + json_tok_full(buffer, toks)); - methodname = json_strdup(NULL, plugin->buffer, methtok); - id = json_get_id(tmpctx, plugin->buffer, toks); + methodname = json_strdup(NULL, buffer, methtok); + id = json_get_id(tmpctx, buffer, toks); if (!id) type = COMMAND_TYPE_NOTIFICATION; @@ -2086,7 +2087,7 @@ static void ld_command_handle(struct plugin *plugin, if (!plugin->manifested) { if (streq(cmd->methodname, "getmanifest")) { - handle_getmanifest(cmd, plugin->buffer, paramstok); + handle_getmanifest(cmd, buffer, paramstok); plugin->manifested = true; return; } @@ -2096,7 +2097,7 @@ static void ld_command_handle(struct plugin *plugin, if (!plugin->initialized) { if (streq(cmd->methodname, "init")) { - handle_init(cmd, plugin->buffer, paramstok); + handle_init(cmd, buffer, paramstok); plugin->initialized = true; return; } @@ -2114,7 +2115,7 @@ static void ld_command_handle(struct plugin *plugin, const char *err; plugin->deprecated_ok_override = tal(plugin, bool); - err = json_scan(tmpctx, plugin->buffer, paramstok, + err = json_scan(tmpctx, buffer, paramstok, "{deprecated_oneshot:{deprecated_ok:%}}", JSON_SCAN(json_to_bool, plugin->deprecated_ok_override)); @@ -2128,7 +2129,7 @@ static void ld_command_handle(struct plugin *plugin, || is_asterix_notification(cmd->methodname, plugin->notif_subs[i].name)) { plugin->notif_subs[i].handle(cmd, - plugin->buffer, + buffer, paramstok); return; } @@ -2140,14 +2141,14 @@ static void ld_command_handle(struct plugin *plugin, plugin_err(plugin, "Unregistered notification %.*s", json_tok_full_len(methtok), - json_tok_full(plugin->buffer, methtok)); + json_tok_full(buffer, methtok)); } for (size_t i = 0; i < plugin->num_hook_subs; i++) { if (streq(cmd->methodname, plugin->hook_subs[i].name)) { cmd->type = COMMAND_TYPE_HOOK; plugin->hook_subs[i].handle(cmd, - plugin->buffer, + buffer, paramstok); return; } @@ -2155,7 +2156,7 @@ static void ld_command_handle(struct plugin *plugin, if (filtertok) { /* On error, this fails cmd */ - if (parse_filter(cmd, "filter", plugin->buffer, filtertok) + if (parse_filter(cmd, "filter", buffer, filtertok) != NULL) return; } @@ -2167,17 +2168,17 @@ static void ld_command_handle(struct plugin *plugin, /* We're going to mangle it, so make a copy */ mod_params = json_tok_copy(cmd, paramstok); - if (!param_check(cmd, plugin->buffer, mod_params, + if (!param_check(cmd, buffer, mod_params, p_req("command_to_check", param_tok, &method), p_opt_any(), NULL)) { plugin_err(plugin, "lightningd check without command_to_check: %.*s", json_tok_full_len(toks), - json_tok_full(plugin->buffer, toks)); + json_tok_full(buffer, toks)); } tal_free(cmd->methodname); - cmd->methodname = json_strdup(cmd, plugin->buffer, method); + cmd->methodname = json_strdup(cmd, buffer, method); /* Point method to the name, not the value */ if (mod_params->type == JSMN_OBJECT) @@ -2190,7 +2191,7 @@ static void ld_command_handle(struct plugin *plugin, for (size_t i = 0; i < plugin->num_commands; i++) { if (streq(cmd->methodname, plugin->commands[i].name)) { plugin->commands[i].handle(cmd, - plugin->buffer, + buffer, paramstok); /* Reset this */ plugin->deprecated_ok_override @@ -2207,8 +2208,8 @@ static void ld_command_handle(struct plugin *plugin, struct command_result *ret; bool check_only; - config = json_strdup(tmpctx, plugin->buffer, - json_get_member(plugin->buffer, paramstok, "config")); + config = json_strdup(tmpctx, buffer, + json_get_member(buffer, paramstok, "config")); popt = find_opt(plugin, config); if (!popt) { plugin_err(plugin, @@ -2224,9 +2225,9 @@ static void ld_command_handle(struct plugin *plugin, check_only = command_check_only(cmd); plugin_log(plugin, LOG_DBG, "setconfig %s check_only=%i", config, check_only); - valtok = json_get_member(plugin->buffer, paramstok, "val"); + valtok = json_get_member(buffer, paramstok, "val"); if (valtok) - val = json_strdup(tmpctx, plugin->buffer, valtok); + val = json_strdup(tmpctx, buffer, valtok); else val = "true"; @@ -2278,7 +2279,7 @@ static bool ld_read_json_one(struct plugin *plugin) /* FIXME: Spark doesn't create proper jsonrpc 2.0! So we don't * check for "jsonrpc" here. */ - ld_command_handle(plugin, plugin->toks); + ld_command_handle(plugin, plugin->buffer, plugin->toks); /* Move this object out of the buffer */ memmove(plugin->buffer, plugin->buffer + plugin->toks[0].end, From 570c815a290eb7ac0e70135312ccc08e165c835b Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Mon, 13 Oct 2025 18:13:34 +1030 Subject: [PATCH 08/14] libplugin: use jsonrpc_io for stdin from lightningd. This is also more efficient if there are many commands at once. Signed-off-by: Rusty Russell --- plugins/libplugin.c | 87 ++++++++++++--------------------------------- 1 file changed, 22 insertions(+), 65 deletions(-) diff --git a/plugins/libplugin.c b/plugins/libplugin.c index 26c37330a91b..a887a3fa4f23 100644 --- a/plugins/libplugin.c +++ b/plugins/libplugin.c @@ -89,10 +89,7 @@ struct plugin { const char **beglist; /* To read from lightningd */ - char *buffer; - size_t used, len_read; - jsmn_parser parser; - jsmntok_t *toks; + struct jsonrpc_io *lightningd_in; /* To write to lightningd */ struct list_head js_list; @@ -2248,65 +2245,31 @@ static void ld_command_handle(struct plugin *plugin, plugin_err(plugin, "Unknown command '%s'", cmd->methodname); } -/** - * Try to parse a complete message from lightningd's buffer, and return true - * if we could handle it. - */ -static bool ld_read_json_one(struct plugin *plugin) +static struct io_plan *ld_read_json(struct io_conn *conn, + struct plugin *plugin) { - bool complete; + /* Gather an parse any new bytes */ + for (;;) { + const jsmntok_t *toks; + const char *buf; + const char *err; - if (!json_parse_input(&plugin->parser, &plugin->toks, - plugin->buffer, plugin->used, - &complete)) { - plugin_err(plugin, "Failed to parse JSON response '%.*s'", - (int)plugin->used, plugin->buffer); - return false; - } + err = jsonrpc_io_parse(tmpctx, + plugin->lightningd_in, + &toks, &buf); + if (err) + plugin_err(plugin, "%s", err); - if (!complete) { - /* We need more. */ - return false; - } + if (!toks) + break; - /* Empty buffer? (eg. just whitespace). */ - if (tal_count(plugin->toks) == 1) { - toks_reset(plugin->toks); - jsmn_init(&plugin->parser); - plugin->used = 0; - return false; + ld_command_handle(plugin, buf, toks); + jsonrpc_io_parse_done(plugin->lightningd_in); } - /* FIXME: Spark doesn't create proper jsonrpc 2.0! So we don't - * check for "jsonrpc" here. */ - ld_command_handle(plugin, plugin->buffer, plugin->toks); - - /* Move this object out of the buffer */ - memmove(plugin->buffer, plugin->buffer + plugin->toks[0].end, - tal_count(plugin->buffer) - plugin->toks[0].end); - plugin->used -= plugin->toks[0].end; - toks_reset(plugin->toks); - jsmn_init(&plugin->parser); - - return true; -} - -static struct io_plan *ld_read_json(struct io_conn *conn, - struct plugin *plugin) -{ - plugin->used += plugin->len_read; - if (plugin->used && plugin->used == tal_count(plugin->buffer)) - tal_resize(&plugin->buffer, plugin->used * 2); - - /* Read and process all messages from the connection */ - while (ld_read_json_one(plugin)) - ; - - /* Now read more from the connection */ - return io_read_partial(plugin->stdin_conn, - plugin->buffer + plugin->used, - tal_count(plugin->buffer) - plugin->used, - &plugin->len_read, ld_read_json, plugin); + /* Read more */ + return jsonrpc_io_read(conn, plugin->lightningd_in, + ld_read_json, plugin); } static struct io_plan *ld_write_json(struct io_conn *conn, @@ -2352,9 +2315,7 @@ static struct io_plan *stdin_conn_init(struct io_conn *conn, { plugin->stdin_conn = conn; io_set_finish(conn, ld_conn_finish, plugin); - return io_read_partial(plugin->stdin_conn, plugin->buffer, - tal_bytelen(plugin->buffer), &plugin->len_read, - ld_read_json, plugin); + return ld_read_json(conn, plugin); } /* lightningd reads from our stdout */ @@ -2395,12 +2356,8 @@ static struct plugin *new_plugin(const tal_t *ctx, p->id = name; p->developer = developer; p->deprecated_ok_override = NULL; - p->buffer = tal_arr(p, char, 64); + p->lightningd_in = jsonrpc_io_new(p); list_head_init(&p->js_list); - p->used = 0; - p->len_read = 0; - jsmn_init(&p->parser); - p->toks = toks_alloc(p); /* Async RPC */ p->jsonrpc_in = jsonrpc_io_new(p); list_head_init(&p->rpc_js_list); From 17795086e801004debf59c770ae5daed6168e05a Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Mon, 13 Oct 2025 18:13:35 +1030 Subject: [PATCH 09/14] lightningd: wean parse_request off referencing jcon->buffer. Hand it in as a parameter to reduce churn in the next patch. Signed-off-by: Rusty Russell --- lightningd/jsonrpc.c | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/lightningd/jsonrpc.c b/lightningd/jsonrpc.c index 6bd3cb784715..3292e4a30851 100644 --- a/lightningd/jsonrpc.c +++ b/lightningd/jsonrpc.c @@ -1048,7 +1048,9 @@ REGISTER_PLUGIN_HOOK(rpc_command, /* We return struct command_result so command_fail return value has a natural * sink; we don't actually use the result. */ static struct command_result * -parse_request(struct json_connection *jcon, const jsmntok_t tok[]) +parse_request(struct json_connection *jcon, + const char *buffer, + const jsmntok_t tok[]) { const jsmntok_t *method, *id, *params, *filter, *jsonrpc; struct command *c; @@ -1061,10 +1063,10 @@ parse_request(struct json_connection *jcon, const jsmntok_t tok[]) return NULL; } - method = json_get_member(jcon->buffer, tok, "method"); - params = json_get_member(jcon->buffer, tok, "params"); - filter = json_get_member(jcon->buffer, tok, "filter"); - id = json_get_member(jcon->buffer, tok, "id"); + method = json_get_member(buffer, tok, "method"); + params = json_get_member(buffer, tok, "params"); + filter = json_get_member(buffer, tok, "filter"); + id = json_get_member(buffer, tok, "id"); if (!id) { json_command_malformed(jcon, "null", "No id"); @@ -1077,8 +1079,8 @@ parse_request(struct json_connection *jcon, const jsmntok_t tok[]) return NULL; } - jsonrpc = json_get_member(jcon->buffer, tok, "jsonrpc"); - if (!jsonrpc || jsonrpc->type != JSMN_STRING || !json_tok_streq(jcon->buffer, jsonrpc, "2.0")) { + jsonrpc = json_get_member(buffer, tok, "jsonrpc"); + if (!jsonrpc || jsonrpc->type != JSMN_STRING || !json_tok_streq(buffer, jsonrpc, "2.0")) { json_command_malformed(jcon, "null", "jsonrpc: \"2.0\" must be specified in the request"); return NULL; } @@ -1094,7 +1096,7 @@ parse_request(struct json_connection *jcon, const jsmntok_t tok[]) c->id_is_string = (id->type == JSMN_STRING); /* Include "" around string */ c->id = tal_strndup(c, - json_tok_full(jcon->buffer, id), + json_tok_full(buffer, id), json_tok_full_len(id)); c->mode = CMD_NORMAL; c->filter = NULL; @@ -1113,7 +1115,7 @@ parse_request(struct json_connection *jcon, const jsmntok_t tok[]) if (filter) { struct command_result *ret; - ret = parse_filter(c, "filter", jcon->buffer, filter); + ret = parse_filter(c, "filter", buffer, filter); if (ret) return ret; } @@ -1122,11 +1124,11 @@ parse_request(struct json_connection *jcon, const jsmntok_t tok[]) * actually just logging the id */ log_io(jcon->log, LOG_IO_IN, NULL, c->id, NULL, 0); - c->json_cmd = find_cmd(jcon->ld->jsonrpc, jcon->buffer, method); + c->json_cmd = find_cmd(jcon->ld->jsonrpc, buffer, method); if (!c->json_cmd) { return command_fail( c, JSONRPC2_METHOD_NOT_FOUND, "Unknown command '%.*s'", - method->end - method->start, jcon->buffer + method->start); + method->end - method->start, buffer + method->start); } if (!command_deprecated_in_ok(c, NULL, c->json_cmd->depr_start, @@ -1134,19 +1136,19 @@ parse_request(struct json_connection *jcon, const jsmntok_t tok[]) return command_fail(c, JSONRPC2_METHOD_NOT_FOUND, "Command %.*s is deprecated", json_tok_full_len(method), - json_tok_full(jcon->buffer, method)); + json_tok_full(buffer, method)); } if (c->json_cmd->dev_only && !jcon->ld->developer) { return command_fail(c, JSONRPC2_METHOD_NOT_FOUND, "Command %.*s is developer-only", json_tok_full_len(method), - json_tok_full(jcon->buffer, method)); + json_tok_full(buffer, method)); } rpc_hook = tal(c, struct rpc_command_hook_payload); rpc_hook->cmd = c; /* Duplicate since we might outlive the connection */ - json_dup_contents(rpc_hook, jcon->buffer, tok, + json_dup_contents(rpc_hook, buffer, tok, &rpc_hook->buffer, &rpc_hook->request); @@ -1264,7 +1266,7 @@ static struct io_plan *read_json(struct io_conn *conn, db_begin_transaction(jcon->ld->wallet->db); in_transaction = true; } - parse_request(jcon, jcon->input_toks); + parse_request(jcon, jcon->buffer, jcon->input_toks); /* Remove first {}. */ memmove(jcon->buffer, jcon->buffer + jcon->input_toks[0].end, From 0d997e268aefdd6fb8fabd370200abbfa51a0fb5 Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Mon, 13 Oct 2025 18:13:35 +1030 Subject: [PATCH 10/14] lightningd: use jsonrpc_io for reading JSON commands. This is more efficient if we have lots of incoming commands, too. Signed-off-by: Rusty Russell --- lightningd/Makefile | 1 + lightningd/jsonrpc.c | 105 ++++++++++------------------------ lightningd/test/run-jsonrpc.c | 23 ++++++++ 3 files changed, 54 insertions(+), 75 deletions(-) diff --git a/lightningd/Makefile b/lightningd/Makefile index fbf1d10f47a1..4ca2ddcc4d33 100644 --- a/lightningd/Makefile +++ b/lightningd/Makefile @@ -125,6 +125,7 @@ LIGHTNINGD_COMMON_OBJS := \ common/json_parse.o \ common/json_parse_simple.o \ common/json_stream.o \ + common/jsonrpc_io.o \ common/lease_rates.o \ common/memleak.o \ common/msg_queue.o \ diff --git a/lightningd/jsonrpc.c b/lightningd/jsonrpc.c index 3292e4a30851..88ac751ea7e0 100644 --- a/lightningd/jsonrpc.c +++ b/lightningd/jsonrpc.c @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include @@ -85,18 +86,8 @@ struct json_connection { /* Logging for this json connection. */ struct logger *log; - /* The buffer (required to interpret tokens). */ - char *buffer; - - /* Internal state: */ - /* How much is already filled. */ - size_t used; - /* How much has just been filled. */ - size_t len_read; - - /* JSON parsing state. */ - jsmn_parser input_parser; - jsmntok_t *input_toks; + /* The buffer and state reading in the JSON commands */ + struct jsonrpc_io *json_in; /* Local deprecated support? */ bool deprecated_ok; @@ -1217,93 +1208,61 @@ static struct io_plan *stream_out_complete(struct io_conn *conn, static struct io_plan *read_json(struct io_conn *conn, struct json_connection *jcon) { - bool complete; bool in_transaction = false; struct timemono start_time = time_mono(); + size_t len_read; + const jsmntok_t *toks; + const char *buffer, *error; - if (jcon->len_read) - log_io(jcon->log, LOG_IO_IN, NULL, "", - jcon->buffer + jcon->used, jcon->len_read); - - /* Resize larger if we're full. */ - jcon->used += jcon->len_read; - if (jcon->used == tal_count(jcon->buffer)) - tal_resize(&jcon->buffer, jcon->used * 2); + buffer = jsonrpc_newly_read(jcon->json_in, &len_read); + if (len_read) + log_io(jcon->log, LOG_IO_IN, NULL, "", buffer, len_read); /* We wait for pending output to be consumed, to avoid DoS */ if (tal_count(jcon->js_arr) != 0) { - jcon->len_read = 0; return io_wait(conn, conn, read_json, jcon); } again: - if (!json_parse_input(&jcon->input_parser, &jcon->input_toks, - jcon->buffer, jcon->used, - &complete)) { - json_command_malformed( - jcon, "null", - tal_fmt(tmpctx, "Invalid token in json input: '%s'", - tal_hexstr(tmpctx, jcon->buffer, jcon->used))); + error = jsonrpc_io_parse(tmpctx, jcon->json_in, &toks, &buffer); + if (error) { + json_command_malformed(jcon, "null", error); if (in_transaction) db_commit_transaction(jcon->ld->wallet->db); return io_halfclose(conn); } - if (!complete) - goto read_more; - - /* Empty buffer? (eg. just whitespace). */ - if (tal_count(jcon->input_toks) == 1) { - jcon->used = 0; - - /* Reset parser. */ - jsmn_init(&jcon->input_parser); - toks_reset(jcon->input_toks); + if (!toks) goto read_more; - } if (!in_transaction) { db_begin_transaction(jcon->ld->wallet->db); in_transaction = true; } - parse_request(jcon, jcon->buffer, jcon->input_toks); - - /* Remove first {}. */ - memmove(jcon->buffer, jcon->buffer + jcon->input_toks[0].end, - tal_count(jcon->buffer) - jcon->input_toks[0].end); - jcon->used -= jcon->input_toks[0].end; - - /* Reset parser. */ - jsmn_init(&jcon->input_parser); - toks_reset(jcon->input_toks); + parse_request(jcon, buffer, toks); + jsonrpc_io_parse_done(jcon->json_in); - /* Do we have more already read? */ - if (jcon->used) { - if (!jcon->db_batching) { + if (!jcon->db_batching) { + db_commit_transaction(jcon->ld->wallet->db); + in_transaction = false; + } else { + /* FIXME: io_always() should interleave with + * real IO, and then we should rotate order we + * service fds in, to avoid starvation. */ + if (time_greater(timemono_between(time_mono(), + start_time), + time_from_msec(250))) { db_commit_transaction(jcon->ld->wallet->db); - in_transaction = false; - } else { - /* FIXME: io_always() should interleave with - * real IO, and then we should rotate order we - * service fds in, to avoid starvation. */ - if (time_greater(timemono_between(time_mono(), - start_time), - time_from_msec(250))) { - db_commit_transaction(jcon->ld->wallet->db); - /* Call us back, as if we read nothing new */ - jcon->len_read = 0; - return io_always(conn, read_json, jcon); - } + /* Call us back, as if we read nothing new */ + return io_always(conn, read_json, jcon); } - goto again; } + goto again; read_more: if (in_transaction) db_commit_transaction(jcon->ld->wallet->db); - return io_read_partial(conn, jcon->buffer + jcon->used, - tal_count(jcon->buffer) - jcon->used, - &jcon->len_read, read_json, jcon); + return jsonrpc_io_read(conn, jcon->json_in, read_json, jcon); } static struct io_plan *jcon_connected(struct io_conn *conn, @@ -1315,12 +1274,8 @@ static struct io_plan *jcon_connected(struct io_conn *conn, jcon = notleak(tal(conn, struct json_connection)); jcon->conn = conn; jcon->ld = ld; - jcon->used = 0; - jcon->buffer = tal_arr(jcon, char, 64); jcon->js_arr = tal_arr(jcon, struct json_stream *, 0); - jcon->len_read = 0; - jsmn_init(&jcon->input_parser); - jcon->input_toks = toks_alloc(jcon); + jcon->json_in = jsonrpc_io_new(jcon); jcon->notifications_enabled = false; jcon->db_batching = false; jcon->deprecated_ok = ld->deprecated_ok; diff --git a/lightningd/test/run-jsonrpc.c b/lightningd/test/run-jsonrpc.c index 05d9a91751b2..35c4efea7b34 100644 --- a/lightningd/test/run-jsonrpc.c +++ b/lightningd/test/run-jsonrpc.c @@ -66,6 +66,29 @@ bool json_to_jsonrpc_errcode(const char *buffer UNNEEDED, const jsmntok_t *tok U bool json_to_number(const char *buffer UNNEEDED, const jsmntok_t *tok UNNEEDED, unsigned int *num UNNEEDED) { fprintf(stderr, "json_to_number called!\n"); abort(); } +/* Generated stub for jsonrpc_io_new */ +struct jsonrpc_io *jsonrpc_io_new(const tal_t *ctx UNNEEDED) +{ fprintf(stderr, "jsonrpc_io_new called!\n"); abort(); } +/* Generated stub for jsonrpc_io_parse */ +const char *jsonrpc_io_parse(const tal_t *ctx UNNEEDED, + struct jsonrpc_io *json_in UNNEEDED, + const jsmntok_t **toks UNNEEDED, + const char **buf UNNEEDED) +{ fprintf(stderr, "jsonrpc_io_parse called!\n"); abort(); } +/* Generated stub for jsonrpc_io_parse_done */ +void jsonrpc_io_parse_done(struct jsonrpc_io *json_in UNNEEDED) +{ fprintf(stderr, "jsonrpc_io_parse_done called!\n"); abort(); } +/* Generated stub for jsonrpc_io_read_ */ +struct io_plan *jsonrpc_io_read_(struct io_conn *conn UNNEEDED, + struct jsonrpc_io *json_in UNNEEDED, + struct io_plan *(*next)(struct io_conn * UNNEEDED, + void *) UNNEEDED, + void *arg UNNEEDED) +{ fprintf(stderr, "jsonrpc_io_read_ called!\n"); abort(); } +/* Generated stub for jsonrpc_newly_read */ +const char *jsonrpc_newly_read(struct jsonrpc_io *json_in UNNEEDED, + size_t *len UNNEEDED) +{ fprintf(stderr, "jsonrpc_newly_read called!\n"); abort(); } /* Generated stub for lightningd_deprecated_in_ok */ bool lightningd_deprecated_in_ok(struct lightningd *ld UNNEEDED, struct logger *log UNNEEDED, From a4266cabb900f233df5d9b3643489ff61e7394b3 Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Mon, 13 Oct 2025 18:13:35 +1030 Subject: [PATCH 11/14] common: add brace hack for jsonrpc_async_parse. This is a trick from bcli: we ask bitcoind for the block, and it hands us a 2MB hex blob (which we read in multiple parts). Our parser wades through it all, but a quick search for '}' makes it much faster. Signed-off-by: Rusty Russell --- common/jsonrpc_io.c | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/common/jsonrpc_io.c b/common/jsonrpc_io.c index ef0eb0c434b2..02cb208d71b0 100644 --- a/common/jsonrpc_io.c +++ b/common/jsonrpc_io.c @@ -69,6 +69,15 @@ const char *jsonrpc_io_parse(const tal_t *ctx, *toks = NULL; *buf = NULL; + /* Our JSON parser is pretty good at incremental parsing, but + * `getrawblock` gives a giant 2MB token, which forces it to re-parse + * every time until we have all of it. However, we can't complete a + * JSON object without a '}', so we do a cheaper check here. + */ + if (!memchr(membuf_elems(&json_in->membuf), '}', + membuf_num_elems(&json_in->membuf))) + return NULL; + if (!json_parse_input(&json_in->parser, &json_in->toks, membuf_elems(&json_in->membuf), membuf_num_elems(&json_in->membuf), From 4c868bfab782eb2f012ac49378667e211d7b3cef Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Mon, 13 Oct 2025 18:13:35 +1030 Subject: [PATCH 12/14] lightningd: wean pligun_log_handle/plugin_notify_handle/plugin_response_handle off plugin->buffer. Hand buffer in as a parameter to reduce churn in the next patch. Signed-off-by: Rusty Russell --- lightningd/plugin.c | 58 +++++++++++++++++++++++++-------------------- 1 file changed, 32 insertions(+), 26 deletions(-) diff --git a/lightningd/plugin.c b/lightningd/plugin.c index 1dba9a94dd87..ea0c78e5d9c2 100644 --- a/lightningd/plugin.c +++ b/lightningd/plugin.c @@ -485,16 +485,18 @@ static void plugin_send(struct plugin *plugin, struct json_stream *stream) /* Returns the error string, or NULL */ static const char *plugin_log_handle(struct plugin *plugin, + const char *buffer, const jsmntok_t *paramstok) WARN_UNUSED_RESULT; static const char *plugin_log_handle(struct plugin *plugin, + const char *buffer, const jsmntok_t *paramstok) { const jsmntok_t *msgtok, *leveltok; enum log_level level; bool call_notifier; - msgtok = json_get_member(plugin->buffer, paramstok, "message"); - leveltok = json_get_member(plugin->buffer, paramstok, "level"); + msgtok = json_get_member(buffer, paramstok, "message"); + leveltok = json_get_member(buffer, paramstok, "level"); if (!msgtok || msgtok->type != JSMN_STRING) { return tal_fmt(plugin, "Log notification from plugin doesn't have " @@ -503,7 +505,7 @@ static const char *plugin_log_handle(struct plugin *plugin, if (!leveltok) level = LOG_INFORM; - else if (!log_level_parse(plugin->buffer + leveltok->start, + else if (!log_level_parse(buffer + leveltok->start, leveltok->end - leveltok->start, &level) /* FIXME: Allow io logging? */ @@ -513,15 +515,15 @@ static const char *plugin_log_handle(struct plugin *plugin, "Unknown log-level %.*s, valid values are " "\"trace\", \"debug\", \"info\", \"warn\", or \"error\".", json_tok_full_len(leveltok), - json_tok_full(plugin->buffer, leveltok)); + json_tok_full(buffer, leveltok)); } call_notifier = (level == LOG_BROKEN || level == LOG_UNUSUAL)? true : false; /* Only bother unescaping and splitting if it has \ */ - if (memchr(plugin->buffer + msgtok->start, '\\', msgtok->end - msgtok->start)) { + if (memchr(buffer + msgtok->start, '\\', msgtok->end - msgtok->start)) { const char *log_msg = json_escape_unescape_len(tmpctx, - plugin->buffer + msgtok->start, + buffer + msgtok->start, msgtok->end - msgtok->start); char **lines; @@ -539,13 +541,14 @@ static const char *plugin_log_handle(struct plugin *plugin, print_raw: log_(plugin->log, level, NULL, call_notifier, "%.*s", msgtok->end - msgtok->start, - plugin->buffer + msgtok->start); + buffer + msgtok->start); } return NULL; } static const char *plugin_notify_handle(struct plugin *plugin, + const char *buffer, const jsmntok_t *methodtok, const jsmntok_t *paramstok) { @@ -553,7 +556,7 @@ static const char *plugin_notify_handle(struct plugin *plugin, struct jsonrpc_request *request; /* id inside params tells us which id to redirect to. */ - idtok = json_get_member(plugin->buffer, paramstok, "id"); + idtok = json_get_member(buffer, paramstok, "id"); if (!idtok) { return tal_fmt(plugin, "JSON-RPC notify \"id\"-field is not present"); @@ -561,7 +564,7 @@ static const char *plugin_notify_handle(struct plugin *plugin, /* Include any "" in id */ request = strmap_getn(&plugin->pending_requests, - json_tok_full(plugin->buffer, idtok), + json_tok_full(buffer, idtok), json_tok_full_len(idtok)); if (!request) { return NULL; @@ -569,7 +572,7 @@ static const char *plugin_notify_handle(struct plugin *plugin, /* Ignore if they don't have a callback */ if (request->notify_cb) - request->notify_cb(plugin->buffer, methodtok, paramstok, idtok, + request->notify_cb(buffer, methodtok, paramstok, idtok, request->response_cb_arg); return NULL; } @@ -588,38 +591,40 @@ static bool plugin_notification_allowed(const struct plugin *plugin, const char /* Returns the error string, or NULL */ static const char *plugin_notification_handle(struct plugin *plugin, + const char *buffer, const jsmntok_t *toks) WARN_UNUSED_RESULT; static const char *plugin_notification_handle(struct plugin *plugin, + const char *buffer, const jsmntok_t *toks) { const jsmntok_t *methtok, *paramstok; const char *methname; struct jsonrpc_notification *n; - methtok = json_get_member(plugin->buffer, toks, "method"); - paramstok = json_get_member(plugin->buffer, toks, "params"); + methtok = json_get_member(buffer, toks, "method"); + paramstok = json_get_member(buffer, toks, "params"); if (!methtok || !paramstok) { return tal_fmt(plugin, "Malformed JSON-RPC notification missing " "\"method\" or \"params\": %.*s", toks->end - toks->start, - plugin->buffer + toks->start); + buffer + toks->start); } /* Dispatch incoming notifications. This is currently limited * to just a few method types, should this ever become * unwieldy we can switch to the AUTODATA construction to * register notification handlers in a variety of places. */ - if (json_tok_streq(plugin->buffer, methtok, "log")) { - return plugin_log_handle(plugin, paramstok); - } else if (json_tok_streq(plugin->buffer, methtok, "message") - || json_tok_streq(plugin->buffer, methtok, "progress")) { - return plugin_notify_handle(plugin, methtok, paramstok); + if (json_tok_streq(buffer, methtok, "log")) { + return plugin_log_handle(plugin, buffer, paramstok); + } else if (json_tok_streq(buffer, methtok, "message") + || json_tok_streq(buffer, methtok, "progress")) { + return plugin_notify_handle(plugin, buffer, methtok, paramstok); } - methname = json_strdup(tmpctx, plugin->buffer, methtok); + methname = json_strdup(tmpctx, buffer, methtok); if (!plugin_notification_allowed(plugin, methname)) { log_unusual(plugin->log, @@ -630,7 +635,7 @@ static const char *plugin_notification_handle(struct plugin *plugin, } else if (notifications_have_topic(plugin->plugins, methname)) { n = jsonrpc_notification_start_noparams(NULL, methname); json_add_string(n->stream, "origin", plugin->shortname); - json_add_tok(n->stream, "params", paramstok, plugin->buffer); + json_add_tok(n->stream, "params", paramstok, buffer); jsonrpc_notification_end_noparams(n); plugins_notify(plugin->plugins, take(n)); @@ -675,6 +680,7 @@ static void destroy_request(struct jsonrpc_request *req, } static void plugin_response_handle(struct plugin *plugin, + const char *buffer, const jsmntok_t *toks, const jsmntok_t *idtok) { @@ -682,7 +688,7 @@ static void plugin_response_handle(struct plugin *plugin, const tal_t *ctx; request = strmap_getn(&plugin->pending_requests, - json_tok_full(plugin->buffer, idtok), + json_tok_full(buffer, idtok), json_tok_full_len(idtok)); /* Can happen if request was freed before plugin responded */ if (!request) { @@ -696,7 +702,7 @@ static void plugin_response_handle(struct plugin *plugin, /* Don't keep track of this request; we will terminate it */ tal_del_destructor2(request, destroy_request, plugin); destroy_request(request, plugin); - request->response_cb(plugin->buffer, toks, idtok, request->response_cb_arg); + request->response_cb(buffer, toks, idtok, request->response_cb_arg); tal_free(ctx); } @@ -778,7 +784,7 @@ static const char *plugin_read_json_one(struct plugin *plugin, * * https://www.jsonrpc.org/specification#notification */ - err = plugin_notification_handle(plugin, plugin->toks); + err = plugin_notification_handle(plugin, plugin->buffer, plugin->toks); } else { /* When a rpc call is made, the Server MUST reply with @@ -808,7 +814,7 @@ static const char *plugin_read_json_one(struct plugin *plugin, * * https://www.jsonrpc.org/specification#response_object */ - plugin_response_handle(plugin, plugin->toks, idtok); + plugin_response_handle(plugin, plugin->buffer, plugin->toks, idtok); err = NULL; } if (want_transaction) @@ -1527,7 +1533,7 @@ static const char *plugin_subscriptions_add(struct plugin *plugin, * manifest, without checking that they exist, since * later plugins may also emit notifications of custom * types that we don't know about yet. */ - sub.topic = json_strdup(plugin, plugin->buffer, s); + sub.topic = json_strdup(plugin, buffer, s); sub.owner = plugin; tal_arr_expand(&plugin->subscriptions, sub); } @@ -1568,7 +1574,7 @@ static const char *plugin_hooks_add(struct plugin *plugin, const char *buffer, aftertok = json_get_member(buffer, t, "after"); } else { /* FIXME: deprecate in 3 releases after v0.9.2! */ - name = json_strdup(tmpctx, plugin->buffer, t); + name = json_strdup(tmpctx, buffer, t); beforetok = aftertok = NULL; } From cda08cd844d4ffe3c9e0ef16a4ac30298604bbb9 Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Mon, 13 Oct 2025 18:13:35 +1030 Subject: [PATCH 13/14] lightningd: use jsonrpc_io for plugin JSON commands. Signed-off-by: Rusty Russell --- lightningd/plugin.c | 336 +++++++++++++++++--------------------------- lightningd/plugin.h | 5 +- 2 files changed, 131 insertions(+), 210 deletions(-) diff --git a/lightningd/plugin.c b/lightningd/plugin.c index ea0c78e5d9c2..7c0a660e5f9e 100644 --- a/lightningd/plugin.c +++ b/lightningd/plugin.c @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -226,36 +227,36 @@ static bool request_add(const char *reqid, struct jsonrpc_request *req, } /* FIXME: reorder */ -static const char *plugin_read_json_one(struct plugin *plugin, - bool want_transaction, - bool *complete, - bool *destroyed); +static void plugin_response_handle(struct plugin *plugin, + const char *buffer, + const jsmntok_t *toks, + const jsmntok_t *idtok); /* We act as if the plugin itself said "I'm dead!" */ static void plugin_terminated_fail_req(struct plugin *plugin, struct jsonrpc_request *req) { - bool complete, destroyed; - const char *err; - - jsmn_init(&plugin->parser); - toks_reset(plugin->toks); - tal_free(plugin->buffer); - plugin->buffer = tal_fmt(plugin, - "{\"jsonrpc\": \"2.0\"," - "\"id\": %s," - "\"error\":" - " {\"code\":%i, \"message\":\"%s\"}" - "}\n\n", - req->id, - PLUGIN_TERMINATED, - "Plugin terminated before replying to RPC call."); - plugin->used = strlen(plugin->buffer); - - /* We're already in a transaction, don't do it again! */ - err = plugin_read_json_one(plugin, false, &complete, &destroyed); - assert(!err); + jsmntok_t *toks = toks_alloc(plugin); + const jsmntok_t *idtok; + const char *buf; + jsmn_parser parser; + bool complete; + + buf = tal_fmt(plugin, + "{\"jsonrpc\": \"2.0\"," + "\"id\": %s," + "\"error\":" + " {\"code\":%i, \"message\":\"%s\"}" + "}\n\n", + req->id, + PLUGIN_TERMINATED, + "Plugin terminated before replying to RPC call."); + jsmn_init(&parser); + if (!json_parse_input(&parser, &toks, buf, strlen(buf), &complete)) + abort(); assert(complete); + idtok = json_get_member(buf, toks, "id"); + plugin_response_handle(plugin, buf, toks, idtok); } static void destroy_plugin(struct plugin *p) @@ -376,7 +377,6 @@ struct plugin *plugin_register(struct plugins *plugins, const char* path TAKES, p->plugin_state = UNCONFIGURED; p->js_arr = tal_arr(p, struct json_stream *, 0); - p->used = 0; p->notification_topics = tal_arr(p, const char *, 0); p->subscriptions = NULL; p->dynamic = false; @@ -706,186 +706,112 @@ static void plugin_response_handle(struct plugin *plugin, tal_free(ctx); } -/** - * Try to parse a complete message from the plugin's buffer. - * - * Returns NULL if there was no error. - * If it can parse a JSON message, sets *@complete, and returns any error - * from the callback. - * - * If @destroyed was set, it means the plugin called plugin stop on itself. - */ -static const char *plugin_read_json_one(struct plugin *plugin, - bool want_transaction, - bool *complete, - bool *destroyed) -{ - const jsmntok_t *jrtok, *idtok; - struct plugin_destroyed *pd; - const char *err; - struct wallet *wallet = plugin->plugins->ld->wallet; - - *destroyed = false; - /* Note that in the case of 'plugin stop' this can free request (since - * plugin is parent), so detect that case */ - - if (!json_parse_input(&plugin->parser, &plugin->toks, - plugin->buffer, plugin->used, - complete)) { - return tal_fmt(plugin, - "Failed to parse JSON response '%.*s'", - (int)plugin->used, plugin->buffer); - } - - if (!*complete) { - /* We need more. */ - return NULL; - } - - /* Empty buffer? (eg. just whitespace). */ - if (tal_count(plugin->toks) == 1) { - plugin->used = 0; - jsmn_init(&plugin->parser); - toks_reset(plugin->toks); - /* We need more. */ - *complete = false; - return NULL; - } - - if (plugin->toks->type != JSMN_OBJECT) - return tal_fmt( - plugin, - "JSON-RPC message is not a valid JSON object type"); - - jrtok = json_get_member(plugin->buffer, plugin->toks, "jsonrpc"); - idtok = json_get_member(plugin->buffer, plugin->toks, "id"); - - if (!jrtok) { - return tal_fmt( - plugin, - "JSON-RPC message does not contain \"jsonrpc\" field"); - } - - /* We can be called extremely early, or as db hook, or for - * fake "terminated" request. */ - if (want_transaction) - db_begin_transaction(wallet->db); - - pd = plugin_detect_destruction(plugin); - if (!idtok) { - /* A Notification is a Request object without an "id" - * member. A Request object that is a Notification - * signifies the Client's lack of interest in the - * corresponding Response object, and as such no - * Response object needs to be returned to the - * client. The Server MUST NOT reply to a - * Notification, including those that are within a - * batch request. - * - * https://www.jsonrpc.org/specification#notification - */ - err = plugin_notification_handle(plugin, plugin->buffer, plugin->toks); - - } else { - /* When a rpc call is made, the Server MUST reply with - * a Response, except for in the case of - * Notifications. The Response is expressed as a - * single JSON Object, with the following members: - * - * - jsonrpc: A String specifying the version of the - * JSON-RPC protocol. MUST be exactly "2.0". - * - * - result: This member is REQUIRED on success. This - * member MUST NOT exist if there was an error - * invoking the method. The value of this member is - * determined by the method invoked on the Server. - * - * - error: This member is REQUIRED on error. This - * member MUST NOT exist if there was no error - * triggered during invocation. - * - * - id: This member is REQUIRED. It MUST be the same - * as the value of the id member in the Request - * Object. If there was an error in detecting the id - * in the Request object (e.g. Parse error/Invalid - * Request), it MUST be Null. Either the result - * member or error member MUST be included, but both - * members MUST NOT be included. - * - * https://www.jsonrpc.org/specification#response_object - */ - plugin_response_handle(plugin, plugin->buffer, plugin->toks, idtok); - err = NULL; - } - if (want_transaction) - db_commit_transaction(wallet->db); - - /* Corner case: rpc_command hook can destroy plugin for 'plugin - * stop'! */ - if (was_plugin_destroyed(pd)) { - *destroyed = true; - } else { - /* Move this object out of the buffer */ - memmove(plugin->buffer, plugin->buffer + plugin->toks[0].end, - tal_count(plugin->buffer) - plugin->toks[0].end); - plugin->used -= plugin->toks[0].end; - jsmn_init(&plugin->parser); - toks_reset(plugin->toks); - } - return err; -} - +/* Try to parse complete messages from the plugin's buffer. */ static struct io_plan *plugin_read_json(struct io_conn *conn, struct plugin *plugin) { - bool success; - bool have_full; + struct wallet *wallet = plugin->plugins->ld->wallet; + const char *new_bytes, *buffer; + const jsmntok_t *toks; + size_t new_bytes_len; /* wallet is NULL in really early code */ bool want_transaction = (plugin->plugins->want_db_transaction - && plugin->plugins->ld->wallet != NULL); - - log_io(plugin->log, LOG_IO_IN, NULL, "", - plugin->buffer + plugin->used, plugin->len_read); - - /* Our JSON parser is pretty good at incremental parsing, but - * `getrawblock` gives a giant 2MB token, which forces it to re-parse - * every time until we have all of it. However, we can't complete a - * JSON object without a '}', so we do a cheaper check here. - */ - have_full = memchr(plugin->buffer + plugin->used, '}', - plugin->len_read); - - plugin->used += plugin->len_read; - if (plugin->used == tal_count(plugin->buffer)) - tal_resize(&plugin->buffer, plugin->used * 2); - - /* Read and process all messages from the connection */ - if (have_full) { - do { - bool destroyed; - const char *err; - - err = plugin_read_json_one(plugin, want_transaction, - &success, &destroyed); - - /* If it's destroyed, conn is already freed! */ - if (destroyed) - return io_close(NULL); - - if (err) { - plugin_kill(plugin, LOG_UNUSUAL, - "%s", err); - /* plugin_kill frees plugin */ - return io_close(NULL); - } - } while (success); + && wallet != NULL); + + new_bytes = jsonrpc_newly_read(plugin->json_in, &new_bytes_len); + if (new_bytes_len) { + log_io(plugin->log, LOG_IO_IN, NULL, "", + new_bytes, new_bytes_len); + } + + /* Parse until we get incomplete JSON */ + for (;;) { + const char *error; + const jsmntok_t *idtok; + struct plugin_destroyed *pd; + + error = jsonrpc_io_parse(tmpctx, plugin->json_in, + &toks, &buffer); + if (error) { + plugin_kill(plugin, LOG_UNUSUAL, "%s", error); + /* plugin_kill frees plugin */ + return io_close(NULL); + } + /* Incomplete? */ + if (!toks) + break; + + idtok = json_get_member(buffer, toks, "id"); + + /* We can be called extremely early, or as db hook, or for + * fake "terminated" request. */ + if (want_transaction) + db_begin_transaction(wallet->db); + + pd = plugin_detect_destruction(plugin); + if (!idtok) { + /* A Notification is a Request object without an "id" + * member. A Request object that is a Notification + * signifies the Client's lack of interest in the + * corresponding Response object, and as such no + * Response object needs to be returned to the + * client. The Server MUST NOT reply to a + * Notification, including those that are within a + * batch request. + * + * https://www.jsonrpc.org/specification#notification + */ + error = plugin_notification_handle(plugin, buffer, toks); + } else { + /* When a rpc call is made, the Server MUST reply with + * a Response, except for in the case of + * Notifications. The Response is expressed as a + * single JSON Object, with the following members: + * + * - jsonrpc: A String specifying the version of the + * JSON-RPC protocol. MUST be exactly "2.0". + * + * - result: This member is REQUIRED on success. This + * member MUST NOT exist if there was an error + * invoking the method. The value of this member is + * determined by the method invoked on the Server. + * + * - error: This member is REQUIRED on error. This + * member MUST NOT exist if there was no error + * triggered during invocation. + * + * - id: This member is REQUIRED. It MUST be the same + * as the value of the id member in the Request + * Object. If there was an error in detecting the id + * in the Request object (e.g. Parse error/Invalid + * Request), it MUST be Null. Either the result + * member or error member MUST be included, but both + * members MUST NOT be included. + * + * https://www.jsonrpc.org/specification#response_object + */ + plugin_response_handle(plugin, buffer, toks, idtok); + error = NULL; + } + if (want_transaction) + db_commit_transaction(wallet->db); + + /* If it's destroyed, conn is already freed! */ + if (was_plugin_destroyed(pd)) + return io_close(NULL); + + if (error) { + plugin_kill(plugin, LOG_UNUSUAL, "%s", error); + /* plugin_kill frees plugin */ + return io_close(NULL); + } + + jsonrpc_io_parse_done(plugin->json_in); } /* Now read more from the connection */ - return io_read_partial(plugin->stdout_conn, - plugin->buffer + plugin->used, - tal_count(plugin->buffer) - plugin->used, - &plugin->len_read, plugin_read_json, plugin); + return jsonrpc_io_read(plugin->stdout_conn, plugin->json_in, + plugin_read_json, plugin); } /* Mutual recursion */ @@ -940,6 +866,7 @@ static void plugin_conn_finish(struct io_conn *conn, struct plugin *plugin) struct io_plan *plugin_stdin_conn_init(struct io_conn *conn, struct plugin *plugin) { + plugin->stdin_conn = conn; /* We write to their stdin */ /* We don't have anything queued yet, wait for notification */ return io_wait(conn, plugin, plugin_write_json, plugin); @@ -949,10 +876,9 @@ struct io_plan *plugin_stdout_conn_init(struct io_conn *conn, struct plugin *plugin) { /* We read from their stdout */ + plugin->stdout_conn = conn; io_set_finish(conn, plugin_conn_finish, plugin); - return io_read_partial(conn, plugin->buffer, - tal_bytelen(plugin->buffer), &plugin->len_read, - plugin_read_json, plugin); + return plugin_read_json(conn, plugin); } static char *plugin_opt_check(struct plugin_opt *popt) @@ -2043,8 +1969,8 @@ static void plugin_set_timeout(struct plugin *p) time_from_sec(PLUGIN_STARTUP_TIMEOUT), plugin_manifest_timeout, p); } -} +} const char *plugin_send_getmanifest(struct plugin *p, const char *cmd_id) { char **cmd; @@ -2063,14 +1989,12 @@ const char *plugin_send_getmanifest(struct plugin *p, const char *cmd_id) return tal_fmt(p, "opening pipe: %s", strerror(errno)); log_debug(p->plugins->log, "started(%u) %s", p->pid, p->cmd); - p->buffer = tal_arr(p, char, 64); - jsmn_init(&p->parser); - p->toks = toks_alloc(p); + p->json_in = jsonrpc_io_new(p); /* Create two connections, one read-only on top of p->stdout, and one * write-only on p->stdin */ - p->stdout_conn = io_new_conn(p, stdoutfd, plugin_stdout_conn_init, p); - p->stdin_conn = io_new_conn(p, stdinfd, plugin_stdin_conn_init, p); + io_new_conn(p, stdoutfd, plugin_stdout_conn_init, p); + io_new_conn(p, stdinfd, plugin_stdin_conn_init, p); req = jsonrpc_request_start(p, "getmanifest", cmd_id, p->log, NULL, plugin_manifest_cb, p); json_add_bool(req->stream, "allow-deprecated-apis", diff --git a/lightningd/plugin.h b/lightningd/plugin.h index 621f3b8d747d..3d9d4a3a88bc 100644 --- a/lightningd/plugin.h +++ b/lightningd/plugin.h @@ -76,10 +76,7 @@ struct plugin { bool dynamic; /* Stuff we read */ - char *buffer; - size_t used, len_read; - jsmn_parser parser; - jsmntok_t *toks; + struct jsonrpc_io *json_in; /* Our json_streams. Since multiple streams could start * returning data at once, we always service these in order, From 71e9f482d36a8636e552575b1679f33342d8f7a7 Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Mon, 13 Oct 2025 18:13:58 +1030 Subject: [PATCH 14/14] libplugin: use jsonrpc_io logic for sync requests too. It's a little overkill, but it's clear. Signed-off-by: Rusty Russell --- common/jsonrpc_io.c | 26 ++++++ common/jsonrpc_io.h | 9 ++ plugins/bkpr/test/run-sql.c | 3 + plugins/libplugin.c | 179 +++++++++++++++--------------------- plugins/libplugin.h | 1 - 5 files changed, 112 insertions(+), 106 deletions(-) diff --git a/common/jsonrpc_io.c b/common/jsonrpc_io.c index 02cb208d71b0..2bb5ac55e7b8 100644 --- a/common/jsonrpc_io.c +++ b/common/jsonrpc_io.c @@ -5,6 +5,8 @@ #include #include #include +#include +#include #define READ_CHUNKSIZE 64 @@ -135,3 +137,27 @@ struct io_plan *jsonrpc_io_read_(struct io_conn *conn, &json_in->bytes_read, next, arg); } + +bool jsonrpc_sync_read(struct jsonrpc_io *json_in, int infd) +{ + int r; + + /* Make sure there's more room */ + membuf_prepare_space(&json_in->membuf, READ_CHUNKSIZE); + + /* Try to read more. */ + r = read(infd, + membuf_elems(&json_in->membuf) + + json_in->bytes_unparsed, + membuf_num_elems(&json_in->membuf) + - json_in->bytes_unparsed + + membuf_num_space(&json_in->membuf)); + if (r < 0) + return false; + if (r == 0) { + errno = 0; + return false; + } + json_in->bytes_read = r; + return true; +} diff --git a/common/jsonrpc_io.h b/common/jsonrpc_io.h index 9032aecc48d2..fd0babb9d375 100644 --- a/common/jsonrpc_io.h +++ b/common/jsonrpc_io.h @@ -43,6 +43,15 @@ struct io_plan *jsonrpc_io_read_(struct io_conn *conn, const char *jsonrpc_newly_read(struct jsonrpc_io *json_in, size_t *len); +/** + * jsonrpc_sync_read: read from fd into buffer. + * @json_in: buffer to read into. + * @infd: file descriptort to read. + * + * Returns false on error or EOF; for EOF errno will be 0. + */ +bool jsonrpc_sync_read(struct jsonrpc_io *json_in, int infd); + /** * jsonrpc_io_parse: try to parse more of the buffer. * @ctx: context to allocate error message off. diff --git a/plugins/bkpr/test/run-sql.c b/plugins/bkpr/test/run-sql.c index 9be785d203de..2fc758adad17 100644 --- a/plugins/bkpr/test/run-sql.c +++ b/plugins/bkpr/test/run-sql.c @@ -88,6 +88,9 @@ struct io_plan *jsonrpc_io_read_(struct io_conn *conn UNNEEDED, void *) UNNEEDED, void *arg UNNEEDED) { fprintf(stderr, "jsonrpc_io_read_ called!\n"); abort(); } +/* Generated stub for jsonrpc_sync_read */ +bool jsonrpc_sync_read(struct jsonrpc_io *json_in UNNEEDED, int infd UNNEEDED) +{ fprintf(stderr, "jsonrpc_sync_read called!\n"); abort(); } /* Generated stub for last_fee_state */ enum htlc_state last_fee_state(enum side opener UNNEEDED) { fprintf(stderr, "last_fee_state called!\n"); abort(); } diff --git a/plugins/libplugin.c b/plugins/libplugin.c index a887a3fa4f23..1fd57a8a1119 100644 --- a/plugins/libplugin.c +++ b/plugins/libplugin.c @@ -32,11 +32,6 @@ struct plugin_timer { void *cb_arg; }; -struct rpc_conn { - int fd; - MEMBUF(char) mb; -}; - /* We can have more than one of these pending at once. */ struct jstream { struct list_node list; @@ -94,7 +89,7 @@ struct plugin { /* To write to lightningd */ struct list_head js_list; - /* Asynchronous RPC interaction */ + /* Asynchronous RPC interaction. */ struct io_conn *io_rpc_conn; struct list_head rpc_js_list; struct jsonrpc_io *jsonrpc_in; @@ -103,8 +98,9 @@ struct plugin { STRMAP(struct out_req *) out_reqs; u64 next_outreq_id; - /* Synchronous RPC interaction */ - struct rpc_conn *rpc_conn; + /* Synchronous RPC interaction: sync_io is NULL if they didn't want it. */ + int sync_fd; + struct jsonrpc_io *sync_io; /* Plugin information details */ enum plugin_restartability restartability; @@ -557,32 +553,6 @@ struct json_out *json_out_obj(const tal_t *ctx, return jout; } -static int read_json_from_rpc(struct plugin *p) -{ - char *end; - - /* We rely on the double-\n marker which only terminates JSON top - * levels. Thanks lightningd! */ - while ((end = memmem(membuf_elems(&p->rpc_conn->mb), - membuf_num_elems(&p->rpc_conn->mb), "\n\n", 2)) - == NULL) { - ssize_t r; - - /* Make sure we've room for at least READ_CHUNKSIZE. */ - membuf_prepare_space(&p->rpc_conn->mb, READ_CHUNKSIZE); - r = read(p->rpc_conn->fd, membuf_space(&p->rpc_conn->mb), - membuf_num_space(&p->rpc_conn->mb)); - /* lightningd goes away, we go away. */ - if (r == 0) - exit(0); - if (r < 0) - plugin_err(p, "Reading JSON input: %s", strerror(errno)); - membuf_added(&p->rpc_conn->mb, r); - } - - return end + 2 - membuf_elems(&p->rpc_conn->mb); -} - /* This closes a JSON response and writes it out. */ static void finish_and_send_json(int fd, struct json_out *jout) { @@ -742,40 +712,63 @@ void command_set_usage(struct command *cmd, const char *usage TAKES) cmd->methodname); } -/* Reads rpc reply and returns tokens, setting contents to 'error' or -- * 'result' (depending on *error). */ -static jsmntok_t *read_rpc_reply(const tal_t *ctx, - struct plugin *plugin, - const jsmntok_t **contents, - bool *error, - int *reqlen) +static const char *read_one_json_sync(struct plugin *p, const jsmntok_t **toks) { - jsmntok_t *toks; + for (;;) { + const char *buf, *error; - do { - *reqlen = read_json_from_rpc(plugin); + error = jsonrpc_io_parse(tmpctx, p->sync_io, toks, &buf); + if (error) + plugin_err(p, "Parsing sync lightningd: %s", error); + if (*toks) + return buf; - toks = json_parse_simple(ctx, - membuf_elems(&plugin->rpc_conn->mb), - *reqlen); - if (!toks) - plugin_err(plugin, "Malformed JSON reply '%.*s'", - *reqlen, membuf_elems(&plugin->rpc_conn->mb)); + /* lightningd goes away, we go away. */ + if (!jsonrpc_sync_read(p->sync_io, p->sync_fd)) { + if (errno == 0) + exit(0); + else + plugin_err(p, "Reading sync lightningd: %s", + strerror(errno)); + } + } +} + +/* Reads rpc reply and returns result tokens */ +static const jsmntok_t *read_sync_rpc_reply(const tal_t *ctx, + struct plugin *plugin, + const char *method, + const char **final_buffer) +{ + const jsmntok_t *errtok, *resulttok, *toks; + const char *buffer; + + for (;;) { + buffer = read_one_json_sync(plugin, &toks); /* FIXME: Don't simply ignore notifications here! */ - } while (!json_get_member(membuf_elems(&plugin->rpc_conn->mb), toks, - "id")); + if (json_get_member(buffer, toks, "id")) + break; + jsonrpc_io_parse_done(plugin->sync_io); + } - *contents = json_get_member(membuf_elems(&plugin->rpc_conn->mb), toks, "error"); - if (*contents) - *error = true; - else { - *contents = json_get_member(membuf_elems(&plugin->rpc_conn->mb), toks, - "result"); - if (!*contents) - plugin_err(plugin, "JSON reply with no 'result' nor 'error'? '%.*s'", - *reqlen, membuf_elems(&plugin->rpc_conn->mb)); - *error = false; + errtok = json_get_member(buffer, toks, "error"); + if (errtok) { + plugin_err(plugin, "Got error result to %s: '%.*s'", + method, + json_tok_full_len(toks), + json_tok_full(buffer, toks)); } + resulttok = json_get_member(buffer, toks, "result"); + if (!resulttok) { + plugin_err(plugin, "JSON reply with no 'result' nor 'error'? '%.*s'", + json_tok_full_len(toks), + json_tok_full(buffer, toks)); + } + + /* Make the returned pointers valid tal object */ + json_dup_contents(ctx, buffer, resulttok, final_buffer, &toks); + jsonrpc_io_parse_done(plugin->sync_io); + return toks; } @@ -786,13 +779,8 @@ static const jsmntok_t *sync_req(const tal_t *ctx, const struct json_out *params TAKES, const char **resp) { - bool error; - jsmntok_t *toks; - const jsmntok_t *contents; - int reqlen; struct json_out *jout = json_out_new(tmpctx); const char *id = json_id(tmpctx, plugin, "init/", method); - size_t num_toks; json_out_start(jout, NULL, '{'); json_out_addstr(jout, "jsonrpc", "2.0"); @@ -810,23 +798,15 @@ static const jsmntok_t *sync_req(const tal_t *ctx, /* If we're past init, we may need a new fd (the old one * is being used for async comms). */ - if (plugin->rpc_conn->fd == -1) - plugin->rpc_conn->fd = rpc_open(plugin); - - finish_and_send_json(plugin->rpc_conn->fd, jout); - - toks = read_rpc_reply(ctx, plugin, &contents, &error, &reqlen); - if (error) - plugin_err(plugin, "Got error reply to %s: '%.*s'", - method, reqlen, membuf_elems(&plugin->rpc_conn->mb)); + if (plugin->sync_fd == -1) { + plugin->sync_fd = rpc_open(plugin); + if (!plugin->sync_io) + plugin->sync_io = jsonrpc_io_new(plugin); + } - *resp = membuf_consume(&plugin->rpc_conn->mb, reqlen); + finish_and_send_json(plugin->sync_fd, jout); - /* Make the returned pointer the valid tal object of minimal length */ - num_toks = json_next(contents) - contents; - memmove(toks, contents, num_toks * sizeof(*toks)); - tal_resize(&toks, num_toks); - return toks; + return read_sync_rpc_reply(ctx, plugin, method, resp); } const jsmntok_t *jsonrpc_request_sync(const tal_t *ctx, @@ -835,7 +815,6 @@ const jsmntok_t *jsonrpc_request_sync(const tal_t *ctx, const struct json_out *params TAKES, const char **resp) { - return sync_req(ctx, cmd->plugin, method, params, resp); } @@ -1534,7 +1513,6 @@ static struct command_result *handle_init(struct command *cmd, size_t i; char *dir, *network; struct plugin *p = cmd->plugin; - bool with_rpc; const char *err; configtok = json_get_member(buf, params, "configuration"); @@ -1561,17 +1539,10 @@ static struct command_result *handle_init(struct command *cmd, /* Only attempt to connect if the plugin has configured the rpc_conn * already, if that's not the case we were told to run without an RPC * connection, so don't even log an error. */ - if (p->rpc_conn != NULL) { - p->rpc_conn->fd = rpc_open(p); - if (p->rpc_conn->fd == -1) - with_rpc = false; - else - with_rpc = true; - - membuf_init(&p->rpc_conn->mb, tal_arr(p, char, READ_CHUNKSIZE), - READ_CHUNKSIZE, membuf_tal_resize); - } else - with_rpc = false; + if (p->sync_io) + p->sync_fd = rpc_open(p); + else + p->sync_fd = -1; opttok = json_get_member(buf, params, "options"); json_for_each_obj(i, t, opttok) { @@ -1595,11 +1566,12 @@ static struct command_result *handle_init(struct command *cmd, disable)); } - if (with_rpc) { + /* Now set up async. */ + if (p->sync_fd != -1) { struct out_req *req; struct command *aux_cmd = aux_command(cmd); - io_new_conn(p, p->rpc_conn->fd, rpc_conn_init, p); + io_new_conn(p, p->sync_fd, rpc_conn_init, p); /* In case they intercept rpc_command, we can't do this sync. */ req = jsonrpc_request_start(aux_cmd, "listconfigs", get_beglist, plugin_broken_cb, NULL); @@ -1607,7 +1579,7 @@ static struct command_result *handle_init(struct command *cmd, send_outreq(req); /* We will open a new one if we want to be sync. */ - p->rpc_conn->fd = -1; + p->sync_fd = -1; } return command_success(cmd, json_out_obj(cmd, NULL, NULL)); @@ -2367,13 +2339,10 @@ static struct plugin *new_plugin(const tal_t *ctx, p->beglist = NULL; p->desired_features = tal_steal(p, features); - if (init_rpc) { - /* Sync RPC FIXME: maybe go full async ? */ - p->rpc_conn = tal(p, struct rpc_conn); - } else { - p->rpc_conn = NULL; - } - + if (init_rpc) + p->sync_io = jsonrpc_io_new(p); + else + p->sync_io = NULL; p->init = init; p->manifested = p->initialized = p->exiting = false; p->restartability = restartability; diff --git a/plugins/libplugin.h b/plugins/libplugin.h index 550429950995..20254b46983d 100644 --- a/plugins/libplugin.h +++ b/plugins/libplugin.h @@ -5,7 +5,6 @@ #include #include -#include #include #include #include