Skip to content

Commit

Permalink
Merge pull request rabbitmq#10712 from rabbitmq/ik-10535-fix-duplicat…
Browse files Browse the repository at this point in the history
…ion-ss-partition-name-error

FIX rabbitmq#10535. Super Streams: check for duplicate partitions names
  • Loading branch information
michaelklishin authored Mar 11, 2024
2 parents 243064b + 8d3a3f3 commit 0d3a45c
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 1 deletion.
21 changes: 21 additions & 0 deletions deps/rabbitmq_stream/src/rabbit_stream_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,7 @@ validate_super_stream_creation(_VirtualHost, _Name, Partitions, BindingKeys)
{error, {validation_failed, "There must be the same number of partitions and binding keys"}};
validate_super_stream_creation(VirtualHost, Name, Partitions, _BindingKeys) ->
maybe
ok ?= validate_super_stream_partitions(Partitions),
ok ?= case rabbit_vhost_limit:would_exceed_queue_limit(length(Partitions), VirtualHost) of
false ->
ok;
Expand All @@ -704,6 +705,26 @@ validate_super_stream_creation(VirtualHost, Name, Partitions, _BindingKeys) ->
ok ?= check_already_existing_queue(VirtualHost, Partitions)
end.

validate_super_stream_partitions(Partitions) ->
case erlang:length(Partitions) == sets:size(sets:from_list(Partitions)) of
true ->
case lists:dropwhile(fun(Partition) ->
case rabbit_stream_utils:enforce_correct_name(Partition) of
{ok, _} -> true;
_ -> false
end
end, Partitions) of
[] ->
ok;
InvalidPartitions -> {error, {validation_failed,
{rabbit_misc:format("~ts is not a correct partition names",
[InvalidPartitions])}}}
end;
_ -> {error, {validation_failed,
{rabbit_misc:format("Duplicate partition names found ~ts",
[Partitions])}}}
end.

exchange_exists(VirtualHost, Name) ->
case rabbit_stream_utils:enforce_correct_name(Name) of
{ok, CorrectName} ->
Expand Down
24 changes: 23 additions & 1 deletion deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ groups() ->
connection_should_be_closed_on_token_expiry,
should_receive_metadata_update_after_update_secret,
store_offset_requires_read_access,
offset_lag_calculation
offset_lag_calculation,
test_super_stream_duplicate_partitions
]},
%% Run `test_global_counters` on its own so the global metrics are
%% initialised to 0 for each testcase
Expand Down Expand Up @@ -395,6 +396,27 @@ test_super_stream_creation_deletion(Config) ->
closed = wait_for_socket_close(T, S, 10),
ok.

test_super_stream_duplicate_partitions(Config) ->
T = gen_tcp,
Port = get_port(T, Config),
Opts = get_opts(T),
{ok, S} = T:connect("localhost", Port, Opts),
C = rabbit_stream_core:init(0),
test_peer_properties(T, S, C),
test_authenticate(T, S, C),

Ss = atom_to_binary(?FUNCTION_NAME, utf8),
Partitions = [<<"same-name">>, <<"same-name">>],
SsCreationFrame = request({create_super_stream, Ss, Partitions, [<<"1">>, <<"2">>], #{}}),
ok = T:send(S, SsCreationFrame),
{Cmd1, _} = receive_commands(T, S, C),
?assertMatch({response, 1, {create_super_stream, ?RESPONSE_CODE_PRECONDITION_FAILED}},
Cmd1),

test_close(T, S, C),
closed = wait_for_socket_close(T, S, 10),
ok.

test_metadata(Config) ->
Stream = atom_to_binary(?FUNCTION_NAME, utf8),
Transport = gen_tcp,
Expand Down

0 comments on commit 0d3a45c

Please sign in to comment.