-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(data-warehouse): Added incremental syncs for postgres #23145
Conversation
class IncrementalField(TypedDict): | ||
label: str # Label shown in the UI | ||
type: IncrementalFieldType # Field type shown in the UI | ||
field: str # Actual DB field accessed | ||
field_type: IncrementalFieldType # Actual DB type of the field |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have the extra fields for sources where the incremental field is actually masked by an ast.ExpressionField
- e.g. in Stripe tables, we expose a created_at
datetime
, but the underlying data and Stripe API is actually created
and integer
.
Size Change: +197 B (+0.02%) Total Size: 1.06 MB ℹ️ View Unchanged
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super excited for this! Been a long time coming for properly supported incremental
Added some comments that are mostly NIT and i saw some of your comments that noted necessary cleanup. Otherwise ready to try it out
except Exception as e: | ||
logger.exception(f"Could not clean up data import folder: {job.folder_path}", exc_info=e) | ||
logger.exception(f"Could not clean up data import folder: {job.folder_path()}", exc_info=e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What was the reason for shifting these from property to function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It needed to be wrapped in a sync_to_async
call in a test because calling the property was failing from async issues - couldn't figure out how to do the wrapping as a property, and so just changed it to a func instead
for name, field_type in filter_postgres_incremental_fields(columns) | ||
] | ||
elif source.source_type == ExternalDataSource.Type.SNOWFLAKE: | ||
# TODO(@Gilbert09): Move all this into a util and replace elsewhere |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Suspect IssuesThis pull request was deployed and Sentry observed the following issues:
Did you find this useful? React with a 👍 or 👎 |
* Added incremental syncs for postgres * Tests * Fixed mypy * Added extra protection on the sync form * Set the correct write disposition for SQL * PR fixes * Fixed tests * Fixed tests * Fixed tests
Firstly, sorry for the huge PR - this kinda snowballed a fair amount
Problem
Changes
date
,timestamp
, andint
fields as the incremental fieldExternalDataSchema
and added logic there for triggering refreshes etcsync_type_payload
onto the schema object. We'll be storingincremental_field
andincremental_field_type
here.reset_pipeline
value to thesource
'sjob_inputs
which then gets reset at the end of the sync. This is to tell DLT to do a full reset of the tableScreen.Recording.2024-06-21.at.13.56.26.mov
Screen.Recording.2024-06-21.at.13.57.29.mov
Does this work well for both Cloud and self-hosted?
Yup
How did you test this code?