Skip to content

Commit

Permalink
refactor: Handle container start error in separate try-catch context (#…
Browse files Browse the repository at this point in the history
…2316) (#2434)

Co-authored-by: fregataa <[email protected]>
  • Loading branch information
lablup-octodog and fregataa committed Jul 13, 2024
1 parent 0fc0032 commit 75cf7bd
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 51 deletions.
1 change: 1 addition & 0 deletions changes/2316.misc.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Handle container creation exception and start exception in separate try-except contexts.
121 changes: 70 additions & 51 deletions src/ai/backend/agent/docker/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,30 @@ def container_from_docker_container(src: DockerContainer) -> Container:
)


async def _clean_scratch(
loop: asyncio.AbstractEventLoop,
scratch_type: str,
scratch_root: Path,
kernel_id: KernelId,
) -> None:
scratch_dir = scratch_root / str(kernel_id)
tmp_dir = scratch_root / f"{kernel_id}_tmp"
try:
if sys.platform.startswith("linux") and scratch_type == "memory":
await destroy_scratch_filesystem(scratch_dir)
await destroy_scratch_filesystem(tmp_dir)
await loop.run_in_executor(None, shutil.rmtree, scratch_dir)
await loop.run_in_executor(None, shutil.rmtree, tmp_dir)
elif sys.platform.startswith("linux") and scratch_type == "hostfile":
await destroy_loop_filesystem(scratch_root, kernel_id)
else:
await loop.run_in_executor(None, shutil.rmtree, scratch_dir)
except CalledProcessError:
pass
except FileNotFoundError:
pass


def _DockerError_reduce(self):
return (
type(self),
Expand Down Expand Up @@ -853,6 +877,18 @@ async def start_container(
if self.local_config["debug"]["log-kernel-config"]:
log.debug("full container config: {!r}", pretty(container_config))

async def _rollback_container_creation() -> None:
await _clean_scratch(
loop,
self.local_config["container"]["scratch-type"],
self.local_config["container"]["scratch-root"],
self.kernel_id,
)
self.port_pool.update(host_ports)
async with self.resource_lock:
for dev_name, device_alloc in resource_spec.allocations.items():
self.computers[dev_name].alloc_map.free(device_alloc)

# We are all set! Create and start the container.
async with closing_async(Docker()) as docker:
container: Optional[DockerContainer] = None
Expand All @@ -876,24 +912,6 @@ async def start_container(
for k, v in kvpairs.items():
await writer.write(f"{k}={v}\n")

await container.start()

if self.internal_data.get("sudo_session_enabled", False):
exec = await container.exec(
[
# file ownership is guaranteed to be set as root:root since command is executed on behalf of root user
"sh",
"-c",
'mkdir -p /etc/sudoers.d && echo "work ALL=(ALL:ALL) NOPASSWD:ALL" > /etc/sudoers.d/01-bai-work',
],
user="root",
)
shell_response = await exec.start(detach=True)
if shell_response:
raise ContainerCreationError(
container_id=cid,
message=f"sudoers provision failed: {shell_response.decode()}",
)
except asyncio.CancelledError:
if container is not None:
raise ContainerCreationError(
Expand All @@ -902,27 +920,40 @@ async def start_container(
raise
except Exception as e:
# Oops, we have to restore the allocated resources!
scratch_type = self.local_config["container"]["scratch-type"]
scratch_root = self.local_config["container"]["scratch-root"]
if sys.platform.startswith("linux") and scratch_type == "memory":
await destroy_scratch_filesystem(self.scratch_dir)
await destroy_scratch_filesystem(self.tmp_dir)
await loop.run_in_executor(None, shutil.rmtree, self.scratch_dir)
await loop.run_in_executor(None, shutil.rmtree, self.tmp_dir)
elif sys.platform.startswith("linux") and scratch_type == "hostfile":
await destroy_loop_filesystem(scratch_root, self.kernel_id)
else:
await loop.run_in_executor(None, shutil.rmtree, self.scratch_dir)
self.port_pool.update(host_ports)
async with self.resource_lock:
for dev_name, device_alloc in resource_spec.allocations.items():
self.computers[dev_name].alloc_map.free(device_alloc)
await _rollback_container_creation()
if container is not None:
raise ContainerCreationError(
container_id=container._id, message=f"unknown. {repr(e)}"
)
raise

try:
await container.start()
except asyncio.CancelledError:
await _rollback_container_creation()
raise ContainerCreationError(container_id=cid, message="Container start cancelled")
except Exception as e:
await _rollback_container_creation()
raise ContainerCreationError(container_id=cid, message=f"unknown. {repr(e)}")

if self.internal_data.get("sudo_session_enabled", False):
exec = await container.exec(
[
# file ownership is guaranteed to be set as root:root since command is executed on behalf of root user
"sh",
"-c",
'mkdir -p /etc/sudoers.d && echo "work ALL=(ALL:ALL) NOPASSWD:ALL" > /etc/sudoers.d/01-bai-work',
],
user="root",
)
shell_response = await exec.start(detach=True)
if shell_response:
await _rollback_container_creation()
raise ContainerCreationError(
container_id=cid,
message=f"sudoers provision failed: {shell_response.decode()}",
)

additional_network_names: Set[str] = set()
for dev_name, device_alloc in resource_spec.allocations.items():
n = await self.computers[dev_name].instance.get_docker_networks(device_alloc)
Expand Down Expand Up @@ -1504,24 +1535,12 @@ async def log_iter():
log.warning("container deletion timeout (k:{}, c:{})", kernel_id, container_id)

if not restarting:
scratch_type = self.local_config["container"]["scratch-type"]
scratch_root = self.local_config["container"]["scratch-root"]
scratch_dir = scratch_root / str(kernel_id)
tmp_dir = scratch_root / f"{kernel_id}_tmp"
try:
if sys.platform.startswith("linux") and scratch_type == "memory":
await destroy_scratch_filesystem(scratch_dir)
await destroy_scratch_filesystem(tmp_dir)
await loop.run_in_executor(None, shutil.rmtree, scratch_dir)
await loop.run_in_executor(None, shutil.rmtree, tmp_dir)
elif sys.platform.startswith("linux") and scratch_type == "hostfile":
await destroy_loop_filesystem(scratch_root, kernel_id)
else:
await loop.run_in_executor(None, shutil.rmtree, scratch_dir)
except CalledProcessError:
pass
except FileNotFoundError:
pass
await _clean_scratch(
loop,
self.local_config["container"]["scratch-type"],
self.local_config["container"]["scratch-root"],
kernel_id,
)

async def create_local_network(self, network_name: str) -> None:
async with closing_async(Docker()) as docker:
Expand Down

0 comments on commit 75cf7bd

Please sign in to comment.