[jira] [Created] (FLINK-11654) ProducerFencedExceptions from Kafka in EXACTLY_ONCE mode due to identical transactional IDs in multiple jobs

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-11654) ProducerFencedExceptions from Kafka in EXACTLY_ONCE mode due to identical transactional IDs in multiple jobs

Shang Yuanchun (Jira)
Jürgen Kreileder created FLINK-11654:
----------------------------------------

             Summary: ProducerFencedExceptions from Kafka in EXACTLY_ONCE mode due to identical transactional IDs in multiple jobs
                 Key: FLINK-11654
                 URL: https://issues.apache.org/jira/browse/FLINK-11654
             Project: Flink
          Issue Type: Bug
          Components: Kafka Connector
    Affects Versions: 1.7.1
            Reporter: Jürgen Kreileder


We run multiple jobs on a cluster which write a lot to the same Kafka topic from identically named sinks. When EXACTLY_ONCE semantic is enabled for the KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go into a restart cycle.

Example exception from the Kafka log:

 
{code:java}
[2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing append operation on partition finding-commands-dev-1-0 (kafka.server.ReplicaManager)
org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is no longer valid. There is probably another producer with a newer epoch. 483 (request epoch), 484 (server epoch)
{code}
The reason for this is the way FlinkKafkaProducer initializes the TransactionalIdsGenerator:

The IDs are only guaranteed to be unique for a single Job. But they can clash between different Jobs (and Clusters).

 

 
{code:java}
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -819,6 +819,7 @@ public class FlinkKafkaProducer<IN>
                nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(
                        NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
                transactionalIdsGenerator = new TransactionalIdsGenerator(
+ // the prefix probably should include job id and maybe cluster id
                        getRuntimeContext().getTaskName() + "-" + ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
                        getRuntimeContext().getIndexOfThisSubtask(),
                        getRuntimeContext().getNumberOfParallelSubtasks(),{code}
 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
Reply | Threaded
Open this post in threaded view
|

Re: [jira] [Created] (FLINK-11654) ProducerFencedExceptions from Kafka in EXACTLY_ONCE mode due to identical transactional IDs in multiple jobs

Rohan Thimmappa
thank you for the quick response.

On Mon, Feb 18, 2019 at 8:58 PM Jürgen Kreileder (JIRA) <[hidden email]>
wrote:

> Jürgen Kreileder created FLINK-11654:
> ----------------------------------------
>
>              Summary: ProducerFencedExceptions from Kafka in EXACTLY_ONCE
> mode due to identical transactional IDs in multiple jobs
>                  Key: FLINK-11654
>                  URL: https://issues.apache.org/jira/browse/FLINK-11654
>              Project: Flink
>           Issue Type: Bug
>           Components: Kafka Connector
>     Affects Versions: 1.7.1
>             Reporter: Jürgen Kreileder
>
>
> We run multiple jobs on a cluster which write a lot to the same Kafka
> topic from identically named sinks. When EXACTLY_ONCE semantic is enabled
> for the KafkaProducers we run into a lot of ProducerFencedExceptions and
> all jobs go into a restart cycle.
>
> Example exception from the Kafka log:
>
>
> {code:java}
> [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing
> append operation on partition finding-commands-dev-1-0
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch
> is no longer valid. There is probably another producer with a newer epoch.
> 483 (request epoch), 484 (server epoch)
> {code}
> The reason for this is the way FlinkKafkaProducer initializes the
> TransactionalIdsGenerator:
>
> The IDs are only guaranteed to be unique for a single Job. But they can
> clash between different Jobs (and Clusters).
>
>
>
>
> {code:java}
> ---
> a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> +++
> b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> @@ -819,6 +819,7 @@ public class FlinkKafkaProducer<IN>
>                 nextTransactionalIdHintState =
> context.getOperatorStateStore().getUnionListState(
>                         NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
>                 transactionalIdsGenerator = new TransactionalIdsGenerator(
> + // the prefix probably should include job id and maybe cluster id
>                         getRuntimeContext().getTaskName() + "-" +
> ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
>                         getRuntimeContext().getIndexOfThisSubtask(),
>
> getRuntimeContext().getNumberOfParallelSubtasks(),{code}
>
>
>
>
>
>
> --
> This message was sent by Atlassian JIRA
> (v7.6.3#76005)
>


--
Thanks
Rohan