Skip to content

Commit

Permalink
Read full message body at once when larger than 4MB
Browse files Browse the repository at this point in the history
When a message (much) larger than 4MB is read from an rdq file in
`rabbit_msg_store:scan/6)` adjust the read size to the full message
size instead of appending it together from 4MB chunks, which leaves a
lot of garbage and memory fragmentation behind.
  • Loading branch information
gomoripeti committed May 6, 2024
1 parent a5e2add commit 6c216aa
Showing 1 changed file with 6 additions and 5 deletions.
11 changes: 6 additions & 5 deletions deps/rabbit/src/rabbit_msg_store.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1396,7 +1396,7 @@ scan_file_for_valid_messages(Path) ->
{ok, Fd} ->
{ok, FileSize} = file:position(Fd, eof),
{ok, _} = file:position(Fd, bof),
Messages = scan(<<>>, Fd, 0, FileSize, #{}, []),
Messages = scan(<<>>, Fd, 0, FileSize, #{}, [], ?SCAN_BLOCK_SIZE),
ok = file:close(Fd),
case Messages of
[] ->
Expand All @@ -1412,8 +1412,8 @@ scan_file_for_valid_messages(Path) ->
Reason}}
end.

scan(Buffer, Fd, Offset, FileSize, MsgIdsFound, Acc) ->
case file:read(Fd, ?SCAN_BLOCK_SIZE) of
scan(Buffer, Fd, Offset, FileSize, MsgIdsFound, Acc, ScanSize) ->
case file:read(Fd, ScanSize) of
eof ->
Acc;
{ok, Data0} ->
Expand Down Expand Up @@ -1448,10 +1448,11 @@ scan_data(<<Size:64, MsgIdAndMsg:Size/binary, 255, Rest/bits>> = Data,
%% This might be the start of a message.
scan_data(<<Size:64, Rest/bits>> = Data, Fd, Offset, FileSize, MsgIdsFound, Acc)
when byte_size(Rest) < Size + 1, Size < FileSize - Offset ->
scan(Data, Fd, Offset, FileSize, MsgIdsFound, Acc);
ScanSize = max(?SCAN_BLOCK_SIZE, Size - byte_size(Rest) + 1),
scan(Data, Fd, Offset, FileSize, MsgIdsFound, Acc, ScanSize);
scan_data(Data, Fd, Offset, FileSize, MsgIdsFound, Acc)
when byte_size(Data) < 8 ->
scan(Data, Fd, Offset, FileSize, MsgIdsFound, Acc);
scan(Data, Fd, Offset, FileSize, MsgIdsFound, Acc, ?SCAN_BLOCK_SIZE);
%% This is definitely not a message. Try the next byte.
scan_data(<<_, Rest/bits>>, Fd, Offset, FileSize, MsgIdsFound, Acc) ->
scan_data(Rest, Fd, Offset + 1, FileSize, MsgIdsFound, Acc).
Expand Down

0 comments on commit 6c216aa

Please sign in to comment.