From 6c216aa2b5cd7b30127dffed99673c8f26921295 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20G=C3=B6m=C3=B6ri?= Date: Mon, 6 May 2024 18:55:52 +0200 Subject: [PATCH] Read full message body at once when larger than 4MB When a message (much) larger than 4MB is read from an rdq file in `rabbit_msg_store:scan/6)` adjust the read size to the full message size instead of appending it together from 4MB chunks, which leaves a lot of garbage and memory fragmentation behind. --- deps/rabbit/src/rabbit_msg_store.erl | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/deps/rabbit/src/rabbit_msg_store.erl b/deps/rabbit/src/rabbit_msg_store.erl index ddc53963bec4..3a57cdd39fa0 100644 --- a/deps/rabbit/src/rabbit_msg_store.erl +++ b/deps/rabbit/src/rabbit_msg_store.erl @@ -1396,7 +1396,7 @@ scan_file_for_valid_messages(Path) -> {ok, Fd} -> {ok, FileSize} = file:position(Fd, eof), {ok, _} = file:position(Fd, bof), - Messages = scan(<<>>, Fd, 0, FileSize, #{}, []), + Messages = scan(<<>>, Fd, 0, FileSize, #{}, [], ?SCAN_BLOCK_SIZE), ok = file:close(Fd), case Messages of [] -> @@ -1412,8 +1412,8 @@ scan_file_for_valid_messages(Path) -> Reason}} end. -scan(Buffer, Fd, Offset, FileSize, MsgIdsFound, Acc) -> - case file:read(Fd, ?SCAN_BLOCK_SIZE) of +scan(Buffer, Fd, Offset, FileSize, MsgIdsFound, Acc, ScanSize) -> + case file:read(Fd, ScanSize) of eof -> Acc; {ok, Data0} -> @@ -1448,10 +1448,11 @@ scan_data(<> = Data, %% This might be the start of a message. scan_data(<> = Data, Fd, Offset, FileSize, MsgIdsFound, Acc) when byte_size(Rest) < Size + 1, Size < FileSize - Offset -> - scan(Data, Fd, Offset, FileSize, MsgIdsFound, Acc); + ScanSize = max(?SCAN_BLOCK_SIZE, Size - byte_size(Rest) + 1), + scan(Data, Fd, Offset, FileSize, MsgIdsFound, Acc, ScanSize); scan_data(Data, Fd, Offset, FileSize, MsgIdsFound, Acc) when byte_size(Data) < 8 -> - scan(Data, Fd, Offset, FileSize, MsgIdsFound, Acc); + scan(Data, Fd, Offset, FileSize, MsgIdsFound, Acc, ?SCAN_BLOCK_SIZE); %% This is definitely not a message. Try the next byte. scan_data(<<_, Rest/bits>>, Fd, Offset, FileSize, MsgIdsFound, Acc) -> scan_data(Rest, Fd, Offset + 1, FileSize, MsgIdsFound, Acc).