forked from biothings/biothings.api
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathes.py
1480 lines (1250 loc) · 54.5 KB
/
es.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import copy
import datetime
import functools
import itertools
import json
# setup ES logging
import logging
import re
import time
from importlib import import_module
from typing import List, Mapping, Optional
from elasticsearch import Elasticsearch, ElasticsearchException, NotFoundError, RequestError, TransportError, helpers
from elasticsearch.serializer import JSONSerializer
from biothings.utils.common import inf, iter_n, merge, nan, splitstr, traverse
# the following imports used by utils.es.Database
from biothings.utils.dataload import dict_walk, update_dict_recur
from biothings.utils.dotfield import parse_dot_fields
from biothings.utils.hub_db import IDatabase
from biothings.utils.serializer import to_json
formatter = logging.Formatter("%(levelname)s:%(name)s:%(message)s")
es_logger = logging.getLogger("elasticsearch")
es_logger.setLevel(logging.WARNING)
ch = logging.StreamHandler()
ch.setFormatter(formatter)
es_logger.addHandler(ch)
es_tracer = logging.getLogger("elasticsearch.trace")
es_tracer.setLevel(logging.WARNING)
ch = logging.StreamHandler()
ch.setFormatter(formatter)
es_tracer.addHandler(ch)
def verify_ids(
doc_iter,
es_host,
index,
doc_type=None,
step=100000,
):
"""verify how many docs from input interator/list overlapping with existing docs."""
index = index
doc_type = doc_type
es = get_es(es_host)
q = {"query": {"ids": {"values": []}}}
total_cnt = 0
found_cnt = 0
out = []
for doc_batch in iter_n(doc_iter, n=step):
id_li = [doc["_id"] for doc in doc_batch]
# id_li = [doc['_id'].replace('chr', '') for doc in doc_batch]
q["query"]["ids"]["values"] = id_li
xres = es.search(index=index, doc_type=doc_type, body=q, _source=False)
found_cnt += xres["hits"]["total"]
total_cnt += len(id_li)
out.extend([x["_id"] for x in xres["hits"]["hits"]])
return out
def get_es(es_host, timeout=120, max_retries=3, retry_on_timeout=False):
es = Elasticsearch(es_host, timeout=timeout, max_retries=max_retries, retry_on_timeout=retry_on_timeout)
return es
# WARNING: this wrapper changes the _index and _doc_type
# usually this is unwanted but we will leave it this way here
def wrapper(func):
"""this wrapper allows passing index and doc_type from wrapped method."""
def outter_fn(*args, **kwargs):
self = args[0]
index = kwargs.pop("index", self._index) # pylint: disable=protected-access
doc_type = kwargs.pop("doc_type", self._doc_type) # pylint: disable=protected-access
self._index = index # pylint: disable=protected-access
self._doc_type = doc_type # pylint: disable=protected-access
return func(*args, **kwargs)
outter_fn.__doc__ = func.__doc__
return outter_fn
class IndexerException(Exception):
pass
@functools.lru_cache()
def get_doc_type(es_client, index_name):
if int(es_client.info()["version"]["number"].split(".")[0]) < 7:
mappings = es_client.indices.get_mapping(index_name)
mappings = mappings[index_name]["mappings"]
return next(iter(mappings.keys()))
return None
class ESIndex:
"""An Elasticsearch Index Wrapping A Client.
Counterpart for pymongo.collection.Collection"""
# a new approach to biothings.utils.es.ESIndexer
# but not intended to be a replacement in features.
def __init__(self, client, index_name):
self.client = client # synchronous
self.index_name = index_name # MUST exist
@property
def doc_type(self):
return get_doc_type(self.client, self.index_name)
# TODO: remove the code block below if the above works
# if int(self.client.info()["version"]["number"].split(".")[0]) < 7:
# mappings = self.client.indices.get_mapping(self.index_name)
# mappings = mappings[self.index_name]["mappings"]
# return next(iter(mappings.keys()))
# return None
# SUBCLASS NOTE &&
# BEFORE YOU ADD A METHOD UNDER THIS CLASS:
# An object of this class refers to an existing ES index. All operations
# should target this index. Do not put uncommon methods here. They belong
# to the subclasses. This class is to provide a common framework to support
# Index-specific ES operations, an concept does not exist in the low-level
# ES library, thus only providing low-level common operations, like doc_type
# parsing across ES versions for biothings usecases. (single type per index)
class ESIndexer:
# RETIRING THIS CLASS
# --
# Since we don't always directly work on
# existing actual indices, the index referred here
# can be an alias or does not exist.
def __init__(
self,
index,
doc_type="_doc",
es_host="localhost:9200",
step=500,
step_size=10, # elasticsearch.helpers.bulk
number_of_shards=1,
number_of_replicas=0,
check_index=True,
**kwargs,
):
self.es_host = es_host
self._es = get_es(es_host, **kwargs)
self._host_major_ver = int(self._es.info()["version"]["number"].split(".")[0])
# the name of the index when ESIndexer is initialized
self.canonical_index_name = index
self._index = index # placeholder, will be updated later
if check_index:
# if index is actually an alias, resolve the alias to
# the real underlying index
self.check_index()
self._doc_type = None
if doc_type:
self._doc_type = doc_type
else:
# assuming index exists, get mapping to discover doc_type
try:
m = self.get_mapping()
assert len(m) == 1, "Expected only one doc type, got: %s" % m.keys()
self._doc_type = list(m).pop()
except Exception as e: # pylint: disable=broad-except
if check_index:
logging.info("Failed to guess doc_type: %s", e)
self.number_of_shards = number_of_shards # set number_of_shards when create_index
self.number_of_replicas = int(number_of_replicas) # set number_of_replicas when create_index
self.step = step or 500 # the bulk size when doing bulk operation.
self.step_size = (step_size or 10) * 1048576 # MB -> bytes
self.s = None # number of records to skip, useful to continue indexing after an error.
def check_index(self):
"""
Check if index is an alias, and update self._index to point to
actual index
TODO: the overall design of ESIndexer is not great. If we are exposing ES
implementation details (such as the abilities to create and delete indices,
create and update aliases, etc.) to the user of this Class, then this method
doesn't seem that out of place.
"""
try:
res = self._es.indices.get_alias(name=self.canonical_index_name)
# this is an alias
if len(res) != 1:
raise RuntimeError(f"Alias '{self.canonical_index_name}' does not associate with exactly 1 index")
self._index = list(res.keys())[0]
except NotFoundError:
# probably intended to be an index name
self._index = self.canonical_index_name
@wrapper
def get_biothing(self, bid, only_source=False, **kwargs):
rawdoc = self._es.get(index=self._index, id=bid, doc_type=self._doc_type, **kwargs)
if not only_source:
return rawdoc
else:
doc = rawdoc["_source"]
doc["_id"] = rawdoc["_id"]
return doc
@wrapper
def exists(self, bid):
"""return True/False if a biothing id exists or not."""
try:
doc = self.get_biothing(bid, stored_fields=None)
return doc["found"]
except NotFoundError:
return False
@wrapper
def mexists(self, bid_list):
q = {"query": {"ids": {"values": bid_list}}}
res = self._es.search(
index=self._index,
doc_type=self._doc_type,
body=q,
stored_fields=None,
size=len(bid_list),
)
# id_set = set([doc['_id'] for doc in res['hits']['hits']]) # TODO: Confirm this line
id_set = {doc["_id"] for doc in res["hits"]["hits"]}
return [(bid, bid in id_set) for bid in bid_list]
@wrapper
def count(self, q=None, raw=False):
try:
count_kwargs = {"index": self._index}
if q is not None:
count_kwargs.update({"doc_type": self._doc_type, "q": q})
_res = self._es.count(**count_kwargs)
return _res if raw else _res["count"]
except NotFoundError:
return None
@wrapper
def count_src(self, src):
if isinstance(src, str):
src = [src]
cnt_d = {}
for _src in src:
q = {"query": {"constant_score": {"filter": {"exists": {"field": _src}}}}}
cnt_d[_src] = self.count(q)
return cnt_d
@wrapper
def create_index(self, mapping=None, extra_settings=None):
if not self._es.indices.exists(index=self._index):
body = {
"settings": {
"number_of_shards": self.number_of_shards,
"number_of_replicas": self.number_of_replicas,
}
}
extra_settings = extra_settings or {}
self.sanitize_settings(extra_settings)
body["settings"].update(extra_settings)
if mapping:
# the mapping is passed in for elasticsearch 6
# if the remote server is of elasticsearch version 7 or later
# drop the doc_type first level key as it is no longer supported
self._populate_es_version()
if self._host_major_ver > 6:
if len(mapping) == 1 and next(iter(mapping)) not in (
"properties",
"dynamic",
"_meta",
):
mapping = next(iter(mapping.values()))
mapping = {"mappings": mapping}
body.update(mapping)
self._es.indices.create(index=self._index, **body)
def _populate_es_version(self):
if not hasattr(self, "_es_version"):
self._es_version = int(self._es.info()["version"]["number"].split(".")[0])
def exists_index(self, index: Optional[str] = None):
if not index:
index = self._index
return self._es.indices.exists(index)
def index(self, doc, id=None, action="index"): # pylint: disable=redefined-builtin
"""add a doc to the index. If id is not None, the existing doc will be
updated.
"""
self._es.index(index=self._index, doc_type=self._doc_type, body=doc, id=id, params={"op_type": action})
def index_bulk(self, docs, step=None, action="index"):
self._populate_es_version()
index_name = self._index
doc_type = self._doc_type
step = step or self.step
def _get_bulk(doc):
# keep original doc
ndoc = copy.copy(doc)
ndoc.update(
{
"_index": index_name,
"_type": doc_type,
"_op_type": action,
}
)
if self._host_major_ver > 6:
ndoc.pop("_type")
return ndoc
actions = (_get_bulk(doc) for doc in docs)
num_ok, errors = helpers.bulk(self._es, actions, chunk_size=step, max_chunk_bytes=self.step_size)
if errors:
raise ElasticsearchException("%d errors while bulk-indexing: %s" % (len(errors), [str(e) for e in errors]))
return num_ok, errors
def delete_doc(self, id): # pylint: disable=redefined-builtin
"""delete a doc from the index based on passed id."""
return self._es.delete(index=self._index, doc_type=self._doc_type, id=id)
def delete_docs(self, ids, step=None):
"""delete a list of docs in bulk."""
index_name = self._index
doc_type = self._doc_type
step = step or self.step
def _get_bulk(_id):
if self._host_major_ver >= 7:
doc = {"_op_type": "delete", "_index": index_name, "_id": _id}
else:
doc = {"_op_type": "delete", "_index": index_name, "_type": doc_type, "_id": _id}
return doc
actions = (_get_bulk(_id) for _id in ids)
return helpers.bulk(self._es, actions, chunk_size=step, stats_only=True, raise_on_error=False)
def delete_index(self, index=None):
if not index:
index = self._index
self._es.indices.delete(index)
def update(self, id, extra_doc, upsert=True): # pylint: disable=redefined-builtin
"""update an existing doc with extra_doc.
allow to set upsert=True, to insert new docs.
"""
body = {"doc": extra_doc}
if upsert:
body["doc_as_upsert"] = True
return self._es.update(index=self._index, doc_type=self._doc_type, id=id, body=body)
def update_docs(self, partial_docs, upsert=True, step=None, **kwargs):
"""update a list of partial_docs in bulk.
allow to set upsert=True, to insert new docs.
"""
index_name = self._index
doc_type = self._doc_type
step = step or self.step
def _get_bulk(doc):
if self._host_major_ver >= 7:
doc = {"_op_type": "update", "_index": index_name, "_id": doc["_id"], "doc": doc}
else:
doc = {
"_op_type": "update",
"_index": index_name,
"_type": doc_type,
"_id": doc["_id"],
"doc": doc,
}
if upsert:
doc["doc_as_upsert"] = True
return doc
actions = (_get_bulk(doc) for doc in partial_docs)
return helpers.bulk(self._es, actions, chunk_size=step, **kwargs)
def get_mapping(self):
"""return the current index mapping"""
if self._host_major_ver <= 6:
m = self._es.indices.get_mapping(
index=self._index,
doc_type=self._doc_type,
)
return m[self._index]["mappings"]
elif self._host_major_ver <= 8:
m = self._es.indices.get_mapping(index=self._index)
# fake the mapping doc_type
m = {self._doc_type: m[self._index]["mappings"]}
return m
else:
raise RuntimeError(
f"Server Elasticsearch version is {self._host_major_ver} "
"which is unsupported when using old ESIndexer class"
)
def update_mapping(self, m):
if self._host_major_ver <= 6:
assert list(m) == [self._doc_type], "Bad mapping format, should have one doc_type, got: %s" % list(m)
assert "properties" in m[self._doc_type], "Bad mapping format, no 'properties' key"
return self._es.indices.put_mapping(index=self._index, doc_type=self._doc_type, body=m)
elif self._host_major_ver <= 8:
# this is basically guessing based on heuristics
if len(m) == 1:
if "properties" not in m: # basically {'_doc': mapping}
m = next(iter(m.values())) # take out the mapping
else: # basically just the mapping or type: properties
pass # ignoring the possibility of doc_type='properties'
else: # we don't expect {} as input, so len(m) > 1
if "properties" not in m:
raise RuntimeError(f"Possibly invalid mapping {m}")
return self._es.indices.put_mapping(index=self._index, body=m)
else:
raise RuntimeError(
f"Server Elasticsearch version is {self._host_major_ver} "
"which is unsupported when using old ESIndexer class"
)
def get_mapping_meta(self):
"""return the current _meta field."""
m = self.get_mapping()
doc_type = self._doc_type
if doc_type is None:
# fetch doc_type from mapping
assert len(m) == 1, (
"More than one doc_type found, not supported when self._doc_type " + "is not initialized"
)
doc_type = list(m.keys())[0]
return {"_meta": m[doc_type]["_meta"]}
def update_mapping_meta(self, meta):
allowed_keys = {"_meta", "_timestamp"}
# if isinstance(meta, dict) and len(set(meta) - allowed_keys) == 0:
if isinstance(meta, dict) and not set(meta) - allowed_keys:
if self._host_major_ver >= 7:
return self._es.indices.put_mapping(
index=self._index,
body=meta,
)
else: # not sure if _type needs to be specified
body = {self._doc_type: meta}
return self._es.indices.put_mapping(doc_type=self._doc_type, body=body, index=self._index)
else:
raise ValueError('Input "meta" should have and only have "_meta" field.')
def flush_and_refresh(self):
try:
self._es.indices.flush()
self._es.indices.refresh()
except: # pylint: disable=bare-except # noqa
pass
@wrapper
def build_index(self, collection, verbose=True, query=None, bulk=True, update=False, allow_upsert=True):
# update some settings for bulk indexing
body = {
"index": {
"refresh_interval": "-1", # disable refresh temporarily
"auto_expand_replicas": "0-all",
}
}
self.update_settings(body)
try:
self._build_index_sequential(collection, verbose, query=query, bulk=bulk, update=update, allow_upsert=True)
finally:
# restore some settings after bulk indexing is done.
body = {"index": {"refresh_interval": "1s"}} # default settings
self.update_settings(body)
self.flush_and_refresh()
time.sleep(1)
src_cnt = collection.count(query)
es_cnt = self.count()
if src_cnt != es_cnt:
raise IndexerException(
"Total count of documents does not match [{}, should be {}]".format(es_cnt, src_cnt)
)
return es_cnt # NOQA B012
def _build_index_sequential(
self, collection, verbose=False, query=None, bulk=True, update=False, allow_upsert=True
):
def rate_control(cnt, t):
delay = 0
if t > 90:
delay = 30
elif t > 60:
delay = 10
if delay:
time.sleep(delay)
from biothings.utils.mongo import doc_feeder
src_docs = doc_feeder(collection, step=self.step, s=self.s, batch_callback=rate_control, query=query)
if bulk:
if update:
# input doc will update existing one
# if allow_upsert, create new one if not exist
res = self.update_docs(src_docs, upsert=allow_upsert)
else:
# input doc will overwrite existing one
res = self.index_bulk(src_docs)
# if len(res[1]) > 0:
if res[1]:
raise IndexerException("Error: {} docs failed indexing.".format(len(res[1])))
return res[0]
else:
cnt = 0
for doc in src_docs:
self.index(doc)
cnt += 1
return cnt
@wrapper
def optimize(self, max_num_segments=1):
"""optimize the default index."""
params = {
"wait_for_merge": False,
"max_num_segments": max_num_segments,
}
return self._es.indices.forcemerge(index=self._index, params=params)
def clean_field(self, field, dryrun=True, step=5000):
"""remove a top-level field from ES index, if the field is the only field of the doc,
remove the doc as well.
step is the size of bulk update on ES
try first with dryrun turned on, and then perform the actual updates with dryrun off.
"""
if self._host_major_ver >= 7:
raise RuntimeError("clean_field is no longer supported")
q = {"query": {"constant_score": {"filter": {"exists": {"field": field}}}}}
cnt_orphan_doc = 0
cnt = 0
_li = []
for doc in self.doc_feeder(query=q):
if set(doc) == {"_id", field}:
cnt_orphan_doc += 1
# delete orphan doc
_li.append({"delete": {"_index": self._index, "_type": self._doc_type, "_id": doc["_id"]}})
else:
# otherwise, just remove the field from the doc
_li.append({"update": {"_index": self._index, "_type": self._doc_type, "_id": doc["_id"]}})
# this script update requires "script.disable_dynamic: false" setting
# in elasticsearch.yml
_li.append({"script": 'ctx._source.remove("{}")'.format(field)})
cnt += 1
if len(_li) == step:
if not dryrun:
self._es.bulk(body=_li)
_li = []
if _li:
if not dryrun:
self._es.bulk(body=_li)
return {"total": cnt, "updated": cnt - cnt_orphan_doc, "deleted": cnt_orphan_doc}
@wrapper
def doc_feeder_using_helper(self, step=None, verbose=True, query=None, scroll="10m", **kwargs):
# verbose unimplemented
step = step or self.step
q = query if query else {"query": {"match_all": {}}}
for rawdoc in helpers.scan(
client=self._es,
query=q,
scroll=scroll,
index=self._index,
doc_type=self._doc_type,
**kwargs,
):
if rawdoc.get("_source", False):
doc = rawdoc["_source"]
doc["_id"] = rawdoc["_id"]
yield doc
else:
yield rawdoc
@wrapper
def doc_feeder(self, step=None, verbose=True, query=None, scroll="10m", only_source=True, **kwargs):
step = step or self.step
q = query if query else {"query": {"match_all": {}}}
_q_cnt = self.count(q=q, raw=True)
n = _q_cnt["count"]
n_shards = _q_cnt["_shards"]["total"]
assert n_shards == _q_cnt["_shards"]["successful"]
# Not sure if scroll size is per shard anymore in the new ES...should check this
_size = int(step / n_shards)
assert _size * n_shards == step
cnt = 0
# t0 = time.time()
# if verbose:
# t1 = time.time()
res = self._es.search(
index=self._index,
doc_type=self._doc_type,
body=q,
size=_size,
search_type="scan",
scroll=scroll,
**kwargs,
)
# double check initial scroll request returns no hits
# assert len(res['hits']['hits']) == 0
assert not res["hits"]["hits"]
while True:
# if verbose:
# t1 = time.time()
res = self._es.scroll(scroll_id=res["_scroll_id"], scroll=scroll)
# if len(res['hits']['hits']) == 0:
if not res["hits"]["hits"]:
break
else:
for rawdoc in res["hits"]["hits"]:
if rawdoc.get("_source", False) and only_source:
doc = rawdoc["_source"]
doc["_id"] = rawdoc["_id"]
yield doc
else:
yield rawdoc
cnt += 1
assert cnt == n, "Error: scroll query terminated early [{}, {}], please retry.\nLast response:\n{}".format(
cnt, n, res
)
@wrapper
def get_id_list(self, step=None, verbose=True):
step = step or self.step
cur = self.doc_feeder(step=step, _source=False, verbose=verbose)
for doc in cur:
yield doc["_id"]
@wrapper
def get_docs(self, ids, step=None, only_source=True, **mget_args):
"""Return matching docs for given ids iterable, if not found return None.
A generator is returned to the matched docs. If only_source is False,
the entire document is returned, otherwise only the source is returned."""
# chunkify
step = step or self.step
for chunk in iter_n(ids, step):
if self._host_major_ver > 6:
chunk_res = self._es.mget(body={"ids": chunk}, index=self._index, **mget_args)
else:
chunk_res = self._es.mget(body={"ids": chunk}, index=self._index, doc_type=self._doc_type, **mget_args)
for rawdoc in chunk_res["docs"]:
if ("found" not in rawdoc) or (("found" in rawdoc) and not rawdoc["found"]):
continue
elif not only_source:
yield rawdoc
else:
doc = rawdoc["_source"]
doc["_id"] = rawdoc["_id"]
yield doc
def find_biggest_doc(self, fields_li, min=5, return_doc=False): # pylint: disable=redefined-builtin
"""return the doc with the max number of fields from fields_li."""
for n in range(len(fields_li), min - 1, -1):
for field_set in itertools.combinations(fields_li, n):
q = " AND ".join(["_exists_:" + field for field in field_set])
q = {"query": {"query_string": {"query": q}}}
cnt = self.count(q)
if cnt > 0:
if return_doc:
res = self._es.search(index=self._index, doc_type=self._doc_type, body=q, size=cnt)
return res
else:
return (cnt, q)
def snapshot(self, repo, snapshot, mode=None, **params):
body = {
"indices": self._index,
"include_global_state": False
# there is no reason to include global state in our application
# we want to separate the staging env from the production env
# (global state includes index templates and ingest pipeline)
# but this doesn't mean this setting has to be here
# maybe move this line to where it belongs later
}
if mode == "purge":
# Note: this works, just for small one when deletion is done instantly
try:
self._es.snapshot.get(repo, snapshot)
# if we can get it, we have to delete it
self._es.snapshot.delete(repo, snapshot)
except NotFoundError:
# ok, nothing to delete/purge
pass
try:
return self._es.snapshot.create(repo, snapshot, body=body, params=params)
except RequestError as e:
try:
err_msg = e.info["error"]["reason"]
except KeyError:
err_msg = e.error
raise IndexerException("Can't snapshot '%s': %s" % (self._index, err_msg))
def restore(self, repo_name, snapshot_name, index_name=None, purge=False):
indices = self.get_indices_from_snapshots(repo_name, snapshot_name)
if not index_name:
index_name = indices[0]
indices = ",".join(indices)
if purge:
try:
self._es.indices.get(index=index_name)
# if we get there, it exists, delete it
self._es.indices.delete(index=index_name)
except NotFoundError:
# no need to delete it,
pass
try:
# this is just about renaming index within snapshot to index_name
body = {
"indices": indices,
"rename_replacement": index_name,
"ignore_unavailable": True,
"rename_pattern": "(.+)",
# set to False, snapshots were created without global state anyway.
# In ES8, an error will be raised if set to True
"include_global_state": False,
}
return self._es.snapshot.restore(repo_name, snapshot_name, body=body)
except TransportError as e:
raise IndexerException(
"Can't restore snapshot '%s' (does index '%s' already exist ?): %s" % (snapshot_name, index_name, e)
)
def get_alias(self, index: str = None, alias_name: str = None) -> List[str]:
"""
Get indices with alias associated with given index name or alias name
Args:
index: name of index
alias_name: name of alias
Returns:
Mapping of index names with their aliases
"""
return self._es.indices.get_alias(index=index, name=alias_name)
def get_settings(self, index: str = None) -> Mapping[str, Mapping]:
"""
Get indices with settings associated with given index name
Args:
index: name of index
Returns:
Mapping of index names with their settings
"""
return self._es.indices.get_settings(index=index)
def get_indice_names_by_settings(self, index: str = None, sort_by_creation_date=False, reverse=False) -> List[str]:
"""
Get list of indices names associated with given index name, using indices' settings
Args:
index: name of index
sort_by_creation_date: sort the result by indice's creation_date
reverse: control the direction of the sorting
Returns:
list of index names (str)
"""
indices_settings = self.get_settings(index)
names_with_creation_date = [
(indice_name, setting["settings"]["index"]["creation_date"])
for indice_name, setting in indices_settings.items()
]
if sort_by_creation_date:
names_with_creation_date = sorted(
names_with_creation_date,
key=lambda name_with_creation_date: name_with_creation_date[1],
reverse=reverse,
)
indice_names = [name for name, _ in names_with_creation_date]
return indice_names
def update_alias(self, alias_name: str, index: Optional[str] = None):
"""
Create or update an ES alias pointing to an index
Creates or updates an alias in Elasticsearch, associated with the
given index name or the underlying index of the ESIndexer instance.
When the alias name does not exist, it will be created. If an existing
alias already exists, it will be updated to only associate with the
index.
When the alias name already exists, an exception will be raised,
UNLESS the alias name is the same as index name that the ESIndexer is
initialized with. In this case, the existing index with the name
collision will be deleted, and the alias will be created
in its place. This feature is intended for seamless migration from
an index to an alias associated with an index for zero-downtime
installs.
Args:
alias_name: name of the alias
index: name of the index to associate with alias. If None, the
index of the ESIndexer instance is used.
Raises:
IndexerException
"""
if index is None:
index = self._index
if not self._es.indices.exists(alias_name):
# case 1 it does not already exist
# create the alias pointing to _index
self._es.indices.put_alias(index=index, name=alias_name)
else: # case 2 it already exists
if self._es.indices.exists_alias(name=alias_name):
# if it is an alias, blindly update
# This removes any other indices associated with the alias
actions = {"actions": [{"add": {"index": index, "alias": alias_name}}]}
removes = [
{"remove": {"index": index_name, "alias": alias_name}} for index_name in self.get_alias(alias_name)
]
actions["actions"].extend(removes)
self._es.indices.update_aliases(body=actions)
else: # it is an index
# if not _index and is the canonical index name
# then delete the index and create alias
if alias_name == self.canonical_index_name:
self._es.indices.delete(alias_name)
self._es.indices.put_alias(index=index, name=alias_name)
else:
raise IndexerException(
f"Cannot create alias {alias_name} " "an index with the same name " "already exists"
)
def get_repository(self, repo_name):
try:
return self._es.snapshot.get_repository(repo_name)
except NotFoundError:
raise IndexerException("Repository '%s' doesn't exist" % repo_name)
def create_repository(self, repo_name, settings):
try:
self._es.snapshot.create_repository(repo_name, settings)
except TransportError as e:
raise IndexerException("Can't create snapshot repository '%s': %s" % (repo_name, e))
def get_snapshots(self, repo_name, snapshot_name):
try:
snapshots = self._es.snapshot.get(repository=repo_name, snapshot=snapshot_name)
return [snapshot for snapshot in snapshots["snapshots"]]
except NotFoundError as ex:
message = str(ex)
if ex.error == "repository_missing_exception":
message = "Repository '%s' doesn't exist" % repo_name
if ex.error == "snapshot_missing_exception":
message = "Snapshot '%s' doesn't exist" % snapshot_name
raise IndexerException(message)
def get_indices_from_snapshots(self, repo_name, snapshot_name):
snapshots = self.get_snapshots(repo_name, snapshot_name)
indices = []
for snapshot in snapshots:
indices.extend([index for index in (snapshot.get("indices") or [])])
return indices
def get_snapshot_status(self, repo, snapshot):
return self._es.snapshot.status(repo, snapshot)
def get_restore_status(self, index_name=None):
index_name = index_name or self._index
recov = self._es.indices.recovery(index=index_name)
if index_name not in recov:
return {"status": "INIT", "progress": "0%"}
shards = recov[index_name]["shards"]
# get all shards status
shards_status = [s["stage"] for s in shards]
done = len([s for s in shards_status if s == "DONE"])
if set(shards_status) == {"DONE"}:
return {"status": "DONE", "progress": "100%"}
else:
return {
"status": "IN_PROGRESS",
"progress": "%.2f%%" % (done / len(shards_status) * 100),
}
def get_internal_number_of_replicas(self):
try:
index_settings = self._es.indices.get_settings(self._index)
return index_settings[self._index]["settings"]["index"]["number_of_replicas"]
except Exception:
return
def set_internal_number_of_replicas(self, number_of_replicas=None):
if not number_of_replicas:
number_of_replicas = self.number_of_replicas
settings = {"index": {"number_of_replicas": number_of_replicas}}
self.update_settings(settings)
def sanitize_settings(self, settings):
"""
Clean up settings dictinary to remove those static fields cannot be updated.
like: "uuid", "provided_name", "creation_date", "version",
settings will be updated both in-place and returned as well.
"""
setting_fields_to_remove = [
"uuid",
"provided_name",
"creation_date",
"version",
"number_of_shards",
]
for field in setting_fields_to_remove:
settings["index"].pop(field, None)
return settings
def update_settings(self, settings, close=False, **params):
"""
Parameters:
- settings: should be valid ES index's settings.
- close: In order to update static settings, the index must be closed first.
Ref: https://www.elastic.co/guide/en/elasticsearch/reference/7.17/index-modules.html#index-modules-settings
"""
# Some static fields like: "uuid", "provided_name", "creation_date", "version",
# "number_of_shards", which ES doesn't allow update them.
self.sanitize_settings(settings)
if close:
self.close()
self._es.indices.put_settings(
body=settings,
index=self._index,
params=params,
)
if close:
self.open()
def reindex(self, src_index, is_remote=False, **kwargs):
"""In order to reindex from remote,
- src es_host must be set to an IP which the current ES host can connect to.
It means that if 2 indices locate in same host, the es_host can be set to localhost,
but if they are in different hosts, an IP must be used instead.
- If src host uses SSL, https must be included in es_host. Eg: https://192.168.1.10:9200
- src host must be whitelisted in the current ES host.
Ref: https://www.elastic.co/guide/en/elasticsearch/reference/7.17/reindex-upgrade-remote.html
"""
body = {
"source": {"index": src_index._index},
"dest": {"index": self._index},
}
if is_remote:
host = src_index.es_host
if not host.startswith("http"):
host = "http://" + host
body["source"]["remote"] = {"host": host}
return self._es.reindex(body=body, **kwargs)
def close(self):
self._es.indices.close(self._index)
def open(self):
self._es.indices.open(self._index)
class MappingError(Exception):
pass
def generate_es_mapping(inspect_doc, init=True, level=0):
"""Generate an ES mapping according to "inspect_doc", which is
produced by biothings.utils.inspect module"""
map_tpl = {
int: {"type": "integer"},
bool: {"type": "boolean"},
float: {"type": "float"},
str: {
"type": "keyword",
"normalizer": "keyword_lowercase_normalizer",
}, # not splittable (like an ID for instance)
splitstr: {"type": "text"},
}
# inspect_doc, if it's been jsonified, contains keys with type as string,
# such as "<class 'str'>". This is not a real type and we need to convert them