Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed! Upload in parallel #41

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,13 +230,14 @@ def main(argv=None): # IGNORE:C0111
parser.add_argument('-d', '--dryrun', action='store_true', help="Show what would be uploaded but don't upload it.")
parser.add_argument('-f', '--force', action='store_true', help="Force uploads, else only upload if changed.")
parser.add_argument('-s', '--scheduled', action='store_true', help="Upload only when the configured schedule allows.")
parser.add_argument('-n','--num_threads', nargs='?', const=1, type=int,help="Number of parallel uploads while uploading a directory")
parser.add_argument('local', help='The file/folder/path to put')
parser.add_argument('remote', nargs='?', help='The remote folder to put it in')
args = parser.parse_args()
if args.config:
degoo.api.report_config()

result = degoo.put(args.local, args.remote, args.verbose, not args.force, args.dryrun, args.scheduled)
result = degoo.put(args.local, args.remote, args.verbose, not args.force, args.dryrun, args.scheduled,args.num_threads)

if not args.dryrun:
if len(result) == 3:
Expand Down
77 changes: 62 additions & 15 deletions degoo/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@
import getpass
import requests
import humanfriendly
import threading
import logging
import time
import queue

from appdirs import user_config_dir
from datetime import datetime
from dateutil.tz import tzutc, tzlocal

import pathlib
from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor
# from clint.textui.progress import Bar as ProgressBar

Expand Down Expand Up @@ -907,8 +911,34 @@ def get(remote_path, local_directory=None, verbose=0, if_missing=False, dry_run=
else:
return get_file(item['ID'], local_directory, verbose, if_missing, dry_run, schedule)


def put_file(local_file, remote_folder, verbose=0, if_changed=False, dry_run=False, schedule=False):
def start_queue(num_workers):
q = queue.Queue()
for i in range(num_workers):
worker = threading.Thread(target=worker_func, args=(q, i,), daemon=True)
worker.start()
return (q)


def worker_func(q, thread_no):
while True:
task = q.get()
for i in range(3):
try:
put_file(task[0],task[1],task[2],task[3],task[4],task[5],task[6])

break
except Exception as e:
if i >=2:
logging.error(f'ERROR could not upload {os.path.basename(task[0])} to {task[1]}')
logging.error(e)
break
continue
q.task_done()
print(f'Thread #{thread_no} is done uploading {os.path.basename(task[0])}. #{q.qsize()} tasks left ')



def put_file(local_file, remote_folder, verbose=0, if_changed=False, dry_run=False, schedule=False,show_progress = True):
'''
Uploads a local_file to the Degoo cloud store.

Expand All @@ -928,6 +958,7 @@ def progress(monitor):

:param monitor: And instance of MultipartEncoderMonitor
'''

return wget.callback_progress(monitor.bytes_read, 1, monitor.len, wget.bar_adaptive)

if schedule:
Expand Down Expand Up @@ -1003,7 +1034,8 @@ def progress(monitor):
# Odd, to say the least.
Type = os.path.splitext(local_file)[1][1:]
Checksum = api.check_sum(local_file)

Checksum = Checksum.replace("/","_")
Checksum = Checksum.replace("+","-")
if Type:
Key = "{}{}/{}.{}".format(KeyPrefix, Type, Checksum, Type)
else:
Expand All @@ -1028,14 +1060,17 @@ def progress(monitor):

# Perform the upload
multipart = MultipartEncoder(fields=dict(parts))
monitor = MultipartEncoderMonitor(multipart, progress)
if show_progress:
monitor = MultipartEncoderMonitor(multipart,progress)
else:
monitor = MultipartEncoderMonitor(multipart)

heads = {"ngsw-bypass": "1", "content-type": multipart.content_type, "content-length": str(multipart.len)}

response = requests.post(BaseURL, data=monitor, headers=heads)

# A new line after the progress bar is complete
print()
#print()

# We expect a 204 status result, which is silent acknowledgement of success.
if response.ok and response.status_code == 204:
Expand Down Expand Up @@ -1131,7 +1166,7 @@ def progress(monitor):
return (ID, Path, URL)


def put_directory(local_directory, remote_folder, verbose=0, if_changed=False, dry_run=False, schedule=False):
def put_directory(local_directory, remote_folder, verbose=0, if_changed=False, dry_run=False, schedule=False,num_threads=1):
'''
Uploads a local directory recursively to the Degoo cloud store.

Expand All @@ -1145,11 +1180,20 @@ def put_directory(local_directory, remote_folder, verbose=0, if_changed=False, d
:returns: A tuple containing the Degoo ID and the Remote file path
'''
IDs = {}
format = "%(asctime)s: %(message)s"

logging.basicConfig(format=format, level=logging.INFO,

datefmt="%H:%M:%S")

q = start_queue(num_threads)

target_dir = get_dir(remote_folder)
(target_junk, target_name) = os.path.split(local_directory)

Root = target_name
path = pathlib.PurePath(local_directory)
target_name = path.name
Root = local_directory
print("Localdir: ",local_directory,"Target Dir:",target_dir,target_junk,"target name:",target_name)
IDs[Root] = mkdir(target_name, target_dir['ID'], verbose - 1, dry_run)

for root, dirs, files in os.walk(local_directory):
Expand All @@ -1163,19 +1207,22 @@ def put_directory(local_directory, remote_folder, verbose=0, if_changed=False, d

for name in dirs:
Name = os.path.join(root, name)

IDs[Name] = mkdir(name, IDs[relative_root], verbose - 1, dry_run)
print (name,relative_root,IDs)
IDs[Name] = mkdir(name, IDs[root], verbose - 1, dry_run)

for name in files:
Name = os.path.join(root, name)

put_file(Name, IDs[relative_root], verbose, if_changed, dry_run, schedule)
if num_threads == 1: ## Enable or disable Progressbar
q.put([Name, IDs[root], verbose, if_changed, dry_run, schedule,True])
else:
q.put([Name, IDs[root], verbose, if_changed, dry_run, schedule,False])

# Directories have no download URL, they exist only as Degoo metadata
q.join()
return (IDs[Root], target_dir["Path"])


def put(local_path, remote_folder, verbose=0, if_changed=False, dry_run=False, schedule=False):
def put(local_path, remote_folder, verbose=0, if_changed=False, dry_run=False, schedule=False,num_threads=1):
'''
Uplads a file or folder to the Degoo cloud store

Expand All @@ -1189,7 +1236,7 @@ def put(local_path, remote_folder, verbose=0, if_changed=False, dry_run=False, s
isDirectory = os.path.isdir(local_path)

if isDirectory:
return put_directory(local_path, remote_folder, verbose, if_changed, dry_run, schedule)
return put_directory(local_path, remote_folder, verbose, if_changed, dry_run, schedule,num_threads)
elif isFile:
return put_file(local_path, remote_folder, verbose, if_changed, dry_run, schedule)
else:
Expand Down