diff --git a/cumulus_etl/formats/deltalake.py b/cumulus_etl/formats/deltalake.py index 6125cec..63173b7 100644 --- a/cumulus_etl/formats/deltalake.py +++ b/cumulus_etl/formats/deltalake.py @@ -196,22 +196,30 @@ def _get_update_condition(schema: pyspark.sql.types.StructType) -> str | None: if not has_last_updated_field: return None - # OK, the field exists (which is typical for any FHIR resource tables, as we provide a wide FHIR schema), - # so we want to conditionally update rows based on the timestamp. + # OK, the field exists (which is typical for any FHIR resource tables, as we provide a wide + # FHIR schema), so we want to conditionally update rows based on the timestamp. # - # We skip the update row if both the table and the update have a lastUpdated value and the update's value - # is in the past. But err on the side of caution if anything is missing, by taking the update. + # We skip the update row if both the table and the update have a lastUpdated value and the + # update's value is in the past. But err on the side of caution if anything is missing, + # by taking the update. # - # This uses less-than instead of less-than-or-equal just to avoid needless churn. - # If we eventually decide that sub-second updates are a real concern, we can make it <= and - # additionally compare versionId. But I don't know how you extracted both versions so quickly. :) + # This uses less-than-or-equal instead of less-than when comparing the date, because + # sometimes the ETL will upload different content for the same resource as we update the + # ETL (for example, we allow-list yet another extension - we still want to re-upload the + # content with the new extension but same lastUpdated value). This does cause some needless + # churn on the delta lake side, but we'll have to live with that. + # + # If we eventually decide that sub-second updates are a real concern, we can additionally + # compare versionId. But I don't know how you extracted both versions so quickly. :) # # The cast-as-timestamp does not seem to noticeably slow us down. - # If it becomes an issue, we could always actually convert this string column to a real date/time column. + # If it becomes an issue, we could always actually convert this string column to a real + # date/time column. return ( "table.meta.lastUpdated is null or " "updates.meta.lastUpdated is null or " - "CAST(table.meta.lastUpdated AS TIMESTAMP) < CAST(updates.meta.lastUpdated AS TIMESTAMP)" + "CAST(table.meta.lastUpdated AS TIMESTAMP) <= " + "CAST(updates.meta.lastUpdated AS TIMESTAMP)" ) @staticmethod diff --git a/tests/formats/test_deltalake.py b/tests/formats/test_deltalake.py index 1404cfa..e87bdb3 100644 --- a/tests/formats/test_deltalake.py +++ b/tests/formats/test_deltalake.py @@ -157,8 +157,8 @@ def test_last_updated_support(self): [ {"id": "past", "meta": {"lastUpdated": now}, "value": 2}, {"id": "past-with-offset", "meta": {"lastUpdated": now}, "value": 2}, - {"id": "now", "meta": {"lastUpdated": now}, "value": 1}, - {"id": "now-without-zed", "meta": {"lastUpdated": now_without_zed}, "value": 1}, + {"id": "now", "meta": {"lastUpdated": now}, "value": 2}, + {"id": "now-without-zed", "meta": {"lastUpdated": now}, "value": 2}, {"id": "future", "meta": {"lastUpdated": future}, "value": 1}, { "id": "future-with-offset",