12
12
import copy
13
13
import math
14
14
import os
15
+ import pprint
15
16
import shutil
16
17
17
18
import concurrent .futures
18
- import threading
19
19
20
20
from .common import CHUNK_SIZE , ClientError
21
21
from .merginproject import MerginProject
@@ -76,14 +76,14 @@ def _download_items(file, directory, diff_only=False):
76
76
return items
77
77
78
78
79
- def _do_download (item , mc , project_path , job ):
79
+ def _do_download (item , mc , mp , project_path , job ):
80
80
""" runs in worker thread """
81
81
if job .is_cancelled :
82
82
return
83
83
84
84
# TODO: make download_blocking / save_to_file cancellable so that we can cancel as soon as possible
85
85
86
- item .download_blocking (mc , project_path )
86
+ item .download_blocking (mc , mp , project_path )
87
87
job .transferred_size += item .size
88
88
89
89
@@ -99,9 +99,13 @@ def download_project_async(mc, project_path, directory):
99
99
os .makedirs (directory )
100
100
mp = MerginProject (directory )
101
101
102
+ mp .log .info (f"--- start download { project_path } " )
103
+
102
104
project_info = mc .project_info (project_path )
103
105
version = project_info ['version' ] if project_info ['version' ] else 'v0'
104
106
107
+ mp .log .info (f"got project info. version { version } " )
108
+
105
109
# prepare download
106
110
update_tasks = [] # stuff to do at the end of download
107
111
for file in project_info ['files' ]:
@@ -116,14 +120,16 @@ def download_project_async(mc, project_path, directory):
116
120
download_list .extend (task .download_queue_items )
117
121
for item in task .download_queue_items :
118
122
total_size += item .size
123
+
124
+ mp .log .info (f"will download { len (update_tasks )} files in { len (download_list )} chunks, total size { total_size } " )
119
125
120
126
job = DownloadJob (project_path , total_size , version , update_tasks , download_list , directory , mp , project_info )
121
127
122
128
# start download
123
129
job .executor = concurrent .futures .ThreadPoolExecutor (max_workers = 4 )
124
130
job .futures = []
125
131
for item in download_list :
126
- future = job .executor .submit (_do_download , item , mc , project_path , job )
132
+ future = job .executor .submit (_do_download , item , mc , mp , project_path , job )
127
133
job .futures .append (future )
128
134
129
135
return job
@@ -167,6 +173,8 @@ def download_project_finalize(job):
167
173
if future .exception () is not None :
168
174
raise future .exception ()
169
175
176
+ job .mp .log .info ("--- download finished" )
177
+
170
178
for task in job .update_tasks :
171
179
172
180
# right now only copy tasks...
@@ -235,9 +243,10 @@ def __repr__(self):
235
243
return "<DownloadQueueItem path={} version={} diff_only={} part_index={} size={} dest={}>" .format (
236
244
self .file_path , self .version , self .diff_only , self .part_index , self .size , self .download_file_path )
237
245
238
- def download_blocking (self , mc , project_path ):
246
+ def download_blocking (self , mc , mp , project_path ):
239
247
""" Starts download and only returns once the file has been fully downloaded and saved """
240
248
249
+ mp .log .debug (f"Downloading { self .file_path } version={ self .version } diff={ self .diff_only } part={ self .part_index } " )
241
250
start = self .part_index * (1 + CHUNK_SIZE )
242
251
resp = mc .get ("/v1/project/raw/{}" .format (project_path ), data = {
243
252
"file" : self .file_path ,
@@ -248,9 +257,11 @@ def download_blocking(self, mc, project_path):
248
257
}
249
258
)
250
259
if resp .status in [200 , 206 ]:
260
+ mp .log .debug (f"Download finished: { self .file_path } " )
251
261
save_to_file (resp , self .download_file_path )
252
262
else :
253
- raise ClientError ('Failed to download part {} of file {}' .format (part , basename ))
263
+ mp .log .error (f"Download failed: { self .file_path } " )
264
+ raise ClientError ('Failed to download part {} of file {}' .format (self .part_index , self .file_path ))
254
265
255
266
256
267
class PullJob :
@@ -290,19 +301,27 @@ def pull_project_async(mc, directory):
290
301
mp = MerginProject (directory )
291
302
project_path = mp .metadata ["name" ]
292
303
local_version = mp .metadata ["version" ]
304
+
305
+ mp .log .info (f"--- start pull { project_path } " )
306
+
293
307
server_info = mc .project_info (project_path , since = local_version )
294
- if local_version == server_info ["version" ]:
308
+ server_version = server_info ["version" ]
309
+
310
+ mp .log .info (f"got project info. version { server_version } " )
311
+
312
+ if local_version == server_version :
313
+ mp .log .info ("--- pull - nothing to do (already at server version)" )
295
314
return # Project is up to date
296
315
297
316
# we either download a versioned file using diffs (strongly preferred),
298
317
# but if we don't have history with diffs (e.g. uploaded without diffs)
299
318
# then we just download the whole file
300
319
_pulling_file_with_diffs = lambda f : 'diffs' in f and len (f ['diffs' ]) != 0
301
320
302
- server_version = server_info ["version" ]
303
321
temp_dir = mp .fpath_meta (f'fetch_{ local_version } -{ server_version } ' )
304
322
os .makedirs (temp_dir , exist_ok = True )
305
323
pull_changes = mp .get_pull_changes (server_info ["files" ])
324
+ mp .log .debug ("pull changes:\n " + pprint .pformat (pull_changes ))
306
325
fetch_files = []
307
326
for f in pull_changes ["added" ]:
308
327
f ['version' ] = server_version
@@ -364,13 +383,15 @@ def pull_project_async(mc, directory):
364
383
for item in file_to_merge .downloaded_items :
365
384
total_size += item .size
366
385
386
+ mp .log .info (f"will download { len (download_list )} chunks, total size { total_size } " )
387
+
367
388
job = PullJob (project_path , pull_changes , total_size , server_version , files_to_merge , download_list , temp_dir , mp , server_info , basefiles_to_patch )
368
389
369
390
# start download
370
391
job .executor = concurrent .futures .ThreadPoolExecutor (max_workers = 4 )
371
392
job .futures = []
372
393
for item in download_list :
373
- future = job .executor .submit (_do_download , item , mc , project_path , job )
394
+ future = job .executor .submit (_do_download , item , mc , mp , project_path , job )
374
395
job .futures .append (future )
375
396
376
397
return job
@@ -450,6 +471,8 @@ def pull_project_finalize(job):
450
471
if future .exception () is not None :
451
472
raise future .exception ()
452
473
474
+ job .mp .log .info ("finalizing pull" )
475
+
453
476
# merge downloaded chunks
454
477
for file_to_merge in job .files_to_merge :
455
478
file_to_merge .merge ()
@@ -478,6 +501,8 @@ def pull_project_finalize(job):
478
501
'version' : job .version if job .version else "v0" , # for new projects server version is ""
479
502
'files' : job .project_info ['files' ]
480
503
}
481
-
504
+
505
+ job .mp .log .info ("--- pull finished" )
506
+
482
507
shutil .rmtree (job .temp_dir )
483
508
return conflicts
0 commit comments