Skip to content

Commit

Permalink
adds a google storage blobstore
Browse files Browse the repository at this point in the history
  • Loading branch information
bmabey committed Jun 1, 2017
1 parent 6505241 commit b4df3cc
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 4 deletions.
13 changes: 13 additions & 0 deletions provenance/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,19 @@ def __init__(self, *args, **kargs):
BLOBSTORE_TYPES['sftp'] = SFTPStore


try:
import provenance.google_storage as gs
BLOBSTORE_TYPES['gs'] = gs.GSStore

except ImportError as e:
class GSStore(object):
_err = e
def __init__(self, *args, **kargs):
raise(self._err)

BLOBSTORE_TYPES['gs'] = GSStore


blobstore_from_config = atomic_item_from_config(type_dict=BLOBSTORE_TYPES,
item_plural='Blobstores')

Expand Down
8 changes: 4 additions & 4 deletions provenance/blobstores.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,19 +223,19 @@ class S3Store(RemoteStore):
def __init__(self, cachedir, basepath, s3_config=None, s3fs=None,
read=True, write=True, read_through_write=True,
delete=False, on_duplicate_key='skip', cleanup_cachedir=False,
always_check_s3=False):
always_check_remote=False):
"""
Parameters
----------
always_check_s3 : bool
always_check_remote : bool
When True S3 will be checked with every __contains__ call. Otherwise it will
short-circuit if the blob is found in the cachedir. For performance reasons this
should always be set to False. The only reason why you would want to use this
is if you are using a S3Store and a DiskStore in a ChainedStore together for
some reason. Since the S3Store basically doubles as a DiskStore with it's cachedir
chaining the two doesn't really make sense though.
"""
super(S3Store, self).__init__(always_check_remote=always_check_s3,
super(S3Store, self).__init__(always_check_remote=always_check_remote,
cachedir = cachedir,
basepath = basepath,
cleanup_cachedir = cleanup_cachedir,
Expand All @@ -262,7 +262,7 @@ def _download_file(self, remote_path, dest_filename):
self.s3fs.get(remote_path, dest_filename)



class ChainedStore(BaseBlobStore):
def __init__(self, stores, read=True, write=True, read_through_write=True,
delete=True, on_duplicate_key='skip'):
Expand Down
93 changes: 93 additions & 0 deletions provenance/google_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import os
import shutil

from boltons import funcutils as bfu
from google.cloud import storage as gs
from joblib.disk import mkdirp
from memoized_property import memoized_property

from . import blobstores as bs


# TODO: catch and retry w/new client on
# BrokenPipeError: [Errno 32] Broken pipe
# ConnectionResetError: [Errno 54] Connection reset by peer
# more?

def retry(f, max_attempts=2):

@bfu.wraps(f)
def with_retry(store, *args, **kargs):
actual_attempts = 0
while True:
try:
return f(store, *args, **kargs)
except (BrokenPipeError, ConnectionError) as e:
actual_attempts +=1
if actual_attempts >= max_attempts:
raise e
else:
store._setup_client()
return with_retry



class GSStore(bs.RemoteStore):
def __init__(self, cachedir, bucket, basepath='', project=None,
read=True, write=True, read_through_write=True,
delete=False, on_duplicate_key='skip', cleanup_cachedir=False,
always_check_remote=False):
"""
Parameters
----------
always_check_remote : bool
When True GS (Google Storage) will be checked with every __contains__ call. Otherwise it will
short-circuit if the blob is found in the cachedir. For performance reasons this
should always be set to False. The only reason why you would want to use this
is if you are using a GSStore and a DiskStore in a ChainedStore together for
some reason. Since the GSStore basically doubles as a DiskStore with it's cachedir
chaining the two doesn't really make sense though.
"""
super(GSStore, self).__init__(always_check_remote=always_check_remote,
cachedir = cachedir,
basepath = basepath,
cleanup_cachedir = cleanup_cachedir,
read=read, write=write, read_through_write=read_through_write,
delete=delete, on_duplicate_key=on_duplicate_key)

self.bucket_name = bucket
self.project = project

def _setup_client(self):
del self._client
del self._bucket
# force re-memoization
assert self.bucket is not None

@memoized_property
def client(self):
return gs.Client(project=self.project)

@memoized_property
def bucket(self):
return self.client.get_bucket(self.bucket_name)

@retry
def _exists(self, path):
blobs = list(self.bucket.list_blobs(prefix=path))
return len(blobs) == 1

@retry
def _delete_remote(self, path):
self.blob(path).delete()

def _blob(self, path):
return self._bucket.blob(path)

@retry
def _upload_file(self, filename, path):
self._blob(path).upload_from_filename(filename)

@retry
def _download_file(self, remote_path, dest_filename):
self._blob(remote_path).download_to_filename(dest_filename)
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

subpackages = {
'sftp': ['paramiko'],
'google_storage': ['google-cloud'],
'vis': ['graphviz', 'frozendict']
}

Expand Down

0 comments on commit b4df3cc

Please sign in to comment.