Re: recover from svaepoint

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Re: recover from svaepoint

Till Rohrmann
The error message says that we are trying to reuse a transaction id that is
currently being used or has expired.

I am not 100% sure how this can happen. My suspicion is that you have
resumed a job multiple times from the same savepoint. Have you checked that
there is no other job which has been resumed from the same savepoint and
which is currently running or has run and completed checkpoints?

@pnowojski <[hidden email]> @Becket Qin <[hidden email]> how
does the transaction id generation ensures that we don't have a clash of
transaction ids if we resume the same job multiple times from the same
savepoint? From the code, I do see that we have a TransactionalIdsGenerator
which is initialized with the taskName and the operator UID.

fyi: @Arvid Heise <[hidden email]>

Cheers,
Till


On Mon, May 31, 2021 at 11:10 AM 周瑞 <[hidden email]> wrote:

> HI:
>       When "sink.semantic = exactly-once", the following exception is
> thrown when recovering from svaepoint
>
>        public static final String KAFKA_TABLE_FORMAT =
>             "CREATE TABLE "+TABLE_NAME+" (\n" +
>                     "  "+COLUMN_NAME+" STRING\n" +
>                     ") WITH (\n" +
>                     "   'connector' = 'kafka',\n" +
>                     "   'topic' = '%s',\n" +
>                     "   'properties.bootstrap.servers' = '%s',\n" +
>                     "   'sink.semantic' = 'exactly-once',\n" +
>                     "   'properties.transaction.timeout.ms' =
> '900000',\n" +
>                     "   'sink.partitioner' =
> 'com.woqutench.qmatrix.cdc.extractor.PkPartitioner',\n" +
>                     "   'format' = 'dbz-json'\n" +
>                     ")\n";
>   [] - Source: TableSourceScan(table=[[default_catalog, default_database,
> debezium_source]], fields=[data]) -> Sink: Sink
> (table=[default_catalog.default_database.KafkaTable], fields=[data]) (1/1
> )#859 (075273be72ab01bf1afd3c066876aaa6) switched from INITIALIZING to
> FAILED with failure cause: org.apache.kafka.common.KafkaException:
> Unexpected error in InitProducerIdResponse; Producer attempted an
> operation with an old epoch. Either there is a newer producer with the
> same transactionalId, or the producer's transaction has been expired by
> the broker.
>     at org.apache.kafka.clients.producer.internals.
> TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager
> .java:1352)
>     at org.apache.kafka.clients.producer.internals.
> TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:
> 1260)
>     at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse
> .java:109)
>     at org.apache.kafka.clients.NetworkClient.completeResponses(
> NetworkClient.java:572)
>     at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
>     at org.apache.kafka.clients.producer.internals.Sender
> .maybeSendAndPollTransactionalRequest(Sender.java:414)
>     at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender
> .java:312)
>     at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:
> 239)
>     at java.lang.Thread.run(Thread.java:748)
>
Reply | Threaded
Open this post in threaded view
|

Re: recover from svaepoint

Piotr Nowojski-5
Hi,

I think there is no generic way. If this error has happened indeed after
starting a second job from the same savepoint, or something like that, user
can change Sink's operator UID.

If this is an issue of intentional recovery from an earlier
checkpoint/savepoint, maybe `FlinkKafkaProducer#setLogFailuresOnly` will be
helpful.

Best, Piotrek

wt., 1 cze 2021 o 15:16 Till Rohrmann <[hidden email]> napisał(a):

> The error message says that we are trying to reuse a transaction id that is
> currently being used or has expired.
>
> I am not 100% sure how this can happen. My suspicion is that you have
> resumed a job multiple times from the same savepoint. Have you checked that
> there is no other job which has been resumed from the same savepoint and
> which is currently running or has run and completed checkpoints?
>
> @pnowojski <[hidden email]> @Becket Qin <[hidden email]> how
> does the transaction id generation ensures that we don't have a clash of
> transaction ids if we resume the same job multiple times from the same
> savepoint? From the code, I do see that we have a TransactionalIdsGenerator
> which is initialized with the taskName and the operator UID.
>
> fyi: @Arvid Heise <[hidden email]>
>
> Cheers,
> Till
>
>
> On Mon, May 31, 2021 at 11:10 AM 周瑞 <[hidden email]> wrote:
>
> > HI:
> >       When "sink.semantic = exactly-once", the following exception is
> > thrown when recovering from svaepoint
> >
> >        public static final String KAFKA_TABLE_FORMAT =
> >             "CREATE TABLE "+TABLE_NAME+" (\n" +
> >                     "  "+COLUMN_NAME+" STRING\n" +
> >                     ") WITH (\n" +
> >                     "   'connector' = 'kafka',\n" +
> >                     "   'topic' = '%s',\n" +
> >                     "   'properties.bootstrap.servers' = '%s',\n" +
> >                     "   'sink.semantic' = 'exactly-once',\n" +
> >                     "   'properties.transaction.timeout.ms' =
> > '900000',\n" +
> >                     "   'sink.partitioner' =
> > 'com.woqutench.qmatrix.cdc.extractor.PkPartitioner',\n" +
> >                     "   'format' = 'dbz-json'\n" +
> >                     ")\n";
> >   [] - Source: TableSourceScan(table=[[default_catalog, default_database,
> > debezium_source]], fields=[data]) -> Sink: Sink
> > (table=[default_catalog.default_database.KafkaTable], fields=[data]) (1/1
> > )#859 (075273be72ab01bf1afd3c066876aaa6) switched from INITIALIZING to
> > FAILED with failure cause: org.apache.kafka.common.KafkaException:
> > Unexpected error in InitProducerIdResponse; Producer attempted an
> > operation with an old epoch. Either there is a newer producer with the
> > same transactionalId, or the producer's transaction has been expired by
> > the broker.
> >     at org.apache.kafka.clients.producer.internals.
> >
> TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager
> > .java:1352)
> >     at org.apache.kafka.clients.producer.internals.
> > TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:
> > 1260)
> >     at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse
> > .java:109)
> >     at org.apache.kafka.clients.NetworkClient.completeResponses(
> > NetworkClient.java:572)
> >     at
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
> >     at org.apache.kafka.clients.producer.internals.Sender
> > .maybeSendAndPollTransactionalRequest(Sender.java:414)
> >     at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender
> > .java:312)
> >     at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:
> > 239)
> >     at java.lang.Thread.run(Thread.java:748)
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: recover from svaepoint

Till Rohrmann
Forwarding 周瑞's message to a duplicate thread:

After our analysis, we found a bug in the
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.resumeTransaction
method
The analysis process is as follows:


