-
Notifications
You must be signed in to change notification settings - Fork 13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parallelization flag for management commands #16
Comments
Upon looking at the Elasticsearch python client. There appears to be a |
I've often thought of attempting to index in parallel - but time hasn't allowed for it as of yet. I'd certainly consider an contributions via a PR for inclusion. I honestly had no idea |
@jaddison I have been trying it out recently but ran into other issues. Mainly with the GIL and pickling the queryset becoming a CPU bound issue. |
@jaddison Here is a rough implementation which extends your Key things to note are the introduction of from elasticsearch.helpers import parallel_bulk
from simple_elasticsearch.mixins import ElasticsearchTypeMixin
from simple_elasticsearch.utils import queryset_iterator
class ParallelElasticsearchTypeMixin(ElasticsearchTypeMixin):
@classmethod
def get_thread_count(cls):
return 4
@classmethod
def parallel_bulk_index(cls, es=None, queryset=None):
es = es or cls.get_es()
if queryset is None:
queryset = cls.get_queryset()
bulk_limit = cls.get_bulk_index_limit()
thread_limit = cls.get_thread_count()
query_limit = cls.get_query_limit()
# this requires that `get_queryset` is implemented
actions_iterator = queryset_iterator(queryset, query_limit)
for success, info in parallel_bulk(client=es, actions=actions_iterator,
chunk_size=bulk_limit, thread_count=thread_limit,
expand_action_callback=cls.bulk_expansion_callback):
if not success:
print('Doc failed', info)
@classmethod
def bulk_expansion_callback(cls, obj):
tmp = ()
should_delete = not cls.should_index(obj)
doc = cls.get_document(obj)
if not should_delete:
# allow for the case where a document cannot be indexed;
# the implementation of `get_document()` should return a
# falsy value.
doc = cls.get_document(obj)
if not doc:
doc = {}
data = {
'_index': cls.get_index_name(),
'_type': cls.get_type_name(),
'_id': cls.get_document_id(obj)
}
data.update(cls.get_request_params(obj))
action = {'delete' if should_delete else 'index': data}
# bulk operation instructions/details
tmp += action,
# only append bulk operation data if it's not a delete operation
if not should_delete:
tmp += doc,
# expansion callback must return a tuple
return tmp |
Going from gut instinct and without having tested anything, I can't really see the point of Maybe a better approach for this library would be to use actual process-based multiprocessing to split the entire workload - including data querying, document generation, as well as sending to ES.
Thoughts? |
@jaddison That absolutely works for me and the steps you stated make sense. For personal use, I ended up using Celery and their chunks workflow to divide up the Queryset into sub-tasks. Each task would asynchronously retrieve the Queryset, generate docs and push to ES. However, chunks is a part of Celery. That would introduce a coupled dependency to use a queuing technology. Not sure if this is an ok complexity to introduce or there is a simpler solution to achieve the same results. |
Not sure if this feature keeps this library simple as the title suggests. However, I think for large indexes this could be helpful in speeding up commands.
The text was updated successfully, but these errors were encountered: