Replies: 2 comments
-
Here is my understanding, 2 threads would pass the size check but only one (say T1) would actually send requests to ES, and the other one (say T2) would return directly and continue to fill the queue, and if the traffic load is high enough, the T2 would fill up the queue to the size of
Right, PersistenceTimer's thread pool would be blocked if the BulkProcessor cannot process the requests quick enough, to me, this is a producer-consumer paradigm with fixed queue size, if the queue is full and consumer doesn't consume the queue, the producer would block and wait. |
Beta Was this translation helpful? Give feedback.
-
Thanks. This makes it clear. For others reading this thread, I want to add more context for |
Beta Was this translation helpful? Give feedback.
-
BulkProcessor has a parameter(concurrentRequests=2,
elasticsearch/concurrentRequests
) to control the concurrency(from codes) through a Semaphore.Meanwhile, PersistenceTimer uses ExecutorService(pool size = 2,
core/prepareThreads
) to run prepare and essentially dobatchDAO.flush(innerPrepareRequests)
. In the flush, it uses a singleton BulkProcessor instance, which causesBulkProcessor#internalAdd
to be called by two threads concurrently.When I reviewed
BulkProcessor#internalAdd
, I noticedflushIfNeeded
is only checking the size of the queue, without any lock mechanism. This means when this queue size reaches the threshold, two threads(fromPersistenceTimer
) are going to pass this check. Then until onerequests.drainTo(batch);
, the other is always getting an empty queue, and returns directly. **Note, at this point, the two threads are held, so, no extra thread to put the data into the queue.If I read the codes correctly, there is no chance we could run concurrent requests to the ElasticSearch search. We have to leave another thread B to return the loop and wait for the next chance.
I know that, these two threads are not going to run synchronously. And the reality could be much better than the above description.
@kezhenxu94 My question is majorly about, is this our design of
Concurrency Mechanism
of the BulkProcessor?.If YES, is this also meaning, PersistenceTimer's thread pool could be blocked by BulkProcessor if the size of PersistenceTimer's pool is larger than the size of the BulkProcessor's
concurrentRequests
setting.FYI @wankai123 @hanahmily
Beta Was this translation helpful? Give feedback.
All reactions