org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction
public void initializeState(FunctionInitializationContext context) throws
Exception {
    state = context.getOperatorStateStore().getListState(stateDescriptor);
    boolean recoveredUserContext = false;
    if (context.isRestored()) {
        LOG.info("{} - restoring state", name());
        for (State<TXN, CONTEXT> operatorState : state.get()) {
            userContext = operatorState.getContext();
            List<TransactionHolder<TXN>> recoveredTransactions =
                    operatorState.getPendingCommitTransactions();
            List<TXN> handledTransactions = new
ArrayList<>(recoveredTransactions.size() + 1);
            for (TransactionHolder<TXN> recoveredTransaction :
recoveredTransactions) {
                // If this fails to succeed eventually, there is actually
data loss
                recoverAndCommitInternal(recoveredTransaction);
                handledTransactions.add(recoveredTransaction.handle);
                LOG.info("{} committed recovered transaction {}", name(),
recoveredTransaction);
            }

            {
                TXN transaction =
operatorState.getPendingTransaction().handle;
                recoverAndAbort(transaction);
                handledTransactions.add(transaction);
                LOG.info(
                        "{} aborted recovered transaction {}",
                        name(),
                        operatorState.getPendingTransaction());
            }

            if (userContext.isPresent()) {
                finishRecoveringContext(handledTransactions);
                recoveredUserContext = true;
            }
        }
    }

(1)recoverAndCommitInternal(recoveredTransaction);
The previous transactionalid, producerId and epoch in the state are used to
commit the transaction,However, we find that the producerIdAndEpoch of
transactionManager is a static constant (ProducerIdAndEpoch.NONE), which
pollutes the static constant ProducerIdAndEpoch.NONE

@Override
protected void recoverAndCommit(FlinkKafkaProducer.KafkaTransactionState
transaction) {
    if (transaction.isTransactional()) {
        FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
        try {
            producer =
initTransactionalProducer(transaction.transactionalId, false);
            producer.resumeTransaction(transaction.producerId,
transaction.epoch);
            producer.commitTransaction();
        } catch (InvalidTxnStateException | ProducerFencedException ex) {
            // That means we have committed this transaction before.
            LOG.warn(
                    "Encountered error {} while recovering transaction {}. "
                            + "Presumably this transaction has been already
committed before",
                    ex,
                    transaction);
        } finally {
            if (producer != null) {
                producer.close(0, TimeUnit.SECONDS);
            }
        }
    }
}

public void resumeTransaction(long producerId, short epoch) {
    synchronized (producerClosingLock) {
        ensureNotClosed();
        Preconditions.checkState(
                producerId >= 0 && epoch >= 0,
                "Incorrect values for producerId %s and epoch %s",
                producerId,
                epoch);
        LOG.info(
                "Attempting to resume transaction {} with producerId {} and
epoch {}",
                transactionalId,
                producerId,
                epoch);

        Object transactionManager = getField(kafkaProducer,
"transactionManager");
        synchronized (transactionManager) {
            Object topicPartitionBookkeeper =
                    getField(transactionManager,
"topicPartitionBookkeeper");

            invoke(
                    transactionManager,
                    "transitionTo",
                    getEnum(

"org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
            invoke(topicPartitionBookkeeper, "reset");

            Object producerIdAndEpoch = getField(transactionManager,
"producerIdAndEpoch");
            setField(producerIdAndEpoch, "producerId", producerId);
            setField(producerIdAndEpoch, "epoch", epoch);

            invoke(
                    transactionManager,
                    "transitionTo",
                    getEnum(

"org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));

            invoke(
                    transactionManager,
                    "transitionTo",
                    getEnum(

"org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
            setField(transactionManager, "transactionStarted", true);
        }
    }
}


public TransactionManager(LogContext logContext,
                          String transactionalId,
                          int transactionTimeoutMs,
                          long retryBackoffMs,
                          ApiVersions apiVersions) {
    this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
    this.transactionalId = transactionalId;
    this.log = logContext.logger(TransactionManager.class);
    this.transactionTimeoutMs = transactionTimeoutMs;
    this.transactionCoordinator = null;
    this.consumerGroupCoordinator = null;
    this.newPartitionsInTransaction = new HashSet<>();
    this.pendingPartitionsInTransaction = new HashSet<>();
    this.partitionsInTransaction = new HashSet<>();
    this.pendingRequests = new PriorityQueue<>(10,
Comparator.comparingInt(o -> o.priority().priority));
    this.pendingTxnOffsetCommits = new HashMap<>();
    this.partitionsWithUnresolvedSequences = new HashMap<>();
    this.partitionsToRewriteSequences = new HashSet<>();
    this.retryBackoffMs = retryBackoffMs;
    this.topicPartitionBookkeeper = new TopicPartitionBookkeeper();
    this.apiVersions = apiVersions;
}



public class ProducerIdAndEpoch {
    public static final ProducerIdAndEpoch NONE = new
ProducerIdAndEpoch(RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH);

    public final long producerId;
    public final short epoch;

    public ProducerIdAndEpoch(long producerId, short epoch) {
        this.producerId = producerId;
        this.epoch = epoch;
    }

    public boolean isValid() {
        return RecordBatch.NO_PRODUCER_ID < producerId;
    }

    @Override
    public String toString() {
        return "(producerId=" + producerId + ", epoch=" + epoch + ")";
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        ProducerIdAndEpoch that = (ProducerIdAndEpoch) o;

        if (producerId != that.producerId) return false;
        return epoch == that.epoch;
    }

    @Override
    public int hashCode() {
        int result = (int) (producerId ^ (producerId >>> 32));
        result = 31 * result + (int) epoch;
        return result;
    }

}

(2)In the second step,
recoverAndAbort(FlinkKafkaProducer.KafkaTransactionState transaction), when
initializing the transaction, producerId and epoch in the first step
pollute ProducerIdAndEpoch.NONE. Therefore, when an initialization request
is sent to Kafka, the values of the producerId and epoch  variables in the
request parameter ProducerIdAndEpoch.NONE are equal to the values of the
producerId and epoch  variables in the first transaction commit, not equal
to - 1, - 1. So Kafka throws an exception:
Unexpected error in InitProducerIdResponse; Producer attempted an operation
with an old epoch. Either there is a newer producer with the same
transactionalId, or the producer's transaction has been expired by the
broker.
    at
org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352)
    at
org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260)
    at
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
    at
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
    at
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414)
    at
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312)
    at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
    at java.lang.Thread.run(Thread.java:748)

protected void recoverAndAbort(FlinkKafkaProducer.KafkaTransactionState
transaction) {
    if (transaction.isTransactional()) {
        FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
        try {
            producer =
initTransactionalProducer(transaction.transactionalId, false);
            producer.initTransactions();
        } finally {
            if (producer != null) {
                producer.close(0, TimeUnit.SECONDS);
            }
        }
    }
}

public synchronized TransactionalRequestResult initializeTransactions() {
    return initializeTransactions(ProducerIdAndEpoch.NONE);
}

synchronized TransactionalRequestResult
initializeTransactions(ProducerIdAndEpoch producerIdAndEpoch) {
    boolean isEpochBump = producerIdAndEpoch != ProducerIdAndEpoch.NONE;
    return handleCachedTransactionRequestResult(() -> {
        // If this is an epoch bump, we will transition the state as part
of handling the EndTxnRequest
        if (!isEpochBump) {
            transitionTo(State.INITIALIZING);
            log.info("Invoking InitProducerId for the first time in order
to acquire a producer ID");
        } else {
            log.info("Invoking InitProducerId with current producer ID and
epoch {} in order to bump the epoch", producerIdAndEpoch);
        }
        InitProducerIdRequestData requestData = new
InitProducerIdRequestData()
                .setTransactionalId(transactionalId)
                .setTransactionTimeoutMs(transactionTimeoutMs)
                .setProducerId(producerIdAndEpoch.producerId)
                .setProducerEpoch(producerIdAndEpoch.epoch);
        InitProducerIdHandler handler = new InitProducerIdHandler(new
InitProducerIdRequest.Builder(requestData),
                isEpochBump);
        enqueueRequest(handler);
        return handler.result;
    }, State.INITIALIZING);
}

On Wed, Jun 2, 2021 at 3:55 PM Piotr Nowojski <[hidden email]> wrote:

> Hi,
>
> I think there is no generic way. If this error has happened indeed after
> starting a second job from the same savepoint, or something like that, user
> can change Sink's operator UID.
>
> If this is an issue of intentional recovery from an earlier
> checkpoint/savepoint, maybe `FlinkKafkaProducer#setLogFailuresOnly` will be
> helpful.
>
> Best, Piotrek
>
> wt., 1 cze 2021 o 15:16 Till Rohrmann <[hidden email]> napisał(a):
>
>> The error message says that we are trying to reuse a transaction id that
>> is
>> currently being used or has expired.
>>
>> I am not 100% sure how this can happen. My suspicion is that you have
>> resumed a job multiple times from the same savepoint. Have you checked
>> that
>> there is no other job which has been resumed from the same savepoint and
>> which is currently running or has run and completed checkpoints?
>>
>> @pnowojski <[hidden email]> @Becket Qin <[hidden email]> how
>> does the transaction id generation ensures that we don't have a clash of
>> transaction ids if we resume the same job multiple times from the same
>> savepoint? From the code, I do see that we have a
>> TransactionalIdsGenerator
>> which is initialized with the taskName and the operator UID.
>>
>> fyi: @Arvid Heise <[hidden email]>
>>
>> Cheers,
>> Till
>>
>>
>> On Mon, May 31, 2021 at 11:10 AM 周瑞 <[hidden email]> wrote:
>>
>> > HI:
>> >       When "sink.semantic = exactly-once", the following exception is
>> > thrown when recovering from svaepoint
>> >
>> >        public static final String KAFKA_TABLE_FORMAT =
>> >             "CREATE TABLE "+TABLE_NAME+" (\n" +
>> >                     "  "+COLUMN_NAME+" STRING\n" +
>> >                     ") WITH (\n" +
>> >                     "   'connector' = 'kafka',\n" +
>> >                     "   'topic' = '%s',\n" +
>> >                     "   'properties.bootstrap.servers' = '%s',\n" +
>> >                     "   'sink.semantic' = 'exactly-once',\n" +
>> >                     "   'properties.transaction.timeout.ms' =
>> > '900000',\n" +
>> >                     "   'sink.partitioner' =
>> > 'com.woqutench.qmatrix.cdc.extractor.PkPartitioner',\n" +
>> >                     "   'format' = 'dbz-json'\n" +
>> >                     ")\n";
>> >   [] - Source: TableSourceScan(table=[[default_catalog,
>> default_database,
>> > debezium_source]], fields=[data]) -> Sink: Sink
>> > (table=[default_catalog.default_database.KafkaTable], fields=[data])
>> (1/1
>> > )#859 (075273be72ab01bf1afd3c066876aaa6) switched from INITIALIZING to
>> > FAILED with failure cause: org.apache.kafka.common.KafkaException:
>> > Unexpected error in InitProducerIdResponse; Producer attempted an
>> > operation with an old epoch. Either there is a newer producer with the
>> > same transactionalId, or the producer's transaction has been expired by
>> > the broker.
>> >     at org.apache.kafka.clients.producer.internals.
>> >
>> TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager
>> > .java:1352)
>> >     at org.apache.kafka.clients.producer.internals.
>> > TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:
>> > 1260)
>> >     at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse
>> > .java:109)
>> >     at org.apache.kafka.clients.NetworkClient.completeResponses(
>> > NetworkClient.java:572)
>> >     at
>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
>> >     at org.apache.kafka.clients.producer.internals.Sender
>> > .maybeSendAndPollTransactionalRequest(Sender.java:414)
>> >     at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender
>> > .java:312)
>> >     at
>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:
>> > 239)
>> >     at java.lang.Thread.run(Thread.java:748)
>> >
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: recover from svaepoint

Till Rohrmann
Thanks for the update. Skimming over the code it looks indeed that we are
overwriting the values of the static value ProducerIdAndEpoch.NONE. I am
not 100% how this will cause the observed problem, though. I am also not a
Flink Kafka connector and Kafka expert so I would appreciate it if someone
more familiar could double check this part of the code.

Concerning the required changing of the UID of an operator Piotr, is this a
known issue and documented somewhere? I find this rather surprising from a
user's point of view.

Cheers,
Till

On Thu, Jun 3, 2021 at 8:53 AM Till Rohrmann <[hidden email]> wrote:

