Skip to content

Commit

Permalink
[improve][sink]add metrics to elastic search sink
Browse files Browse the repository at this point in the history
  • Loading branch information
zzzming authored and nicoloboschi committed Jun 9, 2023
1 parent c8d7cc6 commit c370cfe
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.elasticsearch.client.BulkProcessor;
import org.apache.pulsar.io.elasticsearch.client.RestClient;
import org.apache.pulsar.io.elasticsearch.client.RestClientFactory;
Expand All @@ -47,7 +48,9 @@ public class ElasticSearchClient implements AutoCloseable {
};

private ElasticSearchConfig config;
private ElasticSearchMetrics metrics;
private RestClient client;
private SinkContext sinkContext;
private final RandomExponentialRetry backoffRetry;

final Set<String> indexCache = new HashSet<>();
Expand All @@ -56,8 +59,9 @@ public class ElasticSearchClient implements AutoCloseable {
final AtomicReference<Exception> irrecoverableError = new AtomicReference<>();
private final IndexNameFormatter indexNameFormatter;

public ElasticSearchClient(ElasticSearchConfig elasticSearchConfig) {
public ElasticSearchClient(ElasticSearchConfig elasticSearchConfig, ElasticSearchMetrics metrics) {
this.config = elasticSearchConfig;
this.metrics = metrics;
if (this.config.getIndexName() != null) {
this.indexNameFormatter = new IndexNameFormatter(this.config.getIndexName());
} else {
Expand All @@ -79,6 +83,7 @@ public void afterBulk(long executionId, List<BulkProcessor.BulkOperationRequest>
checkForIrrecoverableError(record, result);
} else {
record.ack();
metrics.incrementCounter(ElasticSearchMetrics.SUCCESS, 1);
}
}
}
Expand All @@ -89,6 +94,7 @@ public void afterBulk(long executionId, List<BulkProcessor.BulkOperationRequest>
for (BulkProcessor.BulkOperationRequest operation: bulkOperationList) {
final Record record = operation.getPulsarRecord();
record.fail();
metrics.incrementCounter(ElasticSearchMetrics.FAILURE, 1);
}
}
};
Expand All @@ -115,10 +121,13 @@ void checkForIrrecoverableError(Record<?> record, BulkProcessor.BulkOperationRes
for (String error : MALFORMED_ERRORS) {
if (errorCause.contains(error)) {
isMalformed = true;
metrics.incrementCounter(ElasticSearchMetrics.FAILURE, 1);
switch (config.getMalformedDocAction()) {
case IGNORE:
metrics.incrementCounter(ElasticSearchMetrics.MALFORMED_IGNORE, 1);
break;
case WARN:
metrics.incrementCounter(ElasticSearchMetrics.WARN, 1);
log.warn("Ignoring malformed document index={} id={}",
result.getIndex(),
result.getDocumentId(),
Expand All @@ -137,7 +146,10 @@ void checkForIrrecoverableError(Record<?> record, BulkProcessor.BulkOperationRes
if (!isMalformed) {
log.warn("Bulk request failed, message id=[{}] index={} error={}",
record.getMessage()
.map(m -> m.getMessageId().toString())
.map(m -> {
metrics.incrementCounter(ElasticSearchMetrics.FAILURE, 1);
return m.getMessageId().toString();
})
.orElse(""),
result.getIndex(), result.getError());
}
Expand All @@ -160,6 +172,7 @@ public void bulkIndex(Record record, Pair<String, String> idAndDoc) throws Excep
client.getBulkProcessor().appendIndexRequest(bulkIndexRequest);
} catch (Exception e) {
log.debug("index failed id=" + idAndDoc.getLeft(), e);
metrics.incrementCounter(ElasticSearchMetrics.FAILURE, 1);
record.fail();
throw e;
}
Expand All @@ -184,13 +197,16 @@ public boolean indexDocument(Record<GenericObject> record, Pair<String, String>
final boolean createdOrUpdated = client.indexDocument(indexName, documentId, documentSource);
if (createdOrUpdated) {
record.ack();
metrics.incrementCounter(ElasticSearchMetrics.SUCCESS, 1);
} else {
record.fail();
metrics.incrementCounter(ElasticSearchMetrics.FAILURE, 1);
}
return createdOrUpdated;
} catch (final Exception ex) {
log.error("index failed id=" + idAndDoc.getLeft(), ex);
record.fail();
metrics.incrementCounter(ElasticSearchMetrics.FAILURE, 1);
throw ex;
}
}
Expand All @@ -211,6 +227,7 @@ public void bulkDelete(Record<GenericObject> record, String id) throws Exception
} catch (Exception e) {
log.debug("delete failed id: {}", id, e);
record.fail();
metrics.incrementCounter(ElasticSearchMetrics.FAILURE, 1);
throw e;
}
}
Expand All @@ -230,13 +247,16 @@ public boolean deleteDocument(Record<GenericObject> record, String id) throws Ex
final boolean deleted = client.deleteDocument(indexName, id);
if (deleted) {
record.ack();
metrics.incrementCounter(ElasticSearchMetrics.SUCCESS, 1);
} else {
record.fail();
metrics.incrementCounter(ElasticSearchMetrics.FAILURE, 1);
}
return deleted;
} catch (final Exception ex) {
log.debug("index failed id: {}", id, ex);
record.fail();
metrics.incrementCounter(ElasticSearchMetrics.FAILURE, 1);
throw ex;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.elasticsearch;

import org.apache.pulsar.io.core.SinkContext;

/*
* Metrics class for ElasticSearchSink
*/
public class ElasticSearchMetrics {

private SinkContext sinkContext;
// sink metrics
public static final String INCOMING = "_elasticsearch_incoming";

// INCOMING = SUCCESS + FAILURE + SKIP + NULLVALUE_IGNORE
public static final String SUCCESS = "_elasticsearch_success";

// DELETE_ATTEMPT is an attempt to delete a document by id
// TODO: add delete success metrics, currently it's difficult to separate delete and index from the bulk operations
public static final String DELETE_ATTEMPT = "elasticsearch_delete_attempt";

public static final String FAILURE = "elasticsearch_failure";
public static final String SKIP = "elasticsearch_skip";
public static final String WARN = "elasticsearch_warn";
public static final String MALFORMED_IGNORE = "elasticsearch_malformed_ignore";
public static final String NULLVALUE_IGNORE = "elasticsearch_nullvalue_ignore";

public ElasticSearchMetrics(SinkContext sinkContext) {
this.sinkContext = sinkContext;
}

public void incrementCounter(String counter, double value) {
this.sinkContext.recordMetric(counter, value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class ElasticSearchSink implements Sink<GenericObject> {

private ElasticSearchConfig elasticSearchConfig;
private ElasticSearchClient elasticsearchClient;
private ElasticSearchMetrics metrics;
private final ObjectMapper objectMapper = new ObjectMapper();
private ObjectMapper sortedObjectMapper;
private List<String> primaryFields = null;
Expand All @@ -76,7 +77,8 @@ public class ElasticSearchSink implements Sink<GenericObject> {
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
elasticSearchConfig = ElasticSearchConfig.load(config);
elasticSearchConfig.validate();
elasticsearchClient = new ElasticSearchClient(elasticSearchConfig);
metrics = new ElasticSearchMetrics(sinkContext);
elasticsearchClient = new ElasticSearchClient(elasticSearchConfig, metrics);
if (!Strings.isNullOrEmpty(elasticSearchConfig.getPrimaryFields())) {
primaryFields = Arrays.asList(elasticSearchConfig.getPrimaryFields().split(","));
}
Expand Down Expand Up @@ -110,6 +112,7 @@ void setElasticsearchClient(ElasticSearchClient elasticsearchClient) {

@Override
public void write(Record<GenericObject> record) throws Exception {
metrics.incrementCounter(ElasticSearchMetrics.INCOMING, 1);
if (!elasticsearchClient.isFailed()) {
Pair<String, String> idAndDoc = extractIdAndDocument(record);
try {
Expand All @@ -120,16 +123,21 @@ public void write(Record<GenericObject> record) throws Exception {
switch (elasticSearchConfig.getNullValueAction()) {
case DELETE:
if (idAndDoc.getLeft() != null) {
metrics.incrementCounter(ElasticSearchMetrics.DELETE_ATTEMPT, 1);
if (elasticSearchConfig.isBulkEnabled()) {
elasticsearchClient.bulkDelete(record, idAndDoc.getLeft());
} else {
elasticsearchClient.deleteDocument(record, idAndDoc.getLeft());
}
} else {
metrics.incrementCounter(ElasticSearchMetrics.SKIP, 1);
}
break;
case IGNORE:
metrics.incrementCounter(ElasticSearchMetrics.NULLVALUE_IGNORE, 1);
break;
case FAIL:
metrics.incrementCounter(ElasticSearchMetrics.FAILURE, 1);
elasticsearchClient.failed(
new PulsarClientException.InvalidMessageException("Unexpected null message value"));
throw elasticsearchClient.irrecoverableError.get();
Expand All @@ -142,10 +150,14 @@ public void write(Record<GenericObject> record) throws Exception {
}
}
} catch (JsonProcessingException jsonProcessingException) {
// this is from non-bulk action
// a generical failure counter should have been incremented
switch (elasticSearchConfig.getMalformedDocAction()) {
case IGNORE:
metrics.incrementCounter(ElasticSearchMetrics.MALFORMED_IGNORE, 1);
break;
case WARN:
metrics.incrementCounter(ElasticSearchMetrics.WARN, 1);
log.warn("Ignoring malformed document messageId={}",
record.getMessage().map(Message::getMessageId).orElse(null),
jsonProcessingException);
Expand All @@ -163,6 +175,7 @@ public void write(Record<GenericObject> record) throws Exception {
throw e;
}
} else {
metrics.incrementCounter(ElasticSearchMetrics.FAILURE, 1);
throw new IllegalStateException("Elasticsearch client is in FAILED status");
}
}
Expand Down Expand Up @@ -278,7 +291,7 @@ public Pair<String, String> extractIdAndDocument(Record<GenericObject> record) t
}
doc = sanitizeValue(doc);
return Pair.of(id, doc);
} else {
} else {
final byte[] data = record
.getMessage()
.orElseThrow(() -> new IllegalArgumentException("Record does not carry message information"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.SinkContext;
import org.mockito.Mockito;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -78,16 +80,18 @@ public void testBasicAuth() throws Exception {
config.setIndexName(indexName);
config.setMaxRetries(1);
config.setBulkEnabled(true);
SinkContext mockContext = Mockito.mock(SinkContext.class);
ElasticSearchMetrics metrics = new ElasticSearchMetrics(mockContext);
// ensure auth is needed
try (ElasticSearchClient client = new ElasticSearchClient(config);) {
try (ElasticSearchClient client = new ElasticSearchClient(config, metrics);) {
expectThrows(ElasticSearchConnectionException.class, () -> {
client.createIndexIfNeeded(indexName);
});
}

config.setPassword(ELASTICPWD);

try (ElasticSearchClient client = new ElasticSearchClient(config);) {
try (ElasticSearchClient client = new ElasticSearchClient(config, metrics);) {
ensureCalls(client, indexName);
}
}
Expand All @@ -103,25 +107,26 @@ public void testTokenAuth() throws Exception {
config.setMaxRetries(1);
config.setBulkEnabled(true);


SinkContext mockContext = Mockito.mock(SinkContext.class);
ElasticSearchMetrics metrics = new ElasticSearchMetrics(mockContext);
config.setPassword(ELASTICPWD);
String token;
try (ElasticSearchClient client = new ElasticSearchClient(config);) {
try (ElasticSearchClient client = new ElasticSearchClient(config, metrics);) {
token = createAuthToken(client, "elastic", ELASTICPWD);
}

config.setUsername(null);
config.setPassword(null);

// ensure auth is needed
try (ElasticSearchClient client = new ElasticSearchClient(config);) {
try (ElasticSearchClient client = new ElasticSearchClient(config, metrics);) {
expectThrows(ElasticSearchConnectionException.class, () -> {
client.createIndexIfNeeded(indexName);
});
}

config.setToken(token);
try (ElasticSearchClient client = new ElasticSearchClient(config);) {
try (ElasticSearchClient client = new ElasticSearchClient(config, metrics);) {
ensureCalls(client, indexName);
}
}
Expand All @@ -137,24 +142,26 @@ public void testApiKey() throws Exception {
config.setMaxRetries(1);
config.setBulkEnabled(true);

SinkContext mockContext = Mockito.mock(SinkContext.class);
ElasticSearchMetrics metrics = new ElasticSearchMetrics(mockContext);
config.setPassword(ELASTICPWD);
String apiKey;
try (ElasticSearchClient client = new ElasticSearchClient(config);) {
try (ElasticSearchClient client = new ElasticSearchClient(config, metrics);) {
apiKey = createApiKey(client);
}

config.setUsername(null);
config.setPassword(null);

// ensure auth is needed
try (ElasticSearchClient client = new ElasticSearchClient(config);) {
try (ElasticSearchClient client = new ElasticSearchClient(config, metrics);) {
expectThrows(ElasticSearchConnectionException.class, () -> {
client.createIndexIfNeeded(indexName);
});
}

config.setApiKey(apiKey);
try (ElasticSearchClient client = new ElasticSearchClient(config);) {
try (ElasticSearchClient client = new ElasticSearchClient(config, metrics);) {
ensureCalls(client, indexName);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.io.elasticsearch;

import org.apache.pulsar.io.core.SinkContext;
import org.mockito.Mockito;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testcontainers.utility.MountableFile;
Expand Down Expand Up @@ -150,7 +152,9 @@ public void testSslWithClientAuth() throws IOException {
}

private void testClientWithConfig(ElasticSearchConfig config) throws IOException {
try (ElasticSearchClient client = new ElasticSearchClient(config);) {
SinkContext mockContext = Mockito.mock(SinkContext.class);
ElasticSearchMetrics metrics = new ElasticSearchMetrics(mockContext);
try (ElasticSearchClient client = new ElasticSearchClient(config, metrics);) {
testIndexExists(client);
}
}
Expand Down
Loading

0 comments on commit c370cfe

Please sign in to comment.