Hi everyone,
I recently came across the following exception when dealing with a job failure, which uses the Flink as its sink. ``` org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Expiring #N record(s) for TOPIC-PARTITION:#N ms has passed since batch creation ``` After I dug into the source code of FlinkKafkaProducer, I found out that FlinkKafkaProducer does not have any kind of backpressure mechanism if I am correct. Incoming records are simply sent using KafkaProducer#send without synchronization (at FlinkKafkaProducer.java#L915 <https://github.com/apache/flink/blob/74078914a153a8cc1d6af2c5069e6042432ad13d/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L915>). If the parallelism of the producer is not correctly set according to its upstream throughput or write to the leader of a topic partition performs badly, the accumulator in KafkaProducer will be full of unsent records and finally causes record expiration as the one above. I have seen there was a similar ticket FLINK-9083 <https://issues.apache.org/jira/browse/FLINK-9083> before, which is for the Cassandra connector. Shall we have the same improvement for the Kafka connect? Maybe we can also have maxConcurrentRequests attribute in FlinkKafkaProducer and use a semaphore to limit requests? Thanks, Wenhao |
Hi Wenhao,
As far as I know this is different compared to FLINK-9083, as KafkaProducer itself can back pressure writes if internal buffers are exhausted [1]. > The buffer.memory controls the total amount of memory available to the producer for buffering. If records are sent faster than they can be transmitted to the server then this buffer space will be exhausted. When the buffer space is exhausted additional send calls will block. The threshold for time to block is determined by max.block.ms after which it throws a TimeoutException. If you want to limit the amount of concurrent requests you can do it via reducing the `buffer.memory` option passed to KafkaProducer (via FlinkKafkaProducer's "Properties producerConfig"). Piotrek [1] https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html pt., 23 kwi 2021 o 11:15 Wenhao Ji <[hidden email]> napisał(a): > Hi everyone, > > I recently came across the following exception when dealing with a job > failure, which uses the Flink as its sink. > > ``` > org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to > send data to Kafka: Expiring #N record(s) for TOPIC-PARTITION:#N ms has > passed since batch creation > ``` > > After I dug into the source code of FlinkKafkaProducer, I found out that > FlinkKafkaProducer does not have any kind of backpressure mechanism if I am > correct. Incoming records are simply sent using KafkaProducer#send without > synchronization (at FlinkKafkaProducer.java#L915 > < > https://github.com/apache/flink/blob/74078914a153a8cc1d6af2c5069e6042432ad13d/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L915 > >). > If the parallelism of the producer is not correctly set according to its > upstream throughput or write to the leader of a topic partition performs > badly, the accumulator in KafkaProducer will be full of unsent records and > finally causes record expiration as the one above. > > I have seen there was a similar ticket FLINK-9083 > <https://issues.apache.org/jira/browse/FLINK-9083> before, which is for > the Cassandra connector. Shall we have the same improvement for the Kafka > connect? Maybe we can also have maxConcurrentRequests attribute in > FlinkKafkaProducer and use a semaphore to limit requests? > > Thanks, > Wenhao > |
Thanks Piotr for your reply!
It is a nice solution! By restricting the buffer using these properties, I think maxConcurrentRequests attribute is indeed not necessary anymore. On Tue, May 4, 2021 at 11:52 PM Piotr Nowojski <[hidden email]> wrote: > Hi Wenhao, > > As far as I know this is different compared to FLINK-9083, as KafkaProducer > itself can back pressure writes if internal buffers are exhausted [1]. > > > The buffer.memory controls the total amount of memory available to the > producer for buffering. If records are sent faster than they can be > transmitted to the server then this buffer space will be exhausted. When > the buffer space is exhausted additional send calls will block. The > threshold for time to block is determined by max.block.ms after which it > throws a TimeoutException. > > If you want to limit the amount of concurrent requests you can do it via > reducing the `buffer.memory` option passed to KafkaProducer (via > FlinkKafkaProducer's "Properties producerConfig"). > > Piotrek > > [1] > > https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html > > pt., 23 kwi 2021 o 11:15 Wenhao Ji <[hidden email]> napisał(a): > > > Hi everyone, > > > > I recently came across the following exception when dealing with a job > > failure, which uses the Flink as its sink. > > > > ``` > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed > to > > send data to Kafka: Expiring #N record(s) for TOPIC-PARTITION:#N ms has > > passed since batch creation > > ``` > > > > After I dug into the source code of FlinkKafkaProducer, I found out that > > FlinkKafkaProducer does not have any kind of backpressure mechanism if I > am > > correct. Incoming records are simply sent using KafkaProducer#send > without > > synchronization (at FlinkKafkaProducer.java#L915 > > < > > > https://github.com/apache/flink/blob/74078914a153a8cc1d6af2c5069e6042432ad13d/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L915 > > >). > > If the parallelism of the producer is not correctly set according to its > > upstream throughput or write to the leader of a topic partition performs > > badly, the accumulator in KafkaProducer will be full of unsent records > and > > finally causes record expiration as the one above. > > > > I have seen there was a similar ticket FLINK-9083 > > <https://issues.apache.org/jira/browse/FLINK-9083> before, which is for > > the Cassandra connector. Shall we have the same improvement for the Kafka > > connect? Maybe we can also have maxConcurrentRequests attribute in > > FlinkKafkaProducer and use a semaphore to limit requests? > > > > Thanks, > > Wenhao > > > |
no problem and I'm glad that it's solving your problem :)
Piotrek śr., 5 maj 2021 o 08:58 Wenhao Ji <[hidden email]> napisał(a): > Thanks Piotr for your reply! > It is a nice solution! By restricting the buffer using these properties, I > think maxConcurrentRequests attribute is indeed not necessary anymore. > > On Tue, May 4, 2021 at 11:52 PM Piotr Nowojski <[hidden email]> > wrote: > > > Hi Wenhao, > > > > As far as I know this is different compared to FLINK-9083, as > KafkaProducer > > itself can back pressure writes if internal buffers are exhausted [1]. > > > > > The buffer.memory controls the total amount of memory available to the > > producer for buffering. If records are sent faster than they can be > > transmitted to the server then this buffer space will be exhausted. When > > the buffer space is exhausted additional send calls will block. The > > threshold for time to block is determined by max.block.ms after which it > > throws a TimeoutException. > > > > If you want to limit the amount of concurrent requests you can do it via > > reducing the `buffer.memory` option passed to KafkaProducer (via > > FlinkKafkaProducer's "Properties producerConfig"). > > > > Piotrek > > > > [1] > > > > > https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html > > > > pt., 23 kwi 2021 o 11:15 Wenhao Ji <[hidden email]> napisał(a): > > > > > Hi everyone, > > > > > > I recently came across the following exception when dealing with a job > > > failure, which uses the Flink as its sink. > > > > > > ``` > > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed > > to > > > send data to Kafka: Expiring #N record(s) for TOPIC-PARTITION:#N ms has > > > passed since batch creation > > > ``` > > > > > > After I dug into the source code of FlinkKafkaProducer, I found out > that > > > FlinkKafkaProducer does not have any kind of backpressure mechanism if > I > > am > > > correct. Incoming records are simply sent using KafkaProducer#send > > without > > > synchronization (at FlinkKafkaProducer.java#L915 > > > < > > > > > > https://github.com/apache/flink/blob/74078914a153a8cc1d6af2c5069e6042432ad13d/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L915 > > > >). > > > If the parallelism of the producer is not correctly set according to > its > > > upstream throughput or write to the leader of a topic partition > performs > > > badly, the accumulator in KafkaProducer will be full of unsent records > > and > > > finally causes record expiration as the one above. > > > > > > I have seen there was a similar ticket FLINK-9083 > > > <https://issues.apache.org/jira/browse/FLINK-9083> before, which is > for > > > the Cassandra connector. Shall we have the same improvement for the > Kafka > > > connect? Maybe we can also have maxConcurrentRequests attribute in > > > FlinkKafkaProducer and use a semaphore to limit requests? > > > > > > Thanks, > > > Wenhao > > > > > > |
Free forum by Nabble | Edit this page |