Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add FivetranOperatorAsync hook and trigger to handle reschedule changes #25

Merged
merged 12 commits into from
Jun 7, 2023
64 changes: 63 additions & 1 deletion fivetran_provider_async/hooks.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import asyncio
import time
from typing import Any, Dict, cast

import aiohttp
import pendulum
from aiohttp import ClientResponseError
from airflow.exceptions import AirflowException
from asgiref.sync import sync_to_async
Expand Down Expand Up @@ -44,6 +46,7 @@ async def _do_api_call_async(self, endpoint_info, json=None):
request_func = session.get
elif method == "POST":
request_func = session.post
headers.update({"Content-Type": "application/json;version=2"})
elif method == "PATCH":
request_func = session.patch
headers.update({"Content-Type": "application/json;version=2"})
Expand Down Expand Up @@ -92,7 +95,7 @@ async def get_connector_async(self, connector_id):
resp = await self._do_api_call_async(("GET", endpoint))
return resp["data"]

async def get_sync_status_async(self, connector_id, previous_completed_at):
async def get_sync_status_async(self, connector_id, previous_completed_at, reschedule_wait_time=0):
"""
For sensor, return True if connector's 'succeeded_at' field has updated.

Expand All @@ -102,6 +105,9 @@ async def get_sync_status_async(self, connector_id, previous_completed_at):
:param previous_completed_at: The last time the connector ran, collected on Sensor
initialization.
:type previous_completed_at: pendulum.datetime.DateTime
:param reschedule_wait_time: Optional, if connector is in reset state,
number of seconds to wait before restarting the sync.
:type reschedule_wait_time: int
"""
connector_details = await self.get_connector_async(connector_id)
succeeded_at = self._parse_timestamp(connector_details["succeeded_at"])
Expand All @@ -122,6 +128,16 @@ async def get_sync_status_async(self, connector_id, previous_completed_at):
sync_state = connector_details["status"]["sync_state"]
self.log.info('Connector "%s": sync_state = "%s"', connector_id, sync_state)

# if sync in rescheduled start, wait for time recommended by Fivetran
# or manually specified, then restart sync
if sync_state == "rescheduled" and connector_details["schedule_type"] == "manual":
self.log.info('Connector is in "rescheduled" state and needs to be manually restarted')
self.pause_and_restart(
connector_id=connector_id,
reschedule_for=connector_details["status"]["rescheduled_for"],
reschedule_wait_time=reschedule_wait_time,
)

# Check if sync started by airflow has finished
# indicated by new 'succeeded_at' timestamp
if current_completed_at > previous_completed_at:
Expand All @@ -134,6 +150,52 @@ async def get_sync_status_async(self, connector_id, previous_completed_at):
job_status = "pending"
return job_status

def pause_and_restart(self, connector_id, reschedule_for, reschedule_wait_time):
"""
While a connector is syncing, if it falls into a reschedule state,
wait for a time either specified by the user of recommended by Fivetran,
Then restart a sync

:param connector_id: Fivetran connector_id, found in connector settings
page in the Fivetran user interface.
:type connector_id: str
:param reschedule_for: From Fivetran API response, if schedule_type is manual,
then the connector expects triggering the event at the designated UTC time.
:type reschedule_for: str
:param reschedule_wait_time: Optional, if connector is in reset state,
number of seconds to wait before restarting the sync.
:type reschedule_wait_time: int
"""
if reschedule_wait_time:
log_statement = f'Starting connector again in "{reschedule_wait_time}" seconds'
self.log.info(log_statement)
time.sleep(reschedule_wait_time)
else:
wait_time = (
self._parse_timestamp(reschedule_for).add(minutes=1) - pendulum.now(tz="UTC")
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
).seconds
if wait_time < 0:
raise ValueError(
f"Reschedule time {wait_time} configured in "
f"Fivetran connector has elapsed. Sync connector manually."
)
log_statement = f'Starting connector again in "{wait_time}" seconds'
self.log.info(log_statement)
time.sleep(wait_time)
sunank200 marked this conversation as resolved.
Show resolved Hide resolved

self.log.info("Restarting connector now")
return self.start_fivetran_sync(connector_id)

def _parse_timestamp(self, api_time):
"""
Returns either the pendulum-parsed actual timestamp or a very out-of-date timestamp if not set.

:param api_time: timestamp format as returned by the Fivetran API.
:type api_time: str
:rtype: Pendulum.DateTime
"""
return pendulum.parse(api_time) if api_time is not None else pendulum.from_timestamp(-1)