> Forwarding 周瑞's message to a duplicate thread:
>
> After our analysis, we found a bug in the
> org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.resumeTransaction
> method
> The analysis process is as follows:
>
>
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction
> public void initializeState(FunctionInitializationContext context) throws
> Exception {
>     state = context.getOperatorStateStore().getListState(stateDescriptor);
>     boolean recoveredUserContext = false;
>     if (context.isRestored()) {
>         LOG.info("{} - restoring state", name());
>         for (State<TXN, CONTEXT> operatorState : state.get()) {
>             userContext = operatorState.getContext();
>             List<TransactionHolder<TXN>> recoveredTransactions =
>                     operatorState.getPendingCommitTransactions();
>             List<TXN> handledTransactions = new
> ArrayList<>(recoveredTransactions.size() + 1);
>             for (TransactionHolder<TXN> recoveredTransaction :
> recoveredTransactions) {
>                 // If this fails to succeed eventually, there is actually
> data loss
>                 recoverAndCommitInternal(recoveredTransaction);
>                 handledTransactions.add(recoveredTransaction.handle);
>                 LOG.info("{} committed recovered transaction {}", name(),
> recoveredTransaction);
>             }
>
>             {
>                 TXN transaction =
> operatorState.getPendingTransaction().handle;
>                 recoverAndAbort(transaction);
>                 handledTransactions.add(transaction);
>                 LOG.info(
>                         "{} aborted recovered transaction {}",
>                         name(),
>                         operatorState.getPendingTransaction());
>             }
>
>             if (userContext.isPresent()) {
>                 finishRecoveringContext(handledTransactions);
>                 recoveredUserContext = true;
>             }
>         }
>     }
>
> (1)recoverAndCommitInternal(recoveredTransaction);
> The previous transactionalid, producerId and epoch in the state are used
> to commit the transaction,However, we find that the producerIdAndEpoch of
> transactionManager is a static constant (ProducerIdAndEpoch.NONE), which
> pollutes the static constant ProducerIdAndEpoch.NONE
>
> @Override
> protected void recoverAndCommit(FlinkKafkaProducer.KafkaTransactionState
> transaction) {
>     if (transaction.isTransactional()) {
>         FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
>         try {
>             producer =
> initTransactionalProducer(transaction.transactionalId, false);
>             producer.resumeTransaction(transaction.producerId,
> transaction.epoch);
>             producer.commitTransaction();
>         } catch (InvalidTxnStateException | ProducerFencedException ex) {
>             // That means we have committed this transaction before.
>             LOG.warn(
>                     "Encountered error {} while recovering transaction {}.
> "
>                             + "Presumably this transaction has been
> already committed before",
>                     ex,
>                     transaction);
>         } finally {
>             if (producer != null) {
>                 producer.close(0, TimeUnit.SECONDS);
>             }
>         }
>     }
> }
>
> public void resumeTransaction(long producerId, short epoch) {
>     synchronized (producerClosingLock) {
>         ensureNotClosed();
>         Preconditions.checkState(
>                 producerId >= 0 && epoch >= 0,
>                 "Incorrect values for producerId %s and epoch %s",
>                 producerId,
>                 epoch);
>         LOG.info(
>                 "Attempting to resume transaction {} with producerId {}
> and epoch {}",
>                 transactionalId,
>                 producerId,
>                 epoch);
>
>         Object transactionManager = getField(kafkaProducer,
> "transactionManager");
>         synchronized (transactionManager) {
>             Object topicPartitionBookkeeper =
>                     getField(transactionManager,
> "topicPartitionBookkeeper");
>
>             invoke(
>                     transactionManager,
>                     "transitionTo",
>                     getEnum(
>
> "org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
>             invoke(topicPartitionBookkeeper, "reset");
>
>             Object producerIdAndEpoch = getField(transactionManager,
> "producerIdAndEpoch");
>             setField(producerIdAndEpoch, "producerId", producerId);
>             setField(producerIdAndEpoch, "epoch", epoch);
>
>             invoke(
>                     transactionManager,
>                     "transitionTo",
>                     getEnum(
>
> "org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
>
>             invoke(
>                     transactionManager,
>                     "transitionTo",
>                     getEnum(
>
> "org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
>             setField(transactionManager, "transactionStarted", true);
>         }
>     }
> }
>
>
> public TransactionManager(LogContext logContext,
>                           String transactionalId,
>                           int transactionTimeoutMs,
>                           long retryBackoffMs,
>                           ApiVersions apiVersions) {
>     this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
>     this.transactionalId = transactionalId;
>     this.log = logContext.logger(TransactionManager.class);
>     this.transactionTimeoutMs = transactionTimeoutMs;
>     this.transactionCoordinator = null;
>     this.consumerGroupCoordinator = null;
>     this.newPartitionsInTransaction = new HashSet<>();
>     this.pendingPartitionsInTransaction = new HashSet<>();
>     this.partitionsInTransaction = new HashSet<>();
>     this.pendingRequests = new PriorityQueue<>(10,
> Comparator.comparingInt(o -> o.priority().priority));
>     this.pendingTxnOffsetCommits = new HashMap<>();
>     this.partitionsWithUnresolvedSequences = new HashMap<>();
>     this.partitionsToRewriteSequences = new HashSet<>();
>     this.retryBackoffMs = retryBackoffMs;
>     this.topicPartitionBookkeeper = new TopicPartitionBookkeeper();
>     this.apiVersions = apiVersions;
> }
>
>
>
> public class ProducerIdAndEpoch {
>     public static final ProducerIdAndEpoch NONE = new
> ProducerIdAndEpoch(RecordBatch.NO_PRODUCER_ID,
> RecordBatch.NO_PRODUCER_EPOCH);
>
>     public final long producerId;
>     public final short epoch;
>
>     public ProducerIdAndEpoch(long producerId, short epoch) {
>         this.producerId = producerId;
>         this.epoch = epoch;
>     }
>
>     public boolean isValid() {
>         return RecordBatch.NO_PRODUCER_ID < producerId;
>     }
>
>     @Override
>     public String toString() {
>         return "(producerId=" + producerId + ", epoch=" + epoch + ")";
>     }
>
>     @Override
>     public boolean equals(Object o) {
>         if (this == o) return true;
>         if (o == null || getClass() != o.getClass()) return false;
>
>         ProducerIdAndEpoch that = (ProducerIdAndEpoch) o;
>
>         if (producerId != that.producerId) return false;
>         return epoch == that.epoch;
>     }
>
>     @Override
>     public int hashCode() {
>         int result = (int) (producerId ^ (producerId >>> 32));
>         result = 31 * result + (int) epoch;
>         return result;
>     }
>
> }
>
> (2)In the second step,
> recoverAndAbort(FlinkKafkaProducer.KafkaTransactionState transaction), when
> initializing the transaction, producerId and epoch in the first step
> pollute ProducerIdAndEpoch.NONE. Therefore, when an initialization request
> is sent to Kafka, the values of the producerId and epoch  variables in the
> request parameter ProducerIdAndEpoch.NONE are equal to the values of the
> producerId and epoch  variables in the first transaction commit, not equal
> to - 1, - 1. So Kafka throws an exception:
> Unexpected error in InitProducerIdResponse; Producer attempted an
> operation with an old epoch. Either there is a newer producer with the same
> transactionalId, or the producer's transaction has been expired by the
> broker.
>     at
> org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352)
>     at
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260)
>     at
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>     at
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572)
>     at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
>     at
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414)
>     at
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312)
>     at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
>     at java.lang.Thread.run(Thread.java:748)
>
> protected void recoverAndAbort(FlinkKafkaProducer.KafkaTransactionState
> transaction) {
>     if (transaction.isTransactional()) {
>         FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
>         try {
>             producer =
> initTransactionalProducer(transaction.transactionalId, false);
>             producer.initTransactions();
>         } finally {
>             if (producer != null) {
>                 producer.close(0, TimeUnit.SECONDS);
>             }
>         }
>     }
> }
>
> public synchronized TransactionalRequestResult initializeTransactions() {
>     return initializeTransactions(ProducerIdAndEpoch.NONE);
> }
>
> synchronized TransactionalRequestResult
> initializeTransactions(ProducerIdAndEpoch producerIdAndEpoch) {
>     boolean isEpochBump = producerIdAndEpoch != ProducerIdAndEpoch.NONE;
>     return handleCachedTransactionRequestResult(() -> {
>         // If this is an epoch bump, we will transition the state as part
> of handling the EndTxnRequest
>         if (!isEpochBump) {
>             transitionTo(State.INITIALIZING);
>             log.info("Invoking InitProducerId for the first time in order
> to acquire a producer ID");
>         } else {
>             log.info("Invoking InitProducerId with current producer ID
> and epoch {} in order to bump the epoch", producerIdAndEpoch);
>         }
>         InitProducerIdRequestData requestData = new
> InitProducerIdRequestData()
>                 .setTransactionalId(transactionalId)
>                 .setTransactionTimeoutMs(transactionTimeoutMs)
>                 .setProducerId(producerIdAndEpoch.producerId)
>                 .setProducerEpoch(producerIdAndEpoch.epoch);
>         InitProducerIdHandler handler = new InitProducerIdHandler(new
> InitProducerIdRequest.Builder(requestData),
>                 isEpochBump);
>         enqueueRequest(handler);
>         return handler.result;
>     }, State.INITIALIZING);
> }
>
> On Wed, Jun 2, 2021 at 3:55 PM Piotr Nowojski <[hidden email]>
> wrote:
>
>> Hi,
>>
>> I think there is no generic way. If this error has happened indeed after
>> starting a second job from the same savepoint, or something like that, user
>> can change Sink's operator UID.
>>
>> If this is an issue of intentional recovery from an earlier
>> checkpoint/savepoint, maybe `FlinkKafkaProducer#setLogFailuresOnly` will be
>> helpful.
>>
>> Best, Piotrek
>>
>> wt., 1 cze 2021 o 15:16 Till Rohrmann <[hidden email]> napisał(a):
>>
>>> The error message says that we are trying to reuse a transaction id that
>>> is
>>> currently being used or has expired.
>>>
>>> I am not 100% sure how this can happen. My suspicion is that you have
>>> resumed a job multiple times from the same savepoint. Have you checked
>>> that
>>> there is no other job which has been resumed from the same savepoint and
>>> which is currently running or has run and completed checkpoints?
>>>
>>> @pnowojski <[hidden email]> @Becket Qin <[hidden email]> how
>>> does the transaction id generation ensures that we don't have a clash of
>>> transaction ids if we resume the same job multiple times from the same
>>> savepoint? From the code, I do see that we have a
>>> TransactionalIdsGenerator
>>> which is initialized with the taskName and the operator UID.
>>>
>>> fyi: @Arvid Heise <[hidden email]>
>>>
>>> Cheers,
>>> Till
>>>
>>>
>>> On Mon, May 31, 2021 at 11:10 AM 周瑞 <[hidden email]> wrote:
>>>
>>> > HI:
>>> >       When "sink.semantic = exactly-once", the following exception is
>>> > thrown when recovering from svaepoint
>>> >
>>> >        public static final String KAFKA_TABLE_FORMAT =
>>> >             "CREATE TABLE "+TABLE_NAME+" (\n" +
>>> >                     "  "+COLUMN_NAME+" STRING\n" +
>>> >                     ") WITH (\n" +
>>> >                     "   'connector' = 'kafka',\n" +
>>> >                     "   'topic' = '%s',\n" +
>>> >                     "   'properties.bootstrap.servers' = '%s',\n" +
>>> >                     "   'sink.semantic' = 'exactly-once',\n" +
>>> >                     "   'properties.transaction.timeout.ms' =
>>> > '900000',\n" +
>>> >                     "   'sink.partitioner' =
>>> > 'com.woqutench.qmatrix.cdc.extractor.PkPartitioner',\n" +
>>> >                     "   'format' = 'dbz-json'\n" +
>>> >                     ")\n";
>>> >   [] - Source: TableSourceScan(table=[[default_catalog,
>>> default_database,
>>> > debezium_source]], fields=[data]) -> Sink: Sink
>>> > (table=[default_catalog.default_database.KafkaTable], fields=[data])
>>> (1/1
>>> > )#859 (075273be72ab01bf1afd3c066876aaa6) switched from INITIALIZING to
>>> > FAILED with failure cause: org.apache.kafka.common.KafkaException:
>>> > Unexpected error in InitProducerIdResponse; Producer attempted an
>>> > operation with an old epoch. Either there is a newer producer with the
>>> > same transactionalId, or the producer's transaction has been expired by
>>> > the broker.
>>> >     at org.apache.kafka.clients.producer.internals.
>>> >
>>> TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager
>>> > .java:1352)
>>> >     at org.apache.kafka.clients.producer.internals.
>>> >
>>> TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:
>>> > 1260)
>>> >     at
>>> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse
>>> > .java:109)
>>> >     at org.apache.kafka.clients.NetworkClient.completeResponses(
>>> > NetworkClient.java:572)
>>> >     at
>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
>>> >     at org.apache.kafka.clients.producer.internals.Sender
>>> > .maybeSendAndPollTransactionalRequest(Sender.java:414)
>>> >     at
>>> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender
>>> > .java:312)
>>> >     at
>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:
>>> > 239)
>>> >     at java.lang.Thread.run(Thread.java:748)
>>> >
>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: recover from svaepoint

