Skip to content

Commit

Permalink
S3 streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
mrk-its committed Nov 15, 2019
1 parent 7f95af6 commit 3696a90
Show file tree
Hide file tree
Showing 2 changed files with 264 additions and 171 deletions.
190 changes: 19 additions & 171 deletions fs_s3fs/_s3fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,14 @@
from datetime import datetime
import io
import itertools
import os
from ssl import SSLError
import tempfile
import threading
import mimetypes

import boto3
from botocore.exceptions import ClientError, EndpointConnectionError

import six
from six import text_type

from fs import ResourceType
from fs.base import FS
Expand All @@ -29,6 +26,8 @@
from fs.path import basename, dirname, forcedir, join, normpath, relpath
from fs.time import datetime_to_epoch

from ._s3fs_file import S3InputFile, S3OutputFile


def _make_repr(class_name, *args, **kwargs):
"""
Expand Down Expand Up @@ -57,115 +56,6 @@ def __repr__(self):
return "{}({})".format(class_name, ", ".join(arguments))


class S3File(io.IOBase):
"""Proxy for a S3 file."""

@classmethod
def factory(cls, filename, mode, on_close):
"""Create a S3File backed with a temporary file."""
_temp_file = tempfile.TemporaryFile()
proxy = cls(_temp_file, filename, mode, on_close=on_close)
return proxy

def __repr__(self):
return _make_repr(
self.__class__.__name__, self.__filename, text_type(self.__mode)
)

def __init__(self, f, filename, mode, on_close=None):
self._f = f
self.__filename = filename
self.__mode = mode
self._on_close = on_close

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
self.close()

@property
def raw(self):
return self._f

def close(self):
if self._on_close is not None:
self._on_close(self)

@property
def closed(self):
return self._f.closed

def fileno(self):
return self._f.fileno()

def flush(self):
return self._f.flush()

def isatty(self):
return self._f.asatty()

def readable(self):
return self.__mode.reading

def readline(self, limit=-1):
return self._f.readline(limit)

def readlines(self, hint=-1):
if hint == -1:
return self._f.readlines(hint)
else:
size = 0
lines = []
for line in iter(self._f.readline, b""):
lines.append(line)
size += len(line)
if size > hint:
break
return lines

def seek(self, offset, whence=os.SEEK_SET):
if whence not in (os.SEEK_CUR, os.SEEK_END, os.SEEK_SET):
raise ValueError("invalid value for 'whence'")
self._f.seek(offset, whence)
return self._f.tell()

def seekable(self):
return True

def tell(self):
return self._f.tell()

def writable(self):
return self.__mode.writing

def writelines(self, lines):
return self._f.writelines(lines)

def read(self, n=-1):
if not self.__mode.reading:
raise IOError("not open for reading")
return self._f.read(n)

def readall(self):
return self._f.readall()

def readinto(self, b):
return self._f.readinto()

def write(self, b):
if not self.__mode.writing:
raise IOError("not open for reading")
self._f.write(b)
return len(b)

def truncate(self, size=None):
if size is None:
size = self._f.tell()
self._f.truncate(size)
return size


@contextlib.contextmanager
def s3errors(path):
"""Translate S3 errors to FSErrors."""
Expand Down Expand Up @@ -527,29 +417,18 @@ def openbin(self, path, mode="r", buffering=-1, **options):
_path = self.validatepath(path)
_key = self._path_to_key(_path)

if _mode.create:
if _mode.appending:
raise errors.ResourceError(path, msg="append mode is not supported")

def on_close_create(s3file):
"""Called when the S3 file closes, to upload data."""
if _mode.create:
if self.strict:
try:
s3file.raw.seek(0)
with s3errors(path):
self.client.upload_fileobj(
s3file.raw,
self._bucket_name,
_key,
ExtraArgs=self._get_upload_args(_key),
)
finally:
s3file.raw.close()

try:
dir_path = dirname(_path)
if dir_path != "/":
_dir_key = self._path_to_dir_key(dir_path)
self._get_object(dir_path, _dir_key)
except errors.ResourceNotFound:
raise errors.ResourceNotFound(path)
dir_path = dirname(_path)
if dir_path != "/":
_dir_key = self._path_to_dir_key(dir_path)
self._get_object(dir_path, _dir_key)
except errors.ResourceNotFound:
raise errors.ResourceNotFound(path)

try:
info = self._getinfo(path)
Expand All @@ -561,50 +440,19 @@ def on_close_create(s3file):
if info.is_dir:
raise errors.FileExpected(path)

s3file = S3File.factory(path, _mode, on_close=on_close_create)
if _mode.appending:
try:
with s3errors(path):
self.client.download_fileobj(
self._bucket_name,
_key,
s3file.raw,
ExtraArgs=self.download_args,
)
except errors.ResourceNotFound:
pass
else:
s3file.seek(0, os.SEEK_END)

return s3file
obj = self.s3.Object(self._bucket_name, _key)
return S3OutputFile(
obj,
upload_kwargs=self._get_upload_args(_key)
)

if self.strict:
info = self.getinfo(path)
if info.is_dir:
raise errors.FileExpected(path)

def on_close(s3file):
"""Called when the S3 file closes, to upload the data."""
try:
if _mode.writing:
s3file.raw.seek(0, os.SEEK_SET)
with s3errors(path):
self.client.upload_fileobj(
s3file.raw,
self._bucket_name,
_key,
ExtraArgs=self._get_upload_args(_key),
)
finally:
s3file.raw.close()

s3file = S3File.factory(path, _mode, on_close=on_close)
with s3errors(path):
self.client.download_fileobj(
self._bucket_name, _key, s3file.raw, ExtraArgs=self.download_args
)
s3file.seek(0, os.SEEK_SET)
return s3file
obj = self.s3.Object(self._bucket_name, _key)
return S3InputFile(obj)

def remove(self, path):
self.check()
Expand Down
Loading

0 comments on commit 3696a90

Please sign in to comment.