[jira] [Created] (FLINK-12551) elasticsearch6 connector print log error

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-12551) elasticsearch6 connector print log error

Shang Yuanchun (Jira)
xiongkun created FLINK-12551:
--------------------------------

             Summary: elasticsearch6 connector print log error
                 Key: FLINK-12551
                 URL: https://issues.apache.org/jira/browse/FLINK-12551
             Project: Flink
          Issue Type: Bug
          Components: Connectors / ElasticSearch
    Affects Versions: 1.6.3
            Reporter: xiongkun


when i use elasticsearch connector ,when my project is running,i find some data does not insert elasticsearch ,so i want to read log help me ,but the log does contain importance message,so i read source code (org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase),i find a error on write ERROR log.

 
{code:java}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
 if (response.hasFailures()) {
  BulkItemResponse itemResponse;
  Throwable failure;
  RestStatus restStatus;

  try {
   for (int i = 0; i < response.getItems().length; i++) {
    itemResponse = response.getItems()[i];
    failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
    if (failure != null) {
     LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure);

     restStatus = itemResponse.getFailure().getStatus();
     if (restStatus == null) {
      failureHandler.onFailure(request.requests().get(i), failure, -1, requestIndexer);
     } else {
      failureHandler.onFailure(request.requests().get(i), failure, restStatus.getStatus(), requestIndexer);
     }
    }
   }
  } catch (Throwable t) {
   // fail the sink and skip the rest of the items
   // if the failure handler decides to throw an exception
   failureThrowable.compareAndSet(null, t);
  }
 }

 if (flushOnCheckpoint) {
  numPendingRequests.getAndAdd(-request.numberOfActions());
 }
}
{code}
{code:java}
@Override
 public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
  LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure.getCause());

  try {
   for (ActionRequest action : request.requests()) {
    failureHandler.onFailure(action, failure, -1, requestIndexer);
   }
  } catch (Throwable t) {
   // fail the sink and skip the rest of the items
   // if the failure handler decides to throw an exception
   failureThrowable.compareAndSet(null, t);
  }

  if (flushOnCheckpoint) {
   numPendingRequests.getAndAdd(-request.numberOfActions());
  }
 }
}
{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)