Wei Hao created FLINK-21743:
------------------------------- Summary: JdbcXaSinkFunction throws XAER_RMFAIL when calling snapshotState and beginTx Key: FLINK-21743 URL: https://issues.apache.org/jira/browse/FLINK-21743 Project: Flink Issue Type: Test Components: Connectors / JDBC Affects Versions: 1.13.0 Environment: org.apache.flink:flink-streaming-java_2.11:1.12.1 org.apache.flink:flink-connector-jdbc_2.11:1.13-SNAPSHOT Reporter: Wei Hao {code:java} public void snapshotState(FunctionSnapshotContext context) throws Exception { LOG.debug("snapshot state, checkpointId={}", context.getCheckpointId()); this.rollbackPreparedFromCheckpoint(context.getCheckpointId()); this.prepareCurrentTx(context.getCheckpointId()); this.beginTx(context.getCheckpointId() + 1L); this.stateHandler.store(JdbcXaSinkFunctionState.of(this.preparedXids, this.hangingXids)); } {code} When checkpointing starts, it calls snapshotState(), which ends and prepares the current transaction. The issue I found is with beginTx(), where a new Xid is generated and xaFacade will run command like 'xa start new_xid', which will throw the exception as shown below and causes checkpointing failure. {code:java} Caused by: org.apache.flink.connector.jdbc.xa.XaFacade$TransientXaException: com.mysql.cj.jdbc.MysqlXAException: XAER_RMFAIL: The command cannot be executed when global transaction is in the PREPARED stateCaused by: org.apache.flink.connector.jdbc.xa.XaFacade$TransientXaException: com.mysql.cj.jdbc.MysqlXAException: XAER_RMFAIL: The command cannot be executed when global transaction is in the PREPARED state at org.apache.flink.connector.jdbc.xa.XaFacadeImpl.wrapException(XaFacadeImpl.java:353) at org.apache.flink.connector.jdbc.xa.XaFacadeImpl.access$800(XaFacadeImpl.java:66) at org.apache.flink.connector.jdbc.xa.XaFacadeImpl$Command.lambda$fromRunnable$0(XaFacadeImpl.java:288) at org.apache.flink.connector.jdbc.xa.XaFacadeImpl$Command.lambda$fromRunnable$4(XaFacadeImpl.java:327) at org.apache.flink.connector.jdbc.xa.XaFacadeImpl.execute(XaFacadeImpl.java:267) at org.apache.flink.connector.jdbc.xa.XaFacadeImpl.start(XaFacadeImpl.java:160) at org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunction.beginTx(JdbcXaSinkFunction.java:302) at org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunction.snapshotState(JdbcXaSinkFunction.java:241) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:205) ... 23 more {code} I think the scenario is quite predictable because it is how xa transaction works. The MySQL shell example below behaves quite similar to how JdbcXaSinkFunction works. {code:java} xa start "1111"; # Inserting some rows # end the current transaction xa end "1111"; xa prepare "1111"; # start a new transaction with the same connection while the previous one is PREPARED xa prepare "2222"; {code} This also produces error 'SQL Error [1399] [XAE07]: XAER_RMFAIL: The command cannot be executed when global transaction is in the PREPARED state'. -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |