Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge sync and async operator #40

Merged
merged 5 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Fivetran Async Provider for Apache Airflow

This package provides an async operator, sensor and hook that integrates [Fivetran](https://fivetran.com) into Apache Airflow.
`FivetranSensorAsync` allows you to monitor a Fivetran sync job for completion before running downstream processes.
`FivetranOperatorAsync` submits a Fivetran sync job and polls for its status on the triggerer.
`FivetranSensor` allows you to monitor a Fivetran sync job for completion before running downstream processes.
`FivetranOperator` submits a Fivetran sync job and polls for its status on the triggerer.
Copy link
Contributor

@phanikumv phanikumv Aug 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
`FivetranOperator` submits a Fivetran sync job and polls for its status on the triggerer.
`FivetranOperator` submits a Fivetran sync job and polls for it's status on the triggerer.When deferrable param is set to False, it would not release the Airflow worker slot and waits for the job to complete.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@phanikumv the FivetranOperator does not wait for the job to complete. It only submits and then we use a sensor to monitor for completion

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah right sorry , just confused with OSS operators :)

Since an async sensor or operator frees up worker slot while polling is happening on the triggerer,
they consume less resources when compared to traditional "sync" sensors and operators.

Expand Down Expand Up @@ -33,30 +33,30 @@ The sensor assumes the `Conn Id` is set to `fivetran`, however if you are managi

### [Fivetran Operator Async](https://github.com/astronomer/airflow-provider-fivetran-async/tree/main/fivetran_provider_async/operators.py)

`FivetranOperatorAsync` submits a Fivetran sync job and monitors it on trigger for completion.
`FivetranOperator` submits a Fivetran sync job and monitors it on trigger for completion.
It requires that you specify the `connector_id` of the sync job to start. You can find `connector_id` in the Settings page of the connector you configured in the [Fivetran dashboard](https://fivetran.com/dashboard/connectors).

Import into your DAG via:
```python
from fivetran_provider_async.operators import FivetranOperatorAsync
from fivetran_provider_async.operators import FivetranOperator
```

### [Fivetran Sensor Async](https://github.com/astronomer/airflow-provider-fivetran-async/tree/main/fivetran_provider_async/sensors.py)

`FivetranSensorAsync` monitors a Fivetran sync job for completion.
Monitoring with `FivetranSensorAsync` allows you to trigger downstream processes only when the Fivetran sync jobs have completed, ensuring data consistency.
`FivetranSensor` monitors a Fivetran sync job for completion.
Monitoring with `FivetranSensor` allows you to trigger downstream processes only when the Fivetran sync jobs have completed, ensuring data consistency.



You can use multiple instances of `FivetranSensorAsync` to monitor multiple Fivetran connectors.
You can use multiple instances of `FivetranSensor` to monitor multiple Fivetran connectors.

If used in this way,

`FivetranSensorAsync` requires that you specify the `connector_id` of the sync job to start. You can find `connector_id` in the Settings page of the connector you configured in the [Fivetran dashboard](https://fivetran.com/dashboard/connectors).
`FivetranSensor` requires that you specify the `connector_id` of the sync job to start. You can find `connector_id` in the Settings page of the connector you configured in the [Fivetran dashboard](https://fivetran.com/dashboard/connectors).

Import into your DAG via:
```python
from fivetran_provider_async.sensors import FivetranSensorAsync
from fivetran_provider_async.sensors import FivetranSensor
```

## Examples
Expand Down
2 changes: 2 additions & 0 deletions fivetran_provider_async/example_dags/example_fivetran.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
task_id="fivetran-task",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.connector_id }}",
deferrable=False,
)

fivetran_sync_wait = FivetranSensor(
task_id="fivetran-sensor",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.connector_id }}",
poke_interval=5,
deferrable=False,
)

fivetran_sync_start >> fivetran_sync_wait
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

from airflow import DAG

from fivetran_provider_async.operators import FivetranOperator, FivetranOperatorAsync
from fivetran_provider_async.sensors import FivetranSensorAsync
from fivetran_provider_async.operators import FivetranOperator
from fivetran_provider_async.sensors import FivetranSensor

default_args = {
"owner": "Airflow",
Expand All @@ -19,17 +19,18 @@
)

with dag:
fivetran_async_op = FivetranOperatorAsync(
fivetran_async_op = FivetranOperator(
task_id="fivetran_async_op",
connector_id="bronzing_largely",
)

fivetran_sync_op = FivetranOperator(
task_id="fivetran_sync_op",
connector_id="bronzing_largely",
deferrable=False,
)

fivetran_async_sensor = FivetranSensorAsync(
fivetran_async_sensor = FivetranSensor(
task_id="fivetran_async_sensor",
connector_id="bronzing_largely",
poke_interval=5,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,15 @@
task_id="fivetran-task",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.connector_id }}",
deferrable=False,
pankajastro marked this conversation as resolved.
Show resolved Hide resolved
)

