This post was updated on .
When using SELECT DISTINCT in Flink SQL I found that when a new message with the same key arrives, it will emit a retract and a new insert message. According to JIRA-FLINK-8566 and JIRA-FLINK-8564, it's a feature to prevent too early state eviction of downstream operators when state cleaning is enabled. However, when I enabled MiniBatch Aggregation, it stops emitting those retract and new messages.
According to the source code of GroupAggFunction and MiniBatchGroupAggFunction, when MiniBatch Aggregation enabled, it does not emit those messages. But why not? I don't think MiniBatch Aggregation can avoid state eviction of downstream operators anyway. GroupAggFunction.java:
MiniBatchGroupAggFunction.java:
Flink Version: 1.12-SNAPSHOT(GitHub Master) Planner: Blink Planner
Best wishes.
Smile |
Hi Smile,
Thanks for bringing up this discussion. I think you are right, it's an bug, MiniBatchGroupAggFunction should also do this. You can open a jira issue for this, and offer to fix it if you wish. Even more, current MiniBatchGroupAggFunction has some more severe bug, it's not cleaning expired state, see FLINK-17096. Smile <[hidden email]> 于2020年10月12日周一 下午6:15写道: > When using SELECT DISTINCT in Flink SQL I found that when a new message > with > the same key arrives, it will emit a retract and a new insert message. > According to JIRA-FLINK-8566 > <https://issues.apache.org/jira/browse/FLINK-8566> and JIRA-FLINK-8564 > <https://issues.apache.org/jira/browse/FLINK-8564> , it's a feature to > prevent too early state eviction of downstream operators when state > cleaning > is enabled. However, when I enabled MiniBatch Aggregation > < > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation> > > , it stops emitting those retract and new messages. > > According to the source code of GroupAggFunction > < > https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java#L183> > > and MiniBatchGroupAggFunction > < > https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java#L206> > > , when MiniBatch Aggregation enabled, it does not emit those messages. > > But why not? I don't think MiniBatch Aggregation can avoid state eviction > of > downstream operators anyway. > > *GroupAggFunction.java:* > 1 2 3 4 5 6 7 8 910111213141516 > if (!stateCleaningEnabled && equaliser.equals(prevAggValue, > newAggValue)) { // newRow is the same as before and state cleaning is not > enabled. // We do not emit retraction and acc message. // If > state cleaning > is enabled, we have to emit messages to prevent too early // state > eviction > of downstream operators. return;} else { // retract previous > result if > (generateUpdateBefore) { // prepare UPDATE_BEFORE message > for previous row > resultRow.replace(currentKey, > prevAggValue).setRowKind(RowKind.UPDATE_BEFORE); > out.collect(resultRow); } > // prepare UPDATE_AFTER message for new row > resultRow.replace(currentKey, > newAggValue).setRowKind(RowKind.UPDATE_AFTER);} > > > *MiniBatchGroupAggFunction.java:* > 1 2 3 4 5 6 7 8 9101112 > if (!equaliser.equals(prevAggValue, newAggValue)) { // new row is not > same > with prev row if (generateUpdateBefore) { // prepare > UPDATE_BEFORE message > for previous row resultRow.replace(currentKey, > prevAggValue).setRowKind(RowKind.UPDATE_BEFORE); > out.collect(resultRow); } > // prepare UPDATE_AFTER message for new row > resultRow.replace(currentKey, > newAggValue).setRowKind(RowKind.UPDATE_AFTER); out.collect(resultRow);}// > new row is same with prev row, no need to output > > > Flink Version: 1.12-SNAPSHOT( GitHub Master > <https://github.com/apache/flink> ) > Planner: Blink Planner > > > > -- > Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ > -- Best, Benchao Li |
Hi Benchao,
Thank you for your confirmation. I created an issue for this bug as Flink-19592 (https://issues.apache.org/jira/browse/FLINK-19592) ----- Best wishes. Smile -- Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
Best wishes.
Smile |
Free forum by Nabble | Edit this page |