-
Notifications
You must be signed in to change notification settings - Fork 13.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-36881][table] Introduce GroupTableAggFunction in GroupTableAggregate with Async State API #25789
base: master
Are you sure you want to change the base?
Conversation
final boolean enableAsyncState = AggregateUtil.enableAsyncState(config, aggInfoList); | ||
|
||
final OneInputStreamOperator<RowData, RowData> operator; | ||
if (!enableAsyncState) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: if (enableAsyncState)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have modified it in the pr
config, planner.getFlinkContext().getClassLoader()), | ||
planner.createRelBuilder(), | ||
JavaScalaConversionUtil.toScala(inputRowType.getChildren()), | ||
// TODO: heap state backend do not copy key currently, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure of the impact of these TODOs.
it would be worth tracking these TODOs with Jiras and including the numbers in the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will track the progress of Jira and follow up on PR
import static org.apache.flink.table.runtime.util.StateConfigUtil.createTtlConfig; | ||
|
||
/** Aggregate Function used for the groupby (without window) table aggregate in async state. */ | ||
public class AsyncStateGroupTableAggFunction |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if we can have the async version extend the sync version and reuse variables / logic, to try to keep the 2 implementations in line and reduce the change of diverging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, sync and asynchronous use different versions of state objects. In consideration of bccace0, the design concept is to design another set of corresponding files and reuse the code logic as much as possible in Helper
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, sync and asynchronous use different versions of state objects. In consideration of bccace0, the design concept is to design another set of corresponding files and reuse the code logic as much as possible in
Helper
I am curious why there are different state objects - used for sync and async - this does not seem right - can we align or how can we be sure that the behaviour remains the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The async version of the state object is still under development, and many operators such as MiniBatchGroupAggFunction cannot support state objects well, so we need to wait for the maturity of the async version of the state framework
import static org.apache.flink.table.runtime.util.StateConfigUtil.createTtlConfig; | ||
|
||
/** Aggregate Function used for the groupby (without window) table aggregate in async state. */ | ||
public class AsyncStateGroupTableAggFunction |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needs unit test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function related to agg requires GeneratedTableAggsHandleFunction which seems not to be able to create quickly in the test, and other aggFunctions do not implement similar tests. Therefore, I have placed the specific test in TableAggregateITCase and TableAggregateHarnessTest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed by Chi on 12/12/24. Asked submitter questions
import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg; | ||
|
||
/** A helper to do the logic of group table agg. */ | ||
public abstract class GroupTableAggHelper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need unit tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corresponding to Asynchronous State Group Table AggFunction
this.function = function; | ||
} | ||
|
||
public RowData processElement( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be great to have javadoc to detail the intent of the method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for review. Relevant comments have been added
…regate with Async State API
5536f39
to
6eb3463
Compare
What is the purpose of the change
Introduce GroupTableAggFunction in GroupTableAggregate with async state api.
Brief change log
Verifying this change
Existent tests and new added tests can verify this change.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation