[jira] [Created] (FLINK-12494) JDBCOutputFormat support reconnect when link failure and flush by timeInterval

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-12494) JDBCOutputFormat support reconnect when link failure and flush by timeInterval

Shang Yuanchun (Jira)
zhaoshijie created FLINK-12494:
----------------------------------

             Summary: JDBCOutputFormat support reconnect when link failure and flush by timeInterval
                 Key: FLINK-12494
                 URL: https://issues.apache.org/jira/browse/FLINK-12494
             Project: Flink
          Issue Type: Bug
          Components: Connectors / JDBC
    Affects Versions: 1.8.0
            Reporter: zhaoshijie


when i JDBCSink(flink-1.4.2) wite recode to mysql,find exception as flow :
{code:java}
java.util.concurrent.ExecutionException: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet successfully received from the server was 265,251 milliseconds ago. The last packet sent successfully to the server was 265,252 milliseconds ago.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.get(StreamRecordQueueEntry.java:68)
at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:129)
... 2 more
Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet successfully received from the server was 265,251 milliseconds ago. The last packet sent successfully to the server was 265,252 milliseconds ago.
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1116)
at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3364)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:1983)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2163)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2624)
at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2127)
at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2293)
at org.apache.flink.table.plan.nodes.datastream.dimension.JDBCDimensionTableFunction.executeQuery(JDBCDimensionTableFunction.scala:199)
at org.apache.flink.table.plan.nodes.datastream.dimension.JDBCDimensionTableFunction.executeQueryAndCombine(JDBCDimensionTableFunction.scala:139)
at org.apache.flink.table.plan.nodes.datastream.dimension.JDBCDimensionTableFunction$$anon$1.apply(JDBCDimensionTableFunction.scala:83)
at org.apache.flink.table.plan.nodes.datastream.dimension.JDBCDimensionTableFunction$$anon$1.apply(JDBCDimensionTableFunction.scala:73)
at org.apache.flink.streaming.api.functions.async.DimensionTableJoinFunction.lambda$asyncInvoke$0(DimensionTableJoinFunction.java:105)
at akka.dispatch.Futures$$anonfun$future$1.apply(Future.scala:97)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
Caused by: java.net.SocketException: Connection reset
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3345)
... 16 more
{code}

i think it is too long not write record by connection(idleConnection),server close connection initiative. sparse data is relatively common in fact, so i think we should reconnect when then connection is invalid。
besides,i find JDBCOutputFormat.flush only call by snapshotState method and "batchCount >= batchInterval",also if ours sink records is sparse, we will find actual write happended by very large time delay,so should we add a flush condition:currentTime- lastFlushTime > timeInterval?

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)