diff --git a/event_routing_backends/management/commands/tests/test_transform_tracking_logs.py b/event_routing_backends/management/commands/tests/test_transform_tracking_logs.py index 27ed58a1..60cc412d 100644 --- a/event_routing_backends/management/commands/tests/test_transform_tracking_logs.py +++ b/event_routing_backends/management/commands/tests/test_transform_tracking_logs.py @@ -179,14 +179,13 @@ def _get_raw_log_size(): return os.path.getsize(tracking_log_path) -def _get_raw_log_stream(_, start_bytes, end_bytes): +def _get_raw_log_stream(_, start_bytes, chunk_size): """ Return raw event json parsed from current fixtures """ tracking_log_path = _get_tracking_log_file_path() with open(tracking_log_path, "rb") as current: - current.seek(start_bytes) - yield current.read(end_bytes - start_bytes) + yield current.read() @pytest.mark.parametrize("command_opts", command_options()) @@ -421,7 +420,7 @@ def test_get_chunks(): fake_source.download_object_range_as_stream.return_value = "abc" # Check that we got the expected return value - assert _get_chunks(fake_source, "", 0, 1) == "abc" + assert _get_chunks(fake_source, "") == "abc" # Check that we broke out of the retry loop as expected assert fake_source.download_object_range_as_stream.call_count == 1 @@ -431,7 +430,7 @@ def test_get_chunks(): # Speed up our test, don't wait for the sleep with patch("event_routing_backends.management.commands.transform_tracking_logs.sleep"): with pytest.raises(Exception) as e: - _get_chunks(fake_source_err, "", 0, 1) + _get_chunks(fake_source_err, "") # Make sure we're getting the error we expect assert "boom" in str(e) diff --git a/event_routing_backends/management/commands/transform_tracking_logs.py b/event_routing_backends/management/commands/transform_tracking_logs.py index 6ccccec7..4b8b7750 100644 --- a/event_routing_backends/management/commands/transform_tracking_logs.py +++ b/event_routing_backends/management/commands/transform_tracking_logs.py @@ -18,7 +18,7 @@ CHUNK_SIZE = 1024 * 1024 * 2 -def _get_chunks(source, file, start_byte, end_byte): +def _get_chunks(source, file): """ Fetch a chunk from the upstream source, retry 3 times if necessary. @@ -35,8 +35,8 @@ def _get_chunks(source, file, start_byte, end_byte): try: chunks = source.download_object_range_as_stream( file, - start_bytes=start_byte, - end_bytes=end_byte + start_bytes=0, + chunk_size=CHUNK_SIZE ) break # Catching all exceptions here because there's no telling what all @@ -72,29 +72,22 @@ def transform_tracking_logs( # Download the file as a stream of characters to save on memory print(f"Streaming file {file}...") - last_successful_byte = 0 line = "" - while last_successful_byte < int(file.size): - end_byte = last_successful_byte + CHUNK_SIZE + chunks = _get_chunks(source, file) - end_byte = min(end_byte, file.size) + for chunk in chunks: + chunk = chunk.decode('utf-8') - chunks = _get_chunks(source, file, last_successful_byte, end_byte) + # Loop through this chunk, if we find a newline it's time to process + # otherwise just keep appending. + for char in chunk: + if char == "\n" and line: + sender.transform_and_queue(line) + line = "" + else: + line += char - for chunk in chunks: - chunk = chunk.decode('utf-8') - - # Loop through this chunk, if we find a newline it's time to process - # otherwise just keep appending. - for char in chunk: - if char == "\n" and line: - sender.transform_and_queue(line) - line = "" - else: - line += char - - last_successful_byte = end_byte # Sometimes the file doesn't end with a newline, we try to use # any remaining bytes as a final line. if line: