Nazar Volynets created FLINK-21057:
-------------------------------------- Summary: Streaming checkpointing with small interval leads app to hang Key: FLINK-21057 URL: https://issues.apache.org/jira/browse/FLINK-21057 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.11.3 Environment: * streaming app * flink cluster in standalone-job / application mode * 1.11.3 Flink version * jobmanager --> 1 instance * taskmanager --> 1 instance * parallelism --> 2 Reporter: Nazar Volynets Attachments: jobmanager.log, taskmanager.log There is a simple streaming app with enabled checkpointing: * statebackend --> RockDB * mode --> EXACTLY_ONCE STRs: 1. Run Flink cluster in standalone-job / application mode (with embedded streaming app) 2. Get error 3. Wait 1 min 4. Stop Flink cluster 4. Repeat steps from 1 to 3 util error : {code:java|title=taskmanager} org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. flink-kafka-mirror-maker-jobmanager | at org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352) ~[?:?] flink-kafka-mirror-maker-jobmanager | at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260) ~[?:?] flink-kafka-mirror-maker-jobmanager | at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) ~[?:?] flink-kafka-mirror-maker-jobmanager | at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572) ~[?:?] flink-kafka-mirror-maker-jobmanager | at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564) ~[?:?] flink-kafka-mirror-maker-jobmanager | at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414) ~[?:?] flink-kafka-mirror-maker-jobmanager | at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312) ~[?:?] flink-kafka-mirror-maker-jobmanager | at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) ~[?:?] flink-kafka-mirror-maker-jobmanager | at java.lang.Thread.run(Unknown Source) ~[?:?] {code} It is obvious Please find below: * streaming app code base (example) * attached logs ** jobmanager ** taskmanager *Example* +App+ {code:java|title=build.gradle (dependencies)} ... ext { ... javaVersion = '11' flinkVersion = '1.12.0' scalaBinaryVersion = '2.11' ... } dependencies { ... implementation "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}" implementation "org.apache.flink:flink-clients_${scalaBinaryVersion}:${flinkVersion}" implementation "org.apache.flink:flink-statebackend-rocksdb_${scalaBinaryVersion}:${flinkVersion}" ... } {code} {code:java|title=App} public static void main(String[] args) { ... StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); env.enableCheckpointing(500); env.setStateBackend(new RocksDBStateBackend("file:///xxx/config/checkpoints/rocksdb", true)); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); env.getCheckpointConfig().setCheckpointTimeout(600000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); FlinkKafkaConsumer<Record> consumer = createConsumer(); FlinkKafkaProducer<Record> producer = createProducer(); env .addSource(consumer) .uid("kafka-consumer") .addSink(producer) .uid("kafka-producer") ; env.execute(); } public static FlinkKafkaConsumer<Record> createConsumer() { ... Properties props = new Properties(); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-source-1:9091"); ... // nothing special props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); FlinkKafkaConsumer<Record> consumer = new FlinkKafkaConsumer<>("topic-1", new RecordKafkaDerSchema(), props); ... // RecordKafkaDerSchema --> custom schema is used to copy not only message body but message key too ... // SimpleStringSchema --> can be used instead to reproduce issue consumer.setStartFromGroupOffsets(); consumer.setCommitOffsetsOnCheckpoints(true); return consumer; } public static FlinkKafkaProducer<Record> createProducer() { ... Properties props = new Properties(); ... props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-target-1:9094"); props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); props.setProperty(ProducerConfig.ACKS_CONFIG, "all"); props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384"); props.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1000"); props.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "2000"); props.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "9000"); props.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "xxx"); // ignored due to expected behaviour - https://issues.apache.org/jira/browse/FLINK-17691 props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "" + (15 * 60 * 1000)); // decreased from 1 hour to 15 mins; app is going to be restarted quickly ... FlinkKafkaProducer<Record> producer = new FlinkKafkaProducer<>("topic-1", new RecordKafkaSerSchema(true), props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE); ... // RecordKafkaSerSchema --> custom schema is used to copy not only message body but message key too ... // SimpleStringSchema --> can be used instead to reproduce issue return producer; } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |