Skip to content

Commit

Permalink
Make gRPC service report backups as FAILED if lose their callback future
Browse files Browse the repository at this point in the history
  • Loading branch information
rzvoncek committed Jun 19, 2024
1 parent 6a56e53 commit 3f2588b
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 6 deletions.
19 changes: 14 additions & 5 deletions medusa/backup_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ def register_backup(backup_name, is_async, overwrite_existing=True):
if overwrite_existing:
if not BackupMan.__clean(backup_name):
logging.error(f"Registered backup name {backup_name} cleanup failed prior to re-register.")

BackupMan.__instance.__backups[backup_name] = [None, BackupMan.STATUS_UNKNOWN, is_async]
logging.info("Registered backup id {}".format(backup_name))
else:
BackupMan.__instance.__backups[backup_name] = [None, BackupMan.STATUS_UNKNOWN, is_async]
logging.info("Registered backup id {}".format(backup_name))

# Caller can decide how long to wait for a result using the registered backup future returned.
# A future is returned (for async mode), otherwise None (for non-async mode).
Expand All @@ -128,8 +128,10 @@ def get_backup_future(backup_name):
with lock:
backup_state = BackupMan.__instance.__backups[backup_name]
if backup_state:
logging.debug("Returning backup future for id: {}".format(backup_name))
return backup_state[BackupMan.__IDX_FUTURE]
future = backup_state[BackupMan.__IDX_FUTURE]
if future is not None and not future.done():
return backup_state[BackupMan.__IDX_FUTURE]
raise RuntimeError(f'Backup future not found or already completed for id: {backup_name} {future}')

raise RuntimeError('Backup not located for id: {}'.format(backup_name))

Expand All @@ -152,6 +154,13 @@ def remove_all_backups():
is_all_cleanup_successful = True
else:
for backup_name in list(BackupMan.__instance.__backups):
try:
future = BackupMan.get_backup_future(backup_name)
if future is not None and not future.done():
future.cancel()
except RuntimeError:
# the future was not there, so there's nothing to cancel
pass
if not BackupMan.__clean(backup_name):
is_all_cleanup_successful = False
BackupMan.__instance.__backups = None
Expand Down
19 changes: 19 additions & 0 deletions medusa/service/grpc/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,20 @@ def BackupStatus(self, request, context):
else:
response.finishTime = ""
BackupMan.register_backup(request.backupName, is_async=False, overwrite_existing=False)
# determine backup state
status = BackupMan.STATUS_UNKNOWN
if backup.started:
status = BackupMan.STATUS_IN_PROGRESS
if backup.finished:
status = BackupMan.STATUS_SUCCESS
if status == BackupMan.STATUS_IN_PROGRESS:
# if the backup is in progress, check if we have the future waiting for its completion
try:
if BackupMan.get_backup_future(request.backupName) is None:
status = BackupMan.STATUS_FAILED
except RuntimeError:
# if we don't, then something bad happened (eg we restarted), so it's a failure
status = BackupMan.STATUS_FAILED
BackupMan.update_backup_status(request.backupName, status)
# record the status
record_status_in_response(response, request.backupName)
Expand Down Expand Up @@ -340,6 +349,13 @@ def get_backup_summary(backup):
summary.finishTime = backup.finished
summary.status = medusa_pb2.StatusType.SUCCESS

if summary.status == medusa_pb2.StatusType.IN_PROGRESS:
try:
if BackupMan.get_backup_future(backup.name) is None:
summary.status = medusa_pb2.StatusType.FAILED
except RuntimeError:
summary.status = medusa_pb2.StatusType.FAILED

summary.totalNodes = len(backup.tokenmap)
summary.finishedNodes = len(backup.complete_nodes())

Expand All @@ -356,6 +372,9 @@ def get_backup_summary(backup):

# Callback function for recording unique backup results
def record_backup_info(future):
if future.cancelled():
return

try:
logging.info("Recording async backup information.")
if future.exception():
Expand Down
7 changes: 6 additions & 1 deletion tests/backup_man_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,16 @@ def test_set_backup_future_missing_name(self):
def test_register_backup_sync_mode(self):
BackupMan.register_backup("test_backup_id", is_async=False)
self.assertEqual(BackupMan.STATUS_UNKNOWN, BackupMan.get_backup_status("test_backup_id"))
self.assertEqual(None, BackupMan.get_backup_future("test_backup_id"))
with self.assertRaises(RuntimeError):
BackupMan.get_backup_future("test_backup_id")

BackupMan.update_backup_status("test_backup_id", BackupMan.STATUS_SUCCESS)
self.assertEqual(BackupMan.STATUS_SUCCESS, BackupMan.get_backup_status("test_backup_id"))

def test_register_backup_async_mode(self):
backup_id = "test_backup_id"
mock_future = Mock(concurrent.futures.Future)
mock_future.done = lambda: False
BackupMan.register_backup(backup_id, is_async=True)
BackupMan.set_backup_future(backup_id, mock_future)
stored_future = BackupMan.get_backup_future(backup_id)
Expand All @@ -76,6 +78,7 @@ def test_register_backup_async_mode(self):

backup_id_2 = "test_backup_id_2"
mock_future_2 = Mock(concurrent.futures.Future)
mock_future_2.done = lambda: False
BackupMan.register_backup(backup_id_2, is_async=True)
BackupMan.set_backup_future(backup_id_2, mock_future_2)

Expand All @@ -91,7 +94,9 @@ def test_register_backup_duplicate(self):
# Self-healing of detected duplicate, clean and reset w/ new expected
backup_id_1 = "test_backup_id"
mock_future_1 = Mock(concurrent.futures.Future)
mock_future_1.done = lambda: False
mock_future_2 = Mock(concurrent.futures.Future)
mock_future_2.done = lambda: False
BackupMan.register_backup(backup_id_1, is_async=True)
BackupMan.set_backup_future(backup_id_1, mock_future_1)
self.assertEqual(BackupMan.get_backup_future(backup_id_1), mock_future_1)
Expand Down
1 change: 1 addition & 0 deletions tests/backup_node_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def test_handle_backup_async(self, mock_start_backup, mock_storage, mock_cassand
backup_name_arg=test_backup_name, stagger_time=None,
enable_md5_checks_flag=False, mode="differential")
mock_future_instance = MagicMock()
mock_future_instance.done = lambda: False
mock_callback = MagicMock()
mock_future_instance.result.return_value = {"foo": "bar"}
backup_future.add_done_callback(mock_callback)
Expand Down
1 change: 1 addition & 0 deletions tests/service/grpc/server_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def test_get_known_incomplete_backup(self):
"node1": {"tokens": [-1094266504216117253], "is_up": True, "rack": "r1", "dc": "dc1"},
"node2": {"tokens": [1094266504216117253], "is_up": True, "rack": "r1", "dc": "dc1"}
}
BackupMan.remove_backup('backup1')
BackupMan.register_backup('backup1', True)
BackupMan.update_backup_status('backup1', BackupMan.STATUS_IN_PROGRESS)

Expand Down

0 comments on commit 3f2588b

Please sign in to comment.