[jira] [Created] (FLINK-17170) Cannot stop streaming job with savepoint which uses kinesis consumer

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-17170) Cannot stop streaming job with savepoint which uses kinesis consumer

Shang Yuanchun (Jira)
Vasii Cosmin Radu created FLINK-17170:
-----------------------------------------

             Summary: Cannot stop streaming job with savepoint which uses kinesis consumer
                 Key: FLINK-17170
                 URL: https://issues.apache.org/jira/browse/FLINK-17170
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream, Connectors / Kinesis
    Affects Versions: 1.10.0
            Reporter: Vasii Cosmin Radu
         Attachments: Screenshot 2020-04-15 at 18.16.26.png

I am encountering a very strange situation where I can't stop with savepoint a streaming job.

The job reads from kinesis and sinks to S3, very simple job, no mapping function, no watermarks, just source->sink. 

Source is using flink-kinesis-consumer, sink is using StreamingFileSink. 

Everything works fine, except stopping the job with savepoints.

The behaviour happens only when multiple task managers are involved, having sub-tasks off the job spread across multiple task manager instances. When a single task manager has all the sub-tasks this issue never occurred.

Using latest Flink 1.10.0 version, deployment done in HA mode (2 job managers), in EC2, savepoints and checkpoints written on S3.

When trying to stop, the savepoint is created correctly and appears on S3, but not all sub-tasks are stopped. Some of them finished, but some just remain hanged. Sometimes, on the same task manager part of the sub-tasks are finished, part aren't.

The logs don't show any errors. For the ones that succeed, the standard messages appear, with "Source: <....> switched from RUNNING to FINISHED".

For the sub-tasks hanged the last message is "org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - Shutting down the shard consumer threads of subtask 0 ..." and that's it.

 

I tried using the cli (flink stop <job_id>)

Timeout Message:
{code:java}
// code placeholder
{code}
root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop cf43cecd9339e8f02a12333e52966a25

root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop cf43cecd9339e8f02a12333e52966a25Suspending job "cf43cecd9339e8f02a12333e52966a25" with a savepoint. ------------------------------------------------------------ The program finished with the following exception: org.apache.flink.util.FlinkException: Could not stop with a savepoint job "cf43cecd9339e8f02a12333e52966a25". at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462) at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)Caused by: java.util.concurrent.TimeoutException at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460) ... 9 more

 

Using the monitoring api, I keep getting infinite message when querying based on the savepoint id, that the status id is still "IN_PROGRESS".

 

When performing a cancel instead of stop, it works. But cancel is deprecated, so I am a bit concerned that this might fail also, maybe I was just lucky.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)