Skip to content

Commit

Permalink
add stream purge with payload
Browse files Browse the repository at this point in the history
  • Loading branch information
RoadRunnr committed Nov 26, 2024
1 parent 593c5a0 commit 62bcde0
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 4 deletions.
16 changes: 12 additions & 4 deletions src/nats_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
get/2, get/3,
update/3,
delete/2, delete/3,
purge/2, purge/3,
purge/2, purge/3, purge/4,
list/1, list/2, list/3,
names/1, names/2, names/3,
msg_get/3, msg_get/4,
Expand Down Expand Up @@ -245,12 +245,20 @@ delete(Conn, Name, Opts) ->
Other
end.

purge(Conn, Name) ->
purge(Conn, Name)
when is_binary(Name) ->
purge(Conn, Name, #{}).

purge(Conn, Name, Opts) ->
purge(Conn, Name, Opts0)
when is_binary(Name), is_map(Opts0) ->
Msg = maps:with([filter, seq, keep], Opts0),
Opts = maps:without([filter, seq, keep], Opts0),
purge(Conn, Name, Msg, Opts).

purge(Conn, Name, Msg, Opts)
when is_binary(Name), is_map(Msg), is_map(Opts) ->
Topic = make_js_api_topic(~"PURGE", Name, Opts),
case nats:request(Conn, Topic, <<>>, #{}) of
case nats:request(Conn, Topic, json_encode(Msg), #{}) of
{ok, Response} ->
unmarshal_response(?JS_API_V1_STREAM_PURGE_RESPONSE, Response);
Other ->
Expand Down
8 changes: 8 additions & 0 deletions test/nats_jetstream_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,14 @@ kv(_Client, Con, _Config) ->
ct:pal("KvPurgeR2: ~p", [KvPurgeR2]),
?assertMatch({ok, #{stream := _, seq := _}}, KvPurgeR2),

{deleted, _} = nats_kv:get(Con, ?KV_BUCKET, ?KV_KEY_2, BucketCfg),

{ok, #{success := true, purged := 1}} =
nats_stream:purge(Con, <<"KV_", ?KV_BUCKET/binary>>,
#{filter => <<"$KV.", ?KV_BUCKET/binary, $., ?KV_KEY_2/binary>>}, #{}),

{error, #{code := 404}} = nats_kv:get(Con, ?KV_BUCKET, ?KV_KEY_2, BucketCfg),

BucketDeleteR1 = nats_kv:delete_bucket(Con, ?KV_BUCKET),
ct:pal("R1: ~p", [BucketDeleteR1]),
?assertMatch({ok, #{success := true}}, BucketDeleteR1),
Expand Down

0 comments on commit 62bcde0

Please sign in to comment.