Skip to content

Commit

Permalink
Source Notion: increase max_retry attempts on retryable errors (#31825)
Browse files Browse the repository at this point in the history
Co-authored-by: ChristoGrab <[email protected]>
  • Loading branch information
ChristoGrab and ChristoGrab authored Oct 26, 2023
1 parent 8f40b4b commit 467b869
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 6e00b415-b02e-4160-bf02-58176a0ae687
dockerImageTag: 2.0.5
dockerImageTag: 2.0.6
dockerRepository: airbyte/source-notion
documentationUrl: https://docs.airbyte.com/integrations/sources/notion
githubIssueLabel: source-notion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,16 @@ def availability_strategy(self) -> HttpAvailabilityStrategy:
return NotionAvailabilityStrategy()

@property
def retry_factor(self) -> float:
return 8
def retry_factor(self) -> int:
return 5

@property
def max_retries(self) -> int:
return 7

@property
def max_time(self) -> int:
return 60 * 11

@staticmethod
def check_invalid_start_cursor(response: requests.Response):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import re
import time
from unittest.mock import MagicMock, patch

Expand Down Expand Up @@ -219,54 +220,74 @@ def test_invalid_start_cursor(parent, requests_mock, caplog):
inputs = {"sync_mode": SyncMode.incremental, "cursor_field": [], "stream_state": {}}
with patch.object(stream, "backoff_time", return_value=0.1):
list(stream.read_records(**inputs))
assert search_endpoint.call_count == 6
assert search_endpoint.call_count == 8
assert f"Skipping stream pages, error message: {error_message}" in caplog.messages


@mark.parametrize(
"status_code,error_code,error_message,expected_backoff_time",
"status_code,error_code,error_message, expected_backoff_time",
[
(400, "validation_error", "The start_cursor provided is invalid: wrong_start_cursor", 10),
(429, "rate_limited", "Rate Limited", 5), # Assuming retry-after header value is 5
(
500,
"internal_server_error",
"Internal server error",
128,
), # Using retry_factor of 8, the final backoff time should be 128 seconds
(400, "validation_error", "The start_cursor provided is invalid: wrong_start_cursor", [10, 10, 10, 10, 10, 10, 10]),
(429, "rate_limited", "Rate Limited", [5, 5, 5, 5, 5, 5, 5]), # Retry-header is set to 5 seconds for test
(500, "internal_server_error", "Internal server error", [5, 10, 20, 40, 80, 5, 10]),
],
)
def test_retry_logic(status_code, error_code, error_message, expected_backoff_time, parent, requests_mock, caplog):
stream = parent
exception_info = None

# Set up a generator that alternates between error and success responses, to check the reset of backoff time between failures
mock_responses = (
[
{
"status_code": status_code,
"response": {"object": "error", "status": status_code, "code": error_code, "message": error_message},
}
for _ in range(5)
]
+ [{"status_code": 200, "response": {"object": "list", "results": [], "has_more": True, "next_cursor": "dummy_cursor"}}]
+ [
{
"status_code": status_code,
"response": {"object": "error", "status": status_code, "code": error_code, "message": error_message},
}
for _ in range(2)
]
+ [{"status_code": 200, "response": {"object": "list", "results": [], "has_more": False, "next_cursor": None}}]
)

def response_callback(request, context):
# Get the next response from the mock_responses list
response = mock_responses.pop(0)
context.status_code = response["status_code"]
return response["response"]

# Mock the time.sleep function to avoid waiting during tests
with patch.object(time, "sleep", return_value=None):
search_endpoint = requests_mock.post(
"https://api.notion.com/v1/search",
status_code=status_code,
json={"object": "error", "status": status_code, "code": error_code, "message": error_message},
json=response_callback,
headers={"retry-after": "5"},
)

inputs = {"sync_mode": SyncMode.incremental, "cursor_field": [], "stream_state": {}}

inputs = {"sync_mode": SyncMode.full_refresh, "cursor_field": [], "stream_state": {}}
try:
list(stream.read_records(**inputs))
except (UserDefinedBackoffException, DefaultBackoffException) as error:
exception_info = error

# For 429 errors, assert the backoff time matches retry-header value
if status_code == 429:
assert exception_info.backoff == expected_backoff_time

# For 500 cases, assert the backoff time in the penultimate log message
# is 128 (given a retry_factor of 8) to ensure exponential backoff is applied
if status_code == 500:
log_messages = [record.message for record in caplog.records]
expected_log_message = f"Waiting {expected_backoff_time} seconds then retrying..."
assert expected_log_message in log_messages[-2]
# For all test cases, assert the endpoint was hit 6 times
assert search_endpoint.call_count == 6
except (UserDefinedBackoffException, DefaultBackoffException) as e:
return e

# Check that the endpoint was called the expected number of times
assert search_endpoint.call_count == 9

# Additional assertions to check reset of backoff time
# Find the backoff times from the message logs to compare against expected backoff times
log_messages = [record.message for record in caplog.records]
backoff_times = [
round(float(re.search(r"(\d+(\.\d+)?) seconds", msg).group(1)))
for msg in log_messages
if any(word in msg for word in ["Sleeping", "Waiting"])
]

assert backoff_times == expected_backoff_time, f"Unexpected backoff times: {backoff_times}"


# Tests for Comments stream
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/notion.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ The connector is restricted by Notion [request limits](https://developers.notion

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :------------------------------------------------- |
| 2.0.6 | 2023-10-25 | [31825](https://github.com/airbytehq/airbyte/pull/31825) | Increase max_retries on retryable errors |
| 2.0.5 | 2023-10-23 | [31742](https://github.com/airbytehq/airbyte/pull/31742) | Add 'synced_block' property to Blocks schema |
| 2.0.4 | 2023-10-19 | [31625](https://github.com/airbytehq/airbyte/pull/31625) | Fix check_connection method |
| 2.0.3 | 2023-10-19 | [31612](https://github.com/airbytehq/airbyte/pull/31612) | Add exponential backoff for 500 errors |
Expand Down

0 comments on commit 467b869

Please sign in to comment.