Tianxin Zhao
I encountered the exact same issue before when experimenting in a testing
environment. I was not able to spot the bug as mentioned in this thread,
the solution I did was to downgrade my own kafka-client version from 2.5 to
2.4.1, matching the version of flink-connector-kafka.
In 2.4.1 Kafka, TransactionManager is initializing producerIdAndEpoch using

this.producerIdAndEpoch = new ProducerIdAndEpoch(NO_PRODUCER_ID,
> NO_PRODUCER_EPOCH);


instead of

> this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;


On Thu, Jun 3, 2021 at 12:11 AM Till Rohrmann <[hidden email]> wrote:

> Thanks for the update. Skimming over the code it looks indeed that we are
> overwriting the values of the static value ProducerIdAndEpoch.NONE. I am
> not 100% how this will cause the observed problem, though. I am also not a
> Flink Kafka connector and Kafka expert so I would appreciate it if someone
> more familiar could double check this part of the code.
>
> Concerning the required changing of the UID of an operator Piotr, is this
> a known issue and documented somewhere? I find this rather surprising from
> a user's point of view.
>
> Cheers,
> Till
>
> On Thu, Jun 3, 2021 at 8:53 AM Till Rohrmann <[hidden email]> wrote:
>
>> Forwarding 周瑞's message to a duplicate thread:
>>
>> After our analysis, we found a bug in the
>> org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.resumeTransaction
>> method
>> The analysis process is as follows:
>>
>>
>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction
>> public void initializeState(FunctionInitializationContext context) throws
>> Exception {
>>     state = context.getOperatorStateStore().getListState(stateDescriptor);
>>     boolean recoveredUserContext = false;
>>     if (context.isRestored()) {
>>         LOG.info("{} - restoring state", name());
>>         for (State<TXN, CONTEXT> operatorState : state.get()) {
>>             userContext = operatorState.getContext();
>>             List<TransactionHolder<TXN>> recoveredTransactions =
>>                     operatorState.getPendingCommitTransactions();
>>             List<TXN> handledTransactions = new
>> ArrayList<>(recoveredTransactions.size() + 1);
>>             for (TransactionHolder<TXN> recoveredTransaction :
>> recoveredTransactions) {
>>                 // If this fails to succeed eventually, there is actually
>> data loss
>>                 recoverAndCommitInternal(recoveredTransaction);
>>                 handledTransactions.add(recoveredTransaction.handle);
>>                 LOG.info("{} committed recovered transaction {}", name(),
>> recoveredTransaction);
>>             }
>>
>>             {
>>                 TXN transaction =
>> operatorState.getPendingTransaction().handle;
>>                 recoverAndAbort(transaction);
>>                 handledTransactions.add(transaction);
>>                 LOG.info(
>>                         "{} aborted recovered transaction {}",
>>                         name(),
>>                         operatorState.getPendingTransaction());
>>             }
>>
>>             if (userContext.isPresent()) {
>>                 finishRecoveringContext(handledTransactions);
>>                 recoveredUserContext = true;
>>             }
>>         }
>>     }
>>
>> (1)recoverAndCommitInternal(recoveredTransaction);
>> The previous transactionalid, producerId and epoch in the state are used
>> to commit the transaction,However, we find that the producerIdAndEpoch of
>> transactionManager is a static constant (ProducerIdAndEpoch.NONE), which
>> pollutes the static constant ProducerIdAndEpoch.NONE
>>
>> @Override
>> protected void recoverAndCommit(FlinkKafkaProducer.KafkaTransactionState
>> transaction) {
>>     if (transaction.isTransactional()) {
>>         FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
>>         try {
>>             producer =
>> initTransactionalProducer(transaction.transactionalId, false);
>>             producer.resumeTransaction(transaction.producerId,
>> transaction.epoch);
>>             producer.commitTransaction();
>>         } catch (InvalidTxnStateException | ProducerFencedException ex) {
>>             // That means we have committed this transaction before.
>>             LOG.warn(
>>                     "Encountered error {} while recovering transaction
>> {}. "
>>                             + "Presumably this transaction has been
>> already committed before",
>>                     ex,
>>                     transaction);
>>         } finally {
>>             if (producer != null) {
>>                 producer.close(0, TimeUnit.SECONDS);
>>             }
>>         }
>>     }
>> }
>>
>> public void resumeTransaction(long producerId, short epoch) {
>>     synchronized (producerClosingLock) {
>>         ensureNotClosed();
>>         Preconditions.checkState(
>>                 producerId >= 0 && epoch >= 0,
>>                 "Incorrect values for producerId %s and epoch %s",
>>                 producerId,
>>                 epoch);
>>         LOG.info(
>>                 "Attempting to resume transaction {} with producerId {}
>> and epoch {}",
>>                 transactionalId,
>>                 producerId,
>>                 epoch);
>>
>>         Object transactionManager = getField(kafkaProducer,
>> "transactionManager");
>>         synchronized (transactionManager) {
>>             Object topicPartitionBookkeeper =
>>                     getField(transactionManager,
>> "topicPartitionBookkeeper");
>>
>>             invoke(
>>                     transactionManager,
>>                     "transitionTo",
>>                     getEnum(
>>
>> "org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
>>             invoke(topicPartitionBookkeeper, "reset");
>>
>>             Object producerIdAndEpoch = getField(transactionManager,
>> "producerIdAndEpoch");
>>             setField(producerIdAndEpoch, "producerId", producerId);
>>             setField(producerIdAndEpoch, "epoch", epoch);
>>
>>             invoke(
>>                     transactionManager,
>>                     "transitionTo",
>>                     getEnum(
>>
>> "org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
>>
>>             invoke(
>>                     transactionManager,
>>                     "transitionTo",
>>                     getEnum(
>>
>> "org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
>>             setField(transactionManager, "transactionStarted", true);
>>         }
>>     }
>> }
>>
>>
>> public TransactionManager(LogContext logContext,
>>                           String transactionalId,
>>                           int transactionTimeoutMs,
>>                           long retryBackoffMs,
>>                           ApiVersions apiVersions) {
>>     this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
>>     this.transactionalId = transactionalId;
>>     this.log = logContext.logger(TransactionManager.class);
>>     this.transactionTimeoutMs = transactionTimeoutMs;
>>     this.transactionCoordinator = null;
>>     this.consumerGroupCoordinator = null;
>>     this.newPartitionsInTransaction = new HashSet<>();
>>     this.pendingPartitionsInTransaction = new HashSet<>();
>>     this.partitionsInTransaction = new HashSet<>();
>>     this.pendingRequests = new PriorityQueue<>(10,
>> Comparator.comparingInt(o -> o.priority().priority));
>>     this.pendingTxnOffsetCommits = new HashMap<>();
>>     this.partitionsWithUnresolvedSequences = new HashMap<>();
>>     this.partitionsToRewriteSequences = new HashSet<>();
>>     this.retryBackoffMs = retryBackoffMs;
>>     this.topicPartitionBookkeeper = new TopicPartitionBookkeeper();
>>     this.apiVersions = apiVersions;
>> }
>>
>>
>>
>> public class ProducerIdAndEpoch {
>>     public static final ProducerIdAndEpoch NONE = new
>> ProducerIdAndEpoch(RecordBatch.NO_PRODUCER_ID,
>> RecordBatch.NO_PRODUCER_EPOCH);
>>
>>     public final long producerId;
>>     public final short epoch;
>>
>>     public ProducerIdAndEpoch(long producerId, short epoch) {
>>         this.producerId = producerId;
>>         this.epoch = epoch;
>>     }
>>
>>     public boolean isValid() {
>>         return RecordBatch.NO_PRODUCER_ID < producerId;
>>     }
>>
>>     @Override
>>     public String toString() {
>>         return "(producerId=" + producerId + ", epoch=" + epoch + ")";
>>     }
>>
>>     @Override
>>     public boolean equals(Object o) {
>>         if (this == o) return true;
>>         if (o == null || getClass() != o.getClass()) return false;
>>
>>         ProducerIdAndEpoch that = (ProducerIdAndEpoch) o;
>>
>>         if (producerId != that.producerId) return false;
>>         return epoch == that.epoch;
>>     }
>>
>>     @Override
>>     public int hashCode() {
>>         int result = (int) (producerId ^ (producerId >>> 32));
>>         result = 31 * result + (int) epoch;
>>         return result;
>>     }
>>
>> }
>>
>> (2)In the second step,
>> recoverAndAbort(FlinkKafkaProducer.KafkaTransactionState transaction), when
>> initializing the transaction, producerId and epoch in the first step
>> pollute ProducerIdAndEpoch.NONE. Therefore, when an initialization request
>> is sent to Kafka, the values of the producerId and epoch  variables in the
>> request parameter ProducerIdAndEpoch.NONE are equal to the values of the
>> producerId and epoch  variables in the first transaction commit, not equal
>> to - 1, - 1. So Kafka throws an exception:
>> Unexpected error in InitProducerIdResponse; Producer attempted an
>> operation with an old epoch. Either there is a newer producer with the same
>> transactionalId, or the producer's transaction has been expired by the
>> broker.
>>     at
>> org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352)
>>     at
>> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260)
>>     at
>> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>>     at
>> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572)
>>     at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
>>     at
>> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414)
>>     at
>> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312)
>>     at
>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
>>     at java.lang.Thread.run(Thread.java:748)
>>
>> protected void recoverAndAbort(FlinkKafkaProducer.KafkaTransactionState
>> transaction) {
>>     if (transaction.isTransactional()) {
>>         FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
>>         try {
>>             producer =
>> initTransactionalProducer(transaction.transactionalId, false);
>>             producer.initTransactions();
>>         } finally {
>>             if (producer != null) {
>>                 producer.close(0, TimeUnit.SECONDS);
>>             }
>>         }
>>     }
>> }
>>
>> public synchronized TransactionalRequestResult initializeTransactions() {
>>     return initializeTransactions(ProducerIdAndEpoch.NONE);
>> }
>>
>> synchronized TransactionalRequestResult
>> initializeTransactions(ProducerIdAndEpoch producerIdAndEpoch) {
>>     boolean isEpochBump = producerIdAndEpoch != ProducerIdAndEpoch.NONE;
>>     return handleCachedTransactionRequestResult(() -> {
>>         // If this is an epoch bump, we will transition the state as part
>> of handling the EndTxnRequest
>>         if (!isEpochBump) {
>>             transitionTo(State.INITIALIZING);
>>             log.info("Invoking InitProducerId for the first time in
>> order to acquire a producer ID");
>>         } else {
>>             log.info("Invoking InitProducerId with current producer ID
>> and epoch {} in order to bump the epoch", producerIdAndEpoch);
>>         }
>>         InitProducerIdRequestData requestData = new
>> InitProducerIdRequestData()
>>                 .setTransactionalId(transactionalId)
>>                 .setTransactionTimeoutMs(transactionTimeoutMs)
>>                 .setProducerId(producerIdAndEpoch.producerId)
>>                 .setProducerEpoch(producerIdAndEpoch.epoch);
>>         InitProducerIdHandler handler = new InitProducerIdHandler(new
>> InitProducerIdRequest.Builder(requestData),
>>                 isEpochBump);
>>         enqueueRequest(handler);
>>         return handler.result;
>>     }, State.INITIALIZING);
>> }
>>
>> On Wed, Jun 2, 2021 at 3:55 PM Piotr Nowojski <[hidden email]>
>> wrote:
>>
>>> Hi,
>>>
>>> I think there is no generic way. If this error has happened indeed after
>>> starting a second job from the same savepoint, or something like that, user
>>> can change Sink's operator UID.
>>>
>>> If this is an issue of intentional recovery from an earlier
>>> checkpoint/savepoint, maybe `FlinkKafkaProducer#setLogFailuresOnly` will be
>>> helpful.
>>>
>>> Best, Piotrek
>>>
>>> wt., 1 cze 2021 o 15:16 Till Rohrmann <[hidden email]> napisał(a):
>>>
>>>> The error message says that we are trying to reuse a transaction id
>>>> that is
>>>> currently being used or has expired.
>>>>
>>>> I am not 100% sure how this can happen. My suspicion is that you have
>>>> resumed a job multiple times from the same savepoint. Have you checked
>>>> that
>>>> there is no other job which has been resumed from the same savepoint and
>>>> which is currently running or has run and completed checkpoints?
>>>>
>>>> @pnowojski <[hidden email]> @Becket Qin <[hidden email]>
>>>> how
>>>> does the transaction id generation ensures that we don't have a clash of
>>>> transaction ids if we resume the same job multiple times from the same
>>>> savepoint? From the code, I do see that we have a
>>>> TransactionalIdsGenerator
>>>> which is initialized with the taskName and the operator UID.
>>>>
>>>> fyi: @Arvid Heise <[hidden email]>
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>>
>>>> On Mon, May 31, 2021 at 11:10 AM 周瑞 <[hidden email]> wrote:
>>>>
>>>> > HI:
>>>> >       When "sink.semantic = exactly-once", the following exception is
>>>> > thrown when recovering from svaepoint
>>>> >
>>>> >        public static final String KAFKA_TABLE_FORMAT =
>>>> >             "CREATE TABLE "+TABLE_NAME+" (\n" +
>>>> >                     "  "+COLUMN_NAME+" STRING\n" +
>>>> >                     ") WITH (\n" +
>>>> >                     "   'connector' = 'kafka',\n" +
>>>> >                     "   'topic' = '%s',\n" +
>>>> >                     "   'properties.bootstrap.servers' = '%s',\n" +
>>>> >                     "   'sink.semantic' = 'exactly-once',\n" +
>>>> >                     "   'properties.transaction.timeout.ms' =
>>>> > '900000',\n" +
>>>> >                     "   'sink.partitioner' =
>>>> > 'com.woqutench.qmatrix.cdc.extractor.PkPartitioner',\n" +
>>>> >                     "   'format' = 'dbz-json'\n" +
>>>> >                     ")\n";
>>>> >   [] - Source: TableSourceScan(table=[[default_catalog,
>>>> default_database,
>>>> > debezium_source]], fields=[data]) -> Sink: Sink
>>>> > (table=[default_catalog.default_database.KafkaTable], fields=[data])
>>>> (1/1
>>>> > )#859 (075273be72ab01bf1afd3c066876aaa6) switched from INITIALIZING to
>>>> > FAILED with failure cause: org.apache.kafka.common.KafkaException:
>>>> > Unexpected error in InitProducerIdResponse; Producer attempted an
>>>> > operation with an old epoch. Either there is a newer producer with the
>>>> > same transactionalId, or the producer's transaction has been expired
>>>> by
>>>> > the broker.
>>>> >     at org.apache.kafka.clients.producer.internals.
>>>> >
>>>> TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager
>>>> > .java:1352)
>>>> >     at org.apache.kafka.clients.producer.internals.
>>>> >
>>>> TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:
>>>> > 1260)
>>>> >     at
>>>> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse
>>>> > .java:109)
>>>> >     at org.apache.kafka.clients.NetworkClient.completeResponses(
>>>> > NetworkClient.java:572)
>>>> >     at
>>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
>>>> >     at org.apache.kafka.clients.producer.internals.Sender
>>>> > .maybeSendAndPollTransactionalRequest(Sender.java:414)
>>>> >     at
>>>> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender
>>>> > .java:312)
>>>> >     at
>>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:
>>>> > 239)
>>>> >     at java.lang.Thread.run(Thread.java:748)
>>>> >
>>>>
>>>
Reply | Threaded
Open this post in threaded view
|

