Smile created FLINK-19592:
-----------------------------
Summary: MiniBatchGroupAggFunction should emit messages to prevent too early state eviction of downstream operators
Key: FLINK-19592
URL:
https://issues.apache.org/jira/browse/FLINK-19592 Project: Flink
Issue Type: Bug
Components: Table SQL / Runtime
Affects Versions: 1.12.0
Reporter: Smile
Currently, [GroupAggFunction|
https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java#L183] will emit a retract and a new insert message when a new message with the same key arrives. According to [Flink-8566|
https://issues.apache.org/jira/browse/FLINK-8566], it's a feature to prevent too early state eviction of downstream operators.
However, [MiniBatchGroupAggFunction|
https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java#L206] doesn't. Before [Flink-8566|
https://issues.apache.org/jira/browse/FLINK-8566] being resolved, it should also emit these messages.
*GroupAggFunction.java:*
{code:java}
if (!stateCleaningEnabled && equaliser.equals(prevAggValue, newAggValue)) {
// newRow is the same as before and state cleaning is not enabled.
// We do not emit retraction and acc message.
// If state cleaning is enabled, we have to emit messages to prevent too early
// state eviction of downstream operators.
return;
} else {
// retract previous result
if (generateUpdateBefore) {
// prepare UPDATE_BEFORE message for previous row
resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.UPDATE_BEFORE);
out.collect(resultRow);
}
// prepare UPDATE_AFTER message for new row
resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.UPDATE_AFTER);
}
{code}
*MiniBatchGroupAggFunction.java:*
{code:java}
if (!equaliser.equals(prevAggValue, newAggValue)) {
// new row is not same with prev row
if (generateUpdateBefore) {
// prepare UPDATE_BEFORE message for previous row
resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.UPDATE_BEFORE);
out.collect(resultRow);
}
// prepare UPDATE_AFTER message for new row
resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.UPDATE_AFTER);
out.collect(resultRow);
}
// new row is same with prev row, no need to output
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)