xiongkun created FLINK-12551:
--------------------------------
Summary: elasticsearch6 connector print log error
Key: FLINK-12551
URL:
https://issues.apache.org/jira/browse/FLINK-12551 Project: Flink
Issue Type: Bug
Components: Connectors / ElasticSearch
Affects Versions: 1.6.3
Reporter: xiongkun
when i use elasticsearch connector ,when my project is running,i find some data does not insert elasticsearch ,so i want to read log help me ,but the log does contain importance message,so i read source code (org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase),i find a error on write ERROR log.
{code:java}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
BulkItemResponse itemResponse;
Throwable failure;
RestStatus restStatus;
try {
for (int i = 0; i < response.getItems().length; i++) {
itemResponse = response.getItems()[i];
failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
if (failure != null) {
LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure);
restStatus = itemResponse.getFailure().getStatus();
if (restStatus == null) {
failureHandler.onFailure(request.requests().get(i), failure, -1, requestIndexer);
} else {
failureHandler.onFailure(request.requests().get(i), failure, restStatus.getStatus(), requestIndexer);
}
}
}
} catch (Throwable t) {
// fail the sink and skip the rest of the items
// if the failure handler decides to throw an exception
failureThrowable.compareAndSet(null, t);
}
}
if (flushOnCheckpoint) {
numPendingRequests.getAndAdd(-request.numberOfActions());
}
}
{code}
{code:java}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure.getCause());
try {
for (ActionRequest action : request.requests()) {
failureHandler.onFailure(action, failure, -1, requestIndexer);
}
} catch (Throwable t) {
// fail the sink and skip the rest of the items
// if the failure handler decides to throw an exception
failureThrowable.compareAndSet(null, t);
}
if (flushOnCheckpoint) {
numPendingRequests.getAndAdd(-request.numberOfActions());
}
}
}
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)