fivetran_sync_wait = FivetranSensor(
task_id="fivetran-sensor",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.connector_id }}",
poke_interval=5,
deferrable=False,
)

"""
Expand Down
4 changes: 4 additions & 0 deletions fivetran_provider_async/example_dags/example_fivetran_bqml.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,26 +72,30 @@ def ml_branch(ds, **kwargs):
task_id="linkedin-sync",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.linkedin_connector_id }}",
deferrable=False,
)

linkedin_sensor = FivetranSensor(
task_id="linkedin-sensor",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.linkedin_connector_id }}",
poke_interval=5,
deferrable=False,
)

twitter_sync = FivetranOperator(
task_id="twitter-sync",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.twitter_connector_id }}",
deferrable=False,
)

twitter_sensor = FivetranSensor(
task_id="twitter-sensor",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.twitter_connector_id }}",
poke_interval=5,
deferrable=False,
)

dbt_run = SSHOperator(
Expand Down
4 changes: 4 additions & 0 deletions fivetran_provider_async/example_dags/example_fivetran_dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,27 @@
linkedin_sync = FivetranOperator(
task_id="linkedin-ads-sync",
connector_id="{{ var.value.linkedin_connector_id }}",
deferrable=False,
)

linkedin_sensor = FivetranSensor(
task_id="linkedin-sensor",
connector_id="{{ var.value.linkedin_connector_id }}",
poke_interval=600,
deferrable=False,
)

twitter_sync = FivetranOperator(
task_id="twitter-ads-sync",
connector_id="{{ var.value.twitter_connector_id }}",
deferrable=False,
)

twitter_sensor = FivetranSensor(
task_id="twitter-sensor",
connector_id="{{ var.value.twitter_connector_id }}",
poke_interval=600,
deferrable=False,
)

dbt_run = SSHOperator(
Expand Down
2 changes: 2 additions & 0 deletions fivetran_provider_async/example_dags/example_fivetran_xcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
task_id="fivetran-operator",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.connector_id }}",
deferrable=False,
)

delay_task = PythonOperator(task_id="delay_python_task", python_callable=lambda: time.sleep(60))
Expand All @@ -35,6 +36,7 @@
connector_id="{{ var.value.connector_id }}",
poke_interval=5,
xcom="{{ task_instance.xcom_pull('fivetran-operator', key='return_value') }}",
deferrable=False,
)

fivetran_operator >> delay_task >> fivetran_sensor
138 changes: 47 additions & 91 deletions fivetran_provider_async/operators.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from __future__ import annotations

from typing import Any, Dict, Optional

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator, BaseOperatorLink
from airflow.utils.context import Context
from airflow.utils.decorators import apply_defaults

from fivetran_provider_async.hooks import FivetranHook
from fivetran_provider_async.triggers import FivetranTrigger
Expand All @@ -23,71 +24,6 @@ def get_link(self, operator, dttm):


class FivetranOperator(BaseOperator):
"""
`FivetranOperator` starts a Fivetran sync job.

`FivetranOperator` requires that you specify the `connector_id` of the sync job to
start. You can find `connector_id` in the Settings page of the connector you
configured in the `Fivetran dashboard <https://fivetran.com/dashboard/connectors>`_.
Note that when a Fivetran sync job is controlled via an Operator, it is no longer
run on the schedule as managed by Fivetran. In other words, it is now scheduled only
from Airflow. This can be changed with the schedule_type parameter.

