Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 42 additions & 32 deletions mergin/client_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,23 @@ class DownloadJob:
"""

def __init__(
self, project_path, total_size, version, update_tasks, download_queue_items, directory, mp, project_info
self,
project_path,
total_size,
version,
update_tasks,
download_queue_items,
tmp_dir: tempfile.TemporaryDirectory,
mp,
project_info,
):
self.project_path = project_path
self.total_size = total_size # size of data to download (in bytes)
self.transferred_size = 0
self.version = version
self.update_tasks = update_tasks
self.download_queue_items = download_queue_items
self.directory = directory # project's directory
self.tmp_dir = tmp_dir
self.mp = mp # MerginProject instance
self.is_cancelled = False
self.project_info = project_info # parsed JSON with project info returned from the server
Expand Down Expand Up @@ -96,7 +104,7 @@ def _do_download(item, mc, mp, project_path, job):
job.transferred_size += item.size


def _cleanup_failed_download(directory, mergin_project=None):
def _cleanup_failed_download(mergin_project: MerginProject = None):
"""
If a download job fails, there will be the newly created directory left behind with some
temporary files in it. We want to remove it because a new download would fail because
Expand All @@ -109,7 +117,7 @@ def _cleanup_failed_download(directory, mergin_project=None):
mergin_project.remove_logging_handler()

# keep log file as it might contain useful debug info
log_file = os.path.join(directory, ".mergin", "client-log.txt")
log_file = os.path.join(mergin_project.dir, ".mergin", "client-log.txt")
dest_path = None

if os.path.exists(log_file):
Expand All @@ -118,7 +126,6 @@ def _cleanup_failed_download(directory, mergin_project=None):
dest_path = tmp_file.name
shutil.copyfile(log_file, dest_path)

shutil.rmtree(directory)
return dest_path


Expand All @@ -138,6 +145,8 @@ def download_project_async(mc, project_path, directory, project_version=None):
mp.log.info("--- version: " + mc.user_agent_info())
mp.log.info(f"--- start download {project_path}")

tmp_dir = tempfile.TemporaryDirectory(prefix="python-api-client-", ignore_cleanup_errors=True, delete=True)

try:
# check whether we download the latest version or not
latest_proj_info = mc.project_info(project_path)
Expand All @@ -147,7 +156,7 @@ def download_project_async(mc, project_path, directory, project_version=None):
project_info = latest_proj_info

except ClientError:
_cleanup_failed_download(directory, mp)
_cleanup_failed_download(mp)
raise

version = project_info["version"] if project_info["version"] else "v0"
Expand All @@ -158,7 +167,7 @@ def download_project_async(mc, project_path, directory, project_version=None):
update_tasks = [] # stuff to do at the end of download
for file in project_info["files"]:
file["version"] = version
items = _download_items(file, directory)
items = _download_items(file, tmp_dir.name)
is_latest_version = project_version == latest_proj_info["version"]
update_tasks.append(UpdateTask(file["path"], items, latest_version=is_latest_version))

Expand All @@ -172,7 +181,7 @@ def download_project_async(mc, project_path, directory, project_version=None):

mp.log.info(f"will download {len(update_tasks)} files in {len(download_list)} chunks, total size {total_size}")

job = DownloadJob(project_path, total_size, version, update_tasks, download_list, directory, mp, project_info)
job = DownloadJob(project_path, total_size, version, update_tasks, download_list, tmp_dir, mp, project_info)

# start download
job.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
Expand Down Expand Up @@ -203,7 +212,7 @@ def download_project_is_running(job):
traceback_lines = traceback.format_exception(type(exc), exc, exc.__traceback__)
job.mp.log.error("Error while downloading project: " + "".join(traceback_lines))
job.mp.log.info("--- download aborted")
job.failure_log_file = _cleanup_failed_download(job.directory, job.mp)
job.failure_log_file = _cleanup_failed_download(job.mp)
raise future.exception()
if future.running():
return True
Expand All @@ -229,18 +238,20 @@ def download_project_finalize(job):
traceback_lines = traceback.format_exception(type(exc), exc, exc.__traceback__)
job.mp.log.error("Error while downloading project: " + "".join(traceback_lines))
job.mp.log.info("--- download aborted")
job.failure_log_file = _cleanup_failed_download(job.directory, job.mp)
job.failure_log_file = _cleanup_failed_download(job.mp)
raise future.exception()

job.mp.log.info("--- download finished")

