[DISCUSS] Add async backpressure support to FlinkKafkaProducer

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Add async backpressure support to FlinkKafkaProducer

zetaplusae
Hi everyone,

I recently came across the following exception when dealing with a job
failure, which uses the Flink as its sink.

```
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
send data to Kafka: Expiring #N record(s) for TOPIC-PARTITION:#N ms has
passed since batch creation
```

After I dug into the source code of FlinkKafkaProducer, I found out that
FlinkKafkaProducer does not have any kind of backpressure mechanism if I am
correct. Incoming records are simply sent using KafkaProducer#send without
synchronization (at FlinkKafkaProducer.java#L915
<https://github.com/apache/flink/blob/74078914a153a8cc1d6af2c5069e6042432ad13d/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L915>).
If the parallelism of the producer is not correctly set according to its
upstream throughput or write to the leader of a topic partition performs
badly, the accumulator in KafkaProducer will be full of unsent records and
finally causes record expiration as the one above.

I have seen there was a similar ticket FLINK-9083
<https://issues.apache.org/jira/browse/FLINK-9083> before, which is for
the Cassandra connector. Shall we have the same improvement for the Kafka
connect? Maybe we can also have maxConcurrentRequests attribute in
FlinkKafkaProducer and use a semaphore to limit requests?

Thanks,
Wenhao
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Add async backpressure support to FlinkKafkaProducer

Piotr Nowojski-4
Hi Wenhao,

As far as I know this is different compared to FLINK-9083, as KafkaProducer
itself can back pressure writes if internal buffers are exhausted [1].

> The buffer.memory controls the total amount of memory available to the
producer for buffering. If records are sent faster than they can be
transmitted to the server then this buffer space will be exhausted. When
the buffer space is exhausted additional send calls will block. The
threshold for time to block is determined by max.block.ms after which it
throws a TimeoutException.

If you want to limit the amount of concurrent requests you can do it via
reducing the `buffer.memory` option passed to KafkaProducer (via
FlinkKafkaProducer's "Properties producerConfig").

Piotrek

[1]
https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html

pt., 23 kwi 2021 o 11:15 Wenhao Ji <[hidden email]> napisał(a):

> Hi everyone,
>
> I recently came across the following exception when dealing with a job
> failure, which uses the Flink as its sink.
>
> ```
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
> send data to Kafka: Expiring #N record(s) for TOPIC-PARTITION:#N ms has
> passed since batch creation
> ```
>
> After I dug into the source code of FlinkKafkaProducer, I found out that
> FlinkKafkaProducer does not have any kind of backpressure mechanism if I am
> correct. Incoming records are simply sent using KafkaProducer#send without
> synchronization (at FlinkKafkaProducer.java#L915
> <
> https://github.com/apache/flink/blob/74078914a153a8cc1d6af2c5069e6042432ad13d/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L915
> >).
> If the parallelism of the producer is not correctly set according to its
> upstream throughput or write to the leader of a topic partition performs
> badly, the accumulator in KafkaProducer will be full of unsent records and
> finally causes record expiration as the one above.
>
> I have seen there was a similar ticket FLINK-9083
> <https://issues.apache.org/jira/browse/FLINK-9083> before, which is for
> the Cassandra connector. Shall we have the same improvement for the Kafka
> connect? Maybe we can also have maxConcurrentRequests attribute in
> FlinkKafkaProducer and use a semaphore to limit requests?
>
> Thanks,
> Wenhao
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Add async backpressure support to FlinkKafkaProducer

zetaplusae
Thanks Piotr for your reply!
It is a nice solution! By restricting the buffer using these properties, I
think maxConcurrentRequests attribute is indeed not necessary anymore.

On Tue, May 4, 2021 at 11:52 PM Piotr Nowojski <[hidden email]>
wrote:

> Hi Wenhao,
>
> As far as I know this is different compared to FLINK-9083, as KafkaProducer
> itself can back pressure writes if internal buffers are exhausted [1].
>
> > The buffer.memory controls the total amount of memory available to the
> producer for buffering. If records are sent faster than they can be
> transmitted to the server then this buffer space will be exhausted. When
> the buffer space is exhausted additional send calls will block. The
> threshold for time to block is determined by max.block.ms after which it
> throws a TimeoutException.
>
> If you want to limit the amount of concurrent requests you can do it via
> reducing the `buffer.memory` option passed to KafkaProducer (via
> FlinkKafkaProducer's "Properties producerConfig").
>
> Piotrek
>
> [1]
>
> https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
>
> pt., 23 kwi 2021 o 11:15 Wenhao Ji <[hidden email]> napisał(a):
>
> > Hi everyone,
> >
> > I recently came across the following exception when dealing with a job
> > failure, which uses the Flink as its sink.
> >
> > ```
> > org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed
> to
> > send data to Kafka: Expiring #N record(s) for TOPIC-PARTITION:#N ms has
> > passed since batch creation
> > ```
> >
> > After I dug into the source code of FlinkKafkaProducer, I found out that
> > FlinkKafkaProducer does not have any kind of backpressure mechanism if I
> am
> > correct. Incoming records are simply sent using KafkaProducer#send
> without
> > synchronization (at FlinkKafkaProducer.java#L915
> > <
> >
> https://github.com/apache/flink/blob/74078914a153a8cc1d6af2c5069e6042432ad13d/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L915
> > >).
> > If the parallelism of the producer is not correctly set according to its
> > upstream throughput or write to the leader of a topic partition performs
> > badly, the accumulator in KafkaProducer will be full of unsent records
> and
> > finally causes record expiration as the one above.
> >
> > I have seen there was a similar ticket FLINK-9083
> > <https://issues.apache.org/jira/browse/FLINK-9083> before, which is for
> > the Cassandra connector. Shall we have the same improvement for the Kafka
> > connect? Maybe we can also have maxConcurrentRequests attribute in
> > FlinkKafkaProducer and use a semaphore to limit requests?
> >
> > Thanks,
> > Wenhao
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Add async backpressure support to FlinkKafkaProducer

Piotr Nowojski-5
no problem and I'm glad that it's solving your problem :)

Piotrek

śr., 5 maj 2021 o 08:58 Wenhao Ji <[hidden email]> napisał(a):

> Thanks Piotr for your reply!
> It is a nice solution! By restricting the buffer using these properties, I
> think maxConcurrentRequests attribute is indeed not necessary anymore.
>
> On Tue, May 4, 2021 at 11:52 PM Piotr Nowojski <[hidden email]>
> wrote:
>
> > Hi Wenhao,
> >
> > As far as I know this is different compared to FLINK-9083, as
> KafkaProducer
> > itself can back pressure writes if internal buffers are exhausted [1].
> >
> > > The buffer.memory controls the total amount of memory available to the
> > producer for buffering. If records are sent faster than they can be
> > transmitted to the server then this buffer space will be exhausted. When
> > the buffer space is exhausted additional send calls will block. The
> > threshold for time to block is determined by max.block.ms after which it
> > throws a TimeoutException.
> >
> > If you want to limit the amount of concurrent requests you can do it via
> > reducing the `buffer.memory` option passed to KafkaProducer (via
> > FlinkKafkaProducer's "Properties producerConfig").
> >
> > Piotrek
> >
> > [1]
> >
> >
> https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
> >
> > pt., 23 kwi 2021 o 11:15 Wenhao Ji <[hidden email]> napisał(a):
> >
> > > Hi everyone,
> > >
> > > I recently came across the following exception when dealing with a job
> > > failure, which uses the Flink as its sink.
> > >
> > > ```
> > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed
> > to
> > > send data to Kafka: Expiring #N record(s) for TOPIC-PARTITION:#N ms has
> > > passed since batch creation
> > > ```
> > >
> > > After I dug into the source code of FlinkKafkaProducer, I found out
> that
> > > FlinkKafkaProducer does not have any kind of backpressure mechanism if
> I
> > am
> > > correct. Incoming records are simply sent using KafkaProducer#send
> > without
> > > synchronization (at FlinkKafkaProducer.java#L915
> > > <
> > >
> >
> https://github.com/apache/flink/blob/74078914a153a8cc1d6af2c5069e6042432ad13d/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L915
> > > >).
> > > If the parallelism of the producer is not correctly set according to
> its
> > > upstream throughput or write to the leader of a topic partition
> performs
> > > badly, the accumulator in KafkaProducer will be full of unsent records
> > and
> > > finally causes record expiration as the one above.
> > >
> > > I have seen there was a similar ticket FLINK-9083
> > > <https://issues.apache.org/jira/browse/FLINK-9083> before, which is
> for
> > > the Cassandra connector. Shall we have the same improvement for the
> Kafka
> > > connect? Maybe we can also have maxConcurrentRequests attribute in
> > > FlinkKafkaProducer and use a semaphore to limit requests?
> > >
> > > Thanks,
> > > Wenhao
> > >
> >
>