Skip to content

Commit

Permalink
Multipart upload fixed - it was a locking issue.
Browse files Browse the repository at this point in the history
  • Loading branch information
danilop committed Apr 22, 2014
1 parent b1f4b16 commit 1d75b6e
Showing 1 changed file with 4 additions and 9 deletions.
13 changes: 4 additions & 9 deletions yas3fs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,12 +508,12 @@ def do_HEAD(self):

class PartOfFSData():
""" To read just a part of an existing FSData, inspired by FileChunkIO """
def __init__(self, data, start, length):
def __init__(self, data, lock, start, length):
self.data = data
self.start = start
self.length = length
self.pos = 0
self.lock = threading.Lock()
self.lock = lock
def seek(self, offset, whence=0):
logger.debug("seek '%i' '%i'" % (offset, whence))
if whence == 0:
Expand All @@ -529,15 +529,9 @@ def read(self, n=-1):
% (n, self.data, self.data.content, self.pos, self.start, self.length))
if n >= 0:
n = min([n, self.length - self.pos])
logger.debug("read before lock '%i' '%s' '%s' at '%i' starting from '%i' for '%i'"
% (n, self.data, self.data.content, self.pos, self.start, self.length))
with self.lock:
logger.debug("read within lock '%i' '%s' '%s' at '%i' starting from '%i' for '%i'"
% (n, self.data, self.data.content, self.pos, self.start, self.length))
self.data.content.seek(self.start + self.pos)
s = self.data.content.read(n)
logger.debug("read after lock '%i' '%s' '%s' at '%i' starting from '%i' for '%i'"
% (n, self.data, self.data.content, self.pos, self.start, self.length))
self.pos += len(s)
return s
else:
Expand Down Expand Up @@ -1981,14 +1975,15 @@ def multipart_upload(self, key_path, data, full_size, headers, metadata):
part_queue = Queue.Queue()
multipart_size = max(self.multipart_size, full_size / 100) # No more than 100 parts...
logger.debug("multipart_upload '%s' multipart_size '%s'" % (key_path, multipart_size))
upload_lock = threading.Lock()
while part_pos < full_size:
bytes_left = full_size - part_pos
if bytes_left > self.multipart_size:
part_size = self.multipart_size
else:
part_size = bytes_left
part_num += 1
part_queue.put([ part_num, PartOfFSData(data, part_pos, part_size) ])
part_queue.put([ part_num, PartOfFSData(data, upload_lock, part_pos, part_size) ])
part_pos += part_size
logger.debug("part from %i for %i" % (part_pos, part_size))
logger.debug("initiate_multipart_upload '%s' '%s'" % (key_path, headers))
Expand Down

0 comments on commit 1d75b6e

Please sign in to comment.