Skip to content

Commit

Permalink
schedule new fivetran jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
cbini committed Nov 15, 2023
1 parent 7aeb22e commit 18f87b6
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 46 deletions.
69 changes: 38 additions & 31 deletions src/teamster/kipptaf/fivetran/jobs.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,22 @@
import pathlib
import re

from dagster import AssetSelection, RunConfig, config_from_files, define_asset_job, job
from dagster import RunConfig, config_from_files, job

from teamster.core.fivetran.jobs import (
build_fivetran_start_resync_job,
build_fivetran_start_sync_job,
)
from teamster.core.fivetran.jobs import build_fivetran_start_resync_job
from teamster.core.fivetran.ops import SyncConfig, fivetran_start_sync_op
from teamster.kipptaf import CODE_LOCATION, fivetran

fivetran_materialization_jobs = []
fivetran_start_sync_jobs = []
fivetran_start_resync_jobs = []
for asset in fivetran.assets:
connector_name = list(asset.keys)[0].path[1]
connector_id = re.match(
pattern=r"fivetran_sync_(\w+)",
string=asset.op.name,
).group(1)

fivetran_materialization_jobs.append(
define_asset_job(
name=(f"{CODE_LOCATION}_{connector_name}_fivetran_asset_job"),
selection=AssetSelection.keys(*list(asset.keys)),
)
)

fivetran_start_sync_jobs.append(
build_fivetran_start_sync_job(
code_location=CODE_LOCATION,
connector_id=connector_id,
connector_name=connector_name,
)
)
for asset in fivetran.assets:
op_name_match = re.match(pattern=r"fivetran_sync_(\w+)", string=asset.op.name)

fivetran_start_resync_jobs.append(
build_fivetran_start_resync_job(
code_location=CODE_LOCATION,
connector_id=connector_id,
connector_name=connector_name,
connector_id=op_name_match.group(1),
connector_name=list(asset.keys)[0].path[1],
)
)

Expand Down Expand Up @@ -70,9 +47,39 @@ def kipptaf_fivetran_start_syncs_job():
fivetran_sync_op_aliased()


@job(
config=RunConfig(
ops={
"adp_workforce_now": SyncConfig(
connector_id="sameness_cunning", yield_materializations=False
)
}
),
)
def kipptaf_fivetran_adp_workforce_now_start_sync_job():
fivetran_sync_op_aliased = fivetran_start_sync_op.alias("adp_workforce_now")

fivetran_sync_op_aliased()


@job(
config=RunConfig(
ops={
"illuminate": SyncConfig(
connector_id="jinx_credulous", yield_materializations=False
)
}
),
)
def kipptaf_fivetran_illuminate_start_sync_job():
fivetran_sync_op_aliased = fivetran_start_sync_op.alias("illuminate")

fivetran_sync_op_aliased()


__all__ = [
*fivetran_materialization_jobs,
*fivetran_start_sync_jobs,
*fivetran_start_resync_jobs,
kipptaf_fivetran_adp_workforce_now_start_sync_job,
kipptaf_fivetran_illuminate_start_sync_job,
kipptaf_fivetran_start_syncs_job,
]
39 changes: 24 additions & 15 deletions src/teamster/kipptaf/fivetran/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
from teamster.kipptaf import LOCAL_TIMEZONE
from teamster.kipptaf.fivetran.jobs import (
fivetran_start_resync_jobs,
fivetran_start_sync_jobs,
kipptaf_fivetran_adp_workforce_now_start_sync_job,
kipptaf_fivetran_illuminate_start_sync_job,
kipptaf_fivetran_start_syncs_job,
)

adp_wfn_resync_schedule = ScheduleDefinition(
Expand All @@ -16,20 +18,27 @@
][0],
)

kipptaf_fivetran_start_syncs_schedule = ScheduleDefinition(
cron_schedule="0 * * * *",
execution_timezone=LOCAL_TIMEZONE.name,
job=kipptaf_fivetran_start_syncs_job,
)

kipptaf_fivetran_adp_workforce_now_start_sync_schedule = ScheduleDefinition(
cron_schedule="0 0-19 * * *",
execution_timezone=LOCAL_TIMEZONE.name,
job=kipptaf_fivetran_adp_workforce_now_start_sync_job,
)

kipptaf_fivetran_illuminate_start_sync_schedule = ScheduleDefinition(
cron_schedule="5 * * * *",
execution_timezone=LOCAL_TIMEZONE.name,
job=kipptaf_fivetran_illuminate_start_sync_job,
)

__all__ = [
adp_wfn_resync_schedule,
kipptaf_fivetran_adp_workforce_now_start_sync_schedule,
kipptaf_fivetran_illuminate_start_sync_schedule,
kipptaf_fivetran_start_syncs_schedule,
]

for job in fivetran_start_sync_jobs:
if job.name == "kipptaf_adp_workforce_now_fivetran_start_sync_job":
cron_schedule = "0 0-19 * * *"
elif job.name == "kipptaf_illuminate_fivetran_start_sync_job":
cron_schedule = "5 * * * *"
else:
cron_schedule = "0 * * * *"

__all__.append(
ScheduleDefinition(
cron_schedule=cron_schedule, execution_timezone=LOCAL_TIMEZONE.name, job=job
)
)

0 comments on commit 18f87b6

Please sign in to comment.