[DISCUSS]Why not emit messages to prevent too early state eviction in MiniBatchGroupAggFunction?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS]Why not emit messages to prevent too early state eviction in MiniBatchGroupAggFunction?

Smile
This post was updated on .
When using SELECT DISTINCT in Flink SQL I found that when a new message with the same key arrives, it will emit a retract and a new insert message. According to JIRA-FLINK-8566 and JIRA-FLINK-8564, it's a feature to prevent too early state eviction of downstream operators when state cleaning is enabled. However, when I enabled MiniBatch Aggregation, it stops emitting those retract and new messages.

According to the source code of GroupAggFunction and MiniBatchGroupAggFunction, when MiniBatch Aggregation enabled, it does not emit those messages.

But why not? I don't think MiniBatch Aggregation can avoid state eviction of downstream operators anyway.

GroupAggFunction.java:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
if (!stateCleaningEnabled && equaliser.equals(prevAggValue, newAggValue)) {
	// newRow is the same as before and state cleaning is not enabled.
	// We do not emit retraction and acc message.
	// If state cleaning is enabled, we have to emit messages to prevent too early
	// state eviction of downstream operators.
	return;
} else {
	// retract previous result
	if (generateUpdateBefore) {
		// prepare UPDATE_BEFORE message for previous row
		resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.UPDATE_BEFORE);
		out.collect(resultRow);
	}
	// prepare UPDATE_AFTER message for new row
	resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.UPDATE_AFTER);
}


MiniBatchGroupAggFunction.java:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
if (!equaliser.equals(prevAggValue, newAggValue)) {
	// new row is not same with prev row
	if (generateUpdateBefore) {
		// prepare UPDATE_BEFORE message for previous row
		resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.UPDATE_BEFORE);
		out.collect(resultRow);
	}
	// prepare UPDATE_AFTER message for new row
	resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.UPDATE_AFTER);
	out.collect(resultRow);
}
// new row is same with prev row, no need to output


Flink Version: 1.12-SNAPSHOT(GitHub Master)
Planner: Blink Planner
Best wishes.
Smile
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS]Why not emit messages to prevent too early state eviction in MiniBatchGroupAggFunction?

Benchao Li-2
Hi Smile,

Thanks for bringing up this discussion.
I think you are right, it's an bug, MiniBatchGroupAggFunction should also
do this.
You can open a jira issue for this, and offer to fix it if you wish.
Even more, current MiniBatchGroupAggFunction has some more severe bug,
it's not cleaning expired state, see  FLINK-17096.


Smile <[hidden email]> 于2020年10月12日周一 下午6:15写道:

> When using SELECT DISTINCT in Flink SQL I found that when a new message
> with
> the same key arrives, it will emit a retract and a new insert message.
> According to  JIRA-FLINK-8566
> <https://issues.apache.org/jira/browse/FLINK-8566>   and  JIRA-FLINK-8564
> <https://issues.apache.org/jira/browse/FLINK-8564>  , it's a feature to
> prevent too early state eviction of downstream operators when state
> cleaning
> is enabled. However, when I enabled  MiniBatch Aggregation
> <
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation>
>
> , it stops emitting those retract and new messages.
>
> According to the source code of  GroupAggFunction
> <
> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java#L183>
>
> and  MiniBatchGroupAggFunction
> <
> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java#L206>
>
> , when MiniBatch Aggregation enabled, it does not emit those messages.
>
> But why not? I don't think MiniBatch Aggregation can avoid state eviction
> of
> downstream operators anyway.
>
> *GroupAggFunction.java:*
>  1 2 3 4 5 6 7 8 910111213141516
> if (!stateCleaningEnabled &amp;&amp; equaliser.equals(prevAggValue,
> newAggValue)) { // newRow is the same as before and state cleaning is not
> enabled.        // We do not emit retraction and acc message.   // If
> state cleaning
> is enabled, we have to emit messages to prevent too early       // state
> eviction
> of downstream operators.        return;} else { // retract previous
> result      if
> (generateUpdateBefore) {                // prepare UPDATE_BEFORE message
> for previous row
> resultRow.replace(currentKey,
> prevAggValue).setRowKind(RowKind.UPDATE_BEFORE);
> out.collect(resultRow); }
> // prepare UPDATE_AFTER message for new row
>  resultRow.replace(currentKey,
> newAggValue).setRowKind(RowKind.UPDATE_AFTER);}
>
>
> *MiniBatchGroupAggFunction.java:*
>  1 2 3 4 5 6 7 8 9101112
> if (!equaliser.equals(prevAggValue, newAggValue)) {     // new row is not
> same
> with prev row   if (generateUpdateBefore) {             // prepare
> UPDATE_BEFORE message
> for previous row                resultRow.replace(currentKey,
> prevAggValue).setRowKind(RowKind.UPDATE_BEFORE);
> out.collect(resultRow); }
> // prepare UPDATE_AFTER message for new row
>  resultRow.replace(currentKey,
> newAggValue).setRowKind(RowKind.UPDATE_AFTER);  out.collect(resultRow);}//
> new row is same with prev row, no need to output
>
>
> Flink Version: 1.12-SNAPSHOT( GitHub Master
> <https://github.com/apache/flink>  )
> Planner: Blink Planner
>
>
>
> --
> Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>


--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS]Why not emit messages to prevent too early state eviction in MiniBatchGroupAggFunction?

Smile
Hi Benchao,
Thank you for your confirmation.
I created an issue for this bug as Flink-19592
(https://issues.apache.org/jira/browse/FLINK-19592)



-----
Best wishes.
Smile
--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
Best wishes.
Smile