Skip to content

Commit

Permalink
Add ETL timestamp to completion table, make it non-unique
Browse files Browse the repository at this point in the history
This allows us to add an entry to the completion table for every
ETL run - which might help debugging.
  • Loading branch information
mikix committed Oct 28, 2024
1 parent 98e2b8c commit 5a43957
Show file tree
Hide file tree
Showing 35 changed files with 56 additions and 37 deletions.
8 changes: 7 additions & 1 deletion cumulus_etl/completion/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ def completion_format_args() -> dict:
"""Returns kwargs to pass to the Format class initializer of your choice"""
return {
"dbname": COMPLETION_TABLE,
"uniqueness_fields": {"table_name", "group_name"},
# These fields altogether basically guarantee that we never collide.
# (i.e. that every 'merge' is really an 'insert')
# That's intentional - we want this table to be a bit of a historical log.
# (We couldn't have no uniqueness fields -- delta lake doesn't like that.)
"uniqueness_fields": {"table_name", "group_name", "export_time", "etl_time"},
}


Expand Down Expand Up @@ -47,6 +51,8 @@ def completion_schema() -> pyarrow.Schema:
pyarrow.field("export_time", pyarrow.string()),
pyarrow.field("export_url", pyarrow.string()),
pyarrow.field("etl_version", pyarrow.string()),
# See note above for why this isn't a pyarrow.timestamp() field.
pyarrow.field("etl_time", pyarrow.string()),
]
)

Expand Down
5 changes: 3 additions & 2 deletions cumulus_etl/etl/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def __init__(
self._output_format = output_format
self.dir_errors = dir_errors
self.client = client
self.timestamp = common.timestamp_filename(timestamp)
self.timestamp = timestamp
self.hostname = gethostname()
self.comment = comment or ""
self.batch_size = batch_size
Expand All @@ -67,7 +67,8 @@ def path_config(self) -> str:
return os.path.join(self.dir_job_config(), "job_config.json")

def dir_job_config(self) -> str:
path = self._output_root.joinpath(f"JobConfig/{self.timestamp}")
timestamp_dir = common.timestamp_filename(self.timestamp)
path = self._output_root.joinpath(f"JobConfig/{timestamp_dir}")
self._output_root.makedirs(path)
return path

Expand Down
1 change: 1 addition & 0 deletions cumulus_etl/etl/tasks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ def _update_completion_table(self) -> None:
"export_time": self.task_config.export_datetime.isoformat(),
"export_url": self.task_config.export_url,
"etl_version": cumulus_etl.__version__,
"etl_time": self.task_config.timestamp.isoformat(),
}
for output in self.outputs
if not output.get_name(self).startswith("etl__")
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "covid_symptom__nlp_results", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
{"table_name": "covid_symptom__nlp_results", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "covid_symptom__nlp_results_term_exists", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
{"table_name": "covid_symptom__nlp_results_term_exists", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "hftest__summary", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
{"table_name": "hftest__summary", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "encounter", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
{"table_name": "encounter", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "patient", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
{"table_name": "patient", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "condition", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
{"table_name": "condition", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "documentreference", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
{"table_name": "documentreference", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
{"table_name": "medication", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
{"table_name": "medicationrequest", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
{"table_name": "medication", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
{"table_name": "medicationrequest", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "observation", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
{"table_name": "observation", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "encounter", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
{"table_name": "encounter", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "patient", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
{"table_name": "patient", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "condition", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
{"table_name": "condition", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "documentreference", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
{"table_name": "documentreference", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
{"table_name": "medication", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
{"table_name": "medicationrequest", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
{"table_name": "medication", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
{"table_name": "medicationrequest", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "observation", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
{"table_name": "observation", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "procedure", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
{"table_name": "procedure", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "servicerequest", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
{"table_name": "servicerequest", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "encounter", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
{"table_name": "encounter", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "patient", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
{"table_name": "patient", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "allergyintolerance", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
{"table_name": "allergyintolerance", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "condition", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
{"table_name": "condition", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "device", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
{"table_name": "device", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "diagnosticreport", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
{"table_name": "diagnosticreport", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "documentreference", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
{"table_name": "documentreference", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "immunization", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
{"table_name": "immunization", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
{"table_name": "medication", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
{"table_name": "medicationrequest", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
{"table_name": "medication", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
{"table_name": "medicationrequest", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "observation", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
{"table_name": "observation", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "procedure", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
{"table_name": "procedure", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "servicerequest", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
{"table_name": "servicerequest", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
1 change: 1 addition & 0 deletions tests/etl/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ def setUp(self) -> None:
"ndjson",
"ndjson",
client,
timestamp=common.datetime_now(),
batch_size=5,
dir_errors=self.errors_dir,
export_group_name="test-group",
Expand Down
13 changes: 11 additions & 2 deletions tests/etl/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,9 @@ async def test_encounter_completion(self):
self.assertFalse(comp_enc_format.update_existing)

self.assertEqual("etl__completion", comp_format.dbname)
self.assertEqual({"table_name", "group_name"}, comp_format.uniqueness_fields)
self.assertEqual(
{"table_name", "group_name", "export_time", "etl_time"}, comp_format.uniqueness_fields
)
self.assertTrue(comp_format.update_existing)

self.assertEqual(2, comp_enc_format.write_records.call_count)
Expand Down Expand Up @@ -240,6 +242,7 @@ async def test_encounter_completion(self):
"export_time": "2012-10-10T05:30:12+00:00",
"export_url": self.export_url,
"etl_version": "1.0.0+test",
"etl_time": "2021-09-14T21:23:45+00:00",
}
],
comp_batch.rows,
Expand Down Expand Up @@ -276,13 +279,15 @@ async def test_medication_completion(self):
"export_time": "2012-10-10T05:30:12+00:00",
"export_url": self.export_url,
"etl_version": "1.0.0+test",
"etl_time": "2021-09-14T21:23:45+00:00",
},
{
"table_name": "medicationrequest",
"group_name": "test-group",
"export_time": "2012-10-10T05:30:12+00:00",
"export_url": self.export_url,
"etl_version": "1.0.0+test",
"etl_time": "2021-09-14T21:23:45+00:00",
},
],
comp_batch.rows,
Expand Down Expand Up @@ -330,6 +335,7 @@ async def test_allow_empty_group(self):
"export_time": "2012-10-10T05:30:12+00:00",
"export_url": self.export_url,
"etl_version": "1.0.0+test",
"etl_time": "2021-09-14T21:23:45+00:00",
}
],
comp_format.write_records.call_args[0][0].rows,
Expand Down Expand Up @@ -420,7 +426,10 @@ async def test_external_medications(self, mock_download):
uniqueness_fields=None,
update_existing=True,
),
mock.call(dbname="etl__completion", uniqueness_fields={"group_name", "table_name"}),
mock.call(
dbname="etl__completion",
uniqueness_fields={"group_name", "table_name", "export_time", "etl_time"},
),
],
self.create_formatter_mock.call_args_list,
)
Expand Down
1 change: 1 addition & 0 deletions tests/loaders/ndjson/test_bulk_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,7 @@ async def test_successful_etl_bulk_export(self):
"export_time": "2015-02-07T13:28:17+02:00",
"export_url": f"{self.fhir_url}/$export?_type=Patient",
"etl_version": "1.0.0+test",
"etl_time": "2021-09-14T21:23:45+00:00",
},
common.read_json(f"{tmpdir}/output/etl__completion/etl__completion.000.ndjson"),
)
Expand Down

0 comments on commit 5a43957

Please sign in to comment.