From 195039362517e6a2d8de56383d47900682e60e75 Mon Sep 17 00:00:00 2001 From: Sean Gillies Date: Mon, 31 Jul 2023 18:49:42 -0600 Subject: [PATCH] New take on retrying order asset downloads Wraps the entire orders.download_asset() in a retry, eliminating the double download and concentrating the retry complexity instead of spreading it over several modules. Resolves #1010 --- planet/clients/orders.py | 79 ++++++++++++++++++---------- setup.py | 1 + tests/integration/test_orders_api.py | 6 +-- 3 files changed, 53 insertions(+), 33 deletions(-) diff --git a/planet/clients/orders.py b/planet/clients/orders.py index 3ff0c059..86d62558 100644 --- a/planet/clients/orders.py +++ b/planet/clients/orders.py @@ -14,19 +14,21 @@ # the License. """Functionality for interacting with the orders api""" import asyncio +import hashlib +import json import logging -from pathlib import Path +import re import time -from typing import AsyncIterator, Callable, List, Optional import uuid -import json -import hashlib +from pathlib import Path +from typing import AsyncIterator, Callable, List, Optional +import stamina from tqdm.asyncio import tqdm from .. import exceptions from ..constants import PLANET_BASE_URL -from ..http import Session +from ..http import RETRY_EXCEPTIONS, Session from ..models import Paged BASE_URL = f'{PLANET_BASE_URL}/compute/ops' @@ -232,6 +234,7 @@ async def aggregated_order_stats(self) -> dict: response = await self._session.request(method='GET', url=url) return response.json() + @stamina.retry(on=tuple(RETRY_EXCEPTIONS)) async def download_asset(self, location: str, filename: Optional[str] = None, @@ -257,31 +260,49 @@ async def download_asset(self, limit is exceeded. """ - response = await self._session.request(method='GET', url=location) - filename = filename or response.filename - length = response.length - if not filename: - raise exceptions.ClientError( - f'Could not determine filename at {location}') + async with self._session._limiter, self._session._client.stream('GET', location) as resp: + content_length = int(resp.headers.get('content-length')) + + # Fall back to content-disposition for a filename. + if not filename: + try: + content_disposition = resp.headers['content-disposition'] + match = re.search('filename="?([^"]+)"?', + content_disposition) + filename = match.group(1) # type: ignore + except (AttributeError, KeyError) as err: + raise exceptions.ClientError( + f'Could not determine filename at {location}') from err + + dl_path = Path(directory, filename) + dl_path.parent.mkdir(exist_ok=True, parents=True) + LOGGER.info(f'Downloading {dl_path}') - dl_path = Path(directory, filename) - dl_path.parent.mkdir(exist_ok=True, parents=True) - LOGGER.info(f'Downloading {dl_path}') - - try: - mode = 'wb' if overwrite else 'xb' - with open(dl_path, mode) as fp: - with tqdm(total=length, - unit_scale=True, - unit_divisor=1024 * 1024, - unit='B', - desc=str(filename), - disable=not progress_bar) as progress: - await self._session.write(location, fp, progress.update) - except FileExistsError: - LOGGER.info(f'File {dl_path} exists, not overwriting') - - return dl_path + try: + mode = 'wb' if overwrite else 'xb' + with dl_path.open(mode) as fp: + with tqdm(total=content_length, + unit_scale=True, + unit_divisor=1024 * 1024, + unit='B', + desc=str(filename), + disable=not progress_bar) as progress: + + previous = resp.num_bytes_downloaded + + # Size from boto3.s3.transfer.TransferConfig + # io_chunksize. Planet assets are generally + # several MB or more. + async for chunk in resp.aiter_bytes(chunk_size=262144): + fp.write(chunk) + current = resp.num_bytes_downloaded + progress.update(current - previous) + previous = current + + except FileExistsError: + LOGGER.info(f'File {dl_path} exists, not overwriting') + + return dl_path async def download_order(self, order_id: str, diff --git a/setup.py b/setup.py index c8a8c29e..a845a0f4 100644 --- a/setup.py +++ b/setup.py @@ -29,6 +29,7 @@ 'httpx>=0.23.0', 'jsonschema', 'pyjwt>=2.1', + 'stamina', 'tqdm>=4.56', 'typing-extensions', ] diff --git a/tests/integration/test_orders_api.py b/tests/integration/test_orders_api.py index 0c164c53..212aa5a0 100644 --- a/tests/integration/test_orders_api.py +++ b/tests/integration/test_orders_api.py @@ -566,7 +566,7 @@ async def test_download_asset_md(tmpdir, session): @respx.mock @pytest.mark.anyio -async def test_download_asset_img(tmpdir, open_test_img, session): +async def test_download_asset_img_with_retry(tmpdir, open_test_img, session): dl_url = TEST_DOWNLOAD_URL + '/1?token=IAmAToken' img_headers = { @@ -587,9 +587,7 @@ async def _stream_img(): # an error caused by respx and not this code # https://github.com/lundberg/respx/issues/130 respx.get(dl_url).side_effect = [ - httpx.Response(HTTPStatus.OK, - headers=img_headers, - request='donotcloneme'), + httpx.ReadError("no can do!"), httpx.Response(HTTPStatus.OK, stream=_stream_img(), headers=img_headers,