Skip to content

Commit 47d571a

Browse files
committed
Added cloud storage utility to toolkit
1 parent 7fcd231 commit 47d571a

File tree

4 files changed

+121
-1
lines changed

4 files changed

+121
-1
lines changed

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ psycopg2-binary
33
pandas
44
pyYAML
55
click
6+
boto3

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ def get_version():
1919

2020
__version__ = get_version()
2121

22-
install_requires = ['web.py', 'psycopg2-binary', 'pandas', 'pyYAML', 'click']
22+
install_requires = ['web.py', 'psycopg2-binary', 'pandas', 'pyYAML', 'click', 'boto3']
2323
extras_require = {
2424
'all': ['requests']
2525
}

toolkit/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@
77
from .dateutil import relative_date, truncate_date # noqa
88
from .signals import Signal # noqa
99
from .fileformat import FileFormat # noqa
10+
from .storage import StoragePath # noqa

toolkit/storage.py

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
"""Storage utility to work with S3, GCS and minio.
2+
3+
HOW TO USE:
4+
* Make sure that credentials are configured the way boto3 expects
5+
* You need to do extra setup to use this module with GCS
6+
- Generate GCS HMAC credentials and set them as aws crentials.
7+
- Please make sure that endpoint url is set to 'https://storage.googleapis.com'
8+
"""
9+
import boto3
10+
import botocore
11+
12+
s3 = boto3.resource("s3")
13+
14+
class StoragePath:
15+
"""The StoragePath class provides a pathlib.Path like interface for
16+
storage.
17+
USAGE:
18+
root = StoragePath(bucket_name, "alpha")
19+
path = root.join("datasets", "customer-master", "template.csv")
20+
text = path.read_text()
21+
22+
TODO: Add delete and list directoy methods.
23+
"""
24+
def __init__(self, bucket: str, path: str):
25+
self.bucket = bucket
26+
self.path = path
27+
28+
@property
29+
def _object(self):
30+
return s3.Object(bucket_name=self.bucket, key=self.path)
31+
32+
def exists(self):
33+
"""Tells the storage path exists or not.
34+
35+
Checks if the path exists or not by getting objects metadata.
36+
"""
37+
obj = self._object
38+
try:
39+
obj.metadata
40+
return True
41+
except botocore.exceptions.ClientError as e:
42+
if e.response['Error']['Code'] == "404":
43+
return False
44+
raise
45+
46+
def delete(self):
47+
"""Deletes the storage path file.
48+
"""
49+
obj = self._object
50+
obj.delete()
51+
52+
def download(self, local_path):
53+
"""Download the contents of storage file to the local_path file.
54+
"""
55+
obj = self._object
56+
obj.download_file(local_path)
57+
58+
def upload(self, local_path):
59+
"""Uploads the file from local_path to storage path.
60+
"""
61+
obj = self._object
62+
obj.upload_file(local_path)
63+
64+
def read_text(self):
65+
"""Read the contents of a path
66+
"""
67+
obj = self._object
68+
return obj.get()['Body'].read()
69+
70+
def _get_presigned_url(self, client_method, expires=600, content_type=None):
71+
"""Returns a presigned URL for upload or download.
72+
The client_method should be one of get_object or put_object.
73+
"""
74+
params = {
75+
'Bucket': self.bucket,
76+
'Key': self.path,
77+
}
78+
if content_type:
79+
params['ContentType'] = content_type
80+
81+
return s3.meta.client.generate_presigned_url(client_method,
82+
Params=params,
83+
ExpiresIn=expires
84+
)
85+
86+
def get_presigned_url_for_download(self, expires=3600):
87+
"""Returns a presigned URL for upload.
88+
89+
The default expiry is one hour (3600 seconds).
90+
"""
91+
return self._get_presigned_url(client_method='get_object', expires=expires)
92+
93+
def get_presigned_url_for_upload(self, expires=600, content_type="text/csv"):
94+
"""Returns a presigned URL for upload.
95+
96+
The default expiry is 10 minutes (600 seconds).
97+
"""
98+
return self._get_presigned_url(client_method='put_object', expires=expires, content_type=content_type)
99+
100+
def read_dataframe(self):
101+
"""TODO: Support csv and parq.
102+
"""
103+
pass
104+
105+
def copy_to(self, dest_path):
106+
"""Copy the file to destination path within the bucket.
107+
"""
108+
pass
109+
110+
def join(self, *parts):
111+
"""Combine the storage path with one or more parts and returns a new path.
112+
"""
113+
path = "/".join([self.path] + list(parts))
114+
return StoragePath(self.bucket, path)
115+
116+
def __repr__(self):
117+
return f'<StoragePath {self.path}>'
118+

0 commit comments

Comments
 (0)