async def get_last_sync_async(self, connector_id, xcom=""):
"""
Get the last time Fivetran connector completed a sync.
Expand Down
51 changes: 45 additions & 6 deletions fivetran_provider_async/operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,52 @@ class FivetranOperatorAsync(FivetranOperator):
the sync job to start. You can find `connector_id` in the Settings page of the connector
you configured in the `Fivetran dashboard <https://fivetran.com/dashboard/connectors>`_.

:param fivetran_conn_id: `Conn ID` of the Connection to be used to configure
the hook.
:param connector_id: ID of the Fivetran connector to sync, found on the
Connector settings page in the Fivetran Dashboard.
:param poll_frequency: Time in seconds that the job should wait in
between each tries
:param fivetran_conn_id: `Conn ID` of the Connection to be used to configure the hook.
:param fivetran_retry_limit: # of retries when encountering API errors
:param fivetran_retry_delay: Time to wait before retrying API request
:param run_name: Fivetran run name
:param timeout_seconds: Timeout in seconds
:param connector_id: ID of the Fivetran connector to sync, found on the Connector settings page.
:param schedule_type: schedule type. Default is "manual" which takes the connector off Fivetran schedule.
:param poll_frequency: Time in seconds that the job should wait in between each try.
:param reschedule_wait_time: Optional, if connector is in reset state,
number of seconds to wait before restarting the sync.
"""

def __init__(
self,
connector_id: str,
run_name: Optional[str] = None,
timeout_seconds: Optional[int] = None,
fivetran_conn_id: str = "fivetran",
fivetran_retry_limit: int = 3,
fivetran_retry_delay: int = 1,
poll_frequency: int = 15,
schedule_type: str = "manual",
reschedule_wait_time: int = 0,
**kwargs,
):
self.connector_id = connector_id
self.fivetran_conn_id = fivetran_conn_id
self.run_name = run_name
self.timeout_seconds = timeout_seconds
self.fivetran_retry_limit = fivetran_retry_limit
self.fivetran_retry_delay = fivetran_retry_delay
self.poll_frequency = poll_frequency
self.schedule_type = schedule_type
self.reschedule_wait_time = reschedule_wait_time
super().__init__(
connector_id=self.connector_id,
run_name=self.run_name,
timeout_seconds=self.timeout_seconds,
fivetran_conn_id=self.fivetran_conn_id,
fivetran_retry_limit=self.fivetran_retry_limit,
fivetran_retry_delay=self.fivetran_retry_delay,
poll_frequency=self.poll_frequency,
schedule_type=self.schedule_type,
**kwargs,
)

def execute(self, context: Dict[str, Any]) -> None:
"""Start the sync using synchronous hook"""
hook = self._get_hook()
Expand All @@ -37,6 +75,7 @@ def execute(self, context: Dict[str, Any]) -> None:
fivetran_conn_id=self.fivetran_conn_id,
connector_id=self.connector_id,
poke_interval=self.poll_frequency,
reschedule_wait_time=self.reschedule_wait_time,
),
method_name="execute_complete",
)
Expand Down
35 changes: 35 additions & 0 deletions fivetran_provider_async/sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from airflow.exceptions import AirflowException
from airflow.utils.context import Context
from airflow.utils.decorators import apply_defaults
from fivetran_provider.sensors.fivetran import FivetranSensor

from fivetran_provider_async.triggers import FivetranTrigger
Expand All @@ -26,8 +27,41 @@ class FivetranSensorAsync(FivetranSensor):
between each tries
:param fivetran_retry_limit: # of retries when encountering API errors
:param fivetran_retry_delay: Time to wait before retrying API request
:param reschedule_wait_time: Optional, if connector is in reset state
number of seconds to wait before restarting, else Fivetran suggestion used
"""

@apply_defaults
def __init__(
self,
connector_id: str,
fivetran_conn_id: str = "fivetran",
poke_interval: int = 60,
fivetran_retry_limit: int = 3,
fivetran_retry_delay: int = 1,
xcom: str = "",
reschedule_wait_time: int = 0,
**kwargs: Any,
) -> None:
self.fivetran_conn_id = fivetran_conn_id
self.connector_id = connector_id
self.poke_interval = poke_interval
self.previous_completed_at = None
self.fivetran_retry_limit = fivetran_retry_limit
self.fivetran_retry_delay = fivetran_retry_delay
self.hook = None
self.xcom = xcom
self.reschedule_wait_time = reschedule_wait_time
super().__init__(
connector_id=self.connector_id,
fivetran_conn_id=self.fivetran_conn_id,
poke_interval=self.poke_interval,
fivetran_retry_limit=self.fivetran_retry_limit,
fivetran_retry_delay=self.fivetran_retry_delay,
xcom=self.xcom,
**kwargs,
)

def execute(self, context: Dict[str, Any]) -> None:
"""Check for the target_status and defers using the trigger"""
self.defer(
Expand All @@ -39,6 +73,7 @@ def execute(self, context: Dict[str, Any]) -> None:
previous_completed_at=self.previous_completed_at,
xcom=self.xcom,
poke_interval=self.poke_interval,
reschedule_wait_time=self.reschedule_wait_time,
),
method_name="execute_complete",
)
Expand Down
9 changes: 8 additions & 1 deletion fivetran_provider_async/triggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ class FivetranTrigger(BaseTrigger):
:param xcom: If used, FivetranSensorAsync receives timestamp of previously
completed sync
:param poke_interval: polling period in seconds to check for the status
:param reschedule_wait_time: Optional, if connector is in reset state,
number of seconds to wait before restarting the sync.
"""

