[jira] [Created] (FLINK-16708) When a JDBC connection has been closed, the retry policy of the JDBCUpsertOutputFormat cannot take effect and may result in data loss

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-16708) When a JDBC connection has been closed, the retry policy of the JDBCUpsertOutputFormat cannot take effect and may result in data loss

Shang Yuanchun (Jira)
tangshangwen created FLINK-16708:
------------------------------------

             Summary: When a JDBC connection has been closed, the retry policy of the JDBCUpsertOutputFormat cannot take effect and may result in data loss
                 Key: FLINK-16708
                 URL: https://issues.apache.org/jira/browse/FLINK-16708
             Project: Flink
          Issue Type: Bug
          Components: Connectors / JDBC
    Affects Versions: 1.10.0
            Reporter: tangshangwen


In our test environment, I used the tcpkill command to simulate a scenario where the postgresql connection was closed. I found that the retry strategy of the flush method did not take effect, and when it retried the second time, it could not recognize that the connection had been closed because Before the first check whether the connection is closed, the batchStatements of PgStatement have been cleared, which causes the second execution to check that the batchStatements are empty and return normally.
{code:java}
2020-03-20 21:16:18.246 [jdbc-upsert-output-format-thread-1] ERROR org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat  - JDBC executeBatch error, retry times = 1
 org.postgresql.util.PSQLException: This connection has been closed.
         at org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857)
         at org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817)
         at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813)
         at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873)
         at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569)
         at org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62)
         at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
         at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124)
         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
         at java.lang.Thread.run(Thread.java:748)
 2020-03-20 21:16:21.247 [jdbc-upsert-output-format-thread-1] ERROR org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat  - JDBC executeBatch error, retry times = 1
 org.postgresql.util.PSQLException: This connection has been closed.
         at org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857)
         at org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817)
         at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813)
         at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873)
         at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569)
         at org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62)
         at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
         at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124)
         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
         at java.lang.Thread.run(Thread.java:748)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)