Skip to content

Commit

Permalink
fix: Retry getting bulk tracking logs (#330)
Browse files Browse the repository at this point in the history
* fix: Retry getting bulk tracking logs
* chore: Bump version to 5.5.5
  • Loading branch information
bmtcril authored Aug 4, 2023
1 parent 984d89e commit 8cfe9f3
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 6 deletions.
2 changes: 1 addition & 1 deletion event_routing_backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
Various backends for receiving edX LMS events..
"""

__version__ = '5.5.4'
__version__ = '5.5.5'
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import event_routing_backends.management.commands.transform_tracking_logs as transform_tracking_logs
from event_routing_backends.management.commands.helpers.queued_sender import QueuedSender
from event_routing_backends.management.commands.transform_tracking_logs import (
_get_chunks,
get_dest_config_from_options,
get_libcloud_drivers,
get_source_config_from_options,
Expand Down Expand Up @@ -394,3 +395,30 @@ def test_get_dest_config_lrs():
assert config is None
assert container is None
assert prefix is None


def test_get_chunks():
"""
Tests the retry functionality of the get_chunks function.
"""
fake_source = MagicMock()
fake_source.download_object_range_as_stream.return_value = "abc"

# Check that we got the expected return value
assert _get_chunks(fake_source, "", 0, 1) == "abc"
# Check that we broke out of the retry loop as expected
assert fake_source.download_object_range_as_stream.call_count == 1

fake_source_err = MagicMock()
fake_source_err.download_object_range_as_stream.side_effect = Exception("boom")

# Speed up our test, don't wait for the sleep
with patch("event_routing_backends.management.commands.transform_tracking_logs.sleep"):
with pytest.raises(Exception) as e:
_get_chunks(fake_source_err, "", 0, 1)

# Make sure we're getting the error we expect
assert "boom" in str(e)

# Make sure we got the correct number of retries
assert fake_source_err.download_object_range_as_stream.call_count == 3
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import os
from io import BytesIO
from textwrap import dedent
from time import sleep

from django.conf import settings
from django.core.management.base import BaseCommand
from libcloud.storage.providers import get_driver
from libcloud.storage.types import Provider
Expand All @@ -16,6 +18,40 @@
CHUNK_SIZE = 1024 * 1024 * 2


def _get_chunks(source, file, start_byte, end_byte):
"""
Fetch a chunk from the upstream source, retry 3 times if necessary.
Often an upstream provider like S3 will fail occasionally on big jobs. This
tries to handle any of those cases gracefully.
"""
chunks = None
num_retries = getattr(settings, 'EVENT_ROUTING_BACKEND_BULK_DOWNLOAD_MAX_RETRIES', 3)
retry_countdown = getattr(settings, 'EVENT_ROUTING_BACKEND_BULK_DOWNLOAD_COUNTDOWN', 1)

# Skipping coverage here because it wants to test a branch that will never
# be hit (for -> return)
for try_number in range(1, num_retries+1): # pragma: no cover
try:
chunks = source.download_object_range_as_stream(
file,
start_bytes=start_byte,
end_bytes=end_byte
)
break
# Catching all exceptions here because there's no telling what all
# the possible errors from different libcloud providers are.
except Exception as e: # pylint: disable=broad-exception-caught
print(e)
if try_number == num_retries:
print(f"Try {try_number}: Error occurred downloading, giving up.")
raise
print(f"Try {try_number}: Error occurred downloading source file chunk. Trying again in 1 second.")
sleep(retry_countdown)

return chunks


def transform_tracking_logs(
source,
source_container,
Expand Down Expand Up @@ -45,11 +81,8 @@ def transform_tracking_logs(
if end_byte > file.size:
end_byte = file.size

chunks = source.download_object_range_as_stream(
file,
start_bytes=last_successful_byte,
end_bytes=end_byte
)
chunks = _get_chunks(source, file, last_successful_byte, end_byte)

for chunk in chunks:
chunk = chunk.decode('utf-8')

Expand Down
2 changes: 2 additions & 0 deletions event_routing_backends/settings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ def plugin_settings(settings):
settings.XAPI_EVENT_LOGGING_ENABLED = True
settings.EVENT_ROUTING_BACKEND_MAX_RETRIES = 3
settings.EVENT_ROUTING_BACKEND_COUNTDOWN = 30
settings.EVENT_ROUTING_BACKEND_BULK_DOWNLOAD_MAX_RETRIES = 3
settings.EVENT_ROUTING_BACKEND_BULK_DOWNLOAD_COUNTDOWN = 1

# .. setting_name: XAPI_AGENT_IFI_TYPE
# .. setting_default: 'external_id'
Expand Down

0 comments on commit 8cfe9f3

Please sign in to comment.