Skip to content

Commit 42c34a2

Browse files
committed
Retry sync job using
Introduce mapping based on chunk_id (generated by client) and server_chunk_id generated by server.
1 parent 7ba0971 commit 42c34a2

File tree

5 files changed

+60
-28
lines changed

5 files changed

+60
-28
lines changed

mergin/client.py

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
from .common import (
2424
ClientError,
25+
ErrorCode,
2526
LoginError,
2627
WorkspaceRole,
2728
ProjectRole,
@@ -41,7 +42,14 @@
4142
download_diffs_finalize,
4243
)
4344
from .client_pull import pull_project_async, pull_project_wait, pull_project_finalize
44-
from .client_push import get_push_changes_batch, push_project_async, push_project_is_running, push_project_wait, push_project_finalize, UploadChunksCache
45+
from .client_push import (
46+
get_push_changes_batch,
47+
push_project_async,
48+
push_project_is_running,
49+
push_project_wait,
50+
push_project_finalize,
51+
UploadChunksCache,
52+
)
4553
from .utils import DateTimeEncoder, get_versions_with_file_changes, int_version, is_version_acceptable
4654
from .version import __version__
4755

@@ -1511,9 +1519,14 @@ def sync_project(self, project_directory):
15111519
push_project_finalize(job)
15121520
_, has_changes = get_push_changes_batch(self, mp, job.server_resp)
15131521
except ClientError as e:
1514-
if e.http_error == 409 and server_conflict_attempts < 2:
1522+
if (
1523+
e.sync_retry
1524+
and server_conflict_attempts < 2
1525+
):
15151526
# retry on conflict, e.g. when server has changes that we do not have yet
1516-
mp.log.info("Attempting sync process due to conflicts between server and local directory or another user is syncing.")
1527+
mp.log.info(
1528+
"Attempting sync process due to conflicts between server and local directory or another user is syncing."
1529+
)
15171530
server_conflict_attempts += 1
15181531
sleep(5)
15191532
continue
@@ -1548,9 +1561,14 @@ def sync_project_with_callback(self, project_directory, progress_callback=None,
15481561
push_project_finalize(job)
15491562
_, has_changes = get_push_changes_batch(self, mp, job.server_resp)
15501563
except ClientError as e:
1551-
if e.http_error == 409 and server_conflict_attempts < 2:
1564+
if (
1565+
e.sync_retry
1566+
and server_conflict_attempts < 2
1567+
):
15521568
# retry on conflict, e.g. when server has changes that we do not have yet
1553-
mp.log.info("Attempting sync process due to conflicts between server and local directory or another user is syncing.")
1569+
mp.log.info(
1570+
"Attempting sync process due to conflicts between server and local directory or another user is syncing."
1571+
)
15541572
server_conflict_attempts += 1
15551573
sleep(5)
15561574
continue

mergin/client_push.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
from .local_changes import LocalChange, LocalChanges
2323

24-
from .common import UPLOAD_CHUNK_SIZE, ClientError
24+
from .common import UPLOAD_CHUNK_SIZE, ClientError, ErrorCode
2525
from .merginproject import MerginProject
2626
from .editor import filter_changes
2727

@@ -136,7 +136,7 @@ def upload_blocking(self):
136136
continue
137137
raise
138138

139-
self.mp.log.debug(f"Upload chunk {self.chunk_id} finished: {self.file_path}")
139+
self.mp.log.debug(f"Upload chunk {self.server_chunk_id or self.chunk_id} finished: {self.file_path}")
140140

141141

142142
class UploadJob:
@@ -188,7 +188,7 @@ def add_items(self, items: List[UploadQueueItem], total_size: int):
188188
def update_chunks_from_items(self):
189189
"""Update chunks in LocalChanges from the upload queue items."""
190190
self.changes.update_chunks(
191-
[(item.file_checksum, item.server_chunk_id) for item in self.upload_queue_items]
191+
[(item.chunk_id, item.server_chunk_id) for item in self.upload_queue_items]
192192
)
193193

194194

@@ -285,9 +285,9 @@ def create_upload_job_v2_api(
285285
{"Content-Type": "application/json"},
286286
)
287287
except ClientError as err:
288-
# ignore 409 error, it means that the project is already in a transaction
288+
# ignore 409 error codes, it means that the project is already in a transaction
289289
# but we still want to continue with the upload
290-
if err.http_error not in [409]:
290+
if err.server_code not in [ErrorCode.AnotherUploadRunning.value, ErrorCode.ProjectVersionExists]:
291291
mp.log.error("Error doing push check compatibility: " + str(err))
292292
mp.log.error("--- push aborted")
293293
raise
@@ -439,11 +439,10 @@ def push_project_finalize(job: UploadJob):
439439
job.mp.log.error("--- push finish failed! " + error_msg)
440440
raise ClientError("Upload error: " + error_msg)
441441

442-
job.update_chunks_from_items()
443-
444442
if server_features.get("v2_push_enabled"):
445443
# v2 push uses a different endpoint
446444
try:
445+
job.update_chunks_from_items()
447446
job.mp.log.info(f"Finishing transaction for project {job.mp.project_full_name()}")
448447
job.mc.post(
449448
f"/v2/projects/{job.mp.project_id()}/versions",
@@ -456,6 +455,10 @@ def push_project_finalize(job: UploadJob):
456455
project_info = job.mc.project_info(job.mp.project_id())
457456
job.server_resp = project_info
458457
except ClientError as err:
458+
if err.server_code in [ErrorCode.AnotherUploadRunning.value, ErrorCode.ProjectVersionExists]:
459+
err.sync_retry = True
460+
else:
461+
job.mc.upload_chunks_cache.clear() # clear the upload chunks cache, as we are getting fatal from server
459462
job.mp.log.error("--- push finish failed! " + str(err))
460463
raise err
461464
elif with_upload_of_files:

mergin/common.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
class ErrorCode(Enum):
2020
ProjectsLimitHit = "ProjectsLimitHit"
2121
StorageLimitHit = "StorageLimitHit"
22+
ProjectVersionExists = "ProjectVersionExists"
23+
AnotherUploadRunning = "AnotherUploadRunning"
2224

2325

2426
class ClientError(Exception):
@@ -32,6 +34,8 @@ def __init__(self, detail, url=None, server_code=None, server_response=None, htt
3234
self.server_response = server_response
3335

3436
self.extra = None
37+
# Param to mark error as candidate for retry sync process
38+
self.sync_retry = False
3539

3640
def __str__(self):
3741
string_res = f"Detail: {self.detail}\n"

mergin/local_changes.py

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -66,24 +66,31 @@ def get_upload_changes(self) -> List[LocalChange]:
6666
This includes added and updated files.
6767
"""
6868
return self.added + self.updated
69+
70+
def _map_unique_chunks(self, change_chunks: List[str], server_chunks: List[Tuple[str, str]]) -> List[str]:
71+
"""
72+
Helper function to map and deduplicate chunk ids for a single change.
73+
"""
74+
mapped = []
75+
seen = set()
76+
for chunk in change_chunks:
77+
for server_chunk in server_chunks:
78+
chunk_id = server_chunk[0]
79+
server_chunk_id = server_chunk[1]
80+
if chunk_id == chunk and server_chunk_id not in seen:
81+
mapped.append(server_chunk_id)
82+
seen.add(server_chunk_id)
83+
return mapped
6984

70-
def update_chunks(self, chunks: List[Tuple[str, str]]) -> None:
85+
def update_chunks(self, server_chunks: List[Tuple[str, str]]) -> None:
7186
"""
72-
Map chunk ids to file checksums.
87+
Map chunk ids to chunks returned from server (server_chunk_id).
7388
7489
This method updates the `chunks` attribute of each change in `added` and `updated`
75-
lists based on the provided `chunks` list, which contains tuples of (checksum, chunk_id).
90+
lists based on the provided `server_chunks` list, which contains tuples of (chunk_id, server_chunk_id).
7691
"""
7792
for change in self.added:
78-
change.chunks = list({
79-
chunk[1]
80-
for chunk in chunks
81-
if chunk[0] == change.checksum
82-
})
93+
change.chunks = self._map_unique_chunks(change.chunks, server_chunks)
8394

8495
for change in self.updated:
85-
change.chunks = list({
86-
chunk[1]
87-
for chunk in chunks
88-
if chunk[0] == change.checksum
89-
})
96+
change.chunks = self._map_unique_chunks(change.chunks, server_chunks)

mergin/test/test_local_changes.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,11 @@ def test_local_changes_get_server_request():
9494
def test_local_changes_update_chunks():
9595
"""Test the update_chunks method of LocalChanges."""
9696
added = [
97-
LocalChange(path="file1.txt", checksum="abc123", size=1024, mtime=datetime.now()),
98-
LocalChange(path="file2.txt", checksum="abc123", size=1024, mtime=datetime.now())
97+
LocalChange(path="file1.txt", checksum="abc123", size=1024, mtime=datetime.now(),chunks=["abc123"]),
98+
LocalChange(path="file2.txt", checksum="abc123", size=1024, mtime=datetime.now(),chunks=["abc123"])
9999
]
100100
updated = [
101-
LocalChange(path="file2.txt", checksum="xyz789", size=2048, mtime=datetime.now())
101+
LocalChange(path="file2.txt", checksum="xyz789", size=2048, mtime=datetime.now(), chunks=["xyz789"])
102102
]
103103

104104
local_changes = LocalChanges(added=added, updated=updated)

0 commit comments

Comments
 (0)