From 605b6f567e9f0f41b52086c8197332caf6a29b54 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Thu, 27 Jul 2023 17:01:01 +0530 Subject: [PATCH 1/5] Merge sync and async operator --- README.md | 18 +-- .../example_dags/example_fivetran.py | 2 + .../example_dags/example_fivetran_async.py | 9 +- .../example_dags/example_fivetran_bigquery.py | 2 + .../example_dags/example_fivetran_bqml.py | 4 + .../example_dags/example_fivetran_dbt.py | 4 + .../example_dags/example_fivetran_xcom.py | 2 + fivetran_provider_async/operators.py | 138 ++++++----------- fivetran_provider_async/sensors.py | 142 ++++++------------ tests/operators/test_fivetran.py | 50 +----- tests/sensors/test_fivetran.py | 36 +---- 11 files changed, 135 insertions(+), 272 deletions(-) diff --git a/README.md b/README.md index ca4a565..af315f0 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ # Fivetran Async Provider for Apache Airflow This package provides an async operator, sensor and hook that integrates [Fivetran](https://fivetran.com) into Apache Airflow. -`FivetranSensorAsync` allows you to monitor a Fivetran sync job for completion before running downstream processes. -`FivetranOperatorAsync` submits a Fivetran sync job and polls for its status on the triggerer. +`FivetranSensor` allows you to monitor a Fivetran sync job for completion before running downstream processes. +`FivetranOperator` submits a Fivetran sync job and polls for its status on the triggerer. Since an async sensor or operator frees up worker slot while polling is happening on the triggerer, they consume less resources when compared to traditional "sync" sensors and operators. @@ -33,30 +33,30 @@ The sensor assumes the `Conn Id` is set to `fivetran`, however if you are managi ### [Fivetran Operator Async](https://github.com/astronomer/airflow-provider-fivetran-async/tree/main/fivetran_provider_async/operators.py) -`FivetranOperatorAsync` submits a Fivetran sync job and monitors it on trigger for completion. +`FivetranOperator` submits a Fivetran sync job and monitors it on trigger for completion. It requires that you specify the `connector_id` of the sync job to start. You can find `connector_id` in the Settings page of the connector you configured in the [Fivetran dashboard](https://fivetran.com/dashboard/connectors). Import into your DAG via: ```python -from fivetran_provider_async.operators import FivetranOperatorAsync +from fivetran_provider_async.operators import FivetranOperator ``` ### [Fivetran Sensor Async](https://github.com/astronomer/airflow-provider-fivetran-async/tree/main/fivetran_provider_async/sensors.py) -`FivetranSensorAsync` monitors a Fivetran sync job for completion. -Monitoring with `FivetranSensorAsync` allows you to trigger downstream processes only when the Fivetran sync jobs have completed, ensuring data consistency. +`FivetranSensor` monitors a Fivetran sync job for completion. +Monitoring with `FivetranSensor` allows you to trigger downstream processes only when the Fivetran sync jobs have completed, ensuring data consistency. -You can use multiple instances of `FivetranSensorAsync` to monitor multiple Fivetran connectors. +You can use multiple instances of `FivetranSensor` to monitor multiple Fivetran connectors. If used in this way, -`FivetranSensorAsync` requires that you specify the `connector_id` of the sync job to start. You can find `connector_id` in the Settings page of the connector you configured in the [Fivetran dashboard](https://fivetran.com/dashboard/connectors). +`FivetranSensor` requires that you specify the `connector_id` of the sync job to start. You can find `connector_id` in the Settings page of the connector you configured in the [Fivetran dashboard](https://fivetran.com/dashboard/connectors). Import into your DAG via: ```python -from fivetran_provider_async.sensors import FivetranSensorAsync +from fivetran_provider_async.sensors import FivetranSensor ``` ## Examples diff --git a/fivetran_provider_async/example_dags/example_fivetran.py b/fivetran_provider_async/example_dags/example_fivetran.py index 6c1c8cc..c5dde05 100644 --- a/fivetran_provider_async/example_dags/example_fivetran.py +++ b/fivetran_provider_async/example_dags/example_fivetran.py @@ -22,6 +22,7 @@ task_id="fivetran-task", fivetran_conn_id="fivetran_default", connector_id="{{ var.value.connector_id }}", + deferrable=False, ) fivetran_sync_wait = FivetranSensor( @@ -29,6 +30,7 @@ fivetran_conn_id="fivetran_default", connector_id="{{ var.value.connector_id }}", poke_interval=5, + deferrable=False, ) fivetran_sync_start >> fivetran_sync_wait diff --git a/fivetran_provider_async/example_dags/example_fivetran_async.py b/fivetran_provider_async/example_dags/example_fivetran_async.py index 63a0c8a..4f12b75 100644 --- a/fivetran_provider_async/example_dags/example_fivetran_async.py +++ b/fivetran_provider_async/example_dags/example_fivetran_async.py @@ -2,8 +2,8 @@ from airflow import DAG -from fivetran_provider_async.operators import FivetranOperator, FivetranOperatorAsync -from fivetran_provider_async.sensors import FivetranSensorAsync +from fivetran_provider_async.operators import FivetranOperator +from fivetran_provider_async.sensors import FivetranSensor default_args = { "owner": "Airflow", @@ -19,7 +19,7 @@ ) with dag: - fivetran_async_op = FivetranOperatorAsync( + fivetran_async_op = FivetranOperator( task_id="fivetran_async_op", connector_id="bronzing_largely", ) @@ -27,9 +27,10 @@ fivetran_sync_op = FivetranOperator( task_id="fivetran_sync_op", connector_id="bronzing_largely", + deferrable=False, ) - fivetran_async_sensor = FivetranSensorAsync( + fivetran_async_sensor = FivetranSensor( task_id="fivetran_async_sensor", connector_id="bronzing_largely", poke_interval=5, diff --git a/fivetran_provider_async/example_dags/example_fivetran_bigquery.py b/fivetran_provider_async/example_dags/example_fivetran_bigquery.py index 3c7e601..269741e 100644 --- a/fivetran_provider_async/example_dags/example_fivetran_bigquery.py +++ b/fivetran_provider_async/example_dags/example_fivetran_bigquery.py @@ -49,6 +49,7 @@ task_id="fivetran-task", fivetran_conn_id="fivetran_default", connector_id="{{ var.value.connector_id }}", + deferrable=False, ) fivetran_sync_wait = FivetranSensor( @@ -56,6 +57,7 @@ fivetran_conn_id="fivetran_default", connector_id="{{ var.value.connector_id }}", poke_interval=5, + deferrable=False, ) """ diff --git a/fivetran_provider_async/example_dags/example_fivetran_bqml.py b/fivetran_provider_async/example_dags/example_fivetran_bqml.py index 1ba602c..5a3e126 100644 --- a/fivetran_provider_async/example_dags/example_fivetran_bqml.py +++ b/fivetran_provider_async/example_dags/example_fivetran_bqml.py @@ -72,6 +72,7 @@ def ml_branch(ds, **kwargs): task_id="linkedin-sync", fivetran_conn_id="fivetran_default", connector_id="{{ var.value.linkedin_connector_id }}", + deferrable=False, ) linkedin_sensor = FivetranSensor( @@ -79,12 +80,14 @@ def ml_branch(ds, **kwargs): fivetran_conn_id="fivetran_default", connector_id="{{ var.value.linkedin_connector_id }}", poke_interval=5, + deferrable=False, ) twitter_sync = FivetranOperator( task_id="twitter-sync", fivetran_conn_id="fivetran_default", connector_id="{{ var.value.twitter_connector_id }}", + deferrable=False, ) twitter_sensor = FivetranSensor( @@ -92,6 +95,7 @@ def ml_branch(ds, **kwargs): fivetran_conn_id="fivetran_default", connector_id="{{ var.value.twitter_connector_id }}", poke_interval=5, + deferrable=False, ) dbt_run = SSHOperator( diff --git a/fivetran_provider_async/example_dags/example_fivetran_dbt.py b/fivetran_provider_async/example_dags/example_fivetran_dbt.py index 4933b17..577c4e7 100644 --- a/fivetran_provider_async/example_dags/example_fivetran_dbt.py +++ b/fivetran_provider_async/example_dags/example_fivetran_dbt.py @@ -20,23 +20,27 @@ linkedin_sync = FivetranOperator( task_id="linkedin-ads-sync", connector_id="{{ var.value.linkedin_connector_id }}", + deferrable=False, ) linkedin_sensor = FivetranSensor( task_id="linkedin-sensor", connector_id="{{ var.value.linkedin_connector_id }}", poke_interval=600, + deferrable=False, ) twitter_sync = FivetranOperator( task_id="twitter-ads-sync", connector_id="{{ var.value.twitter_connector_id }}", + deferrable=False, ) twitter_sensor = FivetranSensor( task_id="twitter-sensor", connector_id="{{ var.value.twitter_connector_id }}", poke_interval=600, + deferrable=False, ) dbt_run = SSHOperator( diff --git a/fivetran_provider_async/example_dags/example_fivetran_xcom.py b/fivetran_provider_async/example_dags/example_fivetran_xcom.py index 4946e82..32798af 100644 --- a/fivetran_provider_async/example_dags/example_fivetran_xcom.py +++ b/fivetran_provider_async/example_dags/example_fivetran_xcom.py @@ -25,6 +25,7 @@ task_id="fivetran-operator", fivetran_conn_id="fivetran_default", connector_id="{{ var.value.connector_id }}", + deferrable=False, ) delay_task = PythonOperator(task_id="delay_python_task", python_callable=lambda: time.sleep(60)) @@ -35,6 +36,7 @@ connector_id="{{ var.value.connector_id }}", poke_interval=5, xcom="{{ task_instance.xcom_pull('fivetran-operator', key='return_value') }}", + deferrable=False, ) fivetran_operator >> delay_task >> fivetran_sensor diff --git a/fivetran_provider_async/operators.py b/fivetran_provider_async/operators.py index 21c51d5..980c9da 100644 --- a/fivetran_provider_async/operators.py +++ b/fivetran_provider_async/operators.py @@ -1,9 +1,10 @@ +from __future__ import annotations + from typing import Any, Dict, Optional from airflow.exceptions import AirflowException from airflow.models import BaseOperator, BaseOperatorLink from airflow.utils.context import Context -from airflow.utils.decorators import apply_defaults from fivetran_provider_async.hooks import FivetranHook from fivetran_provider_async.triggers import FivetranTrigger @@ -23,71 +24,6 @@ def get_link(self, operator, dttm): class FivetranOperator(BaseOperator): - """ - `FivetranOperator` starts a Fivetran sync job. - - `FivetranOperator` requires that you specify the `connector_id` of the sync job to - start. You can find `connector_id` in the Settings page of the connector you - configured in the `Fivetran dashboard `_. - Note that when a Fivetran sync job is controlled via an Operator, it is no longer - run on the schedule as managed by Fivetran. In other words, it is now scheduled only - from Airflow. This can be changed with the schedule_type parameter. - - :param fivetran_conn_id: `Conn ID` of the Connection to be used to configure - the hook. - :type fivetran_conn_id: Optional[str] - :param fivetran_retry_limit: # of retries when encountering API errors - :type fivetran_retry_limit: Optional[int] - :param fivetran_retry_delay: Time to wait before retrying API request - :type fivetran_retry_delay: int - :param connector_id: ID of the Fivetran connector to sync, found on the - Connector settings page. - :type connector_id: str - :param schedule_type: schedule type. Default is "manual" which takes the connector off Fivetran schedule. - Set to "auto" to keep connector on Fivetran schedule. - :type schedule_type: str - """ - - operator_extra_links = (RegistryLink(),) - - # Define which fields get jinjaified - template_fields = ["connector_id"] - - @apply_defaults - def __init__( - self, - connector_id: str, - run_name: Optional[str] = None, - timeout_seconds: Optional[int] = None, - fivetran_conn_id: str = "fivetran", - fivetran_retry_limit: int = 3, - fivetran_retry_delay: int = 1, - poll_frequency: int = 15, - schedule_type: str = "manual", - **kwargs, - ): - super().__init__(**kwargs) - self.fivetran_conn_id = fivetran_conn_id - self.fivetran_retry_limit = fivetran_retry_limit - self.fivetran_retry_delay = fivetran_retry_delay - self.connector_id = connector_id - self.poll_frequency = poll_frequency - self.schedule_type = schedule_type - - def _get_hook(self) -> FivetranHook: - return FivetranHook( - self.fivetran_conn_id, - retry_limit=self.fivetran_retry_limit, - retry_delay=self.fivetran_retry_delay, - ) - - def execute(self, context): - hook = self._get_hook() - hook.prep_connector(self.connector_id, self.schedule_type) - return hook.start_fivetran_sync(self.connector_id) - - -class FivetranOperatorAsync(FivetranOperator): """ `FivetranOperatorAsync` submits a Fivetran sync job , and polls for its status on the airflow trigger.`FivetranOperatorAsync` requires that you specify the `connector_id` of @@ -104,8 +40,13 @@ class FivetranOperatorAsync(FivetranOperator): :param poll_frequency: Time in seconds that the job should wait in between each try. :param reschedule_wait_time: Optional, if connector is in reset state, number of seconds to wait before restarting the sync. + :param deferrable: Run operator in deferrable mode """ + operator_extra_links = (RegistryLink(),) + + template_fields = ["connector_id"] + def __init__( self, connector_id: str, @@ -117,6 +58,7 @@ def __init__( poll_frequency: int = 15, schedule_type: str = "manual", reschedule_wait_time: int = 0, + deferrable: bool = True, **kwargs, ): self.connector_id = connector_id @@ -128,35 +70,36 @@ def __init__( self.poll_frequency = poll_frequency self.schedule_type = schedule_type self.reschedule_wait_time = reschedule_wait_time - super().__init__( - connector_id=self.connector_id, - run_name=self.run_name, - timeout_seconds=self.timeout_seconds, - fivetran_conn_id=self.fivetran_conn_id, - fivetran_retry_limit=self.fivetran_retry_limit, - fivetran_retry_delay=self.fivetran_retry_delay, - poll_frequency=self.poll_frequency, - schedule_type=self.schedule_type, - **kwargs, - ) + self.deferrable = deferrable + super().__init__(**kwargs) - def execute(self, context: Dict[str, Any]) -> None: + def execute(self, context: Dict[str, Any]) -> None | str: """Start the sync using synchronous hook""" hook = self._get_hook() hook.prep_connector(self.connector_id, self.schedule_type) - hook.start_fivetran_sync(self.connector_id) - - # Defer and poll the sync status on the Triggerer - self.defer( - timeout=self.execution_timeout, - trigger=FivetranTrigger( - task_id=self.task_id, - fivetran_conn_id=self.fivetran_conn_id, - connector_id=self.connector_id, - poke_interval=self.poll_frequency, - reschedule_wait_time=self.reschedule_wait_time, - ), - method_name="execute_complete", + last_sync = hook.start_fivetran_sync(self.connector_id) + + if not self.deferrable: + return last_sync + else: + # Defer and poll the sync status on the Triggerer + self.defer( + timeout=self.execution_timeout, + trigger=FivetranTrigger( + task_id=self.task_id, + fivetran_conn_id=self.fivetran_conn_id, + connector_id=self.connector_id, + poke_interval=self.poll_frequency, + reschedule_wait_time=self.reschedule_wait_time, + ), + method_name="execute_complete", + ) + + def _get_hook(self) -> FivetranHook: + return FivetranHook( + self.fivetran_conn_id, + retry_limit=self.fivetran_retry_limit, + retry_delay=self.fivetran_retry_delay, ) def execute_complete(self, context: "Context", event: Optional[Dict[Any, Any]] = None) -> None: @@ -242,3 +185,16 @@ def get_openlineage_facets_on_start(self): def get_openlineage_facets_on_complete(self, task_instance): return self.get_openlineage_facets_on_start() + + +class FivetranOperatorAsync(FivetranOperator): + def __init__(self, *args, **kwargs): + import warnings + + super().__init__(*args, **kwargs) + + warnings.warn( + "FivetranOperatorAsync has been deprecated. Please use `FivetranOperator`.", + DeprecationWarning, + stacklevel=2, + ) diff --git a/fivetran_provider_async/sensors.py b/fivetran_provider_async/sensors.py index fe11876..829bc59 100644 --- a/fivetran_provider_async/sensors.py +++ b/fivetran_provider_async/sensors.py @@ -3,7 +3,6 @@ from airflow.exceptions import AirflowException from airflow.sensors.base import BaseSensorOperator from airflow.utils.context import Context -from airflow.utils.decorators import apply_defaults from fivetran_provider_async.hooks import FivetranHook from fivetran_provider_async.triggers import FivetranTrigger @@ -11,45 +10,30 @@ class FivetranSensor(BaseSensorOperator): """ - `FivetranSensor` monitors a Fivetran sync job for completion. - - Monitoring with `FivetranSensor` allows you to trigger downstream processes only + `FivetranSensorAsync` asynchronously monitors a Fivetran sync job for completion. + Monitoring with `FivetranSensorAsync` allows you to trigger downstream processes only when the Fivetran sync jobs have completed, ensuring data consistency. You can - use multiple instances of `FivetranSensor` to monitor multiple Fivetran - connectors. Note, it is possible to monitor a sync that is scheduled and managed - from Fivetran; in other words, you can use `FivetranSensor` without using - `FivetranOperator`. If used in this way, your DAG will wait until the sync job - starts on its Fivetran-controlled schedule and then completes. `FivetranSensor` - requires that you specify the `connector_id` of the sync job to start. You can - find `connector_id` in the Settings page of the connector you configured in the + use multiple instances of `FivetranSensorAsync` to monitor multiple Fivetran + connectors. `FivetranSensorAsync` requires that you specify the `connector_id` of the sync + job to start. You can find `connector_id` in the Settings page of the connector you configured in the `Fivetran dashboard `_. :param fivetran_conn_id: `Conn ID` of the Connection to be used to configure the hook. - :type fivetran_conn_id: str :param connector_id: ID of the Fivetran connector to sync, found on the Connector settings page in the Fivetran Dashboard. - :type connector_id: str :param poke_interval: Time in seconds that the job should wait in between each tries - :type poke_interval: int :param fivetran_retry_limit: # of retries when encountering API errors - :type fivetran_retry_limit: Optional[int] :param fivetran_retry_delay: Time to wait before retrying API request - :type fivetran_retry_delay: int - :param xcom: If used, FivetranSensor receives timestamp of previously - completed sync from FivetranOperator via XCOM - :type xcom: str - :param reschedule_time: Optional, if connector is in reset state + :param reschedule_wait_time: Optional, if connector is in reset state number of seconds to wait before restarting, else Fivetran suggestion used - :type reschedule_time: int + :param deferrable: Run sensor in deferrable mode """ - # Define which fields get jinjaified template_fields = ["connector_id", "xcom"] - @apply_defaults def __init__( self, connector_id: str, @@ -58,10 +42,11 @@ def __init__( fivetran_retry_limit: int = 3, fivetran_retry_delay: int = 1, xcom: str = "", + reschedule_wait_time: int = 0, reschedule_time: int = 0, + deferrable: bool = True, **kwargs: Any, ) -> None: - super().__init__(**kwargs) self.fivetran_conn_id = fivetran_conn_id self.connector_id = connector_id self.poke_interval = poke_interval @@ -70,7 +55,29 @@ def __init__( self.fivetran_retry_delay = fivetran_retry_delay self.hook = None self.xcom = xcom + self.reschedule_wait_time = reschedule_wait_time self.reschedule_time = reschedule_time + self.deferrable = deferrable + super().__init__(**kwargs) + + def execute(self, context: Dict[str, Any]) -> None: + """Check for the target_status and defers using the trigger""" + if not self.deferrable: + super().execute(context=context) + else: + self.defer( + timeout=self.execution_timeout, + trigger=FivetranTrigger( + task_id=self.task_id, + fivetran_conn_id=self.fivetran_conn_id, + connector_id=self.connector_id, + previous_completed_at=self.previous_completed_at, + xcom=self.xcom, + poke_interval=self.poke_interval, + reschedule_wait_time=self.reschedule_wait_time, + ), + method_name="execute_complete", + ) def _get_hook(self) -> FivetranHook: if self.hook is None: @@ -87,77 +94,6 @@ def poke(self, context): self.previous_completed_at = hook.get_last_sync(self.connector_id, self.xcom) return hook.get_sync_status(self.connector_id, self.previous_completed_at, self.reschedule_time) - -class FivetranSensorAsync(FivetranSensor): - """ - `FivetranSensorAsync` asynchronously monitors a Fivetran sync job for completion. - Monitoring with `FivetranSensorAsync` allows you to trigger downstream processes only - when the Fivetran sync jobs have completed, ensuring data consistency. You can - use multiple instances of `FivetranSensorAsync` to monitor multiple Fivetran - connectors. `FivetranSensorAsync` requires that you specify the `connector_id` of the sync - job to start. You can find `connector_id` in the Settings page of the connector you configured in the - `Fivetran dashboard `_. - - - :param fivetran_conn_id: `Conn ID` of the Connection to be used to configure - the hook. - :param connector_id: ID of the Fivetran connector to sync, found on the - Connector settings page in the Fivetran Dashboard. - :param poke_interval: Time in seconds that the job should wait in - between each tries - :param fivetran_retry_limit: # of retries when encountering API errors - :param fivetran_retry_delay: Time to wait before retrying API request - :param reschedule_wait_time: Optional, if connector is in reset state - number of seconds to wait before restarting, else Fivetran suggestion used - """ - - @apply_defaults - def __init__( - self, - connector_id: str, - fivetran_conn_id: str = "fivetran", - poke_interval: int = 60, - fivetran_retry_limit: int = 3, - fivetran_retry_delay: int = 1, - xcom: str = "", - reschedule_wait_time: int = 0, - **kwargs: Any, - ) -> None: - self.fivetran_conn_id = fivetran_conn_id - self.connector_id = connector_id - self.poke_interval = poke_interval - self.previous_completed_at = None - self.fivetran_retry_limit = fivetran_retry_limit - self.fivetran_retry_delay = fivetran_retry_delay - self.hook = None - self.xcom = xcom - self.reschedule_wait_time = reschedule_wait_time - super().__init__( - connector_id=self.connector_id, - fivetran_conn_id=self.fivetran_conn_id, - poke_interval=self.poke_interval, - fivetran_retry_limit=self.fivetran_retry_limit, - fivetran_retry_delay=self.fivetran_retry_delay, - xcom=self.xcom, - **kwargs, - ) - - def execute(self, context: Dict[str, Any]) -> None: - """Check for the target_status and defers using the trigger""" - self.defer( - timeout=self.execution_timeout, - trigger=FivetranTrigger( - task_id=self.task_id, - fivetran_conn_id=self.fivetran_conn_id, - connector_id=self.connector_id, - previous_completed_at=self.previous_completed_at, - xcom=self.xcom, - poke_interval=self.poke_interval, - reschedule_wait_time=self.reschedule_wait_time, - ), - method_name="execute_complete", - ) - def execute_complete(self, context: "Context", event: Optional[Dict[Any, Any]] = None) -> None: """ Callback for when the trigger fires - returns immediately. @@ -172,3 +108,19 @@ def execute_complete(self, context: "Context", event: Optional[Dict[Any, Any]] = self.log.info( event["message"], ) + + +class FivetranSensorAsync(FivetranSensor): + # Define which fields get jinjaified + template_fields = ["connector_id", "xcom"] + + def __init__(self, *args, **kwargs: Any) -> None: + import warnings + + super().__init__(*args, **kwargs) + + warnings.warn( + "FivetranSensorAsync has been deprecated. Please use `FivetranSensor`.", + DeprecationWarning, + stacklevel=2, + ) diff --git a/tests/operators/test_fivetran.py b/tests/operators/test_fivetran.py index ddba1f7..46dedb3 100644 --- a/tests/operators/test_fivetran.py +++ b/tests/operators/test_fivetran.py @@ -6,7 +6,7 @@ import requests_mock from airflow.exceptions import AirflowException, TaskDeferred -from fivetran_provider_async.operators import FivetranOperator, FivetranOperatorAsync +from fivetran_provider_async.operators import FivetranOperator from tests.common.static import ( MOCK_FIVETRAN_DESTINATIONS_RESPONSE_PAYLOAD_SHEETS, MOCK_FIVETRAN_GROUPS_RESPONSE_PAYLOAD_SHEETS, @@ -64,7 +64,7 @@ def context(): @mock.patch.dict("os.environ", AIRFLOW_CONN_CONN_FIVETRAN="http://API_KEY:API_SECRET@") -class TestFivetranOperatorAsync(unittest.TestCase): +class TestFivetranOperator(unittest.TestCase): @requests_mock.mock() def test_fivetran_op_async_execute_success(self, m): """Tests that task gets deferred after job submission""" @@ -78,7 +78,7 @@ def test_fivetran_op_async_execute_success(self, m): json=MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS, ) - task = FivetranOperatorAsync( + task = FivetranOperator( task_id="fivetran_op_async", fivetran_conn_id="conn_fivetran", connector_id="interchangeable_revenge", @@ -99,7 +99,7 @@ def test_fivetran_op_async_execute_success_reschedule_wait_time_and_manual_mode( json=MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS, ) - task = FivetranOperatorAsync( + task = FivetranOperator( task_id="fivetran_op_async", fivetran_conn_id="conn_fivetran", connector_id="interchangeable_revenge", @@ -111,7 +111,7 @@ def test_fivetran_op_async_execute_success_reschedule_wait_time_and_manual_mode( def test_fivetran_op_async_execute_complete_error(self): """Tests that execute_complete method raises exception in case of error""" - task = FivetranOperatorAsync( + task = FivetranOperator( task_id="fivetran_op_async", fivetran_conn_id="conn_fivetran", connector_id="interchangeable_revenge", @@ -123,7 +123,7 @@ def test_fivetran_op_async_execute_complete_error(self): def test_fivetran_op_async_execute_complete_success(self): """Tests that execute_complete method returns expected result and that it prints expected log""" - task = FivetranOperatorAsync( + task = FivetranOperator( task_id="fivetran_op_async", fivetran_conn_id="conn_fivetran", connector_id="interchangeable_revenge", @@ -172,7 +172,7 @@ def test_fivetran_operator_get_openlineage_facets_on_start(self, m): json=MOCK_FIVETRAN_GROUPS_RESPONSE_PAYLOAD_SHEETS, ) - operator = FivetranOperatorAsync( + operator = FivetranOperator( task_id="fivetran-task", fivetran_conn_id="conn_fivetran", connector_id="interchangeable_revenge", @@ -185,39 +185,3 @@ def test_fivetran_operator_get_openlineage_facets_on_start(self, m): assert schema_field.name == "column_1_dest" assert schema_field.type == "VARCHAR(256)" assert schema_field.description is None - - -# Mock the `conn_fivetran` Airflow connection (note the `@` after `API_SECRET`) -@mock.patch.dict("os.environ", AIRFLOW_CONN_CONN_FIVETRAN="http://API_KEY:API_SECRET@") -class TestFivetranOperator(unittest.TestCase): - """ - Test functions for Fivetran Operator. - - Mocks responses from Fivetran API. - """ - - @requests_mock.mock() - def test_fivetran_operator(self, m): - m.get( - "https://api.fivetran.com/v1/connectors/interchangeable_revenge", - json=MOCK_FIVETRAN_RESPONSE_PAYLOAD, - ) - m.patch( - "https://api.fivetran.com/v1/connectors/interchangeable_revenge", - json=MOCK_FIVETRAN_RESPONSE_PAYLOAD, - ) - m.post( - "https://api.fivetran.com/v1/connectors/interchangeable_revenge/force", - json=MOCK_FIVETRAN_RESPONSE_PAYLOAD, - ) - - operator = FivetranOperator( - task_id="fivetran-task", - fivetran_conn_id="conn_fivetran", - connector_id="interchangeable_revenge", - ) - - result = operator.execute({}) - log.info(result) - - assert result is not None diff --git a/tests/sensors/test_fivetran.py b/tests/sensors/test_fivetran.py index 74079e8..998f639 100644 --- a/tests/sensors/test_fivetran.py +++ b/tests/sensors/test_fivetran.py @@ -1,12 +1,10 @@ import logging -import unittest from unittest import mock import pytest -import requests_mock from airflow.exceptions import AirflowException, TaskDeferred -from fivetran_provider_async.sensors import FivetranSensor, FivetranSensorAsync +from fivetran_provider_async.sensors import FivetranSensor from fivetran_provider_async.triggers import FivetranTrigger TASK_ID = "fivetran_sensor_check" @@ -57,11 +55,11 @@ def context(): yield context -class TestFivetranSensorAsync: +class TestFivetranSensor: def test_fivetran_sensor_async(self): """Asserts that a task is deferred and a FivetranTrigger will be fired when the FivetranSensorAsync is executed.""" - task = FivetranSensorAsync( + task = FivetranSensor( task_id=TASK_ID, fivetran_conn_id="fivetran_default", connector_id="test_connector", @@ -74,7 +72,7 @@ def test_fivetran_sensor_async(self): def test_fivetran_sensor_async_with_response_wait_time(self): """Asserts that a task is deferred and a FivetranTrigger will be fired when the FivetranSensorAsync is executed when reschedule_wait_time is specified.""" - task = FivetranSensorAsync( + task = FivetranSensor( task_id=TASK_ID, fivetran_conn_id="fivetran_default", connector_id="test_connector", @@ -87,7 +85,7 @@ def test_fivetran_sensor_async_with_response_wait_time(self): def test_fivetran_sensor_async_execute_failure(self, context): """Tests that an AirflowException is raised in case of error event""" - task = FivetranSensorAsync( + task = FivetranSensor( task_id=TASK_ID, fivetran_conn_id="fivetran_default", connector_id="test_connector", @@ -101,7 +99,7 @@ def test_fivetran_sensor_async_execute_failure(self, context): def test_fivetran_sensor_async_execute_complete(self): """Asserts that logging occurs as expected""" - task = FivetranSensorAsync( + task = FivetranSensor( task_id=TASK_ID, fivetran_conn_id="fivetran_default", connector_id="test_connector", @@ -112,25 +110,3 @@ def test_fivetran_sensor_async_execute_complete(self): context=None, event={"status": "success", "message": "Fivetran connector finished syncing"} ) mock_log_info.assert_called_with("Fivetran connector finished syncing") - - -# Mock the `conn_fivetran` Airflow connection (note the `@` after `API_SECRET`) -@mock.patch.dict("os.environ", AIRFLOW_CONN_CONN_FIVETRAN="http://API_KEY:API_SECRET@") -class TestFivetranSensor(unittest.TestCase): - """ - Test functions for Fivetran Operator. - - Mocks responses from Fivetran API. - """ - - @mock.patch.object(FivetranSensor, "poke", "returned_sync_status") - @requests_mock.mock() - def test_del(self, m): - sensor = FivetranSensor( - task_id="my_fivetran_sensor", - fivetran_conn_id="conn_fivetran", - connector_id="interchangeable_revenge", - ) - - log.info(sensor.poke) - assert sensor.poke == "returned_sync_status" From 39bfa167a7e9e1368d537de632bb8f63a4db0b05 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Fri, 4 Aug 2023 16:05:16 +0530 Subject: [PATCH 2/5] Apply review suggestions --- .../example_dags/example_fivetran_bigquery.py | 2 -- .../example_dags/example_fivetran_bqml.py | 4 --- .../example_dags/example_fivetran_dbt.py | 4 --- .../example_dags/example_fivetran_xcom.py | 2 -- fivetran_provider_async/operators.py | 18 +++++++------ fivetran_provider_async/sensors.py | 25 +++++++++++-------- 6 files changed, 26 insertions(+), 29 deletions(-) diff --git a/fivetran_provider_async/example_dags/example_fivetran_bigquery.py b/fivetran_provider_async/example_dags/example_fivetran_bigquery.py index 269741e..3c7e601 100644 --- a/fivetran_provider_async/example_dags/example_fivetran_bigquery.py +++ b/fivetran_provider_async/example_dags/example_fivetran_bigquery.py @@ -49,7 +49,6 @@ task_id="fivetran-task", fivetran_conn_id="fivetran_default", connector_id="{{ var.value.connector_id }}", - deferrable=False, ) fivetran_sync_wait = FivetranSensor( @@ -57,7 +56,6 @@ fivetran_conn_id="fivetran_default", connector_id="{{ var.value.connector_id }}", poke_interval=5, - deferrable=False, ) """ diff --git a/fivetran_provider_async/example_dags/example_fivetran_bqml.py b/fivetran_provider_async/example_dags/example_fivetran_bqml.py index 5a3e126..1ba602c 100644 --- a/fivetran_provider_async/example_dags/example_fivetran_bqml.py +++ b/fivetran_provider_async/example_dags/example_fivetran_bqml.py @@ -72,7 +72,6 @@ def ml_branch(ds, **kwargs): task_id="linkedin-sync", fivetran_conn_id="fivetran_default", connector_id="{{ var.value.linkedin_connector_id }}", - deferrable=False, ) linkedin_sensor = FivetranSensor( @@ -80,14 +79,12 @@ def ml_branch(ds, **kwargs): fivetran_conn_id="fivetran_default", connector_id="{{ var.value.linkedin_connector_id }}", poke_interval=5, - deferrable=False, ) twitter_sync = FivetranOperator( task_id="twitter-sync", fivetran_conn_id="fivetran_default", connector_id="{{ var.value.twitter_connector_id }}", - deferrable=False, ) twitter_sensor = FivetranSensor( @@ -95,7 +92,6 @@ def ml_branch(ds, **kwargs): fivetran_conn_id="fivetran_default", connector_id="{{ var.value.twitter_connector_id }}", poke_interval=5, - deferrable=False, ) dbt_run = SSHOperator( diff --git a/fivetran_provider_async/example_dags/example_fivetran_dbt.py b/fivetran_provider_async/example_dags/example_fivetran_dbt.py index 577c4e7..4933b17 100644 --- a/fivetran_provider_async/example_dags/example_fivetran_dbt.py +++ b/fivetran_provider_async/example_dags/example_fivetran_dbt.py @@ -20,27 +20,23 @@ linkedin_sync = FivetranOperator( task_id="linkedin-ads-sync", connector_id="{{ var.value.linkedin_connector_id }}", - deferrable=False, ) linkedin_sensor = FivetranSensor( task_id="linkedin-sensor", connector_id="{{ var.value.linkedin_connector_id }}", poke_interval=600, - deferrable=False, ) twitter_sync = FivetranOperator( task_id="twitter-ads-sync", connector_id="{{ var.value.twitter_connector_id }}", - deferrable=False, ) twitter_sensor = FivetranSensor( task_id="twitter-sensor", connector_id="{{ var.value.twitter_connector_id }}", poke_interval=600, - deferrable=False, ) dbt_run = SSHOperator( diff --git a/fivetran_provider_async/example_dags/example_fivetran_xcom.py b/fivetran_provider_async/example_dags/example_fivetran_xcom.py index 32798af..4946e82 100644 --- a/fivetran_provider_async/example_dags/example_fivetran_xcom.py +++ b/fivetran_provider_async/example_dags/example_fivetran_xcom.py @@ -25,7 +25,6 @@ task_id="fivetran-operator", fivetran_conn_id="fivetran_default", connector_id="{{ var.value.connector_id }}", - deferrable=False, ) delay_task = PythonOperator(task_id="delay_python_task", python_callable=lambda: time.sleep(60)) @@ -36,7 +35,6 @@ connector_id="{{ var.value.connector_id }}", poke_interval=5, xcom="{{ task_instance.xcom_pull('fivetran-operator', key='return_value') }}", - deferrable=False, ) fivetran_operator >> delay_task >> fivetran_sensor diff --git a/fivetran_provider_async/operators.py b/fivetran_provider_async/operators.py index 980c9da..e00187b 100644 --- a/fivetran_provider_async/operators.py +++ b/fivetran_provider_async/operators.py @@ -1,10 +1,12 @@ from __future__ import annotations -from typing import Any, Dict, Optional +from typing import TYPE_CHECKING, Any, Dict, Optional from airflow.exceptions import AirflowException from airflow.models import BaseOperator, BaseOperatorLink -from airflow.utils.context import Context + +if TYPE_CHECKING: + from airflow.utils.context import Context from fivetran_provider_async.hooks import FivetranHook from fivetran_provider_async.triggers import FivetranTrigger @@ -25,8 +27,8 @@ def get_link(self, operator, dttm): class FivetranOperator(BaseOperator): """ - `FivetranOperatorAsync` submits a Fivetran sync job , and polls for its status on the - airflow trigger.`FivetranOperatorAsync` requires that you specify the `connector_id` of + `FivetranOperator` submits a Fivetran sync job , and polls for its status on the + airflow trigger.`FivetranOperator` requires that you specify the `connector_id` of the sync job to start. You can find `connector_id` in the Settings page of the connector you configured in the `Fivetran dashboard `_. @@ -40,7 +42,7 @@ class FivetranOperator(BaseOperator): :param poll_frequency: Time in seconds that the job should wait in between each try. :param reschedule_wait_time: Optional, if connector is in reset state, number of seconds to wait before restarting the sync. - :param deferrable: Run operator in deferrable mode + :param deferrable: Run operator in deferrable mode. Default is True. """ operator_extra_links = (RegistryLink(),) @@ -73,7 +75,7 @@ def __init__( self.deferrable = deferrable super().__init__(**kwargs) - def execute(self, context: Dict[str, Any]) -> None | str: + def execute(self, context: Context) -> None | str: """Start the sync using synchronous hook""" hook = self._get_hook() hook.prep_connector(self.connector_id, self.schedule_type) @@ -102,7 +104,7 @@ def _get_hook(self) -> FivetranHook: retry_delay=self.fivetran_retry_delay, ) - def execute_complete(self, context: "Context", event: Optional[Dict[Any, Any]] = None) -> None: + def execute_complete(self, context: Context, event: Optional[Dict[Any, Any]] = None) -> None: """ Callback for when the trigger fires - returns immediately. Relies on trigger to throw an exception, otherwise it assumes execution was @@ -188,6 +190,8 @@ def get_openlineage_facets_on_complete(self, task_instance): class FivetranOperatorAsync(FivetranOperator): + """This operator has been deprecated. Please use `FivetranOperator`.""" + def __init__(self, *args, **kwargs): import warnings diff --git a/fivetran_provider_async/sensors.py b/fivetran_provider_async/sensors.py index 829bc59..52abdc0 100644 --- a/fivetran_provider_async/sensors.py +++ b/fivetran_provider_async/sensors.py @@ -1,8 +1,12 @@ -from typing import Any, Dict, Optional +from __future__ import annotations + +from typing import TYPE_CHECKING, Any from airflow.exceptions import AirflowException from airflow.sensors.base import BaseSensorOperator -from airflow.utils.context import Context + +if TYPE_CHECKING: + from airflow.utils.context import Context from fivetran_provider_async.hooks import FivetranHook from fivetran_provider_async.triggers import FivetranTrigger @@ -10,11 +14,11 @@ class FivetranSensor(BaseSensorOperator): """ - `FivetranSensorAsync` asynchronously monitors a Fivetran sync job for completion. - Monitoring with `FivetranSensorAsync` allows you to trigger downstream processes only + `FivetranSensor` asynchronously monitors a Fivetran sync job for completion. + Monitoring with `FivetranSensor` allows you to trigger downstream processes only when the Fivetran sync jobs have completed, ensuring data consistency. You can - use multiple instances of `FivetranSensorAsync` to monitor multiple Fivetran - connectors. `FivetranSensorAsync` requires that you specify the `connector_id` of the sync + use multiple instances of `FivetranSensor` to monitor multiple Fivetran + connectors. `FivetranSensor` requires that you specify the `connector_id` of the sync job to start. You can find `connector_id` in the Settings page of the connector you configured in the `Fivetran dashboard `_. @@ -29,7 +33,7 @@ class FivetranSensor(BaseSensorOperator): :param fivetran_retry_delay: Time to wait before retrying API request :param reschedule_wait_time: Optional, if connector is in reset state number of seconds to wait before restarting, else Fivetran suggestion used - :param deferrable: Run sensor in deferrable mode + :param deferrable: Run sensor in deferrable mode. default is True. """ template_fields = ["connector_id", "xcom"] @@ -60,7 +64,7 @@ def __init__( self.deferrable = deferrable super().__init__(**kwargs) - def execute(self, context: Dict[str, Any]) -> None: + def execute(self, context: Context) -> None: """Check for the target_status and defers using the trigger""" if not self.deferrable: super().execute(context=context) @@ -94,7 +98,7 @@ def poke(self, context): self.previous_completed_at = hook.get_last_sync(self.connector_id, self.xcom) return hook.get_sync_status(self.connector_id, self.previous_completed_at, self.reschedule_time) - def execute_complete(self, context: "Context", event: Optional[Dict[Any, Any]] = None) -> None: + def execute_complete(self, context: Context, event: dict[Any, Any] | None = None) -> None: """ Callback for when the trigger fires - returns immediately. Relies on trigger to throw an exception, otherwise it assumes execution was @@ -111,7 +115,8 @@ def execute_complete(self, context: "Context", event: Optional[Dict[Any, Any]] = class FivetranSensorAsync(FivetranSensor): - # Define which fields get jinjaified + """This sensor has been deprecated. Please use `FivetranSensor`.""" + template_fields = ["connector_id", "xcom"] def __init__(self, *args, **kwargs: Any) -> None: From 825bc38943b1a4ada052efd6b3be3f314afe243d Mon Sep 17 00:00:00 2001 From: Pankaj Date: Fri, 4 Aug 2023 18:37:23 +0530 Subject: [PATCH 3/5] Update docstring --- fivetran_provider_async/operators.py | 2 ++ fivetran_provider_async/sensors.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/fivetran_provider_async/operators.py b/fivetran_provider_async/operators.py index e00187b..68803ba 100644 --- a/fivetran_provider_async/operators.py +++ b/fivetran_provider_async/operators.py @@ -31,6 +31,8 @@ class FivetranOperator(BaseOperator): airflow trigger.`FivetranOperator` requires that you specify the `connector_id` of the sync job to start. You can find `connector_id` in the Settings page of the connector you configured in the `Fivetran dashboard `_. + If you do not want to run `FivetranOperator` in async mode you can set `deferrable` to + False in operator. :param fivetran_conn_id: `Conn ID` of the Connection to be used to configure the hook. :param fivetran_retry_limit: # of retries when encountering API errors diff --git a/fivetran_provider_async/sensors.py b/fivetran_provider_async/sensors.py index 52abdc0..0019d61 100644 --- a/fivetran_provider_async/sensors.py +++ b/fivetran_provider_async/sensors.py @@ -21,6 +21,8 @@ class FivetranSensor(BaseSensorOperator): connectors. `FivetranSensor` requires that you specify the `connector_id` of the sync job to start. You can find `connector_id` in the Settings page of the connector you configured in the `Fivetran dashboard `_. + If you do not want to run `FivetranSensor` in async mode you can set `deferrable` to + False in sensor. :param fivetran_conn_id: `Conn ID` of the Connection to be used to configure From f984c2222b7b067b9b1ddcf63a5f82dbe946fc1d Mon Sep 17 00:00:00 2001 From: Pankaj Date: Sat, 5 Aug 2023 16:15:52 +0530 Subject: [PATCH 4/5] Apply review suggestions --- fivetran_provider_async/operators.py | 26 ++++++++++++++------------ fivetran_provider_async/sensors.py | 2 +- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/fivetran_provider_async/operators.py b/fivetran_provider_async/operators.py index 68803ba..39bf0ea 100644 --- a/fivetran_provider_async/operators.py +++ b/fivetran_provider_async/operators.py @@ -86,18 +86,20 @@ def execute(self, context: Context) -> None | str: if not self.deferrable: return last_sync else: - # Defer and poll the sync status on the Triggerer - self.defer( - timeout=self.execution_timeout, - trigger=FivetranTrigger( - task_id=self.task_id, - fivetran_conn_id=self.fivetran_conn_id, - connector_id=self.connector_id, - poke_interval=self.poll_frequency, - reschedule_wait_time=self.reschedule_wait_time, - ), - method_name="execute_complete", - ) + previous_completed_at = hook.get_last_sync(self.connector_id) + completed = hook.get_sync_status(self.connector_id, previous_completed_at) + if not completed: + self.defer( + timeout=self.execution_timeout, + trigger=FivetranTrigger( + task_id=self.task_id, + fivetran_conn_id=self.fivetran_conn_id, + connector_id=self.connector_id, + poke_interval=self.poll_frequency, + reschedule_wait_time=self.reschedule_wait_time, + ), + method_name="execute_complete", + ) def _get_hook(self) -> FivetranHook: return FivetranHook( diff --git a/fivetran_provider_async/sensors.py b/fivetran_provider_async/sensors.py index 0019d61..c9da767 100644 --- a/fivetran_provider_async/sensors.py +++ b/fivetran_provider_async/sensors.py @@ -70,7 +70,7 @@ def execute(self, context: Context) -> None: """Check for the target_status and defers using the trigger""" if not self.deferrable: super().execute(context=context) - else: + elif not self.poke(context): self.defer( timeout=self.execution_timeout, trigger=FivetranTrigger( From ba86dfe4ea5bdeef9c5cf784d4c56ff1a2578725 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Sat, 5 Aug 2023 16:26:07 +0530 Subject: [PATCH 5/5] Fix tests --- tests/sensors/test_fivetran.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/sensors/test_fivetran.py b/tests/sensors/test_fivetran.py index 998f639..01f6387 100644 --- a/tests/sensors/test_fivetran.py +++ b/tests/sensors/test_fivetran.py @@ -56,9 +56,11 @@ def context(): class TestFivetranSensor: - def test_fivetran_sensor_async(self): + @mock.patch("fivetran_provider_async.sensors.FivetranSensor.poke") + def test_fivetran_sensor_async(self, mock_poke): """Asserts that a task is deferred and a FivetranTrigger will be fired when the FivetranSensorAsync is executed.""" + mock_poke.return_value = False task = FivetranSensor( task_id=TASK_ID, fivetran_conn_id="fivetran_default", @@ -69,9 +71,11 @@ def test_fivetran_sensor_async(self): task.execute(context) assert isinstance(exc.value.trigger, FivetranTrigger), "Trigger is not a FivetranTrigger" - def test_fivetran_sensor_async_with_response_wait_time(self): + @mock.patch("fivetran_provider_async.sensors.FivetranSensor.poke") + def test_fivetran_sensor_async_with_response_wait_time(self, mock_poke): """Asserts that a task is deferred and a FivetranTrigger will be fired when the FivetranSensorAsync is executed when reschedule_wait_time is specified.""" + mock_poke.return_value = False task = FivetranSensor( task_id=TASK_ID, fivetran_conn_id="fivetran_default",