:param fivetran_conn_id: `Conn ID` of the Connection to be used to configure
the hook.
:type fivetran_conn_id: Optional[str]
:param fivetran_retry_limit: # of retries when encountering API errors
:type fivetran_retry_limit: Optional[int]
:param fivetran_retry_delay: Time to wait before retrying API request
:type fivetran_retry_delay: int
:param connector_id: ID of the Fivetran connector to sync, found on the
Connector settings page.
:type connector_id: str
:param schedule_type: schedule type. Default is "manual" which takes the connector off Fivetran schedule.
Set to "auto" to keep connector on Fivetran schedule.
:type schedule_type: str
"""

operator_extra_links = (RegistryLink(),)

# Define which fields get jinjaified
template_fields = ["connector_id"]

@apply_defaults
def __init__(
self,
connector_id: str,
run_name: Optional[str] = None,
timeout_seconds: Optional[int] = None,
fivetran_conn_id: str = "fivetran",
fivetran_retry_limit: int = 3,
fivetran_retry_delay: int = 1,
poll_frequency: int = 15,
schedule_type: str = "manual",
**kwargs,
):
super().__init__(**kwargs)
self.fivetran_conn_id = fivetran_conn_id
self.fivetran_retry_limit = fivetran_retry_limit
self.fivetran_retry_delay = fivetran_retry_delay
self.connector_id = connector_id
self.poll_frequency = poll_frequency
self.schedule_type = schedule_type

def _get_hook(self) -> FivetranHook:
return FivetranHook(
self.fivetran_conn_id,
retry_limit=self.fivetran_retry_limit,
retry_delay=self.fivetran_retry_delay,
)

def execute(self, context):
hook = self._get_hook()
hook.prep_connector(self.connector_id, self.schedule_type)
return hook.start_fivetran_sync(self.connector_id)


class FivetranOperatorAsync(FivetranOperator):
"""
`FivetranOperatorAsync` submits a Fivetran sync job , and polls for its status on the
pankajastro marked this conversation as resolved.
Show resolved Hide resolved
airflow trigger.`FivetranOperatorAsync` requires that you specify the `connector_id` of
pankajastro marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -104,8 +40,13 @@ class FivetranOperatorAsync(FivetranOperator):
:param poll_frequency: Time in seconds that the job should wait in between each try.
:param reschedule_wait_time: Optional, if connector is in reset state,
number of seconds to wait before restarting the sync.
:param deferrable: Run operator in deferrable mode
pankajastro marked this conversation as resolved.
Show resolved Hide resolved
"""

operator_extra_links = (RegistryLink(),)

template_fields = ["connector_id"]

def __init__(
self,
connector_id: str,
Expand All @@ -117,6 +58,7 @@ def __init__(
poll_frequency: int = 15,
schedule_type: str = "manual",
reschedule_wait_time: int = 0,
deferrable: bool = True,
**kwargs,
):
self.connector_id = connector_id
Expand All @@ -128,35 +70,36 @@ def __init__(
self.poll_frequency = poll_frequency
self.schedule_type = schedule_type
self.reschedule_wait_time = reschedule_wait_time
super().__init__(
connector_id=self.connector_id,
run_name=self.run_name,
timeout_seconds=self.timeout_seconds,
fivetran_conn_id=self.fivetran_conn_id,
fivetran_retry_limit=self.fivetran_retry_limit,
fivetran_retry_delay=self.fivetran_retry_delay,
poll_frequency=self.poll_frequency,
schedule_type=self.schedule_type,
**kwargs,
)
self.deferrable = deferrable
super().__init__(**kwargs)

def execute(self, context: Dict[str, Any]) -> None:
def execute(self, context: Dict[str, Any]) -> None | str:
pankajastro marked this conversation as resolved.
Show resolved Hide resolved
"""Start the sync using synchronous hook"""
hook = self._get_hook()
hook.prep_connector(self.connector_id, self.schedule_type)
hook.start_fivetran_sync(self.connector_id)

# Defer and poll the sync status on the Triggerer
self.defer(
timeout=self.execution_timeout,
trigger=FivetranTrigger(
task_id=self.task_id,
fivetran_conn_id=self.fivetran_conn_id,
connector_id=self.connector_id,
poke_interval=self.poll_frequency,
reschedule_wait_time=self.reschedule_wait_time,
),
method_name="execute_complete",
last_sync = hook.start_fivetran_sync(self.connector_id)

if not self.deferrable:
return last_sync
else:
# Defer and poll the sync status on the Triggerer
phanikumv marked this conversation as resolved.
Show resolved Hide resolved
self.defer(
timeout=self.execution_timeout,
trigger=FivetranTrigger(
task_id=self.task_id,
fivetran_conn_id=self.fivetran_conn_id,
connector_id=self.connector_id,
poke_interval=self.poll_frequency,
reschedule_wait_time=self.reschedule_wait_time,
),
method_name="execute_complete",
)

def _get_hook(self) -> FivetranHook:
return FivetranHook(
self.fivetran_conn_id,
retry_limit=self.fivetran_retry_limit,
retry_delay=self.fivetran_retry_delay,
)

def execute_complete(self, context: "Context", event: Optional[Dict[Any, Any]] = None) -> None:
Expand Down Expand Up @@ -242,3 +185,16 @@ def get_openlineage_facets_on_start(self):

def get_openlineage_facets_on_complete(self, task_instance):
return self.get_openlineage_facets_on_start()


class FivetranOperatorAsync(FivetranOperator):
pankajastro marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self, *args, **kwargs):
import warnings

super().__init__(*args, **kwargs)

warnings.warn(
"FivetranOperatorAsync has been deprecated. Please use `FivetranOperator`.",
DeprecationWarning,
stacklevel=2,
)
Loading