def __init__(
Expand All @@ -31,6 +33,7 @@ def __init__(
previous_completed_at: pendulum.DateTime | None = None,
xcom: str = "",
poke_interval: float = 4.0,
reschedule_wait_time: int = 0,
):
super().__init__()
self.task_id = task_id
Expand All @@ -39,6 +42,7 @@ def __init__(
self.previous_completed_at = previous_completed_at
self.xcom = xcom
self.poke_interval = poke_interval
self.reschedule_wait_time = reschedule_wait_time

def serialize(self) -> Tuple[str, Dict[str, Any]]:
"""Serializes FivetranTrigger arguments and classpath."""
Expand All @@ -51,6 +55,7 @@ def serialize(self) -> Tuple[str, Dict[str, Any]]:
"fivetran_conn_id": self.fivetran_conn_id,
"previous_completed_at": self.previous_completed_at,
"xcom": self.xcom,
"reschedule_wait_time": self.reschedule_wait_time,
},
)

Expand All @@ -64,7 +69,9 @@ async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override]
if self.previous_completed_at is None:
self.previous_completed_at = await hook.get_last_sync_async(self.connector_id, self.xcom)
while True:
res = await hook.get_sync_status_async(self.connector_id, self.previous_completed_at)
res = await hook.get_sync_status_async(
self.connector_id, self.previous_completed_at, self.reschedule_wait_time
)
if res == "success":
self.previous_completed_at = await hook.get_last_sync_async(self.connector_id)
msg = "Fivetran connector %s finished syncing at %s" % (
Expand Down
72 changes: 72 additions & 0 deletions tests/common/static.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import pendulum

LOGIN = "login"
PASSWORD = "password"

Expand Down Expand Up @@ -35,6 +37,76 @@
},
}

MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS_RESCHEDULE_MODE = {
"code": "Success",
"data": {
"id": "interchangeable_revenge",
"paused": False,
"group_id": "rarer_gradient",
"service": "google_sheets",
"service_version": 1,
"schema": "google_sheets.fivetran_google_sheets_spotify",
"connected_by": "mournful_shalt",
"created_at": "2021-03-05T22:58:56.238875Z",
"succeeded_at": "2021-03-23T20:55:12.670390Z",
"failed_at": "2021-03-22T20:55:12.670390Z",
"sync_frequency": 360,
"schedule_type": "manual",
"status": {
"setup_state": "connected",
"sync_state": "rescheduled",
"update_state": "on_schedule",
"is_historical_sync": False,
"tasks": [],
"warnings": [],
"rescheduled_for": "2021-03-05T22:59:56.238875Z",
},
"config": {
"latest_version": "1",
"sheet_id": "https://docs.google.com/spreadsheets/d/.../edit#gid=...",
"named_range": "fivetran_test_range",
"authorization_method": "User OAuth",
"service_version": "1",
"last_synced_changes__utc_": "2021-03-23 20:54",
},
},
}

MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS_WITH_RESCHEDULE_FOR = {
"code": "Success",
"data": {
"id": "interchangeable_revenge",
"paused": False,
"group_id": "rarer_gradient",
"service": "google_sheets",
"service_version": 1,
"schema": "google_sheets.fivetran_google_sheets_spotify",
"connected_by": "mournful_shalt",
"created_at": "2021-03-05T22:58:56.238875Z",
"succeeded_at": "2021-03-23T20:55:12.670390Z",
"failed_at": "2021-03-22T20:55:12.670390Z",
"sync_frequency": 360,
"schedule_type": "manual",
"status": {
"setup_state": "connected",
"sync_state": "rescheduled",
"update_state": "on_schedule",
"is_historical_sync": False,
"tasks": [],
"warnings": [],
"rescheduled_for": str(pendulum.now(tz="UTC").add(minutes=1)),
},
"config": {
"latest_version": "1",
"sheet_id": "https://docs.google.com/spreadsheets/d/.../edit#gid=...",
"named_range": "fivetran_test_range",
"authorization_method": "User OAuth",
"service_version": "1",
"last_synced_changes__utc_": "2021-03-23 20:54",
},
},
}

MOCK_FIVETRAN_SCHEMA_RESPONSE_PAYLOAD_SHEETS = {
"code": "Success",
"data": {
Expand Down
Loading