Job gets stuck with exactly once checkpointing and Kafka when initializing.

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Job gets stuck with exactly once checkpointing and Kafka when initializing.

Anil Singh

Dear Flink Community,
                                        When using exactly once checkpointing along kafka and connecting Kafka using SASL_SSL the job gets stuck with status of Kafka producer in Initialising state for around 10 mins and post that it functions normally across multiple checkpoints. Following are various config and observations.

Flink/Job config:  
VERSION: Flink 1.13.1 for both env and jars.

1. Single node with checkpointing in exactly once mode and checkpointing interval as 2 minutes. ( tried with 10 minutes still same issue)
2. The job has source as flink kafka consumer for topic 1 and sink as flink kafka producer for topic 2 and parallelism as 4 for source & sink.
3. The kafka producer uses FlinkKafkaProducer implementation in EXACTLY_ONCE mode and connects to broker using SASL_SSL where SSL certificate is self signed.


Kafka config:
VERSION: 2.8 for broker. For Flink Job tried 2.4.1 (shipped with flink kafka connector) and 2.8.0

1. Single broker with both topics having 4 partitions.
2. Everything works correctly using Kafka Consumer commands when using SASL_SSL mode.

Observation for debug logs:
Background: In dev env with PLAIN_TEXT connection everything was working perfectly. When I changed PLAIN_TEXT to SASL_SSL I was get timeout exceptions. Which was resolved after I increased max.block.ms from 60000 to 90000 but then started facing delayed start issue.

1. About error/exception:
        I did not find any exception in debug logs but the notified following pattern which keeps repeating until successful initialisation
        SSL_HANDSHAKE --> SCRAM_AUTHENTICATION —> FIND_COORDINATOR —> FETCH PRODUCER_ID —> Transition from state INITIALIZING to READY —> CLOSE PRODUCER
       
The producer keep repeating above step until following pattern occurs
        SSL_HANDSHAKE --> SCRAM_AUTHENTICATION —> FIND_COORDINATOR —> FETCH PRODUCER_ID —> Transition from state INITIALIZING to READY —> READY to IN_TRANSACTION.

Also while kafka producer/sink is stuck in Initialising state if I cancel job the kafka sink operator gets stuck and I get following error  Task did not exit gracefully within 180 + seconds. And task manager crashes.

2. Turning off SSL and using SASL_PLAINTEXT - When connecting to kafka broker using SASL_PLAINTEXT mode the kafka producer gets initialised and starts processing data in 50-60 secs.

3. Disable checkpointing - When checkpointing is disabled and use SASL_SSL/SASL_PLAINTEXT mode fro kafka. The kafka producer get initialised immediately but Kafka Consumer or sink takes 10-15 secs to initialise and then app operators start processing data.

4. If wrote a producer directly using kafka producer from kafka library. And added transaction to it. My obeservation was kafkaProducer.initTransactions() method takes around 30 sec to execute post which it starting publishing data to kafka topic.

5. Same pattern of delayed start seems to appear incase of crash and automatic restart of job.

6. No exception in Flink Dashboard UI for job.

PFA, a copy of part of log.