andy hoang created FLINK-11335:
----------------------------------
Summary: Kafka consumer can not commit offset at checkpoint
Key: FLINK-11335
URL:
https://issues.apache.org/jira/browse/FLINK-11335 Project: Flink
Issue Type: Bug
Components: Kafka Connector
Affects Versions: 1.6.2
Environment: AWS EMR 5.20: hadoop, flink plugin
flink: 1.62
run under yarn-cluster
Kafka cluster: 1.0
Reporter: andy hoang
When trying to commit offset to kafka, I always get warning
{noformat}
2019-01-15 11:18:55,405 WARN org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher - Committing offsets to Kafka takes longer than the checkpoint interval. Skipping commit of previous offsets because newer complete checkpoint offsets are available. This does not compromise Flink's checkpoint integrity.
{noformat}
The code was simplified be remove business
{code:java}
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(new FsStateBackend("s3://pp-andy-test/checkpoint"))
env.enableCheckpointing(6000, CheckpointingMode.AT_LEAST_ONCE)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
val properties = new Properties()
properties.setProperty("group.id", "my_groupid")
//properties.setProperty("enable.auto.commit", "false")
val consumer = new FlinkKafkaConsumer011[ObjectNode]("my_topic",
new JSONKeyValueDeserializationSchema(true),
properties).setStartFromGroupOffsets().setCommitOffsetsOnCheckpoints(true)
val stream = env.addSource(consumer)
stream.map(new MapFunction[ObjectNode, Either[(Exception, ObjectNode), (Int, ujson.Value)]] {
override def map(node:ObjectNode): scala.Either[(Exception, ObjectNode), (Int, ujson.Value)] = {
logger.info("################## %s".format(node.get("metadata").toString))
Thread.sleep(3000)
return Right(200, writeJs(node.toString))
}
}).print()
env.execute("pp_convoy_flink")
}
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)