Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: send chunk_size instead of manually handling it #459

Merged
merged 2 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -179,14 +179,13 @@ def _get_raw_log_size():
return os.path.getsize(tracking_log_path)


def _get_raw_log_stream(_, start_bytes, end_bytes):
def _get_raw_log_stream(_, start_bytes, chunk_size):
"""
Return raw event json parsed from current fixtures
"""
tracking_log_path = _get_tracking_log_file_path()
with open(tracking_log_path, "rb") as current:
current.seek(start_bytes)
yield current.read(end_bytes - start_bytes)
yield current.read()


@pytest.mark.parametrize("command_opts", command_options())
Expand Down Expand Up @@ -421,7 +420,7 @@ def test_get_chunks():
fake_source.download_object_range_as_stream.return_value = "abc"

# Check that we got the expected return value
assert _get_chunks(fake_source, "", 0, 1) == "abc"
assert _get_chunks(fake_source, "") == "abc"
# Check that we broke out of the retry loop as expected
assert fake_source.download_object_range_as_stream.call_count == 1

Expand All @@ -431,7 +430,7 @@ def test_get_chunks():
# Speed up our test, don't wait for the sleep
with patch("event_routing_backends.management.commands.transform_tracking_logs.sleep"):
with pytest.raises(Exception) as e:
_get_chunks(fake_source_err, "", 0, 1)
_get_chunks(fake_source_err, "")

# Make sure we're getting the error we expect
assert "boom" in str(e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
CHUNK_SIZE = 1024 * 1024 * 2


def _get_chunks(source, file, start_byte, end_byte):
def _get_chunks(source, file):
"""
Fetch a chunk from the upstream source, retry 3 times if necessary.

Expand All @@ -35,8 +35,8 @@ def _get_chunks(source, file, start_byte, end_byte):
try:
chunks = source.download_object_range_as_stream(
file,
start_bytes=start_byte,
end_bytes=end_byte
start_bytes=0,
chunk_size=CHUNK_SIZE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like I tried this at some point and it didn't work as I expected it to. Have you made sure that you're getting chunks of the expected sizes?

Copy link
Contributor Author

@Ian2012 Ian2012 Oct 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I didn't, and it didn't solve the problem.

Actually, it does, I was testing it with a different branch.

)
break
# Catching all exceptions here because there's no telling what all
Expand Down Expand Up @@ -72,29 +72,22 @@ def transform_tracking_logs(
# Download the file as a stream of characters to save on memory
print(f"Streaming file {file}...")

last_successful_byte = 0
line = ""

while last_successful_byte < int(file.size):
end_byte = last_successful_byte + CHUNK_SIZE
chunks = _get_chunks(source, file)

end_byte = min(end_byte, file.size)
for chunk in chunks:
chunk = chunk.decode('utf-8')

chunks = _get_chunks(source, file, last_successful_byte, end_byte)
# Loop through this chunk, if we find a newline it's time to process
# otherwise just keep appending.
for char in chunk:
if char == "\n" and line:
sender.transform_and_queue(line)
line = ""
else:
line += char

for chunk in chunks:
chunk = chunk.decode('utf-8')

# Loop through this chunk, if we find a newline it's time to process
# otherwise just keep appending.
for char in chunk:
if char == "\n" and line:
sender.transform_and_queue(line)
line = ""
else:
line += char

last_successful_byte = end_byte
# Sometimes the file doesn't end with a newline, we try to use
# any remaining bytes as a final line.
if line:
Expand Down