Skip to content

Thread lock when unable to insert in Elasticsearch #925

Open
@nicolasm35

Description

@nicolasm35

Java API client version

8.15.1

Java version

openjdk version "21.0.5" 2024-10-15 LTS

Elasticsearch Version

8.15.1

Problem description

Insert data in Elasticsearch.

Important parameters:
throughput = 50 records/s
bulk concurrent requests = 2
bulk max actions = 10
bulk max size = 90 (never triggers insertions)

BulkIngester

        bulkIngester = BulkIngester.of(b -> b
                .client(client)
                .maxOperations(bulkMaxActions)
                .maxConcurrentRequests(bulkConcurrentRequests)
                .maxSize(bulkMaxSize * 1024L * 1024)
                .listener(esBulkListener));

BulkListener

public class EsBulkListener implements BulkListener<byte[]> {

    private static final Logger LOG = LogManager.getLogger(EsBulkListener.class);

    private AckCounter ackCounter;

    private boolean acknowledgeRecords;

    public EsBulkListener(boolean acknowledgeRecords) {
        this.ackCounter = new AckCounter();
        this.acknowledgeRecords = acknowledgeRecords;
    }

    @Override
    public void beforeBulk(long executionId, BulkRequest request, List<byte[]> contexts) {
        //LOG.info("BulkIngester Execution (" + executionId + ") - About to execute new bulk insert composed of " + request.operations().size() + " actions");
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, List<byte[]> contexts, BulkResponse response) {
        if (response.errors()) {
            LOG.error("BulkIngester Bulk Response has failures: " + response.errors());
            if (acknowledgeRecords) {
                response.items().forEach(i -> {
                    if (i.error() != null) {
                        ackCounter.updateFailure(1L);
                    } else {
                        ackCounter.updateSuccess(1L);
                    }
                });
            }
        } else {
            //LOG.info("BulkIngester Execution (" + executionId + ") - Bulk insert composed of " + request.operations().size() +" actions, took " + response.took() + " ms");
            if (acknowledgeRecords) {
                ackCounter.updateSuccess(request.operations().size());
            }
        }
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, List<byte[]> contexts, Throwable failure) {
        LOG.error("BulkIngester (" + executionId + ") - Execution of bulk request failed: " + failure.getMessage());
        if (acknowledgeRecords) {
            ackCounter.updateFailure(request.operations().size());
        }
    }

    AckCounter getAckCounter() {
        return ackCounter;
    }
}

Insertion:

    public void send(Object record) {
    	Map<String, Object> esValues = (Map<String, Object>)record;
    	String docId = (String) esValues.remove(DOC_ID);
    	bulkIngester.add(op -> op
    			.index(ind -> ind
    					.id(docId)
    					.index(indexName)
    					.document(esValues)));
    }

Set Elasticsearch master down to fail insertions in Elasticsearch.
You have errors: bulk request failed: Connection refused
Then fix Elasticsearch master to insert again.
Insertions are blocked.

If bulk concurrent requests = 1 there is no issue.
Note that same test does not fail with deprecated bulk processor

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions