zhaoshijie created FLINK-12494:
---------------------------------- Summary: JDBCOutputFormat support reconnect when link failure and flush by timeInterval Key: FLINK-12494 URL: https://issues.apache.org/jira/browse/FLINK-12494 Project: Flink Issue Type: Bug Components: Connectors / JDBC Affects Versions: 1.8.0 Reporter: zhaoshijie when i JDBCSink(flink-1.4.2) wite recode to mysql,find exception as flow : {code:java} java.util.concurrent.ExecutionException: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure The last packet successfully received from the server was 265,251 milliseconds ago. The last packet sent successfully to the server was 265,252 milliseconds ago. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.get(StreamRecordQueueEntry.java:68) at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:129) ... 2 more Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure The last packet successfully received from the server was 265,251 milliseconds ago. The last packet sent successfully to the server was 265,252 milliseconds ago. at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at com.mysql.jdbc.Util.handleNewInstance(Util.java:411) at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1116) at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3364) at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:1983) at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2163) at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2624) at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2127) at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2293) at org.apache.flink.table.plan.nodes.datastream.dimension.JDBCDimensionTableFunction.executeQuery(JDBCDimensionTableFunction.scala:199) at org.apache.flink.table.plan.nodes.datastream.dimension.JDBCDimensionTableFunction.executeQueryAndCombine(JDBCDimensionTableFunction.scala:139) at org.apache.flink.table.plan.nodes.datastream.dimension.JDBCDimensionTableFunction$$anon$1.apply(JDBCDimensionTableFunction.scala:83) at org.apache.flink.table.plan.nodes.datastream.dimension.JDBCDimensionTableFunction$$anon$1.apply(JDBCDimensionTableFunction.scala:73) at org.apache.flink.streaming.api.functions.async.DimensionTableJoinFunction.lambda$asyncInvoke$0(DimensionTableJoinFunction.java:105) at akka.dispatch.Futures$$anonfun$future$1.apply(Future.scala:97) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ... 1 more Caused by: java.net.SocketException: Connection reset at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113) at java.net.SocketOutputStream.write(SocketOutputStream.java:153) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3345) ... 16 more {code} i think it is too long not write record by connection(idleConnection),server close connection initiative. sparse data is relatively common in fact, so i think we should reconnect when then connection is invalid。 besides,i find JDBCOutputFormat.flush only call by snapshotState method and "batchCount >= batchInterval",also if ours sink records is sparse, we will find actual write happended by very large time delay,so should we add a flush condition:currentTime- lastFlushTime > timeInterval? -- This message was sent by Atlassian JIRA (v7.6.3#76005) |
Free forum by Nabble | Edit this page |