Skip to content

Commit

Permalink
[improve][es-sink] Add error log for failed bulk records in ES sink (a…
Browse files Browse the repository at this point in the history
…pache#16177)

(cherry picked from commit c327b63)
  • Loading branch information
fantapsody authored and nicoloboschi committed Jun 9, 2023
1 parent 42f3668 commit c8d7cc6
Showing 1 changed file with 11 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void afterBulk(long executionId, List<BulkProcessor.BulkOperationRequest>
final Record record = bulkOperationList.get(index++).getPulsarRecord();
if (result.isError()) {
record.fail();
checkForIrrecoverableError(result);
checkForIrrecoverableError(record, result);
} else {
record.ack();
}
Expand Down Expand Up @@ -106,13 +106,15 @@ boolean isFailed() {
return irrecoverableError.get() != null;
}

void checkForIrrecoverableError(BulkProcessor.BulkOperationResult result) {
void checkForIrrecoverableError(Record<?> record, BulkProcessor.BulkOperationResult result) {
if (!result.isError()) {
return;
}
final String errorCause = result.getError();
boolean isMalformed = false;
for (String error : MALFORMED_ERRORS) {
if (errorCause.contains(error)) {
isMalformed = true;
switch (config.getMalformedDocAction()) {
case IGNORE:
break;
Expand All @@ -132,6 +134,13 @@ void checkForIrrecoverableError(BulkProcessor.BulkOperationResult result) {
}
}
}
if (!isMalformed) {
log.warn("Bulk request failed, message id=[{}] index={} error={}",
record.getMessage()
.map(m -> m.getMessageId().toString())
.orElse(""),
result.getIndex(), result.getError());
}
}

public void bulkIndex(Record record, Pair<String, String> idAndDoc) throws Exception {
Expand Down

0 comments on commit c8d7cc6

Please sign in to comment.