for task in job.update_tasks:
# right now only copy tasks...
task.apply(job.directory, job.mp)
task.apply(job.mp.dir, job.mp)

# final update of project metadata
job.mp.update_metadata(job.project_info)

job.tmp_dir.cleanup()


def download_project_cancel(job):
"""
Expand Down Expand Up @@ -336,7 +347,7 @@ def __init__(
version,
files_to_merge,
download_queue_items,
temp_dir,
tmp_dir,
mp,
project_info,
basefiles_to_patch,
Expand All @@ -351,7 +362,7 @@ def __init__(
self.version = version
self.files_to_merge = files_to_merge # list of FileToMerge instances
self.download_queue_items = download_queue_items
self.temp_dir = temp_dir # full path to temporary directory where we store downloaded files
self.tmp_dir = tmp_dir # TemporaryDirectory instance where we store downloaded files
self.mp = mp # MerginProject instance
self.is_cancelled = False
self.project_info = project_info # parsed JSON with project info returned from the server
Expand Down Expand Up @@ -413,8 +424,7 @@ def pull_project_async(mc, directory):
# then we just download the whole file
_pulling_file_with_diffs = lambda f: "diffs" in f and len(f["diffs"]) != 0

temp_dir = mp.fpath_meta(f"fetch_{local_version}-{server_version}")
os.makedirs(temp_dir, exist_ok=True)
tmp_dir = tempfile.TemporaryDirectory(prefix="mm-pull-", ignore_cleanup_errors=True, delete=True)
pull_changes = mp.get_pull_changes(server_info["files"])
mp.log.debug("pull changes:\n" + pprint.pformat(pull_changes))
fetch_files = []
Expand All @@ -441,10 +451,10 @@ def pull_project_async(mc, directory):

for file in fetch_files:
diff_only = _pulling_file_with_diffs(file)
items = _download_items(file, temp_dir, diff_only)
items = _download_items(file, tmp_dir.name, diff_only)

# figure out destination path for the file
file_dir = os.path.dirname(os.path.normpath(os.path.join(temp_dir, file["path"])))
file_dir = os.path.dirname(os.path.normpath(os.path.join(tmp_dir.name, file["path"])))
basename = os.path.basename(file["diff"]["path"]) if diff_only else os.path.basename(file["path"])
dest_file_path = os.path.join(file_dir, basename)
os.makedirs(file_dir, exist_ok=True)
Expand All @@ -465,8 +475,8 @@ def pull_project_async(mc, directory):
file_path = file["path"]
mp.log.info(f"missing base file for {file_path} -> going to download it (version {server_version})")
file["version"] = server_version
items = _download_items(file, temp_dir, diff_only=False)
dest_file_path = mp.fpath(file["path"], temp_dir)
items = _download_items(file, tmp_dir.name, diff_only=False)
dest_file_path = mp.fpath(file["path"], tmp_dir.name)
# dest_file_path = os.path.join(os.path.dirname(os.path.normpath(os.path.join(temp_dir, file['path']))), os.path.basename(file['path']))
files_to_merge.append(FileToMerge(dest_file_path, items))
continue
Expand All @@ -490,7 +500,7 @@ def pull_project_async(mc, directory):
server_version,
files_to_merge,
download_list,
temp_dir,
tmp_dir,
mp,
server_info,
basefiles_to_patch,
Expand Down Expand Up @@ -604,10 +614,10 @@ def pull_project_finalize(job: PullJob):
# download their full versions so we have them up-to-date for applying changes
for file_path, file_diffs in job.basefiles_to_patch:
basefile = job.mp.fpath_meta(file_path)
server_file = job.mp.fpath(file_path, job.temp_dir)
server_file = job.mp.fpath(file_path, job.tmp_dir.name)

shutil.copy(basefile, server_file)
diffs = [job.mp.fpath(f, job.temp_dir) for f in file_diffs]
diffs = [job.mp.fpath(f, job.tmp_dir.name) for f in file_diffs]
patch_error = job.mp.apply_diffs(server_file, diffs)
if patch_error:
# that's weird that we are unable to apply diffs to the basefile!
Expand All @@ -623,7 +633,7 @@ def pull_project_finalize(job: PullJob):
raise ClientError("Cannot patch basefile {}! Please try syncing again.".format(basefile))

try:
conflicts = job.mp.apply_pull_changes(job.pull_changes, job.temp_dir, job.project_info, job.mc)
conflicts = job.mp.apply_pull_changes(job.pull_changes, job.tmp_dir.name, job.project_info, job.mc)
except Exception as e:
job.mp.log.error("Failed to apply pull changes: " + str(e))
job.mp.log.info("--- pull aborted")
Expand All @@ -636,7 +646,7 @@ def pull_project_finalize(job: PullJob):
else:
job.mp.log.info("--- pull finished -- at version " + job.mp.version())

shutil.rmtree(job.temp_dir)
job.tmp_dir.cleanup() # delete our temporary dir and all its content
return conflicts


Expand Down Expand Up @@ -788,7 +798,7 @@ def download_files_async(
mp.log.info(f"Got project info. version {project_info['version']}")

# set temporary directory for download
temp_dir = tempfile.mkdtemp(prefix="python-api-client-")
tmp_dir = tempfile.mkdtemp(prefix="python-api-client-")

if output_paths is None:
output_paths = []
Expand All @@ -798,7 +808,7 @@ def download_files_async(
if len(output_paths) != len(file_paths):
warn = "Output file paths are not of the same length as file paths. Cannot store required files."
mp.log.warning(warn)
shutil.rmtree(temp_dir)
shutil.rmtree(tmp_dir)
raise ClientError(warn)

download_list = []
Expand All @@ -812,7 +822,7 @@ def download_files_async(
if file["path"] in file_paths:
index = file_paths.index(file["path"])
file["version"] = version
items = _download_items(file, temp_dir)
items = _download_items(file, tmp_dir)
is_latest_version = version == latest_proj_info["version"]
task = UpdateTask(file["path"], items, output_paths[index], latest_version=is_latest_version)
download_list.extend(task.download_queue_items)
Expand All @@ -832,13 +842,13 @@ def download_files_async(
if not download_list or missing_files:
warn = f"No [{', '.join(missing_files)}] exists at version {version}"
mp.log.warning(warn)
shutil.rmtree(temp_dir)
shutil.rmtree(tmp_dir)
raise ClientError(warn)

mp.log.info(
f"will download files [{', '.join(files_to_download)}] in {len(download_list)} chunks, total size {total_size}"
)
job = DownloadJob(project_path, total_size, version, update_tasks, download_list, temp_dir, mp, project_info)
job = DownloadJob(project_path, total_size, version, update_tasks, download_list, tmp_dir, mp, project_info)
job.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
job.futures = []
for item in download_list:
Expand All @@ -862,8 +872,8 @@ def download_files_finalize(job):
job.mp.log.info("--- download finished")

for task in job.update_tasks:
task.apply(job.directory, job.mp)
task.apply(job.tmp_dir, job.mp)

# Remove temporary download directory
if job.directory is not None and os.path.exists(job.directory):
shutil.rmtree(job.directory)
if job.tmp_dir is not None and os.path.exists(job.tmp_dir):
shutil.rmtree(job.tmp_dir)
2 changes: 1 addition & 1 deletion mergin/client_push.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def push_project_async(mc, directory):
changes = filter_changes(mc, project_info, changes)
mp.log.debug("push changes:\n" + pprint.pformat(changes))

tmp_dir = tempfile.TemporaryDirectory(prefix="python-api-client-")
tmp_dir = tempfile.TemporaryDirectory(prefix="python-api-client-", ignore_cleanup_errors=True, delete=True)

# If there are any versioned files (aka .gpkg) that are not updated through a diff,
# we need to make a temporary copy somewhere to be sure that we are uploading full content.
Expand Down
4 changes: 2 additions & 2 deletions mergin/test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2549,7 +2549,7 @@ def test_download_failure(mc):
# download project async
with pytest.raises(IsADirectoryError):
job = download_project_async(mc, project, download_dir)
os.makedirs(os.path.join(download_dir, "base.gpkg.0"))
os.makedirs(os.path.join(job.tmp_dir.name, "base.gpkg.0"))
download_project_wait(job)
download_project_finalize(job)

Expand All @@ -2561,7 +2561,7 @@ def test_download_failure(mc):
# active waiting
remove_folders([download_dir])
job = download_project_async(mc, project, download_dir)
os.makedirs(os.path.join(download_dir, "base.gpkg.0"))
os.makedirs(os.path.join(job.tmp_dir.name, "base.gpkg.0"))
with pytest.raises(IsADirectoryError):
while True:
assert download_project_is_running(job)
Expand Down