Bug: Kafka producer always writes to partition 0, because KafkaSerializationSchemaWrapper does not call open() method of FlinkKafkaPartitioner

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Bug: Kafka producer always writes to partition 0, because KafkaSerializationSchemaWrapper does not call open() method of FlinkKafkaPartitioner

DONG, Weike
Hi community,

We have found a serious issue with the newly-introduced KafkaSerializationSchemaWrapper class, which eventually let FlinkKafkaProducer only write to partition 0 in the given Kafka topic under certain conditions.

First let's look at this constructor in the universal version of FlinkKafkaProducer, and it uses FlinkFixedPartitioner as the custom partitioner.



And when we trace down the call path, KafkaSerializationSchemaWrapper is used to wrap the aforementioned custom partitioner, i.e. FlinkFiexedPartitioner



However, we found that in the implementation of  KafkaSerializationSchemaWrapper, it does not call the open method of the given partitioner, which is essential for the partitioner to initialize its environment variables like parallelInstanceId in FlinkFixedPartitioner

Therefore, when KafkaSerializationSchemaWrapper#serialize is later called by the FlinkKafkaProducer,   FlinkFiexedPartitioner#partition would always return 0, because  parallelInstanceId is not properly initialized.


Eventually, all of the data would go only to partition 0 of the given Kafka topic, creating severe data skew in the sink.



Reply | Threaded
Open this post in threaded view
|

Re: Bug: Kafka producer always writes to partition 0, because KafkaSerializationSchemaWrapper does not call open() method of FlinkKafkaPartitioner

DONG, Weike
Hi community,

And by the way, during FlinkKafkaProducer#initProducer, the flinkKafkaPartitioner is only opened when is is NOT null, which is unfortunately not the case here, because it would be set to null if KafkaSerializationSchemaWrapper is provided in the arguments of the constructor.




So these logic flaws eventually lead to this serious bug, and we recommend that initialization of FlinkKafkaPartitioners could be done in KafkaSerializationSchemaWrapper#open.

Sincerely,
Weike


On Thu, Sep 3, 2020 at 8:15 PM DONG, Weike <[hidden email]> wrote:
Hi community,

We have found a serious issue with the newly-introduced KafkaSerializationSchemaWrapper class, which eventually let FlinkKafkaProducer only write to partition 0 in the given Kafka topic under certain conditions.

First let's look at this constructor in the universal version of FlinkKafkaProducer, and it uses FlinkFixedPartitioner as the custom partitioner.



And when we trace down the call path, KafkaSerializationSchemaWrapper is used to wrap the aforementioned custom partitioner, i.e. FlinkFiexedPartitioner



However, we found that in the implementation of  KafkaSerializationSchemaWrapper, it does not call the open method of the given partitioner, which is essential for the partitioner to initialize its environment variables like parallelInstanceId in FlinkFixedPartitioner

Therefore, when KafkaSerializationSchemaWrapper#serialize is later called by the FlinkKafkaProducer,   FlinkFiexedPartitioner#partition would always return 0, because  parallelInstanceId is not properly initialized.


Eventually, all of the data would go only to partition 0 of the given Kafka topic, creating severe data skew in the sink.



Reply | Threaded
Open this post in threaded view
|

Re: Bug: Kafka producer always writes to partition 0, because KafkaSerializationSchemaWrapper does not call open() method of FlinkKafkaPartitioner

dwysakowicz

Thank you for the thorough investigation.  I totally agree with you. I created an issue for it[1]. Will try to fix it as soon as possible and include it in 1.11.2 and 1.12.

The way you could work this around is by using the KafkaSerializationSchema directly with an KafkaContextAware interface.

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-19133

On 03/09/2020 14:24, DONG, Weike wrote:
Hi community,

And by the way, during FlinkKafkaProducer#initProducer, the flinkKafkaPartitioner is only opened when is is NOT null, which is unfortunately not the case here, because it would be set to null if KafkaSerializationSchemaWrapper is provided in the arguments of the constructor.

image.png
image.png

So these logic flaws eventually lead to this serious bug, and we recommend that initialization of FlinkKafkaPartitioners could be done in KafkaSerializationSchemaWrapper#open.

Sincerely,
Weike


On Thu, Sep 3, 2020 at 8:15 PM DONG, Weike <[hidden email]> wrote:
Hi community,

We have found a serious issue with the newly-introduced KafkaSerializationSchemaWrapper class, which eventually let FlinkKafkaProducer only write to partition 0 in the given Kafka topic under certain conditions.

First let's look at this constructor in the universal version of FlinkKafkaProducer, and it uses FlinkFixedPartitioner as the custom partitioner.

image.png

And when we trace down the call path, KafkaSerializationSchemaWrapper is used to wrap the aforementioned custom partitioner, i.e. FlinkFiexedPartitioner

image.png

However, we found that in the implementation of  KafkaSerializationSchemaWrapper, it does not call the open method of the given partitioner, which is essential for the partitioner to initialize its environment variables like parallelInstanceId in FlinkFixedPartitioner

Therefore, when KafkaSerializationSchemaWrapper#serialize is later called by the FlinkKafkaProducer,   FlinkFiexedPartitioner#partition would always return 0, because  parallelInstanceId is not properly initialized.
image.png

Eventually, all of the data would go only to partition 0 of the given Kafka topic, creating severe data skew in the sink.




signature.asc (849 bytes) Download Attachment