Jürgen Kreileder created FLINK-11654:
---------------------------------------- Summary: ProducerFencedExceptions from Kafka in EXACTLY_ONCE mode due to identical transactional IDs in multiple jobs Key: FLINK-11654 URL: https://issues.apache.org/jira/browse/FLINK-11654 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.7.1 Reporter: Jürgen Kreileder We run multiple jobs on a cluster which write a lot to the same Kafka topic from identically named sinks. When EXACTLY_ONCE semantic is enabled for the KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go into a restart cycle. Example exception from the Kafka log: {code:java} [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing append operation on partition finding-commands-dev-1-0 (kafka.server.ReplicaManager) org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is no longer valid. There is probably another producer with a newer epoch. 483 (request epoch), 484 (server epoch) {code} The reason for this is the way FlinkKafkaProducer initializes the TransactionalIdsGenerator: The IDs are only guaranteed to be unique for a single Job. But they can clash between different Jobs (and Clusters). {code:java} --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java @@ -819,6 +819,7 @@ public class FlinkKafkaProducer<IN> nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState( NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR); transactionalIdsGenerator = new TransactionalIdsGenerator( + // the prefix probably should include job id and maybe cluster id getRuntimeContext().getTaskName() + "-" + ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(), getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks(),{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) |
thank you for the quick response.
On Mon, Feb 18, 2019 at 8:58 PM Jürgen Kreileder (JIRA) <[hidden email]> wrote: > Jürgen Kreileder created FLINK-11654: > ---------------------------------------- > > Summary: ProducerFencedExceptions from Kafka in EXACTLY_ONCE > mode due to identical transactional IDs in multiple jobs > Key: FLINK-11654 > URL: https://issues.apache.org/jira/browse/FLINK-11654 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.7.1 > Reporter: Jürgen Kreileder > > > We run multiple jobs on a cluster which write a lot to the same Kafka > topic from identically named sinks. When EXACTLY_ONCE semantic is enabled > for the KafkaProducers we run into a lot of ProducerFencedExceptions and > all jobs go into a restart cycle. > > Example exception from the Kafka log: > > > {code:java} > [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing > append operation on partition finding-commands-dev-1-0 > (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch > is no longer valid. There is probably another producer with a newer epoch. > 483 (request epoch), 484 (server epoch) > {code} > The reason for this is the way FlinkKafkaProducer initializes the > TransactionalIdsGenerator: > > The IDs are only guaranteed to be unique for a single Job. But they can > clash between different Jobs (and Clusters). > > > > > {code:java} > --- > a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > +++ > b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > @@ -819,6 +819,7 @@ public class FlinkKafkaProducer<IN> > nextTransactionalIdHintState = > context.getOperatorStateStore().getUnionListState( > NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR); > transactionalIdsGenerator = new TransactionalIdsGenerator( > + // the prefix probably should include job id and maybe cluster id > getRuntimeContext().getTaskName() + "-" + > ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(), > getRuntimeContext().getIndexOfThisSubtask(), > > getRuntimeContext().getNumberOfParallelSubtasks(),{code} > > > > > > > -- > This message was sent by Atlassian JIRA > (v7.6.3#76005) > -- Thanks Rohan |
Free forum by Nabble | Edit this page |