Skip to content

Commit

Permalink
Adds request deduplication for cached remote file access
Browse files Browse the repository at this point in the history
Signed-off-by: Alexis Jeandet <[email protected]>
  • Loading branch information
jeandet committed Jul 1, 2024
1 parent c2f2577 commit 444e12b
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 29 deletions.
3 changes: 3 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
[run]
branch = True
concurrency = multiprocessing
parallel = true
sigterm = true

omit =
tests/*
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/PRs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ jobs:
sudo apt update && sudo apt install -y texlive pandoc
pip install pytest pytest-cov sphinx pandoc
pip install -r docs/requirements.txt
pytest --cov=./ --cov-report=xml
pytest --cov=./ -cov-config=.coveragerc --cov-report=xml
make doctest
- name: Check that release process is not broken
if: matrix.python-version == '3.8' && matrix.os == 'ubuntu-latest'
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:
sudo apt update && sudo apt install -y texlive pandoc
pip install pytest pytest-cov sphinx pandoc
pip install -r docs/requirements.txt
pytest --cov=./ --cov-report=xml
pytest --cov=./ -cov-config=.coveragerc --cov-report=xml
make doctest
- name: Check that release process is not broken
if: matrix.python-version == '3.8' && matrix.os == 'ubuntu-latest'
Expand Down
1 change: 1 addition & 0 deletions requirements_dev.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pandas
pytest-runner
pytest-cov
appdirs
diskcache
requests
Expand Down
65 changes: 50 additions & 15 deletions speasy/core/any_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
import logging
import os
import re
from datetime import timedelta, datetime
import time
from datetime import timedelta, datetime, timezone
from typing import List, Optional, Union

from threading import get_native_id
from speasy.core.cache import CacheCall
from speasy.core.cache import get_item, add_item, CacheItem
from . import http
Expand All @@ -14,6 +15,20 @@
_HREF_REGEX = re.compile(' href="([A-Za-z0-9-_.]+)">')


class PendingRequest:
def __init__(self):
self._start_time = datetime.now(tz=timezone.utc)
self._pid = get_native_id()

@property
def elapsed_time(self):
return datetime.now(tz=timezone.utc) - self._start_time

@property
def pid(self):
return self._pid


class AnyFile(io.IOBase):
def __init__(self, url, file_impl: io.IOBase, status=200):
self._url = url
Expand Down Expand Up @@ -61,13 +76,38 @@ def _make_file_from_cache_entry(entry: CacheItem, url: str, mode: str) -> AnyFil
return AnyFile(url, io.StringIO(entry.data))


def _cache_remote_file(url, timeout: int = http.DEFAULT_TIMEOUT, headers: dict = None, mode='rb') -> AnyFile:
resp = http.urlopen(url=url, headers=headers, timeout=timeout)
if 'b' in mode:
entry = CacheItem(data=resp.bytes, version=resp.getheader('last-modified', str(datetime.now())))
else:
entry = CacheItem(data=resp.text, version=resp.getheader('last-modified', str(datetime.now())))
add_item(key=url, item=entry)
def _wait_for_pending_request(url: str, timeout: int):
cache_item: Optional[CacheItem] = get_item(url)
while type(cache_item) is PendingRequest:
if cache_item.elapsed_time > timedelta(seconds=timeout): # pragma: no cover
log.warning(f"Pending request for {url} timed out")
return None
time.sleep(.1)
cache_item = get_item(url)
return cache_item


def _try_lock_request(url: str, timeout: int):
cache_item: Optional[CacheItem] = get_item(url)
if cache_item is None:
add_item(key=url, item=PendingRequest())
cache_item: Optional[CacheItem] = get_item(url)
if type(cache_item) is PendingRequest:
if cache_item.pid != get_native_id():
return _wait_for_pending_request(url, timeout)
return cache_item


def _cached_get_remote_file(url, timeout: int = http.DEFAULT_TIMEOUT, headers: dict = None, mode='rb') -> AnyFile:
last_modified = http.head(url).getheader('last-modified', str(datetime.now()))
entry = _try_lock_request(url, timeout)
if type(entry) is not CacheItem or last_modified != entry.version:
resp = http.urlopen(url=url, headers=headers, timeout=timeout)
if 'b' in mode:
entry = CacheItem(data=resp.bytes, version=last_modified)
else:
entry = CacheItem(data=resp.text, version=last_modified)
add_item(key=url, item=entry)
return _make_file_from_cache_entry(entry, url, mode)


Expand Down Expand Up @@ -99,12 +139,7 @@ def any_loc_open(url, timeout: int = http.DEFAULT_TIMEOUT, headers: Optional[dic
return AnyFile(url, open(url.replace('file://', ''), mode=mode))
else:
if cache_remote_files:
last_modified = http.head(url).getheader('last-modified', str(datetime.now()))
cache_item: Optional[CacheItem] = get_item(url)
if cache_item is None or last_modified != cache_item.version:
return _cache_remote_file(url, timeout=timeout, headers=headers, mode=mode)
else:
return _make_file_from_cache_entry(cache_item, url, mode)
return _cached_get_remote_file(url, timeout=timeout, headers=headers, mode=mode)
else:
return _remote_open(url, timeout=timeout, headers=headers, mode=mode)

Expand Down
32 changes: 23 additions & 9 deletions speasy/core/direct_archive_downloader/direct_archive_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import re
from datetime import timedelta, datetime
from functools import partial
import random
from typing import Optional, List, Callable

from dateutil.relativedelta import relativedelta
Expand Down Expand Up @@ -40,6 +41,18 @@ def _read_cdf(url: Optional[str], variable: str, master_cdf_url: Optional[str] =
return load_variable(file=url, variable=variable, master_cdf_url=master_cdf_url, cache_remote_files=True)


"""
The rationale behind the following function is to randomize the order of execution so we minimize the requests collisions and maximize the throughput.
"""


def randomized_map(f, l):
indexed_list = list(enumerate(l))
random.shuffle(indexed_list)
result = sorted([(i, f(e)) for i, e in indexed_list], key=lambda x: x[0])
return [e for i, e in result]


def _build_url(url_pattern: str, date: datetime, use_file_list=False) -> Optional[str]:
base_ulr = apply_date_format(url_pattern, date)
if not use_file_list:
Expand Down Expand Up @@ -147,10 +160,12 @@ def get_product(url_pattern: str, variable: str, start_time: AnyDateTimeType, st
fname_regex: str, split_frequency: str = "daily", date_format=None,
file_reader: FileLoaderCallable = _read_cdf, **kwargs) -> Optional[SpeasyVariable]:
v = merge(
list(map(partial(file_reader, variable=variable, **kwargs),
RandomSplitDirectDownload.list_files(split_frequency=split_frequency, url_pattern=url_pattern,
start_time=start_time, stop_time=stop_time,
fname_regex=fname_regex, date_format=date_format))))
randomized_map(partial(file_reader, variable=variable, **kwargs),
RandomSplitDirectDownload.list_files(split_frequency=split_frequency,
url_pattern=url_pattern,
start_time=start_time, stop_time=stop_time,
fname_regex=fname_regex,
date_format=date_format)))
if v is not None:
return v[make_utc_datetime(start_time):make_utc_datetime(stop_time)]
return None
Expand All @@ -164,11 +179,10 @@ def get_product(url_pattern: str, variable: str, start_time: AnyDateTimeType,
file_reader: FileLoaderCallable = _read_cdf,
**kwargs) -> \
Optional[SpeasyVariable]:
v = merge(
list(map(lambda date: file_reader(_build_url(url_pattern, date, use_file_list=use_file_list),
variable=variable, **kwargs),
spilt_range(split_frequency=split_frequency, start_time=start_time,
stop_time=stop_time))))
v = merge(randomized_map(
lambda date: file_reader(_build_url(url_pattern, date, use_file_list=use_file_list), variable=variable,
**kwargs),
spilt_range(split_frequency=split_frequency, start_time=start_time, stop_time=stop_time)))
if v is not None:
return v[make_utc_datetime(start_time):make_utc_datetime(stop_time)]
return None
Expand Down
2 changes: 1 addition & 1 deletion tests/test_direct_archive_downloader.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import unittest

from multiprocessing import Pool
from ddt import ddt, data, unpack

import speasy as spz
Expand Down
37 changes: 35 additions & 2 deletions tests/test_file_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,24 @@
import re
import unittest
from datetime import datetime
import time

from ddt import ddt, data

from speasy.core.any_files import any_loc_open, list_files
from speasy.core.cache import drop_item
from multiprocessing import Value, Process

_HERE_ = os.path.dirname(os.path.abspath(__file__))


def _open_file(url, value):
value.value -= 1
while value.value > 0:
time.sleep(.01)
any_loc_open(url, mode='r', cache_remote_files=True)


@ddt
class FileAccess(unittest.TestCase):
def setUp(self):
Expand Down Expand Up @@ -50,7 +59,9 @@ def test_simple_remote_bin_file(self):
def test_simple_remote_bin_file_with_rewrite_rules(self):
if 'SPEASY_CORE_HTTP_REWRITE_RULES' not in os.environ:
self.skipTest("No rewrite rules defined")
f = any_loc_open("https://thisserver_does_not_exists.lpp.polytechnique.fr/pub/data/ace/mag/level_2_cdaweb/mfi_h0/2014/ac_h0_mfi_20141117_v06.cdf", mode='rb')
f = any_loc_open(
"https://thisserver_does_not_exists.lpp.polytechnique.fr/pub/data/ace/mag/level_2_cdaweb/mfi_h0/2014/ac_h0_mfi_20141117_v06.cdf",
mode='rb')
self.assertIsNotNone(f)
self.assertIn(b'NSSDC Common Data Format', f.read(100))

Expand All @@ -68,6 +79,20 @@ def test_cached_remote_bin_file(self):
self.assertIsNotNone(f)
self.assertGreater(mid - start, stop - mid)

def test_remote_file_request_deduplication(self):
drop_item("https://hephaistos.lpp.polytechnique.fr/data/jeandet/Vbias.html")
sync = Value('i', 5)
try:
processes = [Process(target=_open_file, args=(
"https://hephaistos.lpp.polytechnique.fr/data/jeandet/Vbias.html", sync)) for _ in range(4)]
for p in processes:
p.start()
_open_file("https://hephaistos.lpp.polytechnique.fr/data/jeandet/Vbias.html", sync)
finally:
for p in processes:
p.join()
self.assertEqual(0, sync.value)

@data(
f"{_HERE_}/resources/derived_param.txt",
f"file://{_HERE_}/resources/derived_param.txt"
Expand All @@ -85,7 +110,9 @@ def test_list_remote_files(self):
def test_list_remote_files_with_rewrite_rules(self):
if 'SPEASY_CORE_HTTP_REWRITE_RULES' not in os.environ:
self.skipTest("No rewrite rules defined")
flist = list_files(url='https://thisserver_does_not_exists.lpp.polytechnique.fr/pub/data/ace/mag/level_2_cdaweb/mfi_h0/2014/', file_regex=re.compile(r'.*\.cdf'))
flist = list_files(
url='https://thisserver_does_not_exists.lpp.polytechnique.fr/pub/data/ace/mag/level_2_cdaweb/mfi_h0/2014/',
file_regex=re.compile(r'.*\.cdf'))
self.assertGreaterEqual(len(flist), 10)

@data(
Expand All @@ -99,4 +126,10 @@ def test_list_local_files(self, url):


if __name__ == '__main__':
try:
from pytest_cov.embed import cleanup_on_sigterm
except ImportError:
pass
else:
cleanup_on_sigterm()
unittest.main()

0 comments on commit 444e12b

Please sign in to comment.