diff --git a/airbyte-integrations/connectors/source-notion/metadata.yaml b/airbyte-integrations/connectors/source-notion/metadata.yaml index 75b375ca4b30..aa83c6b45e2c 100644 --- a/airbyte-integrations/connectors/source-notion/metadata.yaml +++ b/airbyte-integrations/connectors/source-notion/metadata.yaml @@ -10,7 +10,7 @@ data: connectorSubtype: api connectorType: source definitionId: 6e00b415-b02e-4160-bf02-58176a0ae687 - dockerImageTag: 2.0.5 + dockerImageTag: 2.0.6 dockerRepository: airbyte/source-notion documentationUrl: https://docs.airbyte.com/integrations/sources/notion githubIssueLabel: source-notion diff --git a/airbyte-integrations/connectors/source-notion/source_notion/streams.py b/airbyte-integrations/connectors/source-notion/source_notion/streams.py index 09b91e7cfed3..402bf5e47117 100644 --- a/airbyte-integrations/connectors/source-notion/source_notion/streams.py +++ b/airbyte-integrations/connectors/source-notion/source_notion/streams.py @@ -61,8 +61,16 @@ def availability_strategy(self) -> HttpAvailabilityStrategy: return NotionAvailabilityStrategy() @property - def retry_factor(self) -> float: - return 8 + def retry_factor(self) -> int: + return 5 + + @property + def max_retries(self) -> int: + return 7 + + @property + def max_time(self) -> int: + return 60 * 11 @staticmethod def check_invalid_start_cursor(response: requests.Response): diff --git a/airbyte-integrations/connectors/source-notion/unit_tests/test_incremental_streams.py b/airbyte-integrations/connectors/source-notion/unit_tests/test_incremental_streams.py index 2b0376116bd1..d34afd131fc0 100644 --- a/airbyte-integrations/connectors/source-notion/unit_tests/test_incremental_streams.py +++ b/airbyte-integrations/connectors/source-notion/unit_tests/test_incremental_streams.py @@ -2,6 +2,7 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +import re import time from unittest.mock import MagicMock, patch @@ -219,54 +220,74 @@ def test_invalid_start_cursor(parent, requests_mock, caplog): inputs = {"sync_mode": SyncMode.incremental, "cursor_field": [], "stream_state": {}} with patch.object(stream, "backoff_time", return_value=0.1): list(stream.read_records(**inputs)) - assert search_endpoint.call_count == 6 + assert search_endpoint.call_count == 8 assert f"Skipping stream pages, error message: {error_message}" in caplog.messages @mark.parametrize( - "status_code,error_code,error_message,expected_backoff_time", + "status_code,error_code,error_message, expected_backoff_time", [ - (400, "validation_error", "The start_cursor provided is invalid: wrong_start_cursor", 10), - (429, "rate_limited", "Rate Limited", 5), # Assuming retry-after header value is 5 - ( - 500, - "internal_server_error", - "Internal server error", - 128, - ), # Using retry_factor of 8, the final backoff time should be 128 seconds + (400, "validation_error", "The start_cursor provided is invalid: wrong_start_cursor", [10, 10, 10, 10, 10, 10, 10]), + (429, "rate_limited", "Rate Limited", [5, 5, 5, 5, 5, 5, 5]), # Retry-header is set to 5 seconds for test + (500, "internal_server_error", "Internal server error", [5, 10, 20, 40, 80, 5, 10]), ], ) def test_retry_logic(status_code, error_code, error_message, expected_backoff_time, parent, requests_mock, caplog): stream = parent - exception_info = None + # Set up a generator that alternates between error and success responses, to check the reset of backoff time between failures + mock_responses = ( + [ + { + "status_code": status_code, + "response": {"object": "error", "status": status_code, "code": error_code, "message": error_message}, + } + for _ in range(5) + ] + + [{"status_code": 200, "response": {"object": "list", "results": [], "has_more": True, "next_cursor": "dummy_cursor"}}] + + [ + { + "status_code": status_code, + "response": {"object": "error", "status": status_code, "code": error_code, "message": error_message}, + } + for _ in range(2) + ] + + [{"status_code": 200, "response": {"object": "list", "results": [], "has_more": False, "next_cursor": None}}] + ) + + def response_callback(request, context): + # Get the next response from the mock_responses list + response = mock_responses.pop(0) + context.status_code = response["status_code"] + return response["response"] + + # Mock the time.sleep function to avoid waiting during tests with patch.object(time, "sleep", return_value=None): search_endpoint = requests_mock.post( "https://api.notion.com/v1/search", - status_code=status_code, - json={"object": "error", "status": status_code, "code": error_code, "message": error_message}, + json=response_callback, headers={"retry-after": "5"}, ) - inputs = {"sync_mode": SyncMode.incremental, "cursor_field": [], "stream_state": {}} - + inputs = {"sync_mode": SyncMode.full_refresh, "cursor_field": [], "stream_state": {}} try: list(stream.read_records(**inputs)) - except (UserDefinedBackoffException, DefaultBackoffException) as error: - exception_info = error - - # For 429 errors, assert the backoff time matches retry-header value - if status_code == 429: - assert exception_info.backoff == expected_backoff_time - - # For 500 cases, assert the backoff time in the penultimate log message - # is 128 (given a retry_factor of 8) to ensure exponential backoff is applied - if status_code == 500: - log_messages = [record.message for record in caplog.records] - expected_log_message = f"Waiting {expected_backoff_time} seconds then retrying..." - assert expected_log_message in log_messages[-2] - # For all test cases, assert the endpoint was hit 6 times - assert search_endpoint.call_count == 6 + except (UserDefinedBackoffException, DefaultBackoffException) as e: + return e + + # Check that the endpoint was called the expected number of times + assert search_endpoint.call_count == 9 + + # Additional assertions to check reset of backoff time + # Find the backoff times from the message logs to compare against expected backoff times + log_messages = [record.message for record in caplog.records] + backoff_times = [ + round(float(re.search(r"(\d+(\.\d+)?) seconds", msg).group(1))) + for msg in log_messages + if any(word in msg for word in ["Sleeping", "Waiting"]) + ] + + assert backoff_times == expected_backoff_time, f"Unexpected backoff times: {backoff_times}" # Tests for Comments stream diff --git a/docs/integrations/sources/notion.md b/docs/integrations/sources/notion.md index 2c2e06acb2b4..9371ad6097a2 100644 --- a/docs/integrations/sources/notion.md +++ b/docs/integrations/sources/notion.md @@ -112,6 +112,7 @@ The connector is restricted by Notion [request limits](https://developers.notion | Version | Date | Pull Request | Subject | | :------ | :--------- | :------------------------------------------------------- | :------------------------------------------------- | +| 2.0.6 | 2023-10-25 | [31825](https://github.com/airbytehq/airbyte/pull/31825) | Increase max_retries on retryable errors | | 2.0.5 | 2023-10-23 | [31742](https://github.com/airbytehq/airbyte/pull/31742) | Add 'synced_block' property to Blocks schema | | 2.0.4 | 2023-10-19 | [31625](https://github.com/airbytehq/airbyte/pull/31625) | Fix check_connection method | | 2.0.3 | 2023-10-19 | [31612](https://github.com/airbytehq/airbyte/pull/31612) | Add exponential backoff for 500 errors |