chenchuangchuang created FLINK-17638:
---------------------------------------- Summary: FlinkKafkaConsumerBase restore from empty state will be set consum from earliest forced Key: FLINK-17638 URL: https://issues.apache.org/jira/browse/FLINK-17638 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.10.0, 1.9.3, 1.9.0 Environment: Flink 1.9.0 kafka 1.1.0 jdk 1.8 Reporter: chenchuangchuang my work target and data is like this : # i need count the number of post per user create last 30 days in my system # the total and realtime data is in MYSQL # i can get increment MYSQL binlog from kafka-1.1.1 ( it just store the last 7 days binlog), the topic name is "binlog_post_topic" # so , i have to combine the MYSQL data and the binlog data i do it in this way: # first , i carry a snapshot of MYSQL data to kafka topic in order of create_time ( topic name is "init-post-topic"), and consume from kafka topic "init-post-topic" as flink data-stream with the SlidingEventTimeWindows # second, after the task do all the data in the topic "init-post-topic" , i create a save point for the task , call the save point save-point-a # third, i modify my code , ## the data source is "binlog_post_topic" topic of kafka , ## other operotor will not change, ## and the "binlog_post_topic" is setted consuming from special timestamp (when the snapshot of MYSQL create ) # forth, i restart my task from save-point-a but i find the kafka consumer for the "binlog_post_topic" do not consume data from the timestamp i setted, but from the earlist, i find the log in the task manager {code:java} //代码占位符 2020-05-11 17:20:47,228 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Consumer subtask 0 restored state: {}. ... 2020-05-12 20:14:52,641 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Consumer subtask 0 will start reading 1 partitions with offsets in restored state: {KafkaTopicPartition{topic='binlog-kk_social-post', partition=0}=-915623761775} 2020-05-11 17:20:47,414 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Consumer subtask 0 creating fetcher with offsets {KafkaTopicPartition{topic='binlog-kk_social-post', partition=0}=-915623761775}. {code} i guess this may be caused by the FlinkKafkaConsumerBase then i find code like this in the method FlinkKafkaConsumerBase.initializeState() {code:java} //代码占位符 if (context.isRestored() && !restoredFromOldState) { restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator()); ....{code} this code mean that if a task is restart from the save point ,that restoredState will not be null, at least be an empty TreeMap; and in FlinkKafkaConsumerBase.open() {code:java} //代码占位符 if (restoredState != null) { for (KafkaTopicPartition partition : allPartitions) { if (!restoredState.containsKey(partition)) { restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET); } } {code} in this place will init the consumer , if a task is restart from a save-point , restoredState at least is an empty TreeMap, then in this code , the consumer will be setted consume from KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET i change this code like this {code:java} //代码占位符 if (restoredState != null && !restoredState.isEmpty()) { .... {code} and this work well for me . -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |