diff --git a/src/teamster/kipptaf/fivetran/jobs.py b/src/teamster/kipptaf/fivetran/jobs.py index 0d26f1060d..2997729b2a 100644 --- a/src/teamster/kipptaf/fivetran/jobs.py +++ b/src/teamster/kipptaf/fivetran/jobs.py @@ -1,45 +1,22 @@ import pathlib import re -from dagster import AssetSelection, RunConfig, config_from_files, define_asset_job, job +from dagster import RunConfig, config_from_files, job -from teamster.core.fivetran.jobs import ( - build_fivetran_start_resync_job, - build_fivetran_start_sync_job, -) +from teamster.core.fivetran.jobs import build_fivetran_start_resync_job from teamster.core.fivetran.ops import SyncConfig, fivetran_start_sync_op from teamster.kipptaf import CODE_LOCATION, fivetran -fivetran_materialization_jobs = [] -fivetran_start_sync_jobs = [] fivetran_start_resync_jobs = [] -for asset in fivetran.assets: - connector_name = list(asset.keys)[0].path[1] - connector_id = re.match( - pattern=r"fivetran_sync_(\w+)", - string=asset.op.name, - ).group(1) - - fivetran_materialization_jobs.append( - define_asset_job( - name=(f"{CODE_LOCATION}_{connector_name}_fivetran_asset_job"), - selection=AssetSelection.keys(*list(asset.keys)), - ) - ) - fivetran_start_sync_jobs.append( - build_fivetran_start_sync_job( - code_location=CODE_LOCATION, - connector_id=connector_id, - connector_name=connector_name, - ) - ) +for asset in fivetran.assets: + op_name_match = re.match(pattern=r"fivetran_sync_(\w+)", string=asset.op.name) fivetran_start_resync_jobs.append( build_fivetran_start_resync_job( code_location=CODE_LOCATION, - connector_id=connector_id, - connector_name=connector_name, + connector_id=op_name_match.group(1), + connector_name=list(asset.keys)[0].path[1], ) ) @@ -70,9 +47,39 @@ def kipptaf_fivetran_start_syncs_job(): fivetran_sync_op_aliased() +@job( + config=RunConfig( + ops={ + "adp_workforce_now": SyncConfig( + connector_id="sameness_cunning", yield_materializations=False + ) + } + ), +) +def kipptaf_fivetran_adp_workforce_now_start_sync_job(): + fivetran_sync_op_aliased = fivetran_start_sync_op.alias("adp_workforce_now") + + fivetran_sync_op_aliased() + + +@job( + config=RunConfig( + ops={ + "illuminate": SyncConfig( + connector_id="jinx_credulous", yield_materializations=False + ) + } + ), +) +def kipptaf_fivetran_illuminate_start_sync_job(): + fivetran_sync_op_aliased = fivetran_start_sync_op.alias("illuminate") + + fivetran_sync_op_aliased() + + __all__ = [ - *fivetran_materialization_jobs, - *fivetran_start_sync_jobs, *fivetran_start_resync_jobs, + kipptaf_fivetran_adp_workforce_now_start_sync_job, + kipptaf_fivetran_illuminate_start_sync_job, kipptaf_fivetran_start_syncs_job, ] diff --git a/src/teamster/kipptaf/fivetran/schedules.py b/src/teamster/kipptaf/fivetran/schedules.py index 8f4dbe111b..c3faf558bc 100644 --- a/src/teamster/kipptaf/fivetran/schedules.py +++ b/src/teamster/kipptaf/fivetran/schedules.py @@ -3,7 +3,9 @@ from teamster.kipptaf import LOCAL_TIMEZONE from teamster.kipptaf.fivetran.jobs import ( fivetran_start_resync_jobs, - fivetran_start_sync_jobs, + kipptaf_fivetran_adp_workforce_now_start_sync_job, + kipptaf_fivetran_illuminate_start_sync_job, + kipptaf_fivetran_start_syncs_job, ) adp_wfn_resync_schedule = ScheduleDefinition( @@ -16,20 +18,27 @@ ][0], ) +kipptaf_fivetran_start_syncs_schedule = ScheduleDefinition( + cron_schedule="0 * * * *", + execution_timezone=LOCAL_TIMEZONE.name, + job=kipptaf_fivetran_start_syncs_job, +) + +kipptaf_fivetran_adp_workforce_now_start_sync_schedule = ScheduleDefinition( + cron_schedule="0 0-19 * * *", + execution_timezone=LOCAL_TIMEZONE.name, + job=kipptaf_fivetran_adp_workforce_now_start_sync_job, +) + +kipptaf_fivetran_illuminate_start_sync_schedule = ScheduleDefinition( + cron_schedule="5 * * * *", + execution_timezone=LOCAL_TIMEZONE.name, + job=kipptaf_fivetran_illuminate_start_sync_job, +) + __all__ = [ adp_wfn_resync_schedule, + kipptaf_fivetran_adp_workforce_now_start_sync_schedule, + kipptaf_fivetran_illuminate_start_sync_schedule, + kipptaf_fivetran_start_syncs_schedule, ] - -for job in fivetran_start_sync_jobs: - if job.name == "kipptaf_adp_workforce_now_fivetran_start_sync_job": - cron_schedule = "0 0-19 * * *" - elif job.name == "kipptaf_illuminate_fivetran_start_sync_job": - cron_schedule = "5 * * * *" - else: - cron_schedule = "0 * * * *" - - __all__.append( - ScheduleDefinition( - cron_schedule=cron_schedule, execution_timezone=LOCAL_TIMEZONE.name, job=job - ) - )