-
Notifications
You must be signed in to change notification settings - Fork 464
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement dataflow expiration to limit temporal data retention #29587
Conversation
src/compute/src/render.rs
Outdated
// does not advance past the expiration time. Otherwise, we might write down incorrect | ||
// data. | ||
if let Some(timestamp) = self.expire_at.as_option().copied() { | ||
oks.expire_at(timestamp); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could assign to oks
to make sure that we're observing the frontier before any downstream operators.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change would need to recreate the arrangement after the inspect
, right?
054b232
to
b9a2074
Compare
Some notes:
|
That's expected! Dataflows that are valid for all times, and indexes/materialed views/subscribes without up-to should have an empty frontier. The Meet of the empty frontier with the expiration time should, however, set the until. |
Thanks for the reply! Yes I meant the second part. I can see that Did you test temporal filters in an index? For me, the data is not being dropped because |
0eb513d
to
6da5d66
Compare
What index did you test this with? |
MitigationsCompleting required mitigations increases Resilience Coverage.
Risk Summary:This pull request has a high-risk score of 83, driven by predictors such as the average line count in files and executable lines within files. Historically, PRs with these characteristics are 158% more likely to cause a bug than the repository baseline. Additionally, four modified files are recent hotspots for bug fixes. While the observed bug trend in the repository is increasing with recent spikes, the predicted trend is decreasing. Note: The risk score is not based on semantic analysis but on historical predictors of bug occurrence in the repository. The attributes above were deemed the strongest predictors based on that history. Predictors and the score may change as the PR evolves in code, time, and review activity. Bug Hotspots:
|
I tested with a simple index and materialized view. Had to move out the |
5275555
to
1606dfd
Compare
@@ -241,7 +254,7 @@ pub fn build_compute_dataflow<A: Allocate>( | |||
source.storage_metadata.clone(), | |||
dataflow.as_of.clone(), | |||
snapshot_mode, | |||
dataflow.until.clone(), | |||
until.clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this is correct. We are now passing the expiration time as the until
to the storage source operators. Which means that these operators are not free to produce all times up to the until, and then jump directly to the empty frontier. expire_stream_at
wouldn't panic in this case because it has an exception for the empty frontier, so we would show invalid data for times >= until
. I think we still need to pass dataflow.until
here, or don't ignore the empty frontier in expire_stream_at
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the logic needs a rework wrt empty frontiers, as is also obvious from a CI failure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we discussed, this is needed in mfp.evaluate()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think Jan has a point, because a source could produce all data at the correct point in time, and then drop its capability. The downstream operator could see the data, plus the frontier advancement at the same time, which would make it hard to reason about what the expiration logic should do.
I think this is mitigated by only allowing the milliseconds timeline, which implies that we won't observe the forward-jumping behavior. But it's very hand-wavy.
Adds a metric to report the timestamp of replica expiration, and an approximate number of seconds that remain. ``` # HELP mz_dataflow_replica_expiration_remaining_seconds The remaining seconds until replica expiration. Can go negative, can lag behind. # TYPE mz_dataflow_replica_expiration_remaining_seconds gauge mz_dataflow_replica_expiration_remaining_seconds{worker_id="0"} 1727981.64199996 # HELP mz_dataflow_replica_expiration_timestamp_seconds The replica expiration timestamp in seconds since epoch. # TYPE mz_dataflow_replica_expiration_timestamp_seconds gauge mz_dataflow_replica_expiration_timestamp_seconds{worker_id="0"} 1730280383911 ``` Signed-off-by: Moritz Hoffmann <[email protected]>
51f0062
to
74635c2
Compare
fcfac82
to
5b847d4
Compare
Thanks everyone for the reviews! |
Support expiration of dataflows depending on wall-clock time and with refresh schedules. This is a partial re-implementation of #29587 to enable more dataflows to participate in expiration. Specifically, it introduces the abstraction of _time dependence_ to describe how a dataflow follows wall-clock time. Using this information, we can then determine how a replica's expiration time relates to a specific dataflow. This allows us to support dataflows that have custom refresh policies. I'm not sold on the names introduced by this PR, but it's the best I came up with. Open to suggestions! The implementation deviates from the existing implementation is some important ways: * We do not panic in the dataflow operator that checks for frontier advancements, but rather retain a capability until the dataflow is shut down. This avoids race-condition where dataflow shutdown happens in parallel with dropping the shutdown token, and it avoids needing to reason about what dataflows produce error streams---some have an error output that immediately advances to the empty frontier. * We do not handle the empty frontier in a special way. Previously, we considered advancing to the empty frontier acceptable. However, this makes it difficult to distinguish a shutdown from a source reading the expiration time. In the first case, the operator should drop its capability, in the second it must not for correctness reasons. * We check in the worker thread whether the replica has expired and panic if needed. There are some problems this PR does not address: * Caching the time dependence information in the physical plans seems like a hack. I think a better place would be the controller. Happy to try this in a follow-up PR. * We need a separate kill-switch to disable the feature because as it is implemented, we capture the expiration time in the controller once per replica. A second kill-switch would enable us to override the expiration to stabilize the system. Fixes MaterializeInc/database-issues#8688. Fixes MaterializeInc/database-issues#8683. ### Tips for the reviewer Don't look at individual commits, it's a work log and does not have any semantic meaning. ### Checklist - [ ] This PR has adequate test coverage / QA involvement has been duly considered. ([trigger-ci for additional test/nightly runs](https://trigger-ci.dev.materialize.com/)) - [ ] This PR has an associated up-to-date [design doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md), is a design doc ([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)), or is sufficiently small to not require a design. <!-- Reference the design in the description. --> - [ ] If this PR evolves [an existing `$T ⇔ Proto$T` mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md) (possibly in a backwards-incompatible way), then it is tagged with a `T-proto` label. - [ ] If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label ([example](MaterializeInc/cloud#5021)). <!-- Ask in #team-cloud on Slack if you need help preparing the cloud PR. --> - [ ] If this PR includes major [user-facing behavior changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note), I have pinged the relevant PM to schedule a changelog post. --------- Signed-off-by: Moritz Hoffmann <[email protected]>
Introduces a new feature to limit data retention in temporal filters by dropped retraction diffs beyond a configured expiration time.
Motivation and logic is explained in more details in the design doc.
Fixes MaterializeInc/database-issues#7757
Tips for reviewer
Catalog
in a separate file instead of piling on incatalog.rs
.Checklist
$T ⇔ Proto$T
mapping (possibly in a backwards-incompatible way), then it is tagged with aT-proto
label.