Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-19059]: Support non-time retract mode for OverAggregate operator #25753

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

bvarghese1
Copy link
Contributor

What is the purpose of the change

  • Non-time attribute over aggregates can support retract mode
  • Time attribute over aggregates support only append mode
  • An example would be:
  • INSERT INTO sink_t SELECT key, val, ts, SUM(val) OVER (PARTITION BY key ORDER BY <NON-TIME-ATTRIBUTE> RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS sum_val FROM source_t

Brief change log

  • The entire input state has to be maintained to retract old records correctly
  • The state is maintained as ValueState<List> per key
  • Requires documentation (WIP)

Verifying this change

Please make sure both new and modified tests in this PR follow the conventions for tests defined in our code quality guide.

(Please pick either of the following options)

  • Added restore tests

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (yes)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (not documented)

- Non-time attribute over aggregates can support retract mode
- Time attribute over aggregates support only append mode
@flinkbot
Copy link
Collaborator

flinkbot commented Dec 6, 2024

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@fhueske fhueske left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @bvarghese1!

I've only taken a look a the operator implementation and left a few suggestion to improve the efficiency of the operator.

Best, Fabian

protected transient JoinedRowData output;

// state to hold the accumulators of the aggregations
private transient ValueState<RowData> accState;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is stored in accState?
Is it the accumulators for the last record in the ordered list?

registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime());

// get last accumulator
RowData lastAccumulatorCurr = accState.value();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

accState is never updated.
I don't think we need to have accumulator state the way it is used here.

Storing the accumulators for each input row would reduce the computational cost for updates.

new ValueStateDescriptor<List<RowData>>("sortedKeyState", sortedKeyTypeInfo);
sortedKeyState = getRuntimeContext().getState(sortedKeyStateDescriptor);

MapStateDescriptor<Long, RowData> valueStateDescriptor =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we don't persist the accumulators for all input rows, another optimization would be to split the input row data into two maps, one for the fields that are needed to compute the aggregates and one for those that are simply forwarded.
Then we could recompute the accumulators by just deserializing the first map.


while (iterator.hasNext()) {
RowData curKey = iterator.next(); // (ID, sortKey)
RowData inputKey = GenericRowData.of(-1L, sortKeyFieldGetter.getFieldOrNull(input));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this out of the while loop?

break;
}
// Can also add the accKey to the sortedKeyList to avoid reading from the valueMapState
RowData curRow = valueMapState.get(curKey.getLong(keyIdx));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that the Map approach won't helps if we can have to access all elements for any update (insert or delete). We can either persist the accumulators for all input rows (so we only need to recompute all accumulators after the inserted/deleted row) or if we split the input row into the fields that are needed for the aggregate computation and those that aren't

Comment on lines +354 to +361
// Generate UPDATE_BEFORE
output.setRowKind(RowKind.UPDATE_BEFORE);
output.replace(curValue, prevFunction.getValue());
out.collect(output);
// Generate UPDATE_AFTER
output.setRowKind(RowKind.UPDATE_AFTER);
output.replace(curValue, currFunction.getValue());
out.collect(output);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we would store the accumulators for all input rows, we could check if the aggregates really changed.
For example a MIN(), MAX(), LAG(), or LEAD() function might not change, even if we inserted or deleted a row. We should only emit updates if something really changed.
Any change we filter out here, will reduce the processing overhead in subsequent operators / sinks.

out.collect(output);

// Emit updated agg value for all records after newly inserted row
while (iterator.hasNext()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we store the accumulators, we can early out this loop as soon as the accumulators before and after the change are identical. Since the accumulator for row x depends on the accumulator of row x-1 and x itself and x didn't change, we can stop traversing the list once the accumulators are identical before and after the row was inserted.

Comment on lines +413 to +419
output.setRowKind(RowKind.UPDATE_BEFORE);
output.replace(curValue, prevFunction.getValue());
out.collect(output);

output.setRowKind(RowKind.UPDATE_AFTER);
output.replace(curValue, currFunction.getValue());
out.collect(output);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we persist per-row accumulators, we could check if the rows differ and only emit updates in that case.

List<RowData> sortedKeyList = sortedKeyState.value();
ListIterator<RowData> iterator = sortedKeyList.listIterator();

while (iterator.hasNext()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could early out the loop once the accumulators before and after the change are the same (see detailed comment above)

out.collect(output);
iterator.remove();
valueMapState.remove(curKey.getLong(keyIdx));
currFunction.retract(curValue);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be better if we wouldn't need to rely on retractable accumulators.
They are often less space and/or compute efficient.

@bvarghese1
Copy link
Contributor Author

@lincoln-lil I noticed that you made some similar changes (more specifically the planner changes) for optimizing the Deduplicate operator. Would you mind giving me some suggestions regarding some planner specific changes that needs to be done if we want to support retract mode for a specific operator? I looked at your PR and I think I would need to also update the NDU files. Any other files/classes that I should look into?

@davidradl
Copy link
Contributor

Reviewed by Chi on 12/12/24. appears that this PR is healthily progressing

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants