Skip to content

Commit d1805da

Browse files
committed
feat(indexd-tools): use asyncio for multi-thread and multi-processing, update docs, add tests, add backoff to indexd calls
1 parent cf118e6 commit d1805da

File tree

9 files changed

+394
-126
lines changed

9 files changed

+394
-126
lines changed

README.md

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,24 +25,34 @@ How to download a manifest `object-manifest.csv` of all file objects in indexd f
2525
```
2626
import sys
2727
import logging
28+
import asyncio
2829
2930
from gen3.index import Gen3Index
3031
from gen3.tools import indexing
3132
from gen3.tools.indexing.verify_manifest import manifest_row_parsers
3233
33-
logging.basicConfig(filename="output.log", level=logging.INFO)
34+
logging.basicConfig(filename="output.log", level=logging.DEBUG)
3435
logging.getLogger().addHandler(logging.StreamHandler(sys.stdout))
3536
3637
COMMONS = "https://{{insert-commons-here}}/"
3738
38-
3939
def main():
40-
indexing.download_object_manifest(
41-
COMMONS, output_filename="object-manifest.csv", num_processes=3
40+
loop = asyncio.new_event_loop()
41+
asyncio.set_event_loop(loop)
42+
43+
loop.run_until_complete(
44+
indexing.async_download_object_manifest(
45+
COMMONS,
46+
output_filename="object-manifest.csv",
47+
num_processes=8,
48+
max_concurrent_requests=24,
49+
)
4250
)
4351
52+
4453
if __name__ == "__main__":
4554
main()
55+
4656
```
4757

4858
The output file will contain columns `guid, urls, authz, acl, md5, file_size` with info

gen3/index.py

Lines changed: 103 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,49 @@
1+
import aiohttp
2+
import backoff
13
import requests
4+
import urllib.parse
5+
import logging
6+
import sys
7+
28
import indexclient.client as client
39

4-
import urllib.parse
10+
11+
def __log_backoff_retry(details):
12+
args_str = ", ".join(map(str, details["args"]))
13+
kwargs_str = (
14+
(", " + _print_kwargs(details["kwargs"])) if details.get("kwargs") else ""
15+
)
16+
func_call_log = "{}({}{})".format(
17+
_print_func_name(details["target"]), args_str, kwargs_str
18+
)
19+
logging.warning(
20+
"backoff: call {func_call} delay {wait:0.1f} seconds after {tries} tries".format(
21+
func_call=func_call_log, **details
22+
)
23+
)
24+
25+
26+
def __log_backoff_giveup(details):
27+
args_str = ", ".join(map(str, details["args"]))
28+
kwargs_str = (
29+
(", " + _print_kwargs(details["kwargs"])) if details.get("kwargs") else ""
30+
)
31+
func_call_log = "{}({}{})".format(
32+
_print_func_name(details["target"]), args_str, kwargs_str
33+
)
34+
logging.error(
35+
"backoff: gave up call {func_call} after {tries} tries; exception: {exc}".format(
36+
func_call=func_call_log, exc=sys.exc_info(), **details
37+
)
38+
)
39+
40+
41+
# Default settings to control usage of backoff library.
42+
BACKOFF_SETTINGS = {
43+
"on_backoff": __log_backoff_retry,
44+
"on_giveup": __log_backoff_giveup,
45+
"max_tries": 2,
46+
}
547

648

749
class Gen3Index:
@@ -25,9 +67,14 @@ class Gen3Index:
2567

2668
def __init__(self, endpoint, auth_provider=None, service_location="index"):
2769
endpoint = endpoint.strip("/")
70+
# if running locally, indexd is deployed by itself without a location relative
71+
# to the commons
72+
if "http://localhost" in endpoint:
73+
service_location = ""
74+
2875
if not endpoint.endswith(service_location):
2976
endpoint += "/" + service_location
30-
endpoint += "/"
77+
3178
self.client = client.IndexClient(endpoint, auth=auth_provider)
3279

3380
### Get Requests
@@ -44,6 +91,7 @@ def is_healthy(self):
4491
return False
4592
return response.text == "Healthy"
4693

94+
@backoff.on_exception(backoff.expo, Exception, **BACKOFF_SETTINGS)
4795
def get_version(self):
4896
"""
4997
@@ -54,6 +102,7 @@ def get_version(self):
54102
response.raise_for_status()
55103
return response.json()
56104

105+
@backoff.on_exception(backoff.expo, Exception, **BACKOFF_SETTINGS)
57106
def get_stats(self):
58107
"""
59108
@@ -64,6 +113,7 @@ def get_stats(self):
64113
response.raise_for_status()
65114
return response.json()
66115