Re: recover from svaepoint

Till Rohrmann
Thanks for this insight. So the problem might be Flink using an internal
Kafka API (the connector uses reflection to get hold of the
TransactionManager) which changed between version 2.4.1 and 2.5. I think
this is a serious problem because it breaks our end-to-end exactly once
story when using new Kafka versions.

Cheers,
Till

On Thu, Jun 3, 2021 at 10:17 AM Tianxin Zhao <[hidden email]> wrote:

> I encountered the exact same issue before when experimenting in a testing
> environment. I was not able to spot the bug as mentioned in this thread,
> the solution I did was to downgrade my own kafka-client version from 2.5 to
> 2.4.1, matching the version of flink-connector-kafka.
> In 2.4.1 Kafka, TransactionManager is initializing producerIdAndEpoch using
>
> this.producerIdAndEpoch = new ProducerIdAndEpoch(NO_PRODUCER_ID,
>> NO_PRODUCER_EPOCH);
>
>
> instead of
>
>> this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
>
>
> On Thu, Jun 3, 2021 at 12:11 AM Till Rohrmann <[hidden email]>
> wrote:
>
>> Thanks for the update. Skimming over the code it looks indeed that we are
>> overwriting the values of the static value ProducerIdAndEpoch.NONE. I am
>> not 100% how this will cause the observed problem, though. I am also not a
>> Flink Kafka connector and Kafka expert so I would appreciate it if someone
>> more familiar could double check this part of the code.
>>
>> Concerning the required changing of the UID of an operator Piotr, is this
>> a known issue and documented somewhere? I find this rather surprising from
>> a user's point of view.
>>
>> Cheers,
>> Till
>>
>> On Thu, Jun 3, 2021 at 8:53 AM Till Rohrmann <[hidden email]>
>> wrote:
>>
>>> Forwarding 周瑞's message to a duplicate thread:
>>>
>>> After our analysis, we found a bug in the
>>> org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.resumeTransaction
>>> method
>>> The analysis process is as follows:
>>>
>>>
>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction
>>> public void initializeState(FunctionInitializationContext context)
>>> throws Exception {
>>>     state =
>>> context.getOperatorStateStore().getListState(stateDescriptor);
>>>     boolean recoveredUserContext = false;
>>>     if (context.isRestored()) {
>>>         LOG.info("{} - restoring state", name());
>>>         for (State<TXN, CONTEXT> operatorState : state.get()) {
>>>             userContext = operatorState.getContext();
>>>             List<TransactionHolder<TXN>> recoveredTransactions =
>>>                     operatorState.getPendingCommitTransactions();
>>>             List<TXN> handledTransactions = new
>>> ArrayList<>(recoveredTransactions.size() + 1);
>>>             for (TransactionHolder<TXN> recoveredTransaction :
>>> recoveredTransactions) {
>>>                 // If this fails to succeed eventually, there is
>>> actually data loss
>>>                 recoverAndCommitInternal(recoveredTransaction);
>>>                 handledTransactions.add(recoveredTransaction.handle);
>>>                 LOG.info("{} committed recovered transaction {}",
>>> name(), recoveredTransaction);
>>>             }
>>>
>>>             {
>>>                 TXN transaction =
>>> operatorState.getPendingTransaction().handle;
>>>                 recoverAndAbort(transaction);
>>>                 handledTransactions.add(transaction);
>>>                 LOG.info(
>>>                         "{} aborted recovered transaction {}",
>>>                         name(),
>>>                         operatorState.getPendingTransaction());
>>>             }
>>>
>>>             if (userContext.isPresent()) {
>>>                 finishRecoveringContext(handledTransactions);
>>>                 recoveredUserContext = true;
>>>             }
>>>         }
>>>     }
>>>
>>> (1)recoverAndCommitInternal(recoveredTransaction);
>>> The previous transactionalid, producerId and epoch in the state are used
>>> to commit the transaction,However, we find that the producerIdAndEpoch of
>>> transactionManager is a static constant (ProducerIdAndEpoch.NONE), which
>>> pollutes the static constant ProducerIdAndEpoch.NONE
>>>
>>> @Override
>>> protected void recoverAndCommit(FlinkKafkaProducer.KafkaTransactionState
>>> transaction) {
>>>     if (transaction.isTransactional()) {
>>>         FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
>>>         try {
>>>             producer =
>>> initTransactionalProducer(transaction.transactionalId, false);
>>>             producer.resumeTransaction(transaction.producerId,
>>> transaction.epoch);
>>>             producer.commitTransaction();
>>>         } catch (InvalidTxnStateException | ProducerFencedException ex) {
>>>             // That means we have committed this transaction before.
>>>             LOG.warn(
>>>                     "Encountered error {} while recovering transaction
>>> {}. "
>>>                             + "Presumably this transaction has been
>>> already committed before",
>>>                     ex,
>>>                     transaction);
>>>         } finally {
>>>             if (producer != null) {
>>>                 producer.close(0, TimeUnit.SECONDS);
>>>             }
>>>         }
>>>     }
>>> }
>>>
>>> public void resumeTransaction(long producerId, short epoch) {
>>>     synchronized (producerClosingLock) {
>>>         ensureNotClosed();
>>>         Preconditions.checkState(
>>>                 producerId >= 0 && epoch >= 0,
>>>                 "Incorrect values for producerId %s and epoch %s",
>>>                 producerId,
>>>                 epoch);
>>>         LOG.info(
>>>                 "Attempting to resume transaction {} with producerId {}
>>> and epoch {}",
>>>                 transactionalId,
>>>                 producerId,
>>>                 epoch);
>>>
>>>         Object transactionManager = getField(kafkaProducer,
>>> "transactionManager");
>>>         synchronized (transactionManager) {
>>>             Object topicPartitionBookkeeper =
>>>                     getField(transactionManager,
>>> "topicPartitionBookkeeper");
>>>
>>>             invoke(
>>>                     transactionManager,
>>>                     "transitionTo",
>>>                     getEnum(
>>>
>>> "org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
>>>             invoke(topicPartitionBookkeeper, "reset");
>>>
>>>             Object producerIdAndEpoch = getField(transactionManager,
>>> "producerIdAndEpoch");
>>>             setField(producerIdAndEpoch, "producerId", producerId);
>>>             setField(producerIdAndEpoch, "epoch", epoch);
>>>
>>>             invoke(
>>>                     transactionManager,
>>>                     "transitionTo",
>>>                     getEnum(
>>>
>>> "org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
>>>
>>>             invoke(
>>>                     transactionManager,
>>>                     "transitionTo",
>>>                     getEnum(
>>>
>>> "org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
>>>             setField(transactionManager, "transactionStarted", true);
>>>         }
>>>     }
>>> }
>>>
>>>
>>> public TransactionManager(LogContext logContext,
>>>                           String transactionalId,
>>>                           int transactionTimeoutMs,
>>>                           long retryBackoffMs,
>>>                           ApiVersions apiVersions) {
>>>     this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
>>>     this.transactionalId = transactionalId;
>>>     this.log = logContext.logger(TransactionManager.class);
>>>     this.transactionTimeoutMs = transactionTimeoutMs;
>>>     this.transactionCoordinator = null;
>>>     this.consumerGroupCoordinator = null;
>>>     this.newPartitionsInTransaction = new HashSet<>();
>>>     this.pendingPartitionsInTransaction = new HashSet<>();
>>>     this.partitionsInTransaction = new HashSet<>();
>>>     this.pendingRequests = new PriorityQueue<>(10,
>>> Comparator.comparingInt(o -> o.priority().priority));
>>>     this.pendingTxnOffsetCommits = new HashMap<>();
>>>     this.partitionsWithUnresolvedSequences = new HashMap<>();
>>>     this.partitionsToRewriteSequences = new HashSet<>();
>>>     this.retryBackoffMs = retryBackoffMs;
>>>     this.topicPartitionBookkeeper = new TopicPartitionBookkeeper();
>>>     this.apiVersions = apiVersions;
>>> }
>>>
>>>
>>>
>>> public class ProducerIdAndEpoch {
>>>     public static final ProducerIdAndEpoch NONE = new
>>> ProducerIdAndEpoch(RecordBatch.NO_PRODUCER_ID,
>>> RecordBatch.NO_PRODUCER_EPOCH);
>>>
>>>     public final long producerId;
>>>     public final short epoch;
>>>
>>>     public ProducerIdAndEpoch(long producerId, short epoch) {
>>>         this.producerId = producerId;
>>>         this.epoch = epoch;
>>>     }
>>>
>>>     public boolean isValid() {
>>>         return RecordBatch.NO_PRODUCER_ID < producerId;
>>>     }
>>>
>>>     @Override
>>>     public String toString() {
>>>         return "(producerId=" + producerId + ", epoch=" + epoch + ")";
>>>     }
>>>
>>>     @Override
>>>     public boolean equals(Object o) {
>>>         if (this == o) return true;
>>>         if (o == null || getClass() != o.getClass()) return false;
>>>
>>>         ProducerIdAndEpoch that = (ProducerIdAndEpoch) o;
>>>
>>>         if (producerId != that.producerId) return false;
>>>         return epoch == that.epoch;
>>>     }
>>>
>>>     @Override
>>>     public int hashCode() {
>>>         int result = (int) (producerId ^ (producerId >>> 32));
>>>         result = 31 * result + (int) epoch;
>>>         return result;
>>>     }
>>>
>>> }
>>>
>>> (2)In the second step,
>>> recoverAndAbort(FlinkKafkaProducer.KafkaTransactionState transaction), when
>>> initializing the transaction, producerId and epoch in the first step
>>> pollute ProducerIdAndEpoch.NONE. Therefore, when an initialization request
>>> is sent to Kafka, the values of the producerId and epoch  variables in the
>>> request parameter ProducerIdAndEpoch.NONE are equal to the values of the
>>> producerId and epoch  variables in the first transaction commit, not equal
>>> to - 1, - 1. So Kafka throws an exception:
>>> Unexpected error in InitProducerIdResponse; Producer attempted an
>>> operation with an old epoch. Either there is a newer producer with the same
>>> transactionalId, or the producer's transaction has been expired by the
>>> broker.
>>>     at
>>> org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352)
>>>     at
>>> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260)
>>>     at
>>> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>>>     at
>>> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572)
>>>     at
>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
>>>     at
>>> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414)
>>>     at
>>> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312)
>>>     at
>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
>>>     at java.lang.Thread.run(Thread.java:748)
>>>
>>> protected void recoverAndAbort(FlinkKafkaProducer.KafkaTransactionState
>>> transaction) {
>>>     if (transaction.isTransactional()) {
>>>         FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
>>>         try {
>>>             producer =
>>> initTransactionalProducer(transaction.transactionalId, false);
>>>             producer.initTransactions();
>>>         } finally {
>>>             if (producer != null) {
>>>                 producer.close(0, TimeUnit.SECONDS);
>>>             }
>>>         }
>>>     }
>>> }
>>>
>>> public synchronized TransactionalRequestResult initializeTransactions() {
>>>     return initializeTransactions(ProducerIdAndEpoch.NONE);
>>> }
>>>
>>> synchronized TransactionalRequestResult
>>> initializeTransactions(ProducerIdAndEpoch producerIdAndEpoch) {
>>>     boolean isEpochBump = producerIdAndEpoch != ProducerIdAndEpoch.NONE;
>>>     return handleCachedTransactionRequestResult(() -> {
>>>         // If this is an epoch bump, we will transition the state as
>>> part of handling the EndTxnRequest
>>>         if (!isEpochBump) {
>>>             transitionTo(State.INITIALIZING);
>>>             log.info("Invoking InitProducerId for the first time in
>>> order to acquire a producer ID");
>>>         } else {
>>>             log.info("Invoking InitProducerId with current producer ID
>>> and epoch {} in order to bump the epoch", producerIdAndEpoch);
>>>         }
>>>         InitProducerIdRequestData requestData = new
>>> InitProducerIdRequestData()
>>>                 .setTransactionalId(transactionalId)
>>>                 .setTransactionTimeoutMs(transactionTimeoutMs)
>>>                 .setProducerId(producerIdAndEpoch.producerId)
>>>                 .setProducerEpoch(producerIdAndEpoch.epoch);
>>>         InitProducerIdHandler handler = new InitProducerIdHandler(new
>>> InitProducerIdRequest.Builder(requestData),
>>>                 isEpochBump);
>>>         enqueueRequest(handler);
>>>         return handler.result;
>>>     }, State.INITIALIZING);
>>> }
>>>
>>> On Wed, Jun 2, 2021 at 3:55 PM Piotr Nowojski <[hidden email]>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I think there is no generic way. If this error has happened indeed
>>>> after starting a second job from the same savepoint, or something like
>>>> that, user can change Sink's operator UID.
>>>>
>>>> If this is an issue of intentional recovery from an earlier
>>>> checkpoint/savepoint, maybe `FlinkKafkaProducer#setLogFailuresOnly` will be
>>>> helpful.
>>>>
>>>> Best, Piotrek
>>>>
>>>> wt., 1 cze 2021 o 15:16 Till Rohrmann <[hidden email]>
>>>> napisał(a):
>>>>
>>>>> The error message says that we are trying to reuse a transaction id
>>>>> that is
>>>>> currently being used or has expired.
>>>>>
>>>>> I am not 100% sure how this can happen. My suspicion is that you have
>>>>> resumed a job multiple times from the same savepoint. Have you checked
>>>>> that
>>>>> there is no other job which has been resumed from the same savepoint
>>>>> and
>>>>> which is currently running or has run and completed checkpoints?
>>>>>
>>>>> @pnowojski <[hidden email]> @Becket Qin <[hidden email]>
>>>>> how
>>>>> does the transaction id generation ensures that we don't have a clash
>>>>> of
>>>>> transaction ids if we resume the same job multiple times from the same
>>>>> savepoint? From the code, I do see that we have a
>>>>> TransactionalIdsGenerator
>>>>> which is initialized with the taskName and the operator UID.
>>>>>
>>>>> fyi: @Arvid Heise <[hidden email]>
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>>
>>>>> On Mon, May 31, 2021 at 11:10 AM 周瑞 <[hidden email]> wrote:
>>>>>
>>>>> > HI:
>>>>> >       When "sink.semantic = exactly-once", the following exception is
>>>>> > thrown when recovering from svaepoint
>>>>> >
>>>>> >        public static final String KAFKA_TABLE_FORMAT =
>>>>> >             "CREATE TABLE "+TABLE_NAME+" (\n" +
>>>>> >                     "  "+COLUMN_NAME+" STRING\n" +
>>>>> >                     ") WITH (\n" +
>>>>> >                     "   'connector' = 'kafka',\n" +
>>>>> >                     "   'topic' = '%s',\n" +
>>>>> >                     "   'properties.bootstrap.servers' = '%s',\n" +
>>>>> >                     "   'sink.semantic' = 'exactly-once',\n" +
>>>>> >                     "   'properties.transaction.timeout.ms' =
>>>>> > '900000',\n" +
>>>>> >                     "   'sink.partitioner' =
>>>>> > 'com.woqutench.qmatrix.cdc.extractor.PkPartitioner',\n" +
>>>>> >                     "   'format' = 'dbz-json'\n" +
>>>>> >                     ")\n";
>>>>> >   [] - Source: TableSourceScan(table=[[default_catalog,
>>>>> default_database,
>>>>> > debezium_source]], fields=[data]) -> Sink: Sink
>>>>> > (table=[default_catalog.default_database.KafkaTable], fields=[data])
>>>>> (1/1
>>>>> > )#859 (075273be72ab01bf1afd3c066876aaa6) switched from INITIALIZING
>>>>> to
>>>>> > FAILED with failure cause: org.apache.kafka.common.KafkaException:
>>>>> > Unexpected error in InitProducerIdResponse; Producer attempted an
>>>>> > operation with an old epoch. Either there is a newer producer with
>>>>> the
>>>>> > same transactionalId, or the producer's transaction has been expired
>>>>> by
>>>>> > the broker.
>>>>> >     at org.apache.kafka.clients.producer.internals.
>>>>> >
>>>>> TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager
>>>>> > .java:1352)
>>>>> >     at org.apache.kafka.clients.producer.internals.
>>>>> >
>>>>> TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:
>>>>> > 1260)
>>>>> >     at
>>>>> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse
>>>>> > .java:109)
>>>>> >     at org.apache.kafka.clients.NetworkClient.completeResponses(
>>>>> > NetworkClient.java:572)
>>>>> >     at
>>>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
>>>>> >     at org.apache.kafka.clients.producer.internals.Sender
>>>>> > .maybeSendAndPollTransactionalRequest(Sender.java:414)
>>>>> >     at
>>>>> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender
>>>>> > .java:312)
>>>>> >     at
>>>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:
>>>>> > 239)
>>>>> >     at java.lang.Thread.run(Thread.java:748)
>>>>> >
>>>>>
>>>>
Reply | Threaded
Open this post in threaded view
|

