Hi community, We have found a serious issue with the newly-introduced KafkaSerializationSchemaWrapper class, which eventually let FlinkKafkaProducer only write to partition 0 in the given Kafka topic under certain conditions. First let's look at this constructor in the universal version of FlinkKafkaProducer, and it uses FlinkFixedPartitioner as the custom partitioner. And when we trace down the call path, KafkaSerializationSchemaWrapper is used to wrap the aforementioned custom partitioner, i.e. FlinkFiexedPartitioner. However, we found that in the implementation of
KafkaSerializationSchemaWrapper, it does not call the open method of the given partitioner, which is essential for the partitioner to initialize its environment variables like parallelInstanceId in FlinkFixedPartitioner. Therefore, when KafkaSerializationSchemaWrapper#serialize is later called by the FlinkKafkaProducer,
FlinkFiexedPartitioner#partition would always return 0, because
parallelInstanceId is not properly initialized. Eventually, all of the data would go only to partition 0 of the given Kafka topic, creating severe data skew in the sink. |
Hi community, And by the way, during FlinkKafkaProducer#initProducer, the flinkKafkaPartitioner is only opened when is is NOT null, which is unfortunately not the case here, because it would be set to null if KafkaSerializationSchemaWrapper is provided in the arguments of the constructor. So these logic flaws eventually lead to this serious bug, and we recommend that initialization of FlinkKafkaPartitioners could be done in KafkaSerializationSchemaWrapper#open. Sincerely, Weike On Thu, Sep 3, 2020 at 8:15 PM DONG, Weike <[hidden email]> wrote:
|
Thank you for the thorough investigation. I totally agree with
you. I created an issue for it[1]. Will try to fix it as soon as
possible and include it in 1.11.2 and 1.12. The way you could work this around is by using the
KafkaSerializationSchema directly with an KafkaContextAware
interface. Best, Dawid [1] https://issues.apache.org/jira/browse/FLINK-19133 On 03/09/2020 14:24, DONG, Weike wrote:
signature.asc (849 bytes) Download Attachment |
Free forum by Nabble | Edit this page |