Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: allow overwriting delta lake entries with same timestamp #363

Merged
merged 1 commit into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions cumulus_etl/formats/deltalake.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,22 +196,30 @@ def _get_update_condition(schema: pyspark.sql.types.StructType) -> str | None:
if not has_last_updated_field:
return None

# OK, the field exists (which is typical for any FHIR resource tables, as we provide a wide FHIR schema),
# so we want to conditionally update rows based on the timestamp.
# OK, the field exists (which is typical for any FHIR resource tables, as we provide a wide
# FHIR schema), so we want to conditionally update rows based on the timestamp.
#
# We skip the update row if both the table and the update have a lastUpdated value and the update's value
# is in the past. But err on the side of caution if anything is missing, by taking the update.
# We skip the update row if both the table and the update have a lastUpdated value and the
# update's value is in the past. But err on the side of caution if anything is missing,
# by taking the update.
#
# This uses less-than instead of less-than-or-equal just to avoid needless churn.
# If we eventually decide that sub-second updates are a real concern, we can make it <= and
# additionally compare versionId. But I don't know how you extracted both versions so quickly. :)
# This uses less-than-or-equal instead of less-than when comparing the date, because
# sometimes the ETL will upload different content for the same resource as we update the
# ETL (for example, we allow-list yet another extension - we still want to re-upload the
# content with the new extension but same lastUpdated value). This does cause some needless
# churn on the delta lake side, but we'll have to live with that.
#
# If we eventually decide that sub-second updates are a real concern, we can additionally
# compare versionId. But I don't know how you extracted both versions so quickly. :)
Comment on lines +212 to +213
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah i don't think we're going to have this problem any time soon

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But correctness Maaaatt!

#
# The cast-as-timestamp does not seem to noticeably slow us down.
# If it becomes an issue, we could always actually convert this string column to a real date/time column.
# If it becomes an issue, we could always actually convert this string column to a real
# date/time column.
return (
"table.meta.lastUpdated is null or "
"updates.meta.lastUpdated is null or "
"CAST(table.meta.lastUpdated AS TIMESTAMP) < CAST(updates.meta.lastUpdated AS TIMESTAMP)"
"CAST(table.meta.lastUpdated AS TIMESTAMP) <= "
"CAST(updates.meta.lastUpdated AS TIMESTAMP)"
)

@staticmethod
Expand Down
4 changes: 2 additions & 2 deletions tests/formats/test_deltalake.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ def test_last_updated_support(self):
[
{"id": "past", "meta": {"lastUpdated": now}, "value": 2},
{"id": "past-with-offset", "meta": {"lastUpdated": now}, "value": 2},
{"id": "now", "meta": {"lastUpdated": now}, "value": 1},
{"id": "now-without-zed", "meta": {"lastUpdated": now_without_zed}, "value": 1},
{"id": "now", "meta": {"lastUpdated": now}, "value": 2},
{"id": "now-without-zed", "meta": {"lastUpdated": now}, "value": 2},
{"id": "future", "meta": {"lastUpdated": future}, "value": 1},
{
"id": "future-with-offset",
Expand Down