116+
@backoff.on_exception(backoff.expo, Exception, **BACKOFF_SETTINGS)
67117
def get_all_records(self, limit=None, paginate=False, start=None):
68118
"""
69119
@@ -108,13 +158,13 @@ def get_all_records(self, limit=None, paginate=False, start=None):
108158

109159
return all_records
110160

161+
@backoff.on_exception(backoff.expo, Exception, **BACKOFF_SETTINGS)
111162
def get_records_on_page(self, limit=None, page=None):
112163
"""
113164
114165
Get a list of all records given the page and page size limit
115166
116167
"""
117-
all_records = []
118168
params = {}
119169
url = "index/"
120170

@@ -131,6 +181,36 @@ def get_records_on_page(self, limit=None, page=None):
131181

132182
return response.json().get("records")
133183

184+
@backoff.on_exception(backoff.expo, Exception, **BACKOFF_SETTINGS)
185+
async def async_get_records_on_page(self, limit=None, page=None, _ssl=None):
186+
"""
187+
Asynchronous function to request a page from indexd.
188+
189+
Args:
190+
page (int/str): indexd page to request
191+
192+
Returns:
193+
List[dict]: List of indexd records from the page
194+
"""
195+
all_records = []
196+
params = {}
197+
198+
if limit is not None:
199+
params["limit"] = limit
200+
201+
if page is not None:
202+
params["page"] = page
203+
204+
query = urllib.parse.urlencode(params)
205+
206+
url = f"{self.client.url}/index" + "?" + query
207+
async with aiohttp.ClientSession() as session:
208+
async with session.get(url, ssl=_ssl) as response:
209+
response = await response.json()
210+
211+
return response.get("records")
212+
213+
@backoff.on_exception(backoff.expo, Exception, **BACKOFF_SETTINGS)
134214
def get(self, guid, dist_resolution=True):
135215
"""
136216
@@ -151,6 +231,7 @@ def get(self, guid, dist_resolution=True):
151231

152232
return rec.to_json()
153233

234+
@backoff.on_exception(backoff.expo, Exception, **BACKOFF_SETTINGS)
154235
def get_urls(self, size=None, hashes=None, guids=None):
155236
"""
156237
@@ -171,6 +252,7 @@ def get_urls(self, size=None, hashes=None, guids=None):
171252
urls = self.client._get("urls", params=p).json()
172253
return [url for _, url in urls.items()]
173254

255+
@backoff.on_exception(backoff.expo, Exception, **BACKOFF_SETTINGS)
174256
def get_record(self, guid):
175257
"""
176258
@@ -184,6 +266,7 @@ def get_record(self, guid):
184266

185267
return rec.to_json()
186268

269+
@backoff.on_exception(backoff.expo, Exception, **BACKOFF_SETTINGS)
187270
def get_with_params(self, params=None):
188271
"""
189272
@@ -202,6 +285,7 @@ def get_with_params(self, params=None):
202285

203286
return rec.to_json()
204287

288+
@backoff.on_exception(backoff.expo, Exception, **BACKOFF_SETTINGS)
205289
def get_latest_version(self, guid, has_version=False):
206290
"""
207291
@@ -222,6 +306,7 @@ def get_latest_version(self, guid, has_version=False):
222306

223307
return rec.to_json()
224308

309+
@backoff.on_exception(backoff.expo, Exception, **BACKOFF_SETTINGS)
225310
def get_versions(self, guid):
226311
"""
227312
@@ -241,6 +326,7 @@ def get_versions(self, guid):
241326

242327
### Post Requests
243328

329+
@backoff.on_exception(backoff.expo, Exception, **BACKOFF_SETTINGS)
244330
def create_record(
245331
self,
246332
hashes,
@@ -291,6 +377,7 @@ def create_record(
291377
)
292378
return rec.to_json()
293379

380+
@backoff.on_exception(backoff.expo, Exception, **BACKOFF_SETTINGS)
294381
def create_blank(self, uploader, file_name=None):
295382
"""
296383
@@ -316,6 +403,7 @@ def create_blank(self, uploader, file_name=None):
316403

317404
return self.get_record(rec["did"])
318405

406+
@backoff.on_exception(backoff.expo, Exception, **BACKOFF_SETTINGS)
319407
def create_new_version(
320408
self,
321409
guid,
@@ -390,6 +478,7 @@ def create_new_version(
390478
return self.get_record(rec["did"])
391479
return None
392480

481+
@backoff.on_exception(backoff.expo, Exception, **BACKOFF_SETTINGS)
393482
def get_records(self, dids):
394483
"""
395484
@@ -417,6 +506,7 @@ def get_records(self, dids):
417506

418507
### Put Requests
419508

509+
@backoff.on_exception(backoff.expo, Exception, **BACKOFF_SETTINGS)
420510
def update_blank(self, guid, rev, hashes, size):
421511
"""
422512
@@ -445,6 +535,7 @@ def update_blank(self, guid, rev, hashes, size):
445535

446536
return self.get_record(rec["did"])
447537

538+
@backoff.on_exception(backoff.expo, Exception, **BACKOFF_SETTINGS)
448539
def update_record(
449540
self,
450541
guid,
@@ -486,6 +577,7 @@ def update_record(
486577

487578
### Delete Requests
488579

580+
@backoff.on_exception(backoff.expo, Exception, **BACKOFF_SETTINGS)
489581
def delete_record(self, guid):
490582
"""
491583
@@ -501,3 +593,11 @@ def delete_record(self, guid):
501593
rec = self.client.get(guid)
502594
rec.delete()
503595
return rec
596+
597+
598+
def _print_func_name(function):
599+
return "{}.{}".format(function.__module__, function.__name__)
600+
601+
602+
def _print_kwargs(kwargs):
603+
return ", ".join("{}={}".format(k, repr(v)) for k, v in list(kwargs.items()))

gen3/tools/indexing/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
from gen3.tools.indexing.download_manifest import download_object_manifest
1+
from gen3.tools.indexing.download_manifest import async_download_object_manifest
22
from gen3.tools.indexing.verify_manifest import verify_object_manifest

0 commit comments

Comments
 (0)