Echo Lee created FLINK-20641:
-------------------------------- Summary: flink-connector-elasticsearch6 will deadlock Key: FLINK-20641 URL: https://issues.apache.org/jira/browse/FLINK-20641 Project: Flink Issue Type: Bug Components: Connectors / ElasticSearch Affects Versions: 1.11.1 Reporter: Echo Lee Fix For: 1.13.0 flink vision: 1.11.1 elasticsearch connector version: 6.3.1 My job graph is [kafkaSource--> map–>elasticsearchSink], when I set a larger degree of parallelism, stream processing will stop, I know es has an issue [47599|[https://github.com/elastic/elasticsearch/issues/47599],] this is unexpectedly the risk of deadlock when using flink-connector-elasticsearch6. TaskManager stack is: {code:java} "elasticsearch[scheduler][T#1]" Id=15008 BLOCKED on org.elasticsearch.action.bulk.BulkProcessor@61e178a6 owned by "Sink: ProtoTraceLog (39/60)" Id=8781"elasticsearch[scheduler][T#1]" Id=15008 BLOCKED on org.elasticsearch.action.bulk.BulkProcessor@61e178a6 owned by "Sink: ProtoTraceLog (39/60)" Id=8781 at org.elasticsearch.action.bulk.BulkProcessor$Flush.run(BulkProcessor.java:366) - blocked on org.elasticsearch.action.bulk.BulkProcessor@61e178a6 at org.elasticsearch.threadpool.Scheduler$ReschedulingRunnable.doRun(Scheduler.java:182) at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ... Number of locked synchronizers = 1 - java.util.concurrent.ThreadPoolExecutor$Worker@15659a74" Sink: ProtoTraceLog (39/60)" Id=8781 WAITING on java.util.concurrent.CountDownLatch$Sync@58bbbd7c at sun.misc.Unsafe.park(Native Method) - waiting on java.util.concurrent.CountDownLatch$Sync@58bbbd7c at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:86) at org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:339) ... {code} TaskManager log is: {code:java} 2020-12-16 14:36:35,291 ERROR com.hundsun.flink.handler.HsActionRequestFailureHandler [] - Sink to es exception ,exceptionData: index {[full_link_apm_span-2020- 12-16][apm][null], source[n/a, actual length: [5.8kb], max length: 2kb]} ,exceptionStackTrace: java.lang.InterruptedException 68224 at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998) 68225 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) 68226 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) 68227 at org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:86) 68228 at org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:339) 68229 at org.elasticsearch.action.bulk.BulkProcessor.executeIfNeeded(BulkProcessor.java:330) 68230 at org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:288) 68231 at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:271) 68232 at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:267) 68233 at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:253) 68234 at org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6BulkProcessorIndexer.add(Elasticsearch6BulkProcessorIndexer.java:72) 68235 at com.hundsun.flink.util.ElasticSearchSinkUtil$1.process(ElasticSearchSinkUtil.java:59) 68236 at com.hundsun.flink.util.ElasticSearchSinkUtil$1.process(ElasticSearchSinkUtil.java:47) 68237 at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:310) 68238 at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) 68239 at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) 68240 at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) 68241 at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) 68242 at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) 68243 at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) 68244 at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) 68245 at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) 68246 at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) 68247 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) 68248 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 68249 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |