Skip to content

Commit

Permalink
Merge pull request #363 from smart-on-fhir/mikix/timestamp-equal
Browse files Browse the repository at this point in the history
fix: allow overwriting delta lake entries with same timestamp
  • Loading branch information
mikix authored Nov 19, 2024
2 parents a2edca0 + 4edad9c commit 468dcfb
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 11 deletions.
26 changes: 17 additions & 9 deletions cumulus_etl/formats/deltalake.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,22 +196,30 @@ def _get_update_condition(schema: pyspark.sql.types.StructType) -> str | None:
if not has_last_updated_field:
return None

# OK, the field exists (which is typical for any FHIR resource tables, as we provide a wide FHIR schema),
# so we want to conditionally update rows based on the timestamp.
# OK, the field exists (which is typical for any FHIR resource tables, as we provide a wide
# FHIR schema), so we want to conditionally update rows based on the timestamp.
#
# We skip the update row if both the table and the update have a lastUpdated value and the update's value
# is in the past. But err on the side of caution if anything is missing, by taking the update.
# We skip the update row if both the table and the update have a lastUpdated value and the
# update's value is in the past. But err on the side of caution if anything is missing,
# by taking the update.
#
# This uses less-than instead of less-than-or-equal just to avoid needless churn.
# If we eventually decide that sub-second updates are a real concern, we can make it <= and
# additionally compare versionId. But I don't know how you extracted both versions so quickly. :)
# This uses less-than-or-equal instead of less-than when comparing the date, because
# sometimes the ETL will upload different content for the same resource as we update the
# ETL (for example, we allow-list yet another extension - we still want to re-upload the
# content with the new extension but same lastUpdated value). This does cause some needless
# churn on the delta lake side, but we'll have to live with that.
#
# If we eventually decide that sub-second updates are a real concern, we can additionally
# compare versionId. But I don't know how you extracted both versions so quickly. :)
#
# The cast-as-timestamp does not seem to noticeably slow us down.
# If it becomes an issue, we could always actually convert this string column to a real date/time column.
# If it becomes an issue, we could always actually convert this string column to a real
# date/time column.
return (
"table.meta.lastUpdated is null or "
"updates.meta.lastUpdated is null or "
"CAST(table.meta.lastUpdated AS TIMESTAMP) < CAST(updates.meta.lastUpdated AS TIMESTAMP)"
"CAST(table.meta.lastUpdated AS TIMESTAMP) <= "
"CAST(updates.meta.lastUpdated AS TIMESTAMP)"
)

@staticmethod
Expand Down
4 changes: 2 additions & 2 deletions tests/formats/test_deltalake.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ def test_last_updated_support(self):
[
{"id": "past", "meta": {"lastUpdated": now}, "value": 2},
{"id": "past-with-offset", "meta": {"lastUpdated": now}, "value": 2},
{"id": "now", "meta": {"lastUpdated": now}, "value": 1},
{"id": "now-without-zed", "meta": {"lastUpdated": now_without_zed}, "value": 1},
{"id": "now", "meta": {"lastUpdated": now}, "value": 2},
{"id": "now-without-zed", "meta": {"lastUpdated": now}, "value": 2},
{"id": "future", "meta": {"lastUpdated": future}, "value": 1},
{
"id": "future-with-offset",
Expand Down

0 comments on commit 468dcfb

Please sign in to comment.