Skip to content

Commit

Permalink
fix: bulk ingester might skip lister requests
Browse files Browse the repository at this point in the history
  • Loading branch information
fabriziofortino committed Aug 20, 2024
1 parent bf86580 commit d0af219
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -311,12 +311,6 @@ public void flush() {
if (exec != null) {
// A request was actually sent
exec.futureResponse.handle((resp, thr) -> {

sendRequestCondition.signalIfReadyAfter(() -> {
requestsInFlightCount--;
closeCondition.signalAllIfReady();
});

if (resp != null) {
// Success
if (listener != null) {
Expand All @@ -330,6 +324,11 @@ public void flush() {
exec.contexts, thr));
}
}

sendRequestCondition.signalIfReadyAfter(() -> {
requestsInFlightCount--;
closeCondition.signalAllIfReady();
});
return null;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,26 +89,37 @@ private void printStats(TestTransport transport) {
@Test
public void basicTestFlush() throws Exception {
// Prime numbers, so that we have leftovers to flush before shutting down
multiThreadTest(7, 3, 5, 101);
multiThreadTest(7, 3, 5, 101, true);
}

@Test
public void basicTestFlushWithInternalScheduler() throws Exception {
// Prime numbers, so that we have leftovers to flush before shutting down
multiThreadTest(7, 3, 5, 101, false);
}

@Test
public void basicTestNoFlush() throws Exception {
// Will have nothing to flush on close.
multiThreadTest(10, 3, 5, 100);
multiThreadTest(10, 3, 5, 100, true);
}

private void multiThreadTest(int maxOperations, int maxRequests, int numThreads, int numOperations) throws Exception {
private void multiThreadTest(int maxOperations, int maxRequests, int numThreads, int numOperations, boolean externalScheduler) throws Exception {

CountingListener listener = new CountingListener();
TestTransport transport = new TestTransport();
ElasticsearchAsyncClient client = new ElasticsearchAsyncClient(transport);
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
ScheduledExecutorService scheduler;
if (externalScheduler) {
scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = Executors.defaultThreadFactory().newThread(r);
t.setName("my-bulk-ingester-executor#" );
t.setName("my-bulk-ingester-executor#");
t.setDaemon(true);
return t;
});
});
} else {
scheduler = null;
}

BulkIngester<Void> ingester = BulkIngester.of(b -> b
.client(client)
Expand Down Expand Up @@ -138,7 +149,7 @@ private void multiThreadTest(int maxOperations, int maxRequests, int numThreads,

ingester.close();
transport.close();
scheduler.shutdownNow();
if (scheduler != null) scheduler.shutdownNow();

printStats(ingester);
printStats(listener);
Expand Down

0 comments on commit d0af219

Please sign in to comment.