Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fsspec as alternate file protocol #946

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions datajoint/external.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from pathlib import Path, PurePosixPath, PureWindowsPath
from collections import Mapping
from fsspec.core import url_to_fs
from tqdm import tqdm
from .settings import config
from .errors import DataJointError, MissingExternalFile
Expand Down Expand Up @@ -44,6 +45,8 @@ def __init__(self, connection, store, database):
if self.spec['protocol'] == 'file' and not Path(self.spec['location']).is_dir():
raise FileNotFoundError('Inaccessible local directory %s' %
self.spec['location']) from None
if self.spec['protocol'] == 'fsspec':
self._fs, _ = url_to_fs(self.spec['location'])

@property
def definition(self):
Expand Down Expand Up @@ -84,6 +87,11 @@ def _make_external_filepath(self, relative_filepath):
# Preserve root
elif self.spec['protocol'] == 'file':
return PurePosixPath(Path(self.spec['location']), relative_filepath)
elif self.spec['protocol'] == 'fsspec':
# The pathlib library strips the double slashes used in uris. For now,
# just assume linux slash directory separator.
# TODO: Make this work on all systems, including Windows.
return self.spec['location'] + "/" + str(relative_filepath)
else:
assert False

Expand All @@ -97,6 +105,10 @@ def _upload_file(self, local_path, external_path, metadata=None):
self.s3.fput(local_path, external_path, metadata)
elif self.spec['protocol'] == 'file':
safe_copy(local_path, external_path, overwrite=True)
elif self.spec['protocol'] == 'fsspec':
parent = self._fs._parent(external_path)
self._fs.makedirs(parent)
self._fs.put_file(local_path, external_path)
else:
assert False

Expand All @@ -105,6 +117,8 @@ def _download_file(self, external_path, download_path):
self.s3.fget(external_path, download_path)
elif self.spec['protocol'] == 'file':
safe_copy(external_path, download_path)
elif self.spec['protocol'] == 'fsspec':
self._fs.get_file(external_path, download_path)
else:
assert False

Expand All @@ -113,6 +127,10 @@ def _upload_buffer(self, buffer, external_path):
self.s3.put(external_path, buffer)
elif self.spec['protocol'] == 'file':
safe_write(external_path, buffer)
elif self.spec['protocol'] == 'fsspec':
parent = self._fs._parent(external_path)
self._fs.makedirs(parent)
self._fs.pipe_file(external_path, buffer)
else:
assert False

Expand All @@ -121,13 +139,19 @@ def _download_buffer(self, external_path):
return self.s3.get(external_path)
if self.spec['protocol'] == 'file':
return Path(external_path).read_bytes()
if self.spec['protocol'] == 'fsspec':
return self._fs.cat_file(external_path)
assert False

def _remove_external_file(self, external_path):
if self.spec['protocol'] == 's3':
self.s3.remove_object(external_path)
elif self.spec['protocol'] == 'file':
Path(external_path).unlink()
elif self.spec['protocol'] == 'fsspec':
self._fs.rm(external_path)
else:
assert False

