From 62bcde00bff27a6d72449c1af1b07eb3f28be1a5 Mon Sep 17 00:00:00 2001 From: Andreas Schultz Date: Tue, 26 Nov 2024 10:25:58 +0100 Subject: [PATCH] add stream purge with payload --- src/nats_stream.erl | 16 ++++++++++++---- test/nats_jetstream_SUITE.erl | 8 ++++++++ 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/src/nats_stream.erl b/src/nats_stream.erl index 880ec0d..ad65795 100644 --- a/src/nats_stream.erl +++ b/src/nats_stream.erl @@ -35,7 +35,7 @@ get/2, get/3, update/3, delete/2, delete/3, - purge/2, purge/3, + purge/2, purge/3, purge/4, list/1, list/2, list/3, names/1, names/2, names/3, msg_get/3, msg_get/4, @@ -245,12 +245,20 @@ delete(Conn, Name, Opts) -> Other end. -purge(Conn, Name) -> +purge(Conn, Name) + when is_binary(Name) -> purge(Conn, Name, #{}). -purge(Conn, Name, Opts) -> +purge(Conn, Name, Opts0) + when is_binary(Name), is_map(Opts0) -> + Msg = maps:with([filter, seq, keep], Opts0), + Opts = maps:without([filter, seq, keep], Opts0), + purge(Conn, Name, Msg, Opts). + +purge(Conn, Name, Msg, Opts) + when is_binary(Name), is_map(Msg), is_map(Opts) -> Topic = make_js_api_topic(~"PURGE", Name, Opts), - case nats:request(Conn, Topic, <<>>, #{}) of + case nats:request(Conn, Topic, json_encode(Msg), #{}) of {ok, Response} -> unmarshal_response(?JS_API_V1_STREAM_PURGE_RESPONSE, Response); Other -> diff --git a/test/nats_jetstream_SUITE.erl b/test/nats_jetstream_SUITE.erl index 03bc3ac..5ab5723 100644 --- a/test/nats_jetstream_SUITE.erl +++ b/test/nats_jetstream_SUITE.erl @@ -256,6 +256,14 @@ kv(_Client, Con, _Config) -> ct:pal("KvPurgeR2: ~p", [KvPurgeR2]), ?assertMatch({ok, #{stream := _, seq := _}}, KvPurgeR2), + {deleted, _} = nats_kv:get(Con, ?KV_BUCKET, ?KV_KEY_2, BucketCfg), + + {ok, #{success := true, purged := 1}} = + nats_stream:purge(Con, <<"KV_", ?KV_BUCKET/binary>>, + #{filter => <<"$KV.", ?KV_BUCKET/binary, $., ?KV_KEY_2/binary>>}, #{}), + + {error, #{code := 404}} = nats_kv:get(Con, ?KV_BUCKET, ?KV_KEY_2, BucketCfg), + BucketDeleteR1 = nats_kv:delete_bucket(Con, ?KV_BUCKET), ct:pal("R1: ~p", [BucketDeleteR1]), ?assertMatch({ok, #{success := true}}, BucketDeleteR1),