[jira] [Created] (FLINK-14938) Flink elasticsearch failure handler re-add indexrequest causes ConcurrentModificationException

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-14938) Flink elasticsearch failure handler re-add indexrequest causes ConcurrentModificationException

Shang Yuanchun (Jira)
Shengnan YU created FLINK-14938:
-----------------------------------

             Summary: Flink elasticsearch failure handler re-add indexrequest causes ConcurrentModificationException
                 Key: FLINK-14938
                 URL: https://issues.apache.org/jira/browse/FLINK-14938
             Project: Flink
          Issue Type: Bug
          Components: Connectors / ElasticSearch
    Affects Versions: 1.8.1
            Reporter: Shengnan YU


 

When use Elasticsearch connector failure handler (from official example) to re-add documents, Flink encountered ConcurrentModificationException.
{code:java}
input.addSink(new ElasticsearchSink<>(
    config, transportAddresses,
    new ElasticsearchSinkFunction<String>() {...},
    new ActionRequestFailureHandler() {
        @Override
        void onFailure(ActionRequest action,
                Throwable failure,
                int restStatusCode,
                RequestIndexer indexer) throw Throwable {

            if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
                // full queue; re-add document for indexing
                indexer.add(action);
            }
        }
}));
{code}
I found that in method BufferingNoOpRequestIndexer$processBufferedRequests, it will iterator a list of ActionRequest. However the failure handler will keep re-adding request to that list after bulk, which causes ConcurrentModificationException.

I think it should be a multi-thread bug and need to find a thread-safe List to maintain the failure request?

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)