Does FlinkKinesisConsumer not retry on NoHttpResponseException?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Does FlinkKinesisConsumer not retry on NoHttpResponseException?

Singh Aulakh, Karanpreet KP
Hello!

(Apache Flink1.8 on AWS EMR release label 5.28.x)

Our data source is an AWS Kinesis stream (with 450 shards if that matters). We use the FlinkKinesisConsumer to read the kinesis stream. Our application occasionally (once every couple of days) crashes with a "Target server failed to respond" error. The full stack trace is at the bottom.

Looking more into the codebase I found out that 'ProvisionedThroughputExceededException' are the only exception types that are retried on. Code<https://github.com/apache/flink/blob/2b0a8ceeb131c938d2e41dfee66099bfa5f366ae/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java#L365>
1. Wondering why a transient http response exception is not retried by the kinesis connector?
2. Is there a way I can pass in a retry configuration that will retry on these errors?

As a side note, we set the following retry configuration -

env.setRestartStrategy(RestartStrategies.failureRateRestart(12,

      org.apache.flink.api.common.time.Time.of(60, TimeUnit.MINUTES),

                org.apache.flink.api.common.time.Time.of(300, TimeUnit.SECONDS)));

Full stack trace of the exception -

    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1201)

    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1147)

    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)

    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)

    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)

    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)

    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)

    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)

    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)

    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2809)

    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2776)

    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2765)

    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetRecords(AmazonKinesisClient.java:1292)

    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:1263)

    at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:250)

    at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:400)

    at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:243)

    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

    at java.util.concurrent.FutureTask.run(FutureTask.java:266)

    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

    at java.lang.Thread.run(Thread.java:748)

(Shamelessly copy pasting from the stack overflow question I posted https://stackoverflow.com/questions/62399248/flinkkinesisconsumer-does-not-retry-on-nohttpresponseexception )

--
KP
Reply | Threaded
Open this post in threaded view
|

Re: Does FlinkKinesisConsumer not retry on NoHttpResponseException?

Robert Metzger
For the others on the dev@ list: I responded on SO.

On Tue, Jun 16, 2020 at 7:56 AM Singh Aulakh, Karanpreet KP
<[hidden email]> wrote:

> Hello!
>
> (Apache Flink1.8 on AWS EMR release label 5.28.x)
>
> Our data source is an AWS Kinesis stream (with 450 shards if that
> matters). We use the FlinkKinesisConsumer to read the kinesis stream. Our
> application occasionally (once every couple of days) crashes with a "Target
> server failed to respond" error. The full stack trace is at the bottom.
>
> Looking more into the codebase I found out that
> 'ProvisionedThroughputExceededException' are the only exception types that
> are retried on. Code<
> https://github.com/apache/flink/blob/2b0a8ceeb131c938d2e41dfee66099bfa5f366ae/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java#L365
> >
> 1. Wondering why a transient http response exception is not retried by the
> kinesis connector?
> 2. Is there a way I can pass in a retry configuration that will retry on
> these errors?
>
> As a side note, we set the following retry configuration -
>
> env.setRestartStrategy(RestartStrategies.failureRateRestart(12,
>
>       org.apache.flink.api.common.time.Time.of(60, TimeUnit.MINUTES),
>
>                 org.apache.flink.api.common.time.Time.of(300,
> TimeUnit.SECONDS)));
>
> Full stack trace of the exception -
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1201)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1147)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2809)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2776)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2765)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetRecords(AmazonKinesisClient.java:1292)
>
>     at
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:1263)
>
>     at
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:250)
>
>     at
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:400)
>
>     at
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:243)
>
>     at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>     at java.lang.Thread.run(Thread.java:748)
>
> (Shamelessly copy pasting from the stack overflow question I posted
> https://stackoverflow.com/questions/62399248/flinkkinesisconsumer-does-not-retry-on-nohttpresponseexception
> )
>
> --
> KP
>