diff --git a/scaler/about.py b/scaler/about.py index d6578c9..bae6b8a 100644 --- a/scaler/about.py +++ b/scaler/about.py @@ -1 +1 @@ -__version__ = "1.8.14" +__version__ = "1.8.17" diff --git a/scaler/client/agent/client_agent.py b/scaler/client/agent/client_agent.py index 7fb3294..5360f70 100644 --- a/scaler/client/agent/client_agent.py +++ b/scaler/client/agent/client_agent.py @@ -176,7 +176,7 @@ async def __get_loops(self): finally: self._stop_event.set() # always set the stop event before setting futures' exceptions - await self._object_manager.clean_all_objects() + await self._object_manager.clear_all_objects(clear_serializer=True) self._connector_external.destroy() self._connector_internal.destroy() diff --git a/scaler/client/agent/future_manager.py b/scaler/client/agent/future_manager.py index d9733ec..911085b 100644 --- a/scaler/client/agent/future_manager.py +++ b/scaler/client/agent/future_manager.py @@ -8,7 +8,7 @@ from scaler.client.serializer.mixins import Serializer from scaler.io.utility import concat_list_of_bytes from scaler.protocol.python.common import TaskStatus -from scaler.protocol.python.message import ObjectResponse, TaskResult +from scaler.protocol.python.message import ObjectResponse, TaskCancel, TaskResult from scaler.utility.exceptions import DisconnectedError, NoWorkerError, TaskNotFoundError, WorkerDiedError from scaler.utility.metadata.profile_result import retrieve_profiling_result_from_task_result from scaler.utility.object_utility import deserialize_failure @@ -34,6 +34,8 @@ def cancel_all_futures(self): for task_id, future in self._task_id_to_future.items(): future.cancel() + self._task_id_to_future.clear() + def set_all_futures_with_exception(self, exception: Exception): with self._lock: for future in self._task_id_to_future.values(): @@ -42,7 +44,7 @@ def set_all_futures_with_exception(self, exception: Exception): except InvalidStateError: continue # Future got canceled - self._task_id_to_future = dict() + self._task_id_to_future.clear() def on_task_result(self, result: TaskResult): with self._lock: @@ -94,6 +96,10 @@ def on_task_result(self, result: TaskResult): except InvalidStateError: return # Future got canceled + def on_cancel_task(self, task_cancel: TaskCancel): + with self._lock: + self._task_id_to_future.pop(task_cancel.task_id, None) + def on_object_response(self, response: ObjectResponse): for object_id, object_name, object_bytes in zip( response.object_content.object_ids, diff --git a/scaler/client/agent/mixins.py b/scaler/client/agent/mixins.py index a66b1bf..59c988b 100644 --- a/scaler/client/agent/mixins.py +++ b/scaler/client/agent/mixins.py @@ -10,6 +10,7 @@ ObjectRequest, ObjectResponse, Task, + TaskCancel, TaskResult, ) @@ -40,11 +41,11 @@ async def on_object_request(self, request: ObjectRequest): raise NotImplementedError() @abc.abstractmethod - def record_task_result(self, task_id: bytes, object_id: bytes): + def on_task_result(self, result: TaskResult): raise NotImplementedError() @abc.abstractmethod - async def clean_all_objects(self): + async def clear_all_objects(self, clear_serializer: bool): raise NotImplementedError() @@ -79,6 +80,10 @@ def set_all_futures_with_exception(self, exception: Exception): def on_task_result(self, result: TaskResult): raise NotImplementedError() + @abc.abstractmethod + def on_cancel_task(self, task_cancel: TaskCancel): + raise NotImplementedError() + @abc.abstractmethod def on_object_response(self, response: ObjectResponse): raise NotImplementedError() diff --git a/scaler/client/agent/object_manager.py b/scaler/client/agent/object_manager.py index 92ff051..6f2a5f7 100644 --- a/scaler/client/agent/object_manager.py +++ b/scaler/client/agent/object_manager.py @@ -3,12 +3,18 @@ from scaler.client.agent.mixins import ObjectManager from scaler.io.async_connector import AsyncConnector from scaler.protocol.python.common import ObjectContent -from scaler.protocol.python.message import ObjectInstruction, ObjectRequest +from scaler.protocol.python.message import ( + ObjectInstruction, + ObjectRequest, + TaskResult, +) class ClientObjectManager(ObjectManager): def __init__(self, identity: bytes): self._sent_object_ids: Set[bytes] = set() + self._sent_serializer_id: Optional[bytes] = None + self._identity = identity self._connector_internal: Optional[AsyncConnector] = None @@ -23,23 +29,38 @@ async def on_object_instruction(self, instruction: ObjectInstruction): await self.__send_object_creation(instruction) elif instruction.instruction_type == ObjectInstruction.ObjectInstructionType.Delete: await self.__delete_objects(instruction) + elif instruction.instruction_type == ObjectInstruction.ObjectInstructionType.Clear: + await self.clear_all_objects(clear_serializer=False) async def on_object_request(self, object_request: ObjectRequest): assert object_request.request_type == ObjectRequest.ObjectRequestType.Get await self._connector_external.send(object_request) - def record_task_result(self, task_id: bytes, object_id: bytes): - self._sent_object_ids.add(object_id) + def on_task_result(self, task_result: TaskResult): + # TODO: received result objects should be deleted from the scheduler when no longer needed. + # This requires to not delete objects that are required by not-yet-computed dependent graph tasks. + # For now, we just remove the objects when the client makes a clear request, or on client shutdown. + # https://github.com/Citi/scaler/issues/43 + + self._sent_object_ids.update(task_result.results) + + async def clear_all_objects(self, clear_serializer): + cleared_object_ids = self._sent_object_ids.copy() + + if clear_serializer: + self._sent_serializer_id = None + elif self._sent_serializer_id is not None: + cleared_object_ids.remove(self._sent_serializer_id) + + self._sent_object_ids.difference_update(cleared_object_ids) - async def clean_all_objects(self): await self._connector_external.send( ObjectInstruction.new_msg( ObjectInstruction.ObjectInstructionType.Delete, self._identity, - ObjectContent.new_msg(tuple(self._sent_object_ids)), + ObjectContent.new_msg(tuple(cleared_object_ids)), ) ) - self._sent_object_ids = set() async def __send_object_creation(self, instruction: ObjectInstruction): assert instruction.instruction_type == ObjectInstruction.ObjectInstructionType.Create @@ -48,12 +69,20 @@ async def __send_object_creation(self, instruction: ObjectInstruction): if not new_object_ids: return + if ObjectContent.ObjectContentType.Serializer in instruction.object_content.object_types: + if self._sent_serializer_id is not None: + raise ValueError("trying to send multiple serializers.") + + serializer_index = instruction.object_content.object_types.index(ObjectContent.ObjectContentType.Serializer) + self._sent_serializer_id = instruction.object_content.object_ids[serializer_index] + new_object_content = ObjectContent.new_msg( *zip( *filter( lambda object_pack: object_pack[0] in new_object_ids, zip( instruction.object_content.object_ids, + instruction.object_content.object_types, instruction.object_content.object_names, instruction.object_content.object_bytes, ), @@ -71,5 +100,10 @@ async def __send_object_creation(self, instruction: ObjectInstruction): async def __delete_objects(self, instruction: ObjectInstruction): assert instruction.instruction_type == ObjectInstruction.ObjectInstructionType.Delete + + if self._sent_serializer_id in instruction.object_content.object_ids: + raise ValueError("trying to delete serializer.") + self._sent_object_ids.difference_update(instruction.object_content.object_ids) + await self._connector_external.send(instruction) diff --git a/scaler/client/agent/task_manager.py b/scaler/client/agent/task_manager.py index 8b54c77..537d803 100644 --- a/scaler/client/agent/task_manager.py +++ b/scaler/client/agent/task_manager.py @@ -3,7 +3,6 @@ from scaler.client.agent.future_manager import ClientFutureManager from scaler.client.agent.mixins import ObjectManager, TaskManager from scaler.io.async_connector import AsyncConnector -from scaler.protocol.python.common import TaskStatus from scaler.protocol.python.message import GraphTask, GraphTaskCancel, Task, TaskCancel, TaskResult @@ -39,6 +38,8 @@ async def on_cancel_task(self, task_cancel: TaskCancel): return self._task_ids.remove(task_cancel.task_id) + self._future_manager.on_cancel_task(task_cancel) + await self._connector_external.send(task_cancel) async def on_new_graph_task(self, task: GraphTask): @@ -54,13 +55,14 @@ async def on_cancel_graph_task(self, task_cancel: GraphTaskCancel): await self._connector_external.send(task_cancel) async def on_task_result(self, result: TaskResult): + # All task result objects must be propagated to the object manager, even if we do not track the task anymore + # (e.g. if it got cancelled). If we don't, we might lose track of these result objects and not properly clear + # them. + self._object_manager.on_task_result(result) + if result.task_id not in self._task_ids: return self._task_ids.remove(result.task_id) - if result.status != TaskStatus.Canceled: - for result_object_id in result.results: - self._object_manager.record_task_result(result.task_id, result_object_id) - self._future_manager.on_task_result(result) diff --git a/scaler/client/client.py b/scaler/client/client.py index ea15d8f..fd850fe 100644 --- a/scaler/client/client.py +++ b/scaler/client/client.py @@ -308,6 +308,15 @@ def send_object(self, obj: Any, name: Optional[str] = None) -> ObjectReference: cache = self._object_buffer.buffer_send_object(obj, name) return ObjectReference(cache.object_name, cache.object_id, sum(map(len, cache.object_bytes))) + def clear(self): + """ + clear all resources used by the client, this will cancel all running futures and invalidate all existing object + references + """ + + self._future_manager.cancel_all_futures() + self._object_buffer.clear() + def disconnect(self): """ disconnect from connected scheduler, this will not shut down the scheduler diff --git a/scaler/client/object_buffer.py b/scaler/client/object_buffer.py index 885c8cc..7c113e2 100644 --- a/scaler/client/object_buffer.py +++ b/scaler/client/object_buffer.py @@ -15,6 +15,7 @@ @dataclasses.dataclass class ObjectCache: object_id: bytes + object_type: ObjectContent.ObjectContentType object_name: bytes object_bytes: List[bytes] @@ -54,7 +55,8 @@ def commit_send_objects(self): return objects_to_send = [ - (obj_cache.object_id, obj_cache.object_name, obj_cache.object_bytes) for obj_cache in self._pending_objects + (obj_cache.object_id, obj_cache.object_type, obj_cache.object_name, obj_cache.object_bytes) + for obj_cache in self._pending_objects ] self._connector.send( @@ -65,7 +67,7 @@ def commit_send_objects(self): ) ) - self._pending_objects = list() + self._pending_objects.clear() def commit_delete_objects(self): if not self._pending_delete_objects: @@ -81,16 +83,38 @@ def commit_delete_objects(self): self._pending_delete_objects.clear() + def clear(self): + """ + remove all committed and pending objects. + """ + + self._pending_delete_objects.clear() + self._pending_objects.clear() + + self._connector.send( + ObjectInstruction.new_msg( + ObjectInstruction.ObjectInstructionType.Clear, + self._identity, + ObjectContent.new_msg(tuple()), + ) + ) + def __construct_serializer(self) -> ObjectCache: serializer_bytes = cloudpickle.dumps(self._serializer, protocol=pickle.HIGHEST_PROTOCOL) object_id = generate_serializer_object_id(self._identity) - return ObjectCache(object_id, b"serializer", chunk_to_list_of_bytes(serializer_bytes)) + return ObjectCache( + object_id, + ObjectContent.ObjectContentType.Serializer, + b"serializer", + chunk_to_list_of_bytes(serializer_bytes) + ) def __construct_function(self, fn: Callable) -> ObjectCache: function_bytes = self._serializer.serialize(fn) object_id = generate_object_id(self._identity, function_bytes) function_cache = ObjectCache( object_id, + ObjectContent.ObjectContentType.Object, getattr(fn, "__name__", f"").encode(), chunk_to_list_of_bytes(function_bytes), ) @@ -100,4 +124,9 @@ def __construct_object(self, obj: Any, name: Optional[str] = None) -> ObjectCach object_payload = self._serializer.serialize(obj) object_id = generate_object_id(self._identity, object_payload) name_bytes = name.encode() if name else f"".encode() - return ObjectCache(object_id, name_bytes, chunk_to_list_of_bytes(object_payload)) + return ObjectCache( + object_id, + ObjectContent.ObjectContentType.Object, + name_bytes, + chunk_to_list_of_bytes(object_payload) + ) diff --git a/scaler/protocol/capnp/common.capnp b/scaler/protocol/capnp/common.capnp index 628f626..0d84acb 100644 --- a/scaler/protocol/capnp/common.capnp +++ b/scaler/protocol/capnp/common.capnp @@ -17,6 +17,12 @@ enum TaskStatus { struct ObjectContent { objectIds @0 :List(Data); - objectNames @1 :List(Data); - objectBytes @2 :List(List(Data)); + objectTypes @1 :List(ObjectContentType); + objectNames @2 :List(Data); + objectBytes @3 :List(List(Data)); + + enum ObjectContentType { + serializer @0; + object @1; + } } diff --git a/scaler/protocol/capnp/message.capnp b/scaler/protocol/capnp/message.capnp index c554aa2..c619dd2 100644 --- a/scaler/protocol/capnp/message.capnp +++ b/scaler/protocol/capnp/message.capnp @@ -77,6 +77,7 @@ struct ObjectInstruction { enum ObjectInstructionType { create @0; delete @1; + clear @2; } } diff --git a/scaler/protocol/python/common.py b/scaler/protocol/python/common.py index e254ddf..cc387d0 100644 --- a/scaler/protocol/python/common.py +++ b/scaler/protocol/python/common.py @@ -25,6 +25,14 @@ class TaskStatus(enum.Enum): @dataclasses.dataclass class ObjectContent(Message): + class ObjectContentType(enum.Enum): + # FIXME: Pycapnp does not support assignment of raw enum values when the enum is itself declared within a list. + # However, assigning the enum's string value works. + # See https://github.com/capnproto/pycapnp/issues/374 + + Serializer = "serializer" + Object = "object" + def __init__(self, msg): super().__init__(msg) @@ -32,6 +40,10 @@ def __init__(self, msg): def object_ids(self) -> Tuple[bytes, ...]: return tuple(self._msg.objectIds) + @property + def object_types(self) -> Tuple[ObjectContentType, ...]: + return tuple(ObjectContent.ObjectContentType(object_type._as_str()) for object_type in self._msg.objectTypes) + @property def object_names(self) -> Tuple[bytes, ...]: return tuple(self._msg.objectNames) @@ -43,12 +55,16 @@ def object_bytes(self) -> Tuple[List[bytes], ...]: @staticmethod def new_msg( object_ids: Tuple[bytes, ...], + object_types: Tuple[ObjectContentType, ...] = tuple(), object_names: Tuple[bytes, ...] = tuple(), object_bytes: Tuple[List[bytes], ...] = tuple(), ) -> "ObjectContent": return ObjectContent( _common.ObjectContent( - objectIds=list(object_ids), objectNames=list(object_names), objectBytes=tuple(object_bytes) + objectIds=list(object_ids), + objectTypes=[object_type.value for object_type in object_types], + objectNames=list(object_names), + objectBytes=tuple(object_bytes), ) ) diff --git a/scaler/protocol/python/message.py b/scaler/protocol/python/message.py index e38fb80..6f8414a 100644 --- a/scaler/protocol/python/message.py +++ b/scaler/protocol/python/message.py @@ -290,6 +290,7 @@ class ObjectInstruction(Message): class ObjectInstructionType(enum.Enum): Create = _message.ObjectInstruction.ObjectInstructionType.create Delete = _message.ObjectInstruction.ObjectInstructionType.delete + Clear = _message.ObjectInstruction.ObjectInstructionType.clear def __init__(self, msg): super().__init__(msg) diff --git a/scaler/scheduler/graph_manager.py b/scaler/scheduler/graph_manager.py index 392c829..47be1f5 100644 --- a/scaler/scheduler/graph_manager.py +++ b/scaler/scheduler/graph_manager.py @@ -6,7 +6,7 @@ from scaler.io.async_binder import AsyncBinder from scaler.io.async_connector import AsyncConnector -from scaler.protocol.python.common import TaskStatus +from scaler.protocol.python.common import ObjectContent, TaskStatus from scaler.protocol.python.message import GraphTask, GraphTaskCancel, StateGraphTask, Task, TaskCancel, TaskResult from scaler.scheduler.mixins import ClientManager, GraphTaskManager, ObjectManager, TaskManager from scaler.utility.graph.topological_sorter import TopologicalSorter @@ -250,6 +250,7 @@ async def __clean_all_running_nodes(self, graph_task_id: bytes, result: TaskResu self._object_manager.on_add_object( graph_info.client, new_result_object_id, + ObjectContent.ObjectContentType.Object, self._object_manager.get_object_name(result_object_id), self._object_manager.get_object_content(result_object_id), ) @@ -271,6 +272,7 @@ async def __clean_all_inactive_nodes(self, graph_task_id: bytes, result: TaskRes self._object_manager.on_add_object( graph_info.client, new_result_object_id, + ObjectContent.ObjectContentType.Object, self._object_manager.get_object_name(result_object_id), self._object_manager.get_object_content(result_object_id), ) diff --git a/scaler/scheduler/mixins.py b/scaler/scheduler/mixins.py index 5169e63..a3189ba 100644 --- a/scaler/scheduler/mixins.py +++ b/scaler/scheduler/mixins.py @@ -1,6 +1,7 @@ import abc from typing import List, Optional, Set +from scaler.protocol.python.common import ObjectContent from scaler.protocol.python.message import ( ClientDisconnect, ClientHeartbeat, @@ -27,7 +28,14 @@ async def on_object_request(self, source: bytes, request: ObjectRequest): raise NotImplementedError() @abc.abstractmethod - def on_add_object(self, object_user: bytes, object_id: bytes, object_name: bytes, object_bytes: List[bytes]): + def on_add_object( + self, + object_user: bytes, + object_id: bytes, + object_type: ObjectContent.ObjectContentType, + object_name: bytes, + object_bytes: List[bytes] + ): raise NotImplementedError() @abc.abstractmethod diff --git a/scaler/scheduler/object_manager.py b/scaler/scheduler/object_manager.py index 0907e28..02c5541 100644 --- a/scaler/scheduler/object_manager.py +++ b/scaler/scheduler/object_manager.py @@ -18,6 +18,7 @@ class _ObjectCreation(ObjectUsage): object_id: bytes object_creator: bytes + object_type: ObjectContent.ObjectContentType object_name: bytes object_bytes: List[bytes] @@ -61,7 +62,7 @@ async def on_object_instruction(self, source: bytes, instruction: ObjectInstruct logging.error( f"received unknown object response type instruction_type={instruction.instruction_type} from " - f"source={instruction.object_user}" + f"source={instruction.object_user!r}" ) async def on_object_request(self, source: bytes, request: ObjectRequest): @@ -71,11 +72,19 @@ async def on_object_request(self, source: bytes, request: ObjectRequest): logging.error(f"received unknown object request type {request=} from {source=!r}") - def on_add_object(self, object_user: bytes, object_id: bytes, object_name: bytes, object_bytes: List[bytes]): - creation = _ObjectCreation(object_id, object_user, object_name, object_bytes) + def on_add_object( + self, + object_user: bytes, + object_id: bytes, + object_type: ObjectContent.ObjectContentType, + object_name: bytes, + object_bytes: List[bytes] + ): + creation = _ObjectCreation(object_id, object_user, object_type, object_name, object_bytes) logging.debug( f"add object cache " f"object_name={creation.object_name!r}, " + f"object_type={creation.object_type}, " f"object_id={creation.object_id.hex()}, " f"size={format_bytes(len(creation.object_bytes))}" ) @@ -140,12 +149,13 @@ def __on_object_create(self, source: bytes, instruction: ObjectInstruction): logging.error(f"received object creation from {source!r} for unknown client {instruction.object_user!r}") return - for object_id, object_name, object_bytes in zip( + for object_id, object_type, object_name, object_bytes in zip( instruction.object_content.object_ids, + instruction.object_content.object_types, instruction.object_content.object_names, instruction.object_content.object_bytes, ): - self.on_add_object(instruction.object_user, object_id, object_name, object_bytes) + self.on_add_object(instruction.object_user, object_id, object_type, object_name, object_bytes) def __finished_object_storage(self, creation: _ObjectCreation): logging.debug( @@ -158,6 +168,7 @@ def __finished_object_storage(self, creation: _ObjectCreation): def __construct_response(self, request: ObjectRequest) -> ObjectResponse: object_ids = [] + object_types = [] object_names = [] object_bytes = [] for object_id in request.object_ids: @@ -166,10 +177,16 @@ def __construct_response(self, request: ObjectRequest) -> ObjectResponse: object_info = self._object_storage.get_object(object_id) object_ids.append(object_info.object_id) + object_types.append(object_info.object_type) object_names.append(object_info.object_name) object_bytes.append(object_info.object_bytes) return ObjectResponse.new_msg( ObjectResponse.ObjectResponseType.Content, - ObjectContent.new_msg(tuple(request.object_ids), tuple(object_names), tuple(object_bytes)), + ObjectContent.new_msg( + tuple(request.object_ids), + tuple(object_types), + tuple(object_names), + tuple(object_bytes) + ), ) diff --git a/scaler/scheduler/task_manager.py b/scaler/scheduler/task_manager.py index 30f1f1b..40319ba 100644 --- a/scaler/scheduler/task_manager.py +++ b/scaler/scheduler/task_manager.py @@ -55,6 +55,12 @@ def register( async def routine(self): task_id = await self._unassigned.get() + # FIXME: As the assign_task_to_worker() call can be blocking (especially if there is no worker connected to the + # scheduler), we might end up with the task object being in neither _running nor _unassigned. + # In this case, the scheduler will answer any task cancellation request with a "task not found" error, which is + # a bug. + # https://github.com/Citi/scaler/issues/45 + if not await self._worker_manager.assign_task_to_worker(self._task_id_to_task[task_id]): await self._unassigned.put(task_id) return diff --git a/scaler/worker/agent/processor/processor.py b/scaler/worker/agent/processor/processor.py index ccbaa23..97ae428 100644 --- a/scaler/worker/agent/processor/processor.py +++ b/scaler/worker/agent/processor/processor.py @@ -273,6 +273,7 @@ def __send_result(self, source: bytes, task_id: bytes, status: TaskStatus, resul source, ObjectContent.new_msg( (result_object_id,), + (ObjectContent.ObjectContentType.Object,), (f"".encode(),), (chunk_to_list_of_bytes(result_bytes),), ), diff --git a/scaler/worker/agent/processor_manager.py b/scaler/worker/agent/processor_manager.py index 26c559d..a0bd3a6 100644 --- a/scaler/worker/agent/processor_manager.py +++ b/scaler/worker/agent/processor_manager.py @@ -180,7 +180,12 @@ async def on_failing_processor(self, processor_id: bytes, process_status: str): ObjectInstruction.new_msg( ObjectInstruction.ObjectInstructionType.Create, source, - ObjectContent.new_msg((result_object_id,), (b"",), (result_object_bytes,)), + ObjectContent.new_msg( + (result_object_id,), + (ObjectContent.ObjectContentType.Object,), + (b"",), + (result_object_bytes,) + ), ) ) diff --git a/tests/test_client.py b/tests/test_client.py index 02e75ac..3e46359 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -3,9 +3,10 @@ import random import time import unittest +from concurrent.futures import CancelledError from scaler import Client, SchedulerClusterCombo -from scaler.utility.exceptions import ProcessorDiedError +from scaler.utility.exceptions import MissingObjects, ProcessorDiedError from scaler.utility.logging.scoped_logger import ScopedLogger from scaler.utility.logging.utility import setup_logger from tests.utility import get_available_tcp_port, logging_test_name @@ -287,3 +288,22 @@ def test_responsiveness(self): disconnect_start_time = time.time() client.disconnect() self.assertLess(time.time() - disconnect_start_time, MAX_DELAY_SECONDS) + + def test_clear(self): + with Client(self.address) as client: + arg_reference = client.send_object(0.5) + future = client.submit(noop_sleep, arg_reference) + + client.clear() + + # clear() cancels all futures + with self.assertRaises(CancelledError): + future.result() + self.assertTrue(future.cancelled()) + + # using an old reference should fail + with self.assertRaises(MissingObjects): + client.submit(noop_sleep, arg_reference).result() + + # but new tasks should work fine + self.assertEqual(client.submit(round, 3.14).result(), 3.0) diff --git a/tests/test_worker_object_tracker.py b/tests/test_worker_object_tracker.py index 34bb2cc..a5ef2fe 100644 --- a/tests/test_worker_object_tracker.py +++ b/tests/test_worker_object_tracker.py @@ -56,6 +56,7 @@ def test_object_tracker(self) -> None: b"client", ObjectContent.new_msg( (b"object_1", b"object_2", b"object_3"), + tuple([ObjectContent.ObjectContentType.Object] * 3), (b"name_1", b"name_2", b"name_3"), ([b"content_1"], [b"content_2"], [b"content_3"]), ),