Skip to content

Commit

Permalink
fix: non-partitioned assets
Browse files Browse the repository at this point in the history
cbini committed Jan 6, 2025
1 parent 466c001 commit a8bae70
Showing 6 changed files with 43 additions and 26 deletions.
39 changes: 25 additions & 14 deletions src/teamster/libraries/couchdrop/sensors.py
Original file line number Diff line number Diff line change
@@ -46,6 +46,8 @@ def build_couchdrop_sftp_sensor(
define_asset_job(
name=(
f"{base_job_name}_{partitions_def.get_serializable_unique_identifier()}"
if partitions_def is not None
else base_job_name
),
selection=list(keys),
)
@@ -97,7 +99,6 @@ def _sensor(context: SensorEvaluationContext, ssh_couchdrop: SSHResource):
for a in asset_selection:
asset_identifier = a.key.to_python_identifier()
metadata = a.metadata_by_key[a.key]
partitions_def = _check.not_none(value=a.partitions_def)

max_st_mtime = cursor_st_mtime = cursor.get(asset_identifier, 0)

@@ -131,28 +132,38 @@ def _sensor(context: SensorEvaluationContext, ssh_couchdrop: SSHResource):
partition_key = None

context.log.info(f"{asset_identifier}\n{f.filename}: {partition_key}")
run_request_kwargs.append(
{
"asset_key": a.key,
"job_name": (
f"{base_job_name}_"
f"{partitions_def.get_serializable_unique_identifier()}"
),
"partition_key": partition_key,
}
)

if a.partitions_def is not None:
run_request_kwargs.append(
{
"asset_key": a.key,
"job_name": (
f"{base_job_name}_"
+ a.partitions_def.get_serializable_unique_identifier()
),
"partition_key": partition_key,
}
)
else:
run_request_kwargs.append(
{
"asset_key": a.key,
"job_name": base_job_name,
"partition_key": None,
}
)

cursor[asset_identifier] = max_st_mtime

if run_request_kwargs:
for (job_name, parition_key), group in groupby(
for (job_name, partition_key), group in groupby(
iterable=run_request_kwargs, key=itemgetter("job_name", "partition_key")
):
run_requests.append(
RunRequest(
run_key=f"{job_name}_{parition_key}_{now_timestamp}",
run_key=f"{job_name}_{partition_key}_{now_timestamp}",
job_name=job_name,
partition_key=parition_key,
partition_key=partition_key,
asset_selection=[g["asset_key"] for g in group],
)
)
6 changes: 3 additions & 3 deletions src/teamster/libraries/iready/sensors.py
Original file line number Diff line number Diff line change
@@ -122,14 +122,14 @@ def _sensor(context: SensorEvaluationContext, ssh_iready: SSHResource):

cursor[asset_identifier] = now_timestamp

for (job_name, parition_key), group in groupby(
for (job_name, partition_key), group in groupby(
iterable=run_request_kwargs, key=itemgetter("job_name", "partition_key")
):
run_requests.append(
RunRequest(
run_key=f"{job_name}_{parition_key}_{now_timestamp}",
run_key=f"{job_name}_{partition_key}_{now_timestamp}",
job_name=job_name,
partition_key=parition_key,
partition_key=partition_key,
asset_selection=[g["asset_key"] for g in group],
)
)
6 changes: 3 additions & 3 deletions src/teamster/libraries/powerschool/sis/sensors.py
Original file line number Diff line number Diff line change
@@ -353,14 +353,14 @@ def _sensor(

item_getter = itemgetter("job_name", "partition_key")

for (job_name, parition_key), group in groupby(
for (job_name, partition_key), group in groupby(
iterable=sorted(run_request_kwargs, key=item_getter), key=item_getter
):
run_requests.append(
RunRequest(
run_key=f"{job_name}_{parition_key}_{now_timestamp}",
run_key=f"{job_name}_{partition_key}_{now_timestamp}",
job_name=job_name,
partition_key=parition_key,
partition_key=partition_key,
asset_selection=[g["asset_key"] for g in group],
tags={MAX_RUNTIME_SECONDS_TAG: (60 * 5)},
)
6 changes: 3 additions & 3 deletions src/teamster/libraries/renlearn/sensors.py
Original file line number Diff line number Diff line change
@@ -104,14 +104,14 @@ def _sensor(context: SensorEvaluationContext, ssh_renlearn: SSHResource):

cursor[asset_identifier] = now_timestamp

for (job_name, parition_key), group in groupby(
for (job_name, partition_key), group in groupby(
iterable=run_request_kwargs, key=itemgetter("job_name", "partition_key")
):
run_requests.append(
RunRequest(
run_key=f"{job_name}_{parition_key}_{now_timestamp}",
run_key=f"{job_name}_{partition_key}_{now_timestamp}",
job_name=job_name,
partition_key=parition_key,
partition_key=partition_key,
asset_selection=[g["asset_key"] for g in group],
)
)
6 changes: 3 additions & 3 deletions src/teamster/libraries/titan/sensors.py
Original file line number Diff line number Diff line change
@@ -109,14 +109,14 @@ def _sensor(context: SensorEvaluationContext, ssh_titan: SSHResource):

cursor[asset_identifier] = now_timestamp

for (job_name, parition_key), group in groupby(
for (job_name, partition_key), group in groupby(
iterable=run_request_kwargs, key=itemgetter("job_name", "partition_key")
):
run_requests.append(
RunRequest(
run_key=f"{job_name}_{parition_key}_{now_timestamp}",
run_key=f"{job_name}_{partition_key}_{now_timestamp}",
job_name=job_name,
partition_key=parition_key,
partition_key=partition_key,
asset_selection=[g["asset_key"] for g in group],
)
)
6 changes: 6 additions & 0 deletions tests/sensors/sftp/test_sftp_couchdrop_sensors.py
Original file line number Diff line number Diff line change
@@ -42,3 +42,9 @@ def test_couchdrop_sftp_sensor_kippnewark():
)

_test_sensor(sftp_sensor=couchdrop_sftp_sensor)


def test_couchdrop_sftp_sensor_kipptaf():
from teamster.code_locations.kipptaf.couchdrop.sensors import couchdrop_sftp_sensor

_test_sensor(sftp_sensor=couchdrop_sftp_sensor)

0 comments on commit a8bae70

Please sign in to comment.