-
Notifications
You must be signed in to change notification settings - Fork 226
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merge Records Based on Individual Record's Last Modified Timestamp #2145
Comments
Hey @trin94, if you just merge all incoming data on the primary key, would this not work? Or so sometimes batches come that have rows with last_modified timestamps that are older than the one in the db? |
Hi @sh-rp, unfortunately, yes, that is possible. In our use case (@trin94 and mine), we sometimes receive a JSONL file where the modification_date is newer than that of all previous files. Some records in the file may have a LastModified date that is newer than what's currently in the database for their primary key (PK), and these need to be loaded. However, other records in the same file might not be the most recent version for their PK, as the latest version is already in the database (with a newer LastModified-Timestamp). Those records in the file need to be ignored. |
@frank-engelen I see. Of the top of my head I don't see a way to do this with the current dlt implementation except for extending the destination implementation you are using and modifying the sql in the merge job, which can be done but is not documented, so you'd need to figure this out. We have consulting partners that could help you with this kind of stuff though if you need support, you can find the list at https://dlthub.com/partners/consulting |
@frank-engelen if your batches are not extremely big you could try to access the destination from the extract step in dlt resource. we do not recommend this way to filter out data but maybe in your case it may work def test_access_pipeline_in_resource() -> None:
pipeline = dlt.pipeline("test_access_pipeline_in_resource", destination="duckdb")
@dlt.resource(name="user_comments")
def comments(user_id: str):
current_pipeline = dlt.current.pipeline()
# find last comment id for given user_id by looking in destination
max_id: int = 0
# on first pipeline run, user_comments table does not yet exist so do not check at all
# alternatively catch DatabaseUndefinedRelation which is raised when unknown table is selected
if not current_pipeline.first_run:
with current_pipeline.sql_client() as client:
# we may get last user comment or None which we replace with 0
max_id = (
client.execute_sql(
"SELECT MAX(_id) FROM user_comments WHERE user_id=?", user_id
)[0][0]
or 0
)
# use max_id to filter our results
yield from [
{"_id": i, "value": l, "user_id": user_id}
for i, l in zip([1, 2, 3], ["A", "B", "C"])
if i > max_id
]
info = pipeline.run(comments("USER_A"))
assert_load_info(info)
assert pipeline.last_trace.last_normalize_info.row_counts["user_comments"] == 3
info = pipeline.run(comments("USER_A"))
# no more data for USER_A
assert_load_info(info, 0)
info = pipeline.run(comments("USER_B"))
assert_load_info(info)
assert pipeline.last_trace.last_normalize_info.row_counts["user_comments"] == 3 |
Thanks for looking into this 🙂 I can understand that this isn't viable as a feature in general, considering the amount and diversity of destinations. For now we will change the strategy from Extending the merge job of our destination (postgres) sounds like a good approach to do this in dlt for our case. We'll look into that when we have room for such topics. Thanks for all your work in creating and building dlt, it's hugely appreciated. |
Alright! Then I will close this :) |
Feature description
We would like to incrementally update individual rows based on a
LastModified
timestamp at row level.Are you a dlt user?
Yes, I run dlt in production.
Use case
We use the filesystem module to incrementally load data into a database, determined by each file's modification_date.
Now, we want to add another condition to filter out outdated data.
Our data is stored in jsonl files, each containing 1-n individual records. Each record has a unique primary key and an individual
LastModified
timestamp. We would like to update each row in the database only if a new record has a more recentLastModified
timestamp for the same primary key.We tried implementing this via the
dlt.sources.incremental
functionality, but as far as we understand, this tracks aLastModified
value for the entire table but not for each record as we would need it.As we receive data in batches and cannot control the order of updates to individual rows, this is not sufficient.
Proposed solution
No response
Related issues
No response
The text was updated successfully, but these errors were encountered: