[jira] [Created] (FLINK-23025) sink-buffer-max-rows and sink-buffer-flush-interval options produce a lot of duplicates

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-23025) sink-buffer-max-rows and sink-buffer-flush-interval options produce a lot of duplicates

Shang Yuanchun (Jira)
Johannes Moser created FLINK-23025:
--------------------------------------

             Summary: sink-buffer-max-rows and sink-buffer-flush-interval options produce a lot of duplicates
                 Key: FLINK-23025
                 URL: https://issues.apache.org/jira/browse/FLINK-23025
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka
    Affects Versions: 1.13.1
            Reporter: Johannes Moser


Using the [sink-buffer-flush-max-rows|https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/#sink-buffer-flush-interval] and [sink-buffer-flush-interval|https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/#sink-buffer-flush-interval] options for a kafka sink produces a lot of duplicate key/values in the target kafka topic. Maybe the {{BufferedUpsertSinkFunction}} should clone the buffered key/value RowData objects, but it doesn’t. Seems like in [line 134|https://github.com/apache/flink/blob/60c7d9e77a6e9d82e0feb33f0d8bc263dddf2fd9/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertSinkFunction.java#L133-L137] the condition should be negated or the ternary operator results swapped:
{code:java}
this.valueCopier =
 getRuntimeContext().getExecutionConfig().isObjectReuseEnabled()
 ? Function.identity()
 : typeSerializer::copy;{code}

(in the jdbc sink the same logic is done but the ternary operator results swapped)

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)