Re: recover from svaepoint

Piotr Nowojski-5
Hi,

Thanks Tianxin and 周瑞' for reporting and tracking down the problem. Indeed
that could be the reason behind it. Have either of you already created a
JIRA ticket for this bug?

> Concerning the required changing of the UID of an operator Piotr, is this
a known issue and documented somewhere? I find this rather surprising from
a user's point of view.

You don't need to change UID, you just need to make sure that the
transactional.ids are unique, either via UID or the task name.

It's not documented, and spawning a new job from an old one, while keeping
both of them running is not officially supported. In order to officially
support this scenario, Flink would need to have a better support of
stop-with-savepoint (commit pending transactions, without starting new
transactions for new records - that would require us to extend
PublicAPI). `setLogFailuresOnly` has a purpose of recovering from some
critical/fatal otherwise failures, for example if transactions have timed
out. It's a lucky conicindence that it can be leveraged in this scenario..

Best,
Piotrek

czw., 3 cze 2021 o 11:43 Till Rohrmann <[hidden email]> napisał(a):

> Thanks for this insight. So the problem might be Flink using an internal
> Kafka API (the connector uses reflection to get hold of the
> TransactionManager) which changed between version 2.4.1 and 2.5. I think
> this is a serious problem because it breaks our end-to-end exactly once
> story when using new Kafka versions.
>
> Cheers,
> Till
>
> On Thu, Jun 3, 2021 at 10:17 AM Tianxin Zhao <[hidden email]> wrote:
>
>> I encountered the exact same issue before when experimenting in a testing
>> environment. I was not able to spot the bug as mentioned in this thread,
>> the solution I did was to downgrade my own kafka-client version from 2.5 to
>> 2.4.1, matching the version of flink-connector-kafka.
>> In 2.4.1 Kafka, TransactionManager is initializing producerIdAndEpoch
>> using
>>
>> this.producerIdAndEpoch = new ProducerIdAndEpoch(NO_PRODUCER_ID,
>>> NO_PRODUCER_EPOCH);
>>
>>
>> instead of
>>
>>> this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
>>
>>
>> On Thu, Jun 3, 2021 at 12:11 AM Till Rohrmann <[hidden email]>
>> wrote:
>>
>>> Thanks for the update. Skimming over the code it looks indeed that we
>>> are overwriting the values of the static value ProducerIdAndEpoch.NONE. I
>>> am not 100% how this will cause the observed problem, though. I am also not
>>> a Flink Kafka connector and Kafka expert so I would appreciate it if
>>> someone more familiar could double check this part of the code.
>>>
>>> Concerning the required changing of the UID of an operator Piotr, is
>>> this a known issue and documented somewhere? I find this rather surprising
>>> from a user's point of view.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Jun 3, 2021 at 8:53 AM Till Rohrmann <[hidden email]>
>>> wrote:
>>>
>>>> Forwarding 周瑞's message to a duplicate thread:
>>>>
>>>> After our analysis, we found a bug in the
>>>> org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.resumeTransaction
>>>> method
>>>> The analysis process is as follows:
>>>>
>>>>
>>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction
>>>> public void initializeState(FunctionInitializationContext context)
>>>> throws Exception {
>>>>     state =
>>>> context.getOperatorStateStore().getListState(stateDescriptor);
>>>>     boolean recoveredUserContext = false;
>>>>     if (context.isRestored()) {
>>>>         LOG.info("{} - restoring state", name());
>>>>         for (State<TXN, CONTEXT> operatorState : state.get()) {
>>>>             userContext = operatorState.getContext();
>>>>             List<TransactionHolder<TXN>> recoveredTransactions =
>>>>                     operatorState.getPendingCommitTransactions();
>>>>             List<TXN> handledTransactions = new
>>>> ArrayList<>(recoveredTransactions.size() + 1);
>>>>             for (TransactionHolder<TXN> recoveredTransaction :
>>>> recoveredTransactions) {
>>>>                 // If this fails to succeed eventually, there is
>>>> actually data loss
>>>>                 recoverAndCommitInternal(recoveredTransaction);
>>>>                 handledTransactions.add(recoveredTransaction.handle);
>>>>                 LOG.info("{} committed recovered transaction {}",
>>>> name(), recoveredTransaction);
>>>>             }
>>>>
>>>>             {
>>>>                 TXN transaction =
>>>> operatorState.getPendingTransaction().handle;
>>>>                 recoverAndAbort(transaction);
>>>>                 handledTransactions.add(transaction);
>>>>                 LOG.info(
>>>>                         "{} aborted recovered transaction {}",
>>>>                         name(),
>>>>                         operatorState.getPendingTransaction());
>>>>             }
>>>>
>>>>             if (userContext.isPresent()) {
>>>>                 finishRecoveringContext(handledTransactions);
>>>>                 recoveredUserContext = true;
>>>>             }
>>>>         }
>>>>     }
>>>>
>>>> (1)recoverAndCommitInternal(recoveredTransaction);
>>>> The previous transactionalid, producerId and epoch in the state are
>>>> used to commit the transaction,However, we find that the producerIdAndEpoch
>>>> of transactionManager is a static constant (ProducerIdAndEpoch.NONE), which
>>>> pollutes the static constant ProducerIdAndEpoch.NONE
>>>>
>>>> @Override
>>>> protected void
>>>> recoverAndCommit(FlinkKafkaProducer.KafkaTransactionState transaction) {
>>>>     if (transaction.isTransactional()) {
>>>>         FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
>>>>         try {
>>>>             producer =
>>>> initTransactionalProducer(transaction.transactionalId, false);
>>>>             producer.resumeTransaction(transaction.producerId,
>>>> transaction.epoch);
>>>>             producer.commitTransaction();
>>>>         } catch (InvalidTxnStateException | ProducerFencedException ex)
>>>> {
>>>>             // That means we have committed this transaction before.
>>>>             LOG.warn(
>>>>                     "Encountered error {} while recovering transaction
>>>> {}. "
>>>>                             + "Presumably this transaction has been
>>>> already committed before",
>>>>                     ex,
>>>>                     transaction);
>>>>         } finally {
>>>>             if (producer != null) {
>>>>                 producer.close(0, TimeUnit.SECONDS);
>>>>             }
>>>>         }
>>>>     }
>>>> }
>>>>
>>>> public void resumeTransaction(long producerId, short epoch) {
>>>>     synchronized (producerClosingLock) {
>>>>         ensureNotClosed();
>>>>         Preconditions.checkState(
>>>>                 producerId >= 0 && epoch >= 0,
>>>>                 "Incorrect values for producerId %s and epoch %s",
>>>>                 producerId,
>>>>                 epoch);
>>>>         LOG.info(
>>>>                 "Attempting to resume transaction {} with producerId {}
>>>> and epoch {}",
>>>>                 transactionalId,
>>>>                 producerId,
>>>>                 epoch);
>>>>
>>>>         Object transactionManager = getField(kafkaProducer,
>>>> "transactionManager");
>>>>         synchronized (transactionManager) {
>>>>             Object topicPartitionBookkeeper =
>>>>                     getField(transactionManager,
>>>> "topicPartitionBookkeeper");
>>>>
>>>>             invoke(
>>>>                     transactionManager,
>>>>                     "transitionTo",
>>>>                     getEnum(
>>>>
>>>> "org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
>>>>             invoke(topicPartitionBookkeeper, "reset");
>>>>
>>>>             Object producerIdAndEpoch = getField(transactionManager,
>>>> "producerIdAndEpoch");
>>>>             setField(producerIdAndEpoch, "producerId", producerId);
>>>>             setField(producerIdAndEpoch, "epoch", epoch);
>>>>
>>>>             invoke(
>>>>                     transactionManager,
>>>>                     "transitionTo",
>>>>                     getEnum(
>>>>
>>>> "org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
>>>>
>>>>             invoke(
>>>>                     transactionManager,
>>>>                     "transitionTo",
>>>>                     getEnum(
>>>>
>>>> "org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
>>>>             setField(transactionManager, "transactionStarted", true);
>>>>         }
>>>>     }
>>>> }
>>>>
>>>>
>>>> public TransactionManager(LogContext logContext,
>>>>                           String transactionalId,
>>>>                           int transactionTimeoutMs,
>>>>                           long retryBackoffMs,
>>>>                           ApiVersions apiVersions) {
>>>>     this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
>>>>     this.transactionalId = transactionalId;
>>>>     this.log = logContext.logger(TransactionManager.class);
>>>>     this.transactionTimeoutMs = transactionTimeoutMs;
>>>>     this.transactionCoordinator = null;
>>>>     this.consumerGroupCoordinator = null;
>>>>     this.newPartitionsInTransaction = new HashSet<>();
>>>>     this.pendingPartitionsInTransaction = new HashSet<>();
>>>>     this.partitionsInTransaction = new HashSet<>();
>>>>     this.pendingRequests = new PriorityQueue<>(10,
>>>> Comparator.comparingInt(o -> o.priority().priority));
>>>>     this.pendingTxnOffsetCommits = new HashMap<>();
>>>>     this.partitionsWithUnresolvedSequences = new HashMap<>();
>>>>     this.partitionsToRewriteSequences = new HashSet<>();
>>>>     this.retryBackoffMs = retryBackoffMs;
>>>>     this.topicPartitionBookkeeper = new TopicPartitionBookkeeper();
>>>>     this.apiVersions = apiVersions;
>>>> }
>>>>
>>>>
>>>>
>>>> public class ProducerIdAndEpoch {
>>>>     public static final ProducerIdAndEpoch NONE = new
>>>> ProducerIdAndEpoch(RecordBatch.NO_PRODUCER_ID,
>>>> RecordBatch.NO_PRODUCER_EPOCH);
>>>>
>>>>     public final long producerId;
>>>>     public final short epoch;
>>>>
>>>>     public ProducerIdAndEpoch(long producerId, short epoch) {
>>>>         this.producerId = producerId;
>>>>         this.epoch = epoch;
>>>>     }
>>>>
>>>>     public boolean isValid() {
>>>>         return RecordBatch.NO_PRODUCER_ID < producerId;
>>>>     }
>>>>
>>>>     @Override
>>>>     public String toString() {
>>>>         return "(producerId=" + producerId + ", epoch=" + epoch + ")";
>>>>     }
>>>>
>>>>     @Override
>>>>     public boolean equals(Object o) {
>>>>         if (this == o) return true;
>>>>         if (o == null || getClass() != o.getClass()) return false;
>>>>
>>>>         ProducerIdAndEpoch that = (ProducerIdAndEpoch) o;
>>>>
>>>>         if (producerId != that.producerId) return false;
>>>>         return epoch == that.epoch;
>>>>     }
>>>>
>>>>     @Override
>>>>     public int hashCode() {
>>>>         int result = (int) (producerId ^ (producerId >>> 32));
>>>>         result = 31 * result + (int) epoch;
>>>>         return result;
>>>>     }
>>>>
>>>> }
>>>>
>>>> (2)In the second step,
>>>> recoverAndAbort(FlinkKafkaProducer.KafkaTransactionState transaction), when
>>>> initializing the transaction, producerId and epoch in the first step
>>>> pollute ProducerIdAndEpoch.NONE. Therefore, when an initialization request
>>>> is sent to Kafka, the values of the producerId and epoch  variables in the
>>>> request parameter ProducerIdAndEpoch.NONE are equal to the values of the
>>>> producerId and epoch  variables in the first transaction commit, not equal
>>>> to - 1, - 1. So Kafka throws an exception:
>>>> Unexpected error in InitProducerIdResponse; Producer attempted an
>>>> operation with an old epoch. Either there is a newer producer with the same
>>>> transactionalId, or the producer's transaction has been expired by the
>>>> broker.
>>>>     at
>>>> org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352)
>>>>     at
>>>> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260)
>>>>     at
>>>> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>>>>     at
>>>> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572)
>>>>     at
>>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
>>>>     at
>>>> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414)
>>>>     at
>>>> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312)
>>>>     at
>>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>
>>>> protected void recoverAndAbort(FlinkKafkaProducer.KafkaTransactionState
>>>> transaction) {
>>>>     if (transaction.isTransactional()) {
>>>>         FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
>>>>         try {
>>>>             producer =
>>>> initTransactionalProducer(transaction.transactionalId, false);
>>>>             producer.initTransactions();
>>>>         } finally {
>>>>             if (producer != null) {
>>>>                 producer.close(0, TimeUnit.SECONDS);
>>>>             }
>>>>         }
>>>>     }
>>>> }
>>>>
>>>> public synchronized TransactionalRequestResult initializeTransactions()
>>>> {
>>>>     return initializeTransactions(ProducerIdAndEpoch.NONE);
>>>> }
>>>>
>>>> synchronized TransactionalRequestResult
>>>> initializeTransactions(ProducerIdAndEpoch producerIdAndEpoch) {
>>>>     boolean isEpochBump = producerIdAndEpoch != ProducerIdAndEpoch.NONE;
>>>>     return handleCachedTransactionRequestResult(() -> {
>>>>         // If this is an epoch bump, we will transition the state as
>>>> part of handling the EndTxnRequest
>>>>         if (!isEpochBump) {
>>>>             transitionTo(State.INITIALIZING);
>>>>             log.info("Invoking InitProducerId for the first time in
>>>> order to acquire a producer ID");
>>>>         } else {
>>>>             log.info("Invoking InitProducerId with current producer ID
>>>> and epoch {} in order to bump the epoch", producerIdAndEpoch);
>>>>         }
>>>>         InitProducerIdRequestData requestData = new
>>>> InitProducerIdRequestData()
>>>>                 .setTransactionalId(transactionalId)
>>>>                 .setTransactionTimeoutMs(transactionTimeoutMs)
>>>>                 .setProducerId(producerIdAndEpoch.producerId)
>>>>                 .setProducerEpoch(producerIdAndEpoch.epoch);
>>>>         InitProducerIdHandler handler = new InitProducerIdHandler(new
>>>> InitProducerIdRequest.Builder(requestData),
>>>>                 isEpochBump);
>>>>         enqueueRequest(handler);
>>>>         return handler.result;
>>>>     }, State.INITIALIZING);
>>>> }
>>>>
>>>> On Wed, Jun 2, 2021 at 3:55 PM Piotr Nowojski <[hidden email]>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I think there is no generic way. If this error has happened indeed
>>>>> after starting a second job from the same savepoint, or something like
>>>>> that, user can change Sink's operator UID.
>>>>>
>>>>> If this is an issue of intentional recovery from an earlier
>>>>> checkpoint/savepoint, maybe `FlinkKafkaProducer#setLogFailuresOnly` will be
>>>>> helpful.
>>>>>
>>>>> Best, Piotrek
>>>>>
>>>>> wt., 1 cze 2021 o 15:16 Till Rohrmann <[hidden email]>
>>>>> napisał(a):
>>>>>
>>>>>> The error message says that we are trying to reuse a transaction id
>>>>>> that is
>>>>>> currently being used or has expired.
>>>>>>
>>>>>> I am not 100% sure how this can happen. My suspicion is that you have
>>>>>> resumed a job multiple times from the same savepoint. Have you
>>>>>> checked that
>>>>>> there is no other job which has been resumed from the same savepoint
>>>>>> and
>>>>>> which is currently running or has run and completed checkpoints?
>>>>>>
>>>>>> @pnowojski <[hidden email]> @Becket Qin <[hidden email]>
>>>>>> how
>>>>>> does the transaction id generation ensures that we don't have a clash
>>>>>> of
>>>>>> transaction ids if we resume the same job multiple times from the same
>>>>>> savepoint? From the code, I do see that we have a
>>>>>> TransactionalIdsGenerator
>>>>>> which is initialized with the taskName and the operator UID.
>>>>>>
>>>>>> fyi: @Arvid Heise <[hidden email]>
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>>
>>>>>> On Mon, May 31, 2021 at 11:10 AM 周瑞 <[hidden email]> wrote:
>>>>>>
>>>>>> > HI:
>>>>>> >       When "sink.semantic = exactly-once", the following exception
>>>>>> is
>>>>>> > thrown when recovering from svaepoint
>>>>>> >
>>>>>> >        public static final String KAFKA_TABLE_FORMAT =
>>>>>> >             "CREATE TABLE "+TABLE_NAME+" (\n" +
>>>>>> >                     "  "+COLUMN_NAME+" STRING\n" +
>>>>>> >                     ") WITH (\n" +
>>>>>> >                     "   'connector' = 'kafka',\n" +
>>>>>> >                     "   'topic' = '%s',\n" +
>>>>>> >                     "   'properties.bootstrap.servers' = '%s',\n" +
>>>>>> >                     "   'sink.semantic' = 'exactly-once',\n" +
>>>>>> >                     "   'properties.transaction.timeout.ms' =
>>>>>> > '900000',\n" +
>>>>>> >                     "   'sink.partitioner' =
>>>>>> > 'com.woqutench.qmatrix.cdc.extractor.PkPartitioner',\n" +
>>>>>> >                     "   'format' = 'dbz-json'\n" +
>>>>>> >                     ")\n";
>>>>>> >   [] - Source: TableSourceScan(table=[[default_catalog,
>>>>>> default_database,
>>>>>> > debezium_source]], fields=[data]) -> Sink: Sink
>>>>>> > (table=[default_catalog.default_database.KafkaTable],
>>>>>> fields=[data]) (1/1
>>>>>> > )#859 (075273be72ab01bf1afd3c066876aaa6) switched from INITIALIZING
>>>>>> to
>>>>>> > FAILED with failure cause: org.apache.kafka.common.KafkaException:
>>>>>> > Unexpected error in InitProducerIdResponse; Producer attempted an
>>>>>> > operation with an old epoch. Either there is a newer producer with
>>>>>> the
>>>>>> > same transactionalId, or the producer's transaction has been
>>>>>> expired by
>>>>>> > the broker.
>>>>>> >     at org.apache.kafka.clients.producer.internals.
>>>>>> >
>>>>>> TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager
>>>>>> > .java:1352)
>>>>>> >     at org.apache.kafka.clients.producer.internals.
>>>>>> >
>>>>>> TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:
>>>>>> > 1260)
>>>>>> >     at
>>>>>> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse
>>>>>> > .java:109)
>>>>>> >     at org.apache.kafka.clients.NetworkClient.completeResponses(
>>>>>> > NetworkClient.java:572)
>>>>>> >     at
>>>>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
>>>>>> >     at org.apache.kafka.clients.producer.internals.Sender
>>>>>> > .maybeSendAndPollTransactionalRequest(Sender.java:414)
>>>>>> >     at
>>>>>> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender
>>>>>> > .java:312)
>>>>>> >     at
>>>>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:
>>>>>> > 239)
>>>>>> >     at java.lang.Thread.run(Thread.java:748)
>>>>>> >
>>>>>>
>>>>>