From 70da8de4b82b84cc60097053b1d0dcad89730d4b Mon Sep 17 00:00:00 2001 From: Chris Roat <1053153+chrisroat@users.noreply.github.com> Date: Sun, 22 Aug 2021 14:24:54 -0700 Subject: [PATCH] Add fsspec as alternate file protocol In principle, fsspec can completely subsume the current protocols (file and s3) and can provide many more. However, since it is a new integration and getting credentialling correct can be difficult, I implemented it as an alternate protocol which people can migrate to. Neither s3fs and gcsfs (installed via fsspec[s3,gcs]) have wheels for alpine, so they need to be built during startup. To correctly run the tests, the following extra install of dev packages is needed: apk add musl-dev --- datajoint/external.py | 26 ++++++++++++++++++++++++++ datajoint/settings.py | 3 ++- local-docker-compose.yml | 4 ++++ requirements.txt | 1 + tests/schema_external.py | 30 +++++++++++++++++++++++++++++- tests/test_external.py | 14 ++++++++++++++ tests/test_filepath.py | 10 ++++++++++ 7 files changed, 86 insertions(+), 2 deletions(-) diff --git a/datajoint/external.py b/datajoint/external.py index 808b2d65b..b4c03fe1e 100644 --- a/datajoint/external.py +++ b/datajoint/external.py @@ -1,5 +1,6 @@ from pathlib import Path, PurePosixPath, PureWindowsPath from collections import Mapping +from fsspec.core import url_to_fs from tqdm import tqdm from .settings import config from .errors import DataJointError, MissingExternalFile @@ -44,6 +45,8 @@ def __init__(self, connection, store, database): if self.spec['protocol'] == 'file' and not Path(self.spec['location']).is_dir(): raise FileNotFoundError('Inaccessible local directory %s' % self.spec['location']) from None + if self.spec['protocol'] == 'fsspec': + self._fs, _ = url_to_fs(self.spec['location']) @property def definition(self): @@ -84,6 +87,11 @@ def _make_external_filepath(self, relative_filepath): # Preserve root elif self.spec['protocol'] == 'file': return PurePosixPath(Path(self.spec['location']), relative_filepath) + elif self.spec['protocol'] == 'fsspec': + # The pathlib library strips the double slashes used in uris. For now, + # just assume linux slash directory separator. + # TODO: Make this work on all systems, including Windows. + return self.spec['location'] + "/" + str(relative_filepath) else: assert False @@ -97,6 +105,10 @@ def _upload_file(self, local_path, external_path, metadata=None): self.s3.fput(local_path, external_path, metadata) elif self.spec['protocol'] == 'file': safe_copy(local_path, external_path, overwrite=True) + elif self.spec['protocol'] == 'fsspec': + parent = self._fs._parent(external_path) + self._fs.makedirs(parent) + self._fs.put_file(local_path, external_path) else: assert False @@ -105,6 +117,8 @@ def _download_file(self, external_path, download_path): self.s3.fget(external_path, download_path) elif self.spec['protocol'] == 'file': safe_copy(external_path, download_path) + elif self.spec['protocol'] == 'fsspec': + self._fs.get_file(external_path, download_path) else: assert False @@ -113,6 +127,10 @@ def _upload_buffer(self, buffer, external_path): self.s3.put(external_path, buffer) elif self.spec['protocol'] == 'file': safe_write(external_path, buffer) + elif self.spec['protocol'] == 'fsspec': + parent = self._fs._parent(external_path) + self._fs.makedirs(parent) + self._fs.pipe_file(external_path, buffer) else: assert False @@ -121,6 +139,8 @@ def _download_buffer(self, external_path): return self.s3.get(external_path) if self.spec['protocol'] == 'file': return Path(external_path).read_bytes() + if self.spec['protocol'] == 'fsspec': + return self._fs.cat_file(external_path) assert False def _remove_external_file(self, external_path): @@ -128,6 +148,10 @@ def _remove_external_file(self, external_path): self.s3.remove_object(external_path) elif self.spec['protocol'] == 'file': Path(external_path).unlink() + elif self.spec['protocol'] == 'fsspec': + self._fs.rm(external_path) + else: + assert False def exists(self, external_filepath): """ @@ -137,6 +161,8 @@ def exists(self, external_filepath): return self.s3.exists(external_filepath) if self.spec['protocol'] == 'file': return Path(external_filepath).is_file() + if self.spec['protocol'] == 'fsspec': + return self._fs.exists(external_filepath) assert False # --- BLOBS ---- diff --git a/datajoint/settings.py b/datajoint/settings.py index 3ebcf3ed9..412f63caf 100644 --- a/datajoint/settings.py +++ b/datajoint/settings.py @@ -137,7 +137,8 @@ def get_store_spec(self, store): spec['subfolding'] = spec.get('subfolding', DEFAULT_SUBFOLDING) spec_keys = { # REQUIRED in uppercase and allowed in lowercase 'file': ('PROTOCOL', 'LOCATION', 'subfolding', 'stage'), - 's3': ('PROTOCOL', 'ENDPOINT', 'BUCKET', 'ACCESS_KEY', 'SECRET_KEY', 'LOCATION', 'secure', 'subfolding', 'stage')} + 's3': ('PROTOCOL', 'ENDPOINT', 'BUCKET', 'ACCESS_KEY', 'SECRET_KEY', 'LOCATION', 'secure', 'subfolding', 'stage'), + 'fsspec': ('PROTOCOL', 'LOCATION', 'subfolding', 'stage')} try: spec_keys = spec_keys[spec.get('protocol', '').lower()] diff --git a/local-docker-compose.yml b/local-docker-compose.yml index 394240ad0..7535c5267 100644 --- a/local-docker-compose.yml +++ b/local-docker-compose.yml @@ -73,6 +73,8 @@ services: - S3_ACCESS_KEY=datajoint - S3_SECRET_KEY=datajoint - S3_BUCKET=datajoint.test + - AWS_ACCESS_KEY_ID=datajoint + - AWS_SECRET_ACCESS_KEY=datajoint - PYTHON_USER=dja - JUPYTER_PASSWORD=datajoint - DISPLAY @@ -82,6 +84,8 @@ services: - -c - | set -e + mkdir -p ~/.config/fsspec + echo '{"s3": {"client_kwargs": {"endpoint_url": "http://fakeservices.datajoint.io"}}}' > ~/.config/fsspec/s3.json pip install --user nose nose-cov coveralls flake8 ptvsd pip install -e . pip freeze | grep datajoint diff --git a/requirements.txt b/requirements.txt index 363271d45..48756a049 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,3 +10,4 @@ minio>=7.0.0 matplotlib cryptography otumat +fsspec[s3,gcs] diff --git a/tests/schema_external.py b/tests/schema_external.py index 7db44bc49..cc3ff52f0 100644 --- a/tests/schema_external.py +++ b/tests/schema_external.py @@ -37,7 +37,18 @@ S3_CONN_INFO, protocol='s3', location='dj/store/repo', - subfolding=(2, 4)) + subfolding=(2, 4)), + + 'fsspec': dict( + protocol='fsspec', + location=tempfile.mkdtemp(), + stage=tempfile.mkdtemp()), + + 'fsspec_s3': dict( + protocol='fsspec', + location='s3://datajoint.test/dj/store/fsspec', + stage=tempfile.mkdtemp()) + } dj.config['stores'] = stores_config @@ -63,6 +74,23 @@ class SimpleRemote(dj.Manual): """ +@schema +class SimpleFsSpec(dj.Manual): + definition = """ + simple : int + --- + item : blob@fsspec + """ + + +@schema +class SimpleFsSpecS3(dj.Manual): + definition = """ + simple : int + --- + item : blob@fsspec_s3 + """ + @schema class Seed(dj.Lookup): definition = """ diff --git a/tests/test_external.py b/tests/test_external.py index 22fee51ab..7eb7b9209 100644 --- a/tests/test_external.py +++ b/tests/test_external.py @@ -8,6 +8,8 @@ import datajoint as dj from .schema_external import stores_config from .schema_external import SimpleRemote +from .schema_external import SimpleFsSpec +from .schema_external import SimpleFsSpecS3 current_location_s3 = dj.config['stores']['share']['location'] current_location_local = dj.config['stores']['local']['location'] @@ -103,3 +105,15 @@ def test_file_leading_slash(): file external storage configured with leading slash """ test_s3_leading_slash(index=200, store='local') + + +def test_fsspec(): + value = np.array([1, 2, 3]) + + SimpleFsSpec.insert([{'simple': 0, 'item': value}]) + assert_true(np.array_equal( + value, (SimpleFsSpec & 'simple=0').fetch1('item'))) + + SimpleFsSpecS3.insert([{'simple': 0, 'item': value}]) + assert_true(np.array_equal( + value, (SimpleFsSpecS3 & 'simple=0').fetch1('item'))) diff --git a/tests/test_filepath.py b/tests/test_filepath.py index e01004827..9fc0f52c0 100644 --- a/tests/test_filepath.py +++ b/tests/test_filepath.py @@ -94,6 +94,16 @@ def test_filepath_s3(): test_filepath(store="repo_s3") +def test_filepath_fsspec(): + """ test file management with fsspec """ + test_filepath(store="fsspec") + + +def test_filepath_fsspec_s3(): + """ test file management with fsspec """ + test_filepath(store="fsspec_s3") + + def test_duplicate_upload(store="repo"): ext = schema.external[store] stage_path = dj.config['stores'][store]['stage']