[jira] [Created] (FLINK-21743) JdbcXaSinkFunction throws XAER_RMFAIL when calling snapshotState and beginTx

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-21743) JdbcXaSinkFunction throws XAER_RMFAIL when calling snapshotState and beginTx

Shang Yuanchun (Jira)
Wei Hao created FLINK-21743:
-------------------------------

             Summary: JdbcXaSinkFunction throws XAER_RMFAIL when calling snapshotState and beginTx
                 Key: FLINK-21743
                 URL: https://issues.apache.org/jira/browse/FLINK-21743
             Project: Flink
          Issue Type: Test
          Components: Connectors / JDBC
    Affects Versions: 1.13.0
         Environment: org.apache.flink:flink-streaming-java_2.11:1.12.1
org.apache.flink:flink-connector-jdbc_2.11:1.13-SNAPSHOT
            Reporter: Wei Hao


{code:java}
public void snapshotState(FunctionSnapshotContext context) throws Exception {
    LOG.debug("snapshot state, checkpointId={}", context.getCheckpointId());
    this.rollbackPreparedFromCheckpoint(context.getCheckpointId());
    this.prepareCurrentTx(context.getCheckpointId());
    this.beginTx(context.getCheckpointId() + 1L);
    this.stateHandler.store(JdbcXaSinkFunctionState.of(this.preparedXids, this.hangingXids));
}
{code}
When checkpointing starts, it calls snapshotState(), which ends and prepares the current transaction. The issue I found is with beginTx(), where a new Xid is generated and xaFacade will run command like 'xa start new_xid', which will throw the exception as shown below and causes checkpointing failure.
{code:java}
Caused by: org.apache.flink.connector.jdbc.xa.XaFacade$TransientXaException: com.mysql.cj.jdbc.MysqlXAException: XAER_RMFAIL: The command cannot be executed when global transaction is in the  PREPARED stateCaused by: org.apache.flink.connector.jdbc.xa.XaFacade$TransientXaException: com.mysql.cj.jdbc.MysqlXAException: XAER_RMFAIL: The command cannot be executed when global transaction is in the  PREPARED state at org.apache.flink.connector.jdbc.xa.XaFacadeImpl.wrapException(XaFacadeImpl.java:353) at org.apache.flink.connector.jdbc.xa.XaFacadeImpl.access$800(XaFacadeImpl.java:66) at org.apache.flink.connector.jdbc.xa.XaFacadeImpl$Command.lambda$fromRunnable$0(XaFacadeImpl.java:288) at org.apache.flink.connector.jdbc.xa.XaFacadeImpl$Command.lambda$fromRunnable$4(XaFacadeImpl.java:327) at org.apache.flink.connector.jdbc.xa.XaFacadeImpl.execute(XaFacadeImpl.java:267) at org.apache.flink.connector.jdbc.xa.XaFacadeImpl.start(XaFacadeImpl.java:160) at org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunction.beginTx(JdbcXaSinkFunction.java:302) at org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunction.snapshotState(JdbcXaSinkFunction.java:241) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:205) ... 23 more
{code}
I think the scenario is quite predictable because it is how xa transaction works.
The MySQL shell example below behaves quite similar to how JdbcXaSinkFunction works.
{code:java}
xa start "1111";
# Inserting some rows
# end the current transaction
xa end "1111";
xa prepare "1111";
# start a new transaction with the same connection while the previous one is PREPARED
xa prepare "2222";
{code}
This also produces error 'SQL Error [1399] [XAE07]: XAER_RMFAIL: The command cannot be executed when global transaction is in the PREPARED state'.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)