diff --git a/fivetran_provider_async/hooks.py b/fivetran_provider_async/hooks.py index d281add..ff47ef3 100644 --- a/fivetran_provider_async/hooks.py +++ b/fivetran_provider_async/hooks.py @@ -1,7 +1,9 @@ import asyncio +import time from typing import Any, Dict, cast import aiohttp +import pendulum from aiohttp import ClientResponseError from airflow.exceptions import AirflowException from asgiref.sync import sync_to_async @@ -44,6 +46,7 @@ async def _do_api_call_async(self, endpoint_info, json=None): request_func = session.get elif method == "POST": request_func = session.post + headers.update({"Content-Type": "application/json;version=2"}) elif method == "PATCH": request_func = session.patch headers.update({"Content-Type": "application/json;version=2"}) @@ -92,7 +95,7 @@ async def get_connector_async(self, connector_id): resp = await self._do_api_call_async(("GET", endpoint)) return resp["data"] - async def get_sync_status_async(self, connector_id, previous_completed_at): + async def get_sync_status_async(self, connector_id, previous_completed_at, reschedule_wait_time=0): """ For sensor, return True if connector's 'succeeded_at' field has updated. @@ -102,6 +105,9 @@ async def get_sync_status_async(self, connector_id, previous_completed_at): :param previous_completed_at: The last time the connector ran, collected on Sensor initialization. :type previous_completed_at: pendulum.datetime.DateTime + :param reschedule_wait_time: Optional, if connector is in reset state, + number of seconds to wait before restarting the sync. + :type reschedule_wait_time: int """ connector_details = await self.get_connector_async(connector_id) succeeded_at = self._parse_timestamp(connector_details["succeeded_at"]) @@ -122,6 +128,16 @@ async def get_sync_status_async(self, connector_id, previous_completed_at): sync_state = connector_details["status"]["sync_state"] self.log.info('Connector "%s": sync_state = "%s"', connector_id, sync_state) + # if sync in rescheduled start, wait for time recommended by Fivetran + # or manually specified, then restart sync + if sync_state == "rescheduled" and connector_details["schedule_type"] == "manual": + self.log.info('Connector is in "rescheduled" state and needs to be manually restarted') + self.pause_and_restart( + connector_id=connector_id, + reschedule_for=connector_details["status"]["rescheduled_for"], + reschedule_wait_time=reschedule_wait_time, + ) + # Check if sync started by airflow has finished # indicated by new 'succeeded_at' timestamp if current_completed_at > previous_completed_at: @@ -134,6 +150,52 @@ async def get_sync_status_async(self, connector_id, previous_completed_at): job_status = "pending" return job_status + def pause_and_restart(self, connector_id, reschedule_for, reschedule_wait_time): + """ + While a connector is syncing, if it falls into a reschedule state, + wait for a time either specified by the user of recommended by Fivetran, + Then restart a sync + + :param connector_id: Fivetran connector_id, found in connector settings + page in the Fivetran user interface. + :type connector_id: str + :param reschedule_for: From Fivetran API response, if schedule_type is manual, + then the connector expects triggering the event at the designated UTC time. + :type reschedule_for: str + :param reschedule_wait_time: Optional, if connector is in reset state, + number of seconds to wait before restarting the sync. + :type reschedule_wait_time: int + """ + if reschedule_wait_time: + log_statement = f'Starting connector again in "{reschedule_wait_time}" seconds' + self.log.info(log_statement) + time.sleep(reschedule_wait_time) + else: + wait_time = ( + self._parse_timestamp(reschedule_for).add(minutes=1) - pendulum.now(tz="UTC") + ).seconds + if wait_time < 0: + raise ValueError( + f"Reschedule time {wait_time} configured in " + f"Fivetran connector has elapsed. Sync connector manually." + ) + log_statement = f'Starting connector again in "{wait_time}" seconds' + self.log.info(log_statement) + time.sleep(wait_time) + + self.log.info("Restarting connector now") + return self.start_fivetran_sync(connector_id) + + def _parse_timestamp(self, api_time): + """ + Returns either the pendulum-parsed actual timestamp or a very out-of-date timestamp if not set. + + :param api_time: timestamp format as returned by the Fivetran API. + :type api_time: str + :rtype: Pendulum.DateTime + """ + return pendulum.parse(api_time) if api_time is not None else pendulum.from_timestamp(-1) + async def get_last_sync_async(self, connector_id, xcom=""): """ Get the last time Fivetran connector completed a sync. diff --git a/fivetran_provider_async/operators.py b/fivetran_provider_async/operators.py index 7f3e1fc..eecffd8 100644 --- a/fivetran_provider_async/operators.py +++ b/fivetran_provider_async/operators.py @@ -15,14 +15,52 @@ class FivetranOperatorAsync(FivetranOperator): the sync job to start. You can find `connector_id` in the Settings page of the connector you configured in the `Fivetran dashboard `_. - :param fivetran_conn_id: `Conn ID` of the Connection to be used to configure - the hook. - :param connector_id: ID of the Fivetran connector to sync, found on the - Connector settings page in the Fivetran Dashboard. - :param poll_frequency: Time in seconds that the job should wait in - between each tries + :param fivetran_conn_id: `Conn ID` of the Connection to be used to configure the hook. + :param fivetran_retry_limit: # of retries when encountering API errors + :param fivetran_retry_delay: Time to wait before retrying API request + :param run_name: Fivetran run name + :param timeout_seconds: Timeout in seconds + :param connector_id: ID of the Fivetran connector to sync, found on the Connector settings page. + :param schedule_type: schedule type. Default is "manual" which takes the connector off Fivetran schedule. + :param poll_frequency: Time in seconds that the job should wait in between each try. + :param reschedule_wait_time: Optional, if connector is in reset state, + number of seconds to wait before restarting the sync. """ + def __init__( + self, + connector_id: str, + run_name: Optional[str] = None, + timeout_seconds: Optional[int] = None, + fivetran_conn_id: str = "fivetran", + fivetran_retry_limit: int = 3, + fivetran_retry_delay: int = 1, + poll_frequency: int = 15, + schedule_type: str = "manual", + reschedule_wait_time: int = 0, + **kwargs, + ): + self.connector_id = connector_id + self.fivetran_conn_id = fivetran_conn_id + self.run_name = run_name + self.timeout_seconds = timeout_seconds + self.fivetran_retry_limit = fivetran_retry_limit + self.fivetran_retry_delay = fivetran_retry_delay + self.poll_frequency = poll_frequency + self.schedule_type = schedule_type + self.reschedule_wait_time = reschedule_wait_time + super().__init__( + connector_id=self.connector_id, + run_name=self.run_name, + timeout_seconds=self.timeout_seconds, + fivetran_conn_id=self.fivetran_conn_id, + fivetran_retry_limit=self.fivetran_retry_limit, + fivetran_retry_delay=self.fivetran_retry_delay, + poll_frequency=self.poll_frequency, + schedule_type=self.schedule_type, + **kwargs, + ) + def execute(self, context: Dict[str, Any]) -> None: """Start the sync using synchronous hook""" hook = self._get_hook() @@ -37,6 +75,7 @@ def execute(self, context: Dict[str, Any]) -> None: fivetran_conn_id=self.fivetran_conn_id, connector_id=self.connector_id, poke_interval=self.poll_frequency, + reschedule_wait_time=self.reschedule_wait_time, ), method_name="execute_complete", ) diff --git a/fivetran_provider_async/sensors.py b/fivetran_provider_async/sensors.py index c47e646..7585764 100644 --- a/fivetran_provider_async/sensors.py +++ b/fivetran_provider_async/sensors.py @@ -2,6 +2,7 @@ from airflow.exceptions import AirflowException from airflow.utils.context import Context +from airflow.utils.decorators import apply_defaults from fivetran_provider.sensors.fivetran import FivetranSensor from fivetran_provider_async.triggers import FivetranTrigger @@ -26,8 +27,41 @@ class FivetranSensorAsync(FivetranSensor): between each tries :param fivetran_retry_limit: # of retries when encountering API errors :param fivetran_retry_delay: Time to wait before retrying API request + :param reschedule_wait_time: Optional, if connector is in reset state + number of seconds to wait before restarting, else Fivetran suggestion used """ + @apply_defaults + def __init__( + self, + connector_id: str, + fivetran_conn_id: str = "fivetran", + poke_interval: int = 60, + fivetran_retry_limit: int = 3, + fivetran_retry_delay: int = 1, + xcom: str = "", + reschedule_wait_time: int = 0, + **kwargs: Any, + ) -> None: + self.fivetran_conn_id = fivetran_conn_id + self.connector_id = connector_id + self.poke_interval = poke_interval + self.previous_completed_at = None + self.fivetran_retry_limit = fivetran_retry_limit + self.fivetran_retry_delay = fivetran_retry_delay + self.hook = None + self.xcom = xcom + self.reschedule_wait_time = reschedule_wait_time + super().__init__( + connector_id=self.connector_id, + fivetran_conn_id=self.fivetran_conn_id, + poke_interval=self.poke_interval, + fivetran_retry_limit=self.fivetran_retry_limit, + fivetran_retry_delay=self.fivetran_retry_delay, + xcom=self.xcom, + **kwargs, + ) + def execute(self, context: Dict[str, Any]) -> None: """Check for the target_status and defers using the trigger""" self.defer( @@ -39,6 +73,7 @@ def execute(self, context: Dict[str, Any]) -> None: previous_completed_at=self.previous_completed_at, xcom=self.xcom, poke_interval=self.poke_interval, + reschedule_wait_time=self.reschedule_wait_time, ), method_name="execute_complete", ) diff --git a/fivetran_provider_async/triggers.py b/fivetran_provider_async/triggers.py index 23c4c64..7192f9c 100644 --- a/fivetran_provider_async/triggers.py +++ b/fivetran_provider_async/triggers.py @@ -21,6 +21,8 @@ class FivetranTrigger(BaseTrigger): :param xcom: If used, FivetranSensorAsync receives timestamp of previously completed sync :param poke_interval: polling period in seconds to check for the status + :param reschedule_wait_time: Optional, if connector is in reset state, + number of seconds to wait before restarting the sync. """ def __init__( @@ -31,6 +33,7 @@ def __init__( previous_completed_at: pendulum.DateTime | None = None, xcom: str = "", poke_interval: float = 4.0, + reschedule_wait_time: int = 0, ): super().__init__() self.task_id = task_id @@ -39,6 +42,7 @@ def __init__( self.previous_completed_at = previous_completed_at self.xcom = xcom self.poke_interval = poke_interval + self.reschedule_wait_time = reschedule_wait_time def serialize(self) -> Tuple[str, Dict[str, Any]]: """Serializes FivetranTrigger arguments and classpath.""" @@ -51,6 +55,7 @@ def serialize(self) -> Tuple[str, Dict[str, Any]]: "fivetran_conn_id": self.fivetran_conn_id, "previous_completed_at": self.previous_completed_at, "xcom": self.xcom, + "reschedule_wait_time": self.reschedule_wait_time, }, ) @@ -64,7 +69,9 @@ async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override] if self.previous_completed_at is None: self.previous_completed_at = await hook.get_last_sync_async(self.connector_id, self.xcom) while True: - res = await hook.get_sync_status_async(self.connector_id, self.previous_completed_at) + res = await hook.get_sync_status_async( + self.connector_id, self.previous_completed_at, self.reschedule_wait_time + ) if res == "success": self.previous_completed_at = await hook.get_last_sync_async(self.connector_id) msg = "Fivetran connector %s finished syncing at %s" % ( diff --git a/tests/common/static.py b/tests/common/static.py index 1c4820a..5766b5d 100644 --- a/tests/common/static.py +++ b/tests/common/static.py @@ -1,3 +1,5 @@ +import pendulum + LOGIN = "login" PASSWORD = "password" @@ -35,6 +37,76 @@ }, } +MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS_RESCHEDULE_MODE = { + "code": "Success", + "data": { + "id": "interchangeable_revenge", + "paused": False, + "group_id": "rarer_gradient", + "service": "google_sheets", + "service_version": 1, + "schema": "google_sheets.fivetran_google_sheets_spotify", + "connected_by": "mournful_shalt", + "created_at": "2021-03-05T22:58:56.238875Z", + "succeeded_at": "2021-03-23T20:55:12.670390Z", + "failed_at": "2021-03-22T20:55:12.670390Z", + "sync_frequency": 360, + "schedule_type": "manual", + "status": { + "setup_state": "connected", + "sync_state": "rescheduled", + "update_state": "on_schedule", + "is_historical_sync": False, + "tasks": [], + "warnings": [], + "rescheduled_for": "2021-03-05T22:59:56.238875Z", + }, + "config": { + "latest_version": "1", + "sheet_id": "https://docs.google.com/spreadsheets/d/.../edit#gid=...", + "named_range": "fivetran_test_range", + "authorization_method": "User OAuth", + "service_version": "1", + "last_synced_changes__utc_": "2021-03-23 20:54", + }, + }, +} + +MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS_WITH_RESCHEDULE_FOR = { + "code": "Success", + "data": { + "id": "interchangeable_revenge", + "paused": False, + "group_id": "rarer_gradient", + "service": "google_sheets", + "service_version": 1, + "schema": "google_sheets.fivetran_google_sheets_spotify", + "connected_by": "mournful_shalt", + "created_at": "2021-03-05T22:58:56.238875Z", + "succeeded_at": "2021-03-23T20:55:12.670390Z", + "failed_at": "2021-03-22T20:55:12.670390Z", + "sync_frequency": 360, + "schedule_type": "manual", + "status": { + "setup_state": "connected", + "sync_state": "rescheduled", + "update_state": "on_schedule", + "is_historical_sync": False, + "tasks": [], + "warnings": [], + "rescheduled_for": str(pendulum.now(tz="UTC").add(minutes=1)), + }, + "config": { + "latest_version": "1", + "sheet_id": "https://docs.google.com/spreadsheets/d/.../edit#gid=...", + "named_range": "fivetran_test_range", + "authorization_method": "User OAuth", + "service_version": "1", + "last_synced_changes__utc_": "2021-03-23 20:54", + }, + }, +} + MOCK_FIVETRAN_SCHEMA_RESPONSE_PAYLOAD_SHEETS = { "code": "Success", "data": { diff --git a/tests/hooks/test_fivetran.py b/tests/hooks/test_fivetran.py index 6a4ed8f..89a22d2 100644 --- a/tests/hooks/test_fivetran.py +++ b/tests/hooks/test_fivetran.py @@ -7,7 +7,13 @@ from airflow.exceptions import AirflowException from fivetran_provider_async.hooks import FivetranHookAsync -from tests.common.static import LOGIN, MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS, PASSWORD +from tests.common.static import ( + LOGIN, + MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS, + MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS_RESCHEDULE_MODE, + MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS_WITH_RESCHEDULE_FOR, + PASSWORD, +) @pytest.mark.asyncio @@ -54,11 +60,109 @@ async def test_fivetran_hook_get_sync_status_async( hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") mock_api_call_async_response.return_value = MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS result = await hook.get_sync_status_async( - connector_id="interchangeable_revenge", previous_completed_at=mock_previous_completed_at + connector_id="interchangeable_revenge", + previous_completed_at=mock_previous_completed_at, + reschedule_wait_time=60, ) assert result == expected_result +@pytest.mark.asyncio +@pytest.mark.parametrize( + "mock_previous_completed_at, expected_result", + [ + ( + pendulum.datetime(2021, 3, 23), # current_completed_at > previous_completed_at + "success", + ), + ( + pendulum.datetime(2021, 3, 23, 21, 55), # current_completed_at < previous_completed_at + "pending", + ), + ], +) +@mock.patch("fivetran_provider_async.hooks.FivetranHookAsync._do_api_call_async") +async def test_fivetran_hook_get_sync_status_async_with_reschedule_mode_error_for_wait_time( + mock_api_call_async_response, mock_previous_completed_at, expected_result +): + """Tests that get_sync_status_async method return error with rescheduled_for in Fivetran API response + along with schedule_type as manual and negative wait time.""" + hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") + mock_api_call_async_response.return_value = MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS_RESCHEDULE_MODE + with pytest.raises(ValueError, match="Sync connector manually."): + await hook.get_sync_status_async( + connector_id="interchangeable_revenge", + previous_completed_at=mock_previous_completed_at, + ) + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "mock_previous_completed_at, expected_result", + [ + ( + pendulum.datetime(2021, 3, 23), # current_completed_at > previous_completed_at + "success", + ), + ( + pendulum.datetime(2021, 3, 23, 21, 55), # current_completed_at < previous_completed_at + "pending", + ), + ], +) +@mock.patch("fivetran_provider_async.hooks.FivetranHookAsync._do_api_call_async") +@mock.patch("fivetran_provider_async.hooks.FivetranHookAsync.start_fivetran_sync") +async def test_fivetran_hook_get_sync_status_async_with_reschedule_mode( + mock_start_fivetran_sync, mock_api_call_async_response, mock_previous_completed_at, expected_result +): + """Tests that get_sync_status_async method return success or pending depending on whether + current_completed_at > previous_completed_at with reschedule_time specified by user and + schedule_type as manual in API response.""" + mock_start_fivetran_sync.return_value = pendulum.datetime(2021, 3, 21, 21, 55) + hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") + mock_api_call_async_response.return_value = MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS_RESCHEDULE_MODE + result = await hook.get_sync_status_async( + connector_id="interchangeable_revenge", + previous_completed_at=mock_previous_completed_at, + reschedule_wait_time=10, + ) + + assert result == expected_result + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "mock_previous_completed_at, expected_result", + [ + ( + pendulum.datetime(2021, 3, 23), # current_completed_at > previous_completed_at + "success", + ), + ( + pendulum.datetime(2021, 3, 23, 21, 55), # current_completed_at < previous_completed_at + "pending", + ), + ], +) +@mock.patch("fivetran_provider_async.hooks.FivetranHookAsync._do_api_call_async") +@mock.patch("fivetran_provider_async.hooks.FivetranHookAsync.start_fivetran_sync") +async def test_fivetran_hook_get_sync_status_async_with_reschedule_for_and_schedule_type_manual( + mock_start_fivetran_sync, mock_api_call_async_response, mock_previous_completed_at, expected_result +): + """Tests that get_sync_status_async method return success or pending depending on whether + current_completed_at > previous_completed_at with reschedule_for in Fivetran API response + along with schedule_type as manual.""" + mock_start_fivetran_sync.return_value = pendulum.datetime(2021, 3, 21, 21, 55) + hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") + mock_api_call_async_response.return_value = MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS_WITH_RESCHEDULE_FOR + result = await hook.get_sync_status_async( + connector_id="interchangeable_revenge", + previous_completed_at=mock_previous_completed_at, + ) + + assert result == expected_result + + @pytest.mark.asyncio @mock.patch("fivetran_provider_async.hooks.FivetranHookAsync._do_api_call_async") async def test_fivetran_hook_get_sync_status_async_exception(mock_api_call_async_response): @@ -74,6 +178,21 @@ async def test_fivetran_hook_get_sync_status_async_exception(mock_api_call_async assert "Fivetran sync for connector interchangeable_revenge failed" in str(exc.value) +@pytest.mark.asyncio +@mock.patch("fivetran_provider_async.hooks.FivetranHookAsync.start_fivetran_sync") +@mock.patch("fivetran_provider_async.hooks.FivetranHookAsync._do_api_call_async") +async def test_fivetran_hook_pause_and_restart(mock_api_call_async_response, mock_start_fivetran_sync): + """Tests that pause_and_restart method for manual mode with reschedule time set.""" + hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") + mock_start_fivetran_sync.return_value = True + mock_api_call_async_response.return_value = MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS + + result = hook.pause_and_restart( + connector_id="interchangeable_revenge", reschedule_for="manual", reschedule_wait_time=60 + ) + assert result is True + + @pytest.mark.asyncio @mock.patch("fivetran_provider_async.hooks.FivetranHookAsync._do_api_call_async") async def test_fivetran_hook_get_last_sync_async_no_xcom(mock_api_call_async_response): diff --git a/tests/operators/test_fivetran.py b/tests/operators/test_fivetran.py index 941f1c2..537c456 100644 --- a/tests/operators/test_fivetran.py +++ b/tests/operators/test_fivetran.py @@ -48,6 +48,29 @@ def test_fivetran_op_async_execute_success(self, m): with pytest.raises(TaskDeferred): task.execute(context) + @requests_mock.mock() + def test_fivetran_op_async_execute_success_reschedule_wait_time_and_manual_mode(self, m): + """Tests that task gets deferred after job submission with reschedule wait time and manual mode.""" + m.get( + "https://api.fivetran.com/v1/connectors/interchangeable_revenge", + json=MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS, + ) + + m.post( + "https://api.fivetran.com/v1/connectors/interchangeable_revenge/force", + json=MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS, + ) + + task = FivetranOperatorAsync( + task_id="fivetran_op_async", + fivetran_conn_id="conn_fivetran", + connector_id="interchangeable_revenge", + reschedule_wait_time=60, + schedule_type="manual", + ) + with pytest.raises(TaskDeferred): + task.execute(context) + def test_fivetran_op_async_execute_complete_error(self): """Tests that execute_complete method raises exception in case of error""" task = FivetranOperatorAsync( diff --git a/tests/sensors/test_fivetran.py b/tests/sensors/test_fivetran.py index a293f7c..64e14e2 100644 --- a/tests/sensors/test_fivetran.py +++ b/tests/sensors/test_fivetran.py @@ -33,6 +33,21 @@ def test_fivetran_sensor_async(): assert isinstance(exc.value.trigger, FivetranTrigger), "Trigger is not a FivetranTrigger" +def test_fivetran_sensor_async_with_response_wait_time(): + """Asserts that a task is deferred and a FivetranTrigger will be fired + when the FivetranSensorAsync is executed when reschedule_wait_time is specified.""" + task = FivetranSensorAsync( + task_id=TASK_ID, + fivetran_conn_id="fivetran_default", + connector_id="test_connector", + poke_interval=5, + reschedule_wait_time=60, + ) + with pytest.raises(TaskDeferred) as exc: + task.execute(context) + assert isinstance(exc.value.trigger, FivetranTrigger), "Trigger is not a FivetranTrigger" + + def test_fivetran_sensor_async_execute_failure(context): """Tests that an AirflowException is raised in case of error event""" task = FivetranSensorAsync( diff --git a/tests/triggers/test_fivetran.py b/tests/triggers/test_fivetran.py index b298db5..1b33825 100644 --- a/tests/triggers/test_fivetran.py +++ b/tests/triggers/test_fivetran.py @@ -68,6 +68,7 @@ def test_fivetran_trigger_serialization(): "previous_completed_at": PREV_COMPLETED_AT, "xcom": "", "task_id": "fivetran_sync_task", + "reschedule_wait_time": 0, }