Skip to content

Commit

Permalink
Merge branch 'main' into 07-03-update_halfstack_packages_to_point_lat…
Browse files Browse the repository at this point in the history
…est_stable_versions
  • Loading branch information
achimnol committed Jul 3, 2024
2 parents e981b7b + e4ca40e commit 153afa4
Show file tree
Hide file tree
Showing 12 changed files with 508 additions and 416 deletions.
1 change: 1 addition & 0 deletions changes/2339.deps.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Upgrade aiodocker to v0.22.0 with minor bug fixes found by improved type annotations
632 changes: 321 additions & 311 deletions python.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
aiodataloader-ng~=0.2.1
aiodocker~=0.21.0
aiodocker==0.22.0
aiofiles~=23.2.1
aiohttp~=3.9.1
aiohttp_cors~=0.7
Expand Down
3 changes: 2 additions & 1 deletion src/ai/backend/agent/docker/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import zmq
from aiodocker.docker import Docker, DockerContainer
from aiodocker.exceptions import DockerError
from aiodocker.types import PortInfo
from aiomonitor.task import preserve_termination_log
from async_timeout import timeout

Expand Down Expand Up @@ -935,7 +936,7 @@ async def start_container(
if container_config["HostConfig"].get("NetworkMode") == "host":
host_port = host_ports[idx]
else:
ports: list[dict[str, Any]] | None = await container.port(port)
ports: list[PortInfo] | None = await container.port(port)
if ports is None:
raise ContainerCreationError(
container_id=cid, message="Container port not found"
Expand Down
7 changes: 4 additions & 3 deletions src/ai/backend/agent/docker/intrinsic.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,13 @@ async def fetch_api_stats(container: DockerContainer) -> Optional[Dict[str, Any]
)
return None
else:
entry = {"read": "0001-01-01"}
# aiodocker 0.16 or later returns a list of dict, even when not streaming.
if isinstance(ret, list):
if not ret:
# The API may return an empty result upon container termination.
return None
ret = ret[0]
entry = ret[0]
# The API may return an invalid or empty result upon container termination.
if ret is None or not isinstance(ret, dict):
log.warning(
Expand All @@ -104,9 +105,9 @@ async def fetch_api_stats(container: DockerContainer) -> Optional[Dict[str, Any]
ret,
)
return None
if ret["read"].startswith("0001-01-01") or ret["preread"].startswith("0001-01-01"):
if entry["read"].startswith("0001-01-01") or entry["preread"].startswith("0001-01-01"):
return None
return ret
return entry


# Pseudo-plugins for intrinsic devices (CPU and the main memory)
Expand Down
142 changes: 85 additions & 57 deletions src/ai/backend/agent/docker/kernel.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from __future__ import annotations

import asyncio
import functools
import gzip
import io
import logging
import lzma
import os
Expand All @@ -9,7 +12,7 @@
import subprocess
import textwrap
from pathlib import Path, PurePosixPath
from typing import Any, Dict, Final, FrozenSet, Mapping, Optional, Sequence, Tuple
from typing import Any, Dict, Final, FrozenSet, Mapping, Optional, Sequence, Tuple, cast, override

import janus
import pkg_resources
Expand Down Expand Up @@ -103,7 +106,7 @@ async def get_logs(self):
container_id = self.data["container_id"]
async with closing_async(Docker()) as docker:
container = await docker.containers.get(container_id)
logs = await container.log(stdout=True, stderr=True)
logs = await container.log(stdout=True, stderr=True, follow=False)
return {"logs": "".join(logs)}

async def interrupt_kernel(self):
Expand Down Expand Up @@ -258,85 +261,110 @@ def _write_chunks(
except asyncio.TimeoutError:
log.warning("Session is already being committed.")

async def accept_file(self, filename: str, filedata: bytes):
@override
async def accept_file(self, container_path: os.PathLike | str, filedata: bytes) -> None:
loop = current_loop()
work_dir = self.agent_config["container"]["scratch-root"] / str(self.kernel_id) / "work"
container_home_path = PurePosixPath("/home/work")
try:
# create intermediate directories in the path
dest_path = (work_dir / filename).resolve(strict=False)
parent_path = dest_path.parent
except ValueError: # parent_path does not start with work_dir!
raise AssertionError("malformed upload filename and path.")
home_relpath = PurePosixPath(container_path).relative_to(container_home_path)
except ValueError:
raise PermissionError("Not allowed to upload files outside /home/work")
host_work_dir: Path = (
self.agent_config["container"]["scratch-root"] / str(self.kernel_id) / "work"
)
host_abspath = (host_work_dir / home_relpath).resolve(strict=False)
if not host_abspath.is_relative_to(host_work_dir):
raise PermissionError("Not allowed to upload files outside /home/work")

def _write_to_disk():
parent_path.mkdir(parents=True, exist_ok=True)
dest_path.write_bytes(filedata)
host_abspath.parent.mkdir(parents=True, exist_ok=True)
host_abspath.write_bytes(filedata)

try:
await loop.run_in_executor(None, _write_to_disk)
except FileNotFoundError:
log.error(
"{0}: writing uploaded file failed: {1} -> {2}", self.kernel_id, filename, dest_path
except OSError as e:
raise RuntimeError(
"{0}: writing uploaded file failed: {1} -> {2} ({3})".format(
self.kernel_id,
container_path,
host_abspath,
repr(e),
)
)

async def download_file(self, filepath: str):
@override
async def download_file(self, container_path: os.PathLike | str) -> bytes:
container_id = self.data["container_id"]

container_home_path = PurePosixPath("/home/work")
container_abspath = PurePosixPath(os.path.normpath(container_home_path / container_path))
if not container_abspath.is_relative_to(container_home_path):
raise PermissionError("You cannot download files outside /home/work")

async with closing_async(Docker()) as docker:
container = docker.containers.container(container_id)
home_path = PurePosixPath("/home/work")
try:
abspath = home_path / filepath
abspath.relative_to(home_path)
except ValueError:
raise PermissionError("You cannot download files outside /home/work")
try:
with await container.get_archive(str(abspath)) as tarobj:
tarobj.fileobj.seek(0, 2)
fsize = tarobj.fileobj.tell()
if fsize > 1048576:
raise ValueError("too large file")
tarbytes = tarobj.fileobj.getvalue()
with await container.get_archive(str(container_abspath)) as tarobj:
# FIXME: Replace this API call to a streaming version and cut the download if
# the downloaded size exceeds the limit.
assert tarobj.fileobj is not None
tar_fobj = cast(io.BufferedIOBase, tarobj.fileobj)
tar_fobj.seek(0, io.SEEK_END)
tar_size = tar_fobj.tell()
if tar_size > 1048576:
raise ValueError("Too large archive file exceeding 1 MiB")
tar_fobj.seek(0, io.SEEK_SET)
tarbytes = tar_fobj.read()
except DockerError:
log.warning("Could not found the file: {0}", abspath)
raise FileNotFoundError(f"Could not found the file: {abspath}")
raise RuntimeError(f"Could not download the archive to: {container_abspath}")
return tarbytes

async def download_single(self, filepath: str):
@override
async def download_single(self, container_path: os.PathLike | str) -> bytes:
container_id = self.data["container_id"]

container_home_path = PurePosixPath("/home/work")
container_abspath = PurePosixPath(os.path.normpath(container_home_path / container_path))
if not container_abspath.is_relative_to(container_home_path):
raise PermissionError("You cannot download files outside /home/work")

async with closing_async(Docker()) as docker:
container = docker.containers.container(container_id)
home_path = PurePosixPath("/home/work")
try:
abspath = home_path / filepath
abspath.relative_to(home_path)
except ValueError:
raise PermissionError("You cannot download files outside /home/work")
try:
with await container.get_archive(str(abspath)) as tarobj:
tarobj.fileobj.seek(0, 2)
fsize = tarobj.fileobj.tell()
if fsize > 1048576:
raise ValueError("too large file")
tarobj.fileobj.seek(0)
inner_file = tarobj.extractfile(tarobj.getnames()[0])
if inner_file:
tarbytes = inner_file.read()
else:
log.warning("Could not found the file: {0}", abspath)
raise FileNotFoundError(f"Could not found the file: {abspath}")
with await container.get_archive(str(container_abspath)) as tarobj:
# FIXME: Replace this API call to a streaming version and cut the download if
# the downloaded size exceeds the limit.
assert tarobj.fileobj is not None
tar_fobj = cast(io.BufferedIOBase, tarobj.fileobj)
tar_fobj.seek(0, io.SEEK_END)
tar_size = tar_fobj.tell()
if tar_size > 1048576:
raise ValueError("Too large archive file exceeding 1 MiB")
tar_fobj.seek(0, io.SEEK_SET)
if len(tarobj.getnames()) > 1:
raise ValueError(
f"Expected a single-file archive but found multiple files from {container_abspath}"
)
inner_fname = tarobj.getnames()[0]
inner_fobj = tarobj.extractfile(inner_fname)
if not inner_fobj:
raise ValueError(
f"Could not read {inner_fname!r} the archive file {container_abspath}"
)
# FYI: To get the size of extracted file, seek and tell with inner_fobj.
content_bytes = inner_fobj.read()
except DockerError:
log.warning("Could not found the file: {0}", abspath)
raise FileNotFoundError(f"Could not found the file: {abspath}")
return tarbytes
raise RuntimeError(f"Could not download the archive to: {container_abspath}")
return content_bytes

async def list_files(self, container_path: str):
@override
async def list_files(self, container_path: os.PathLike | str):
container_id = self.data["container_id"]

# Confine the lookable paths in the home directory
home_path = Path("/home/work").resolve()
resolved_path = (home_path / container_path).resolve()

if str(os.path.commonpath([resolved_path, home_path])) != str(home_path):
container_home_path = PurePosixPath("/home/work")
container_abspath = PurePosixPath(os.path.normpath(container_home_path / container_path))
if not container_abspath.is_relative_to(container_home_path):
raise PermissionError("You cannot list files outside /home/work")

# Gather individual file information in the target path.
Expand Down Expand Up @@ -373,7 +401,7 @@ async def list_files(self, container_path: str):
"/opt/backend.ai/bin/python",
"-c",
code,
str(container_path),
str(container_abspath),
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
Expand Down
2 changes: 1 addition & 1 deletion src/ai/backend/agent/docker/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ async def recreate(self) -> None:
pass
else:
raise
container_config = {
container_config: dict[str, Any] = {
"Image": self.image_ref,
"Tty": True,
"Privileged": False,
Expand Down
17 changes: 12 additions & 5 deletions src/ai/backend/agent/dummy/kernel.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from __future__ import annotations

import asyncio
import os
from collections import OrderedDict
from typing import Any, Dict, FrozenSet, Mapping, Sequence
from typing import Any, Dict, FrozenSet, Mapping, Sequence, override

from ai.backend.common.docker import ImageRef
from ai.backend.common.events import EventProducer
Expand Down Expand Up @@ -137,21 +140,25 @@ async def get_service_apps(self):
"data": [],
}

async def accept_file(self, filename, filedata):
@override
async def accept_file(self, container_path: os.PathLike | str, filedata: bytes) -> None:
delay = self.dummy_kernel_cfg["delay"]["accept-file"]
await asyncio.sleep(delay)

async def download_file(self, filepath):
@override
async def download_file(self, container_path: os.PathLike | str) -> bytes:
delay = self.dummy_kernel_cfg["delay"]["download-file"]
await asyncio.sleep(delay)
return b""

async def download_single(self, filepath):
@override
async def download_single(self, container_path: os.PathLike | str) -> bytes:
delay = self.dummy_kernel_cfg["delay"]["download-single"]
await asyncio.sleep(delay)
return b""

async def list_files(self, path: str):
@override
async def list_files(self, container_path: os.PathLike | str):
delay = self.dummy_kernel_cfg["delay"]["list-files"]
await asyncio.sleep(delay)
return {"files": "", "errors": "", "abspath": ""}
Expand Down
40 changes: 36 additions & 4 deletions src/ai/backend/agent/kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import json
import logging
import math
import os
import re
import secrets
import time
Expand Down Expand Up @@ -318,19 +319,50 @@ async def get_service_apps(self):
raise NotImplementedError

@abstractmethod
async def accept_file(self, filename, filedata):
async def accept_file(self, container_path: os.PathLike | str, filedata) -> None:
"""
Put the uploaded file to the designated container path.
The path should be inside /home/work of the container.
A relative path is interpreted as a subpath inside /home/work.
WARNING: Since the implementations may use the scratch directory mounted as the home
directory inside the container, the file may not be visible inside the container if the
designated home-relative path overlaps with a vfolder mount.
"""
raise NotImplementedError

@abstractmethod
async def download_file(self, filepath):
async def download_file(self, container_path: os.PathLike | str) -> bytes:
"""
Download the designated path (a single file or an entire directory) as a tar archive.
The path should be inside /home/work of the container.
A relative path is interpreted as a subpath inside /home/work.
The return value is the raw byte stream of the archive itself, and it is the caller's
responsibility to extract the tar archive.
This API is intended to download a small set of files from the container filesystem.
"""
raise NotImplementedError

@abstractmethod
async def download_single(self, filepath):
async def download_single(self, container_path: os.PathLike | str) -> bytes:
"""
Download the designated path (a single file) as a tar archive.
The path should be inside /home/work of the container.
A relative path is interpreted as a subpath inside /home/work.
The return value is the content of the file *extracted* from the downloaded archive.
This API is intended to download a small file from the container filesystem.
"""
raise NotImplementedError

@abstractmethod
async def list_files(self, path: str):
async def list_files(self, container_path: os.PathLike | str):
"""
List the directory entries of the designated path.
The path should be inside /home/work of the container.
A relative path is interpreted as a subpath inside /home/work.
"""
raise NotImplementedError

@abstractmethod
Expand Down
7 changes: 4 additions & 3 deletions src/ai/backend/agent/kubernetes/intrinsic.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,13 @@ async def fetch_api_stats(container: DockerContainer) -> Optional[Dict[str, Any]
)
return None
else:
entry = {"read": "0001-01-01"}
# aiodocker 0.16 or later returns a list of dict, even when not streaming.
if isinstance(ret, list):
if not ret:
# The API may return an empty result upon container termination.
return None
ret = ret[0]
entry = ret[0]
# The API may return an invalid or empty result upon container termination.
if ret is None or not isinstance(ret, dict):
log.warning(
Expand All @@ -69,9 +70,9 @@ async def fetch_api_stats(container: DockerContainer) -> Optional[Dict[str, Any]
ret,
)
return None
if ret["read"].startswith("0001-01-01") or ret["preread"].startswith("0001-01-01"):
if entry["read"].startswith("0001-01-01") or entry["preread"].startswith("0001-01-01"):
return None
return ret
return entry


# Pseudo-plugins for intrinsic devices (CPU and the main memory)
Expand Down
Loading

0 comments on commit 153afa4

Please sign in to comment.