def exists(self, external_filepath):
"""
Expand All @@ -137,6 +161,8 @@ def exists(self, external_filepath):
return self.s3.exists(external_filepath)
if self.spec['protocol'] == 'file':
return Path(external_filepath).is_file()
if self.spec['protocol'] == 'fsspec':
return self._fs.exists(external_filepath)
assert False

# --- BLOBS ----
Expand Down
3 changes: 2 additions & 1 deletion datajoint/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ def get_store_spec(self, store):
spec['subfolding'] = spec.get('subfolding', DEFAULT_SUBFOLDING)
spec_keys = { # REQUIRED in uppercase and allowed in lowercase
'file': ('PROTOCOL', 'LOCATION', 'subfolding', 'stage'),
's3': ('PROTOCOL', 'ENDPOINT', 'BUCKET', 'ACCESS_KEY', 'SECRET_KEY', 'LOCATION', 'secure', 'subfolding', 'stage')}
's3': ('PROTOCOL', 'ENDPOINT', 'BUCKET', 'ACCESS_KEY', 'SECRET_KEY', 'LOCATION', 'secure', 'subfolding', 'stage'),
'fsspec': ('PROTOCOL', 'LOCATION', 'subfolding', 'stage')}

try:
spec_keys = spec_keys[spec.get('protocol', '').lower()]
Expand Down
4 changes: 4 additions & 0 deletions local-docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ services:
- S3_ACCESS_KEY=datajoint
- S3_SECRET_KEY=datajoint
- S3_BUCKET=datajoint.test
- AWS_ACCESS_KEY_ID=datajoint
- AWS_SECRET_ACCESS_KEY=datajoint
- PYTHON_USER=dja
- JUPYTER_PASSWORD=datajoint
- DISPLAY
Expand All @@ -82,6 +84,8 @@ services:
- -c
- |
set -e
mkdir -p ~/.config/fsspec
echo '{"s3": {"client_kwargs": {"endpoint_url": "http://fakeservices.datajoint.io"}}}' > ~/.config/fsspec/s3.json
pip install --user nose nose-cov coveralls flake8 ptvsd
pip install -e .
pip freeze | grep datajoint
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ minio>=7.0.0
matplotlib
cryptography
otumat
fsspec[s3,gcs]
30 changes: 29 additions & 1 deletion tests/schema_external.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,18 @@
S3_CONN_INFO,
protocol='s3',
location='dj/store/repo',
subfolding=(2, 4))
subfolding=(2, 4)),

'fsspec': dict(
protocol='fsspec',
location=tempfile.mkdtemp(),
stage=tempfile.mkdtemp()),

'fsspec_s3': dict(
protocol='fsspec',
location='s3://datajoint.test/dj/store/fsspec',
stage=tempfile.mkdtemp())

}

dj.config['stores'] = stores_config
Expand All @@ -63,6 +74,23 @@ class SimpleRemote(dj.Manual):
"""


@schema
class SimpleFsSpec(dj.Manual):
definition = """
simple : int
---
item : blob@fsspec
"""


@schema
class SimpleFsSpecS3(dj.Manual):
definition = """
simple : int
---
item : blob@fsspec_s3
"""

@schema
class Seed(dj.Lookup):
definition = """
Expand Down
14 changes: 14 additions & 0 deletions tests/test_external.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import datajoint as dj
from .schema_external import stores_config
from .schema_external import SimpleRemote
from .schema_external import SimpleFsSpec
from .schema_external import SimpleFsSpecS3
current_location_s3 = dj.config['stores']['share']['location']
current_location_local = dj.config['stores']['local']['location']

Expand Down Expand Up @@ -103,3 +105,15 @@ def test_file_leading_slash():
file external storage configured with leading slash
"""
test_s3_leading_slash(index=200, store='local')


def test_fsspec():
value = np.array([1, 2, 3])

SimpleFsSpec.insert([{'simple': 0, 'item': value}])
assert_true(np.array_equal(
value, (SimpleFsSpec & 'simple=0').fetch1('item')))

SimpleFsSpecS3.insert([{'simple': 0, 'item': value}])
assert_true(np.array_equal(
value, (SimpleFsSpecS3 & 'simple=0').fetch1('item')))
10 changes: 10 additions & 0 deletions tests/test_filepath.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,16 @@ def test_filepath_s3():
test_filepath(store="repo_s3")


def test_filepath_fsspec():
""" test file management with fsspec """
test_filepath(store="fsspec")


def test_filepath_fsspec_s3():
""" test file management with fsspec """
test_filepath(store="fsspec_s3")


def test_duplicate_upload(store="repo"):
ext = schema.external[store]
stage_path = dj.config['stores'][store]['stage']
Expand Down