Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New take on retrying order asset downloads #1011

Draft
wants to merge 1 commit into
base: maint-2.1
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 50 additions & 29 deletions planet/clients/orders.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,21 @@
# the License.
"""Functionality for interacting with the orders api"""
import asyncio
import hashlib
import json
import logging
from pathlib import Path
import re
import time
from typing import AsyncIterator, Callable, List, Optional
import uuid
import json
import hashlib
from pathlib import Path
from typing import AsyncIterator, Callable, List, Optional

import stamina
from tqdm.asyncio import tqdm

from .. import exceptions
from ..constants import PLANET_BASE_URL
from ..http import Session
from ..http import RETRY_EXCEPTIONS, Session
from ..models import Paged

BASE_URL = f'{PLANET_BASE_URL}/compute/ops'
Expand Down Expand Up @@ -232,6 +234,7 @@ async def aggregated_order_stats(self) -> dict:
response = await self._session.request(method='GET', url=url)
return response.json()

@stamina.retry(on=tuple(RETRY_EXCEPTIONS))
async def download_asset(self,
location: str,
filename: Optional[str] = None,
Expand All @@ -257,31 +260,49 @@ async def download_asset(self,
limit is exceeded.

"""
response = await self._session.request(method='GET', url=location)
filename = filename or response.filename
length = response.length
if not filename:
raise exceptions.ClientError(
f'Could not determine filename at {location}')
async with self._session._limiter, self._session._client.stream('GET', location) as resp:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rate limiter is used here.

content_length = int(resp.headers.get('content-length'))

# Fall back to content-disposition for a filename.
if not filename:
try:
content_disposition = resp.headers['content-disposition']
match = re.search('filename="?([^"]+)"?',
content_disposition)
filename = match.group(1) # type: ignore
except (AttributeError, KeyError) as err:
raise exceptions.ClientError(
f'Could not determine filename at {location}') from err

dl_path = Path(directory, filename)
dl_path.parent.mkdir(exist_ok=True, parents=True)
LOGGER.info(f'Downloading {dl_path}')

dl_path = Path(directory, filename)
dl_path.parent.mkdir(exist_ok=True, parents=True)
LOGGER.info(f'Downloading {dl_path}')

try:
mode = 'wb' if overwrite else 'xb'
with open(dl_path, mode) as fp:
with tqdm(total=length,
unit_scale=True,
unit_divisor=1024 * 1024,
unit='B',
desc=str(filename),
disable=not progress_bar) as progress:
await self._session.write(location, fp, progress.update)
except FileExistsError:
LOGGER.info(f'File {dl_path} exists, not overwriting')

return dl_path
try:
mode = 'wb' if overwrite else 'xb'
with dl_path.open(mode) as fp:
with tqdm(total=content_length,
unit_scale=True,
unit_divisor=1024 * 1024,
unit='B',
desc=str(filename),
disable=not progress_bar) as progress:

previous = resp.num_bytes_downloaded

# Size from boto3.s3.transfer.TransferConfig
# io_chunksize. Planet assets are generally
# several MB or more.
async for chunk in resp.aiter_bytes(chunk_size=262144):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default chunk size was 100 before... made the progress bar look smooth, but bad for overall throughput.

fp.write(chunk)
current = resp.num_bytes_downloaded
progress.update(current - previous)
previous = current

except FileExistsError:
LOGGER.info(f'File {dl_path} exists, not overwriting')

return dl_path

async def download_order(self,
order_id: str,
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
'httpx>=0.23.0',
'jsonschema',
'pyjwt>=2.1',
'stamina',
'tqdm>=4.56',
'typing-extensions',
]
Expand Down
6 changes: 2 additions & 4 deletions tests/integration/test_orders_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ async def test_download_asset_md(tmpdir, session):

@respx.mock
@pytest.mark.anyio
async def test_download_asset_img(tmpdir, open_test_img, session):
async def test_download_asset_img_with_retry(tmpdir, open_test_img, session):
dl_url = TEST_DOWNLOAD_URL + '/1?token=IAmAToken'

img_headers = {
Expand All @@ -587,9 +587,7 @@ async def _stream_img():
# an error caused by respx and not this code
# https://github.com/lundberg/respx/issues/130
respx.get(dl_url).side_effect = [
httpx.Response(HTTPStatus.OK,
headers=img_headers,
request='donotcloneme'),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Behavior of this PR no longer depends on multiple download requests.

httpx.ReadError("no can do!"),
httpx.Response(HTTPStatus.OK,
stream=_stream_img(),
headers=img_headers,
Expand Down