[jira] [Created] (FLINK-21056) Streaming checkpointing is failing occasionally

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-21056) Streaming checkpointing is failing occasionally

Shang Yuanchun (Jira)
Nazar Volynets created FLINK-21056:
--------------------------------------

             Summary: Streaming checkpointing is failing occasionally
                 Key: FLINK-21056
                 URL: https://issues.apache.org/jira/browse/FLINK-21056
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Checkpointing
    Affects Versions: 1.11.3
         Environment: * streaming app
* flink cluster in standalone-job / application mode
* 1.11.3 Flink version
* jobmanager --> 1 instance
* taskmanager --> 1 instance
* parallelism --> 2
            Reporter: Nazar Volynets
         Attachments: jobmanager.log, taskmanager.log

There is a simple streaming app with enabled checkpointing:
 * statebackend --> RockDB
 * mode --> EXACTLY_ONCE

STRs:
 1. Run Flink cluster in standalone-job / application mode (with embedded streaming app)
 2. Wait 10 minutes
 3. Restart Flink cluster (& consequently streaming app)
 4. Repeat steps from #1 to #3 until you will get an checkpointing error
{code:java|title=taskmanager}
2021-01-19 12:09:39,719 INFO  org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl [] - Could not complete snapshot 21 for operator Source: Custom Source -> Sink: Unnamed (1/2). Failure reason: Checkpoint was declined.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 21 for operator Source: Custom Source -> Sink: Unnamed (1/2). Failure reason: Checkpoint was declined.
...
Caused by: org.apache.flink.util.SerializedThrowable: Timeout expired after 60000milliseconds while awaiting InitProducerId
{code}
Based on stack trace quite tricky to define / determine the root cause.

Please find below:
 * streaming app code base (example)
 * attached logs
 ** jobmanager
 ** taskmanager

*Example*

+App+
{code:java|title=build.gradle (dependencies)}
...
ext {
  ...
  javaVersion = '11'
  flinkVersion = '1.12.0'
  scalaBinaryVersion = '2.11'
  ...
}

dependencies {
  ...
  implementation "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
  implementation "org.apache.flink:flink-clients_${scalaBinaryVersion}:${flinkVersion}"
  implementation "org.apache.flink:flink-statebackend-rocksdb_${scalaBinaryVersion}:${flinkVersion}"
  ...
}
{code}
{code:java|title=App}
public static void main(String[] args) {
  ...
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

  env.setParallelism(2);

  env.enableCheckpointing(10000);

  env.setStateBackend(new RocksDBStateBackend("file:///xxx/config/checkpoints/rocksdb", true));

  env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
  env.getCheckpointConfig().setCheckpointTimeout(600000);
  env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

  FlinkKafkaConsumer<Record> consumer = createConsumer();
  FlinkKafkaProducer<Record> producer = createProducer();

  env
    .addSource(consumer)
    .uid("kafka-consumer")
    .addSink(producer)
    .uid("kafka-producer")
  ;

  env.execute();
}

public static FlinkKafkaConsumer<Record> createConsumer() {
  ...
  Properties props = new Properties();
  props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-source-1:9091");
  ... // nothing special
  props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");

  FlinkKafkaConsumer<Record> consumer = new FlinkKafkaConsumer<>("topic-1", new RecordKafkaDerSchema(), props);

  ... // RecordKafkaDerSchema --> custom schema is used to copy not only message body but message key too
  ... // SimpleStringSchema --> can be used instead to reproduce issue

  consumer.setStartFromGroupOffsets();
  consumer.setCommitOffsetsOnCheckpoints(true);

  return consumer;
}

public static FlinkKafkaProducer<Record> createProducer() {
  ...
  Properties props = new Properties();
  ...
  props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-target-1:9094");
  props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
  props.setProperty(ProducerConfig.ACKS_CONFIG, "all");
  props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
  props.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1000");
  props.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "2000");
  props.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "9000");
  props.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
  props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "xxx"); // ignored due to expected behaviour - https://issues.apache.org/jira/browse/FLINK-17691
  props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "" + (15 * 60 * 1000)); // decreased from 1 hour to 15 mins; app is going to be restarted quickly
  ...
  FlinkKafkaProducer<Record> producer = new FlinkKafkaProducer<>("topic-1", new RecordKafkaSerSchema(true), props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);

  ... // RecordKafkaSerSchema --> custom schema is used to copy not only message body but message key too
  ... // SimpleStringSchema --> can be used instead to reproduce issue

  return producer;
}
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)