Skip to content

Commit

Permalink
Executor: force Function Executor RunTask concurrency=1
Browse files Browse the repository at this point in the history
Previously before in-Process Function Executor was introduced
max task concurrency in Executor was 1. We bring the previous
behavior here but enforce the concurrency of 1 per Function Executor
(not for the whole Executor). It's better this way cause it's
hard for customers to reason about implications of running multiple
concurrent tasks per Function Executor.

Also added tests that verify task concurrency for the same and
different functions.

We'll revise this concurrency policy in the future if we allow
customers to configure their functions' concurrency explicitly
via function attributes.

I also did a big refactoring which results in task policy implementation
simplification. This is how the current task policies are implemented now:

```
await state.wait_running_tasks_less(1)

if state.function_id_with_version != _function_id_with_version(task):
    await state.destroy_function_executor()
    state.function_id_with_version = _function_id_with_version(task)
```

There also many other refactorings in this PR that help to implement
the upcoming features in Executor and simplify the code base.

Testing:

make check
make test
  • Loading branch information
eabatalov committed Dec 24, 2024
1 parent d867689 commit 2f7e3d6
Show file tree
Hide file tree
Showing 20 changed files with 805 additions and 583 deletions.
10 changes: 6 additions & 4 deletions python-sdk/indexify/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@
import signal
import subprocess
import sys
import tempfile
import threading
import time
from importlib.metadata import version
from typing import Annotated, List, Optional

import httpx
import nanoid
import structlog
import typer
Expand All @@ -24,12 +22,14 @@
from rich.theme import Theme

from indexify.executor.executor import Executor
from indexify.executor.function_executor.server.subprocess_function_executor_server_factory import (
SubprocessFunctionExecutorServerFactory,
)
from indexify.function_executor.function_executor_service import (
FunctionExecutorService,
)
from indexify.function_executor.server import Server as FunctionExecutorServer
from indexify.functions_sdk.image import Build, GetDefaultPythonImage, Image
from indexify.http_client import IndexifyClient

logger = structlog.get_logger(module=__name__)

Expand Down Expand Up @@ -250,7 +250,9 @@ def executor(
code_path=executor_cache,
name_alias=name_alias,
image_hash=image_hash,
development_mode=dev,
function_executor_server_factory=SubprocessFunctionExecutorServerFactory(
development_mode=dev
),
)
try:
asyncio.get_event_loop().run_until_complete(executor.run())
Expand Down
22 changes: 8 additions & 14 deletions python-sdk/indexify/executor/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,6 @@
from .api_objects import Task


class DownloadedInputs:
def __init__(self, input: SerializedObject, init_value: Optional[SerializedObject]):
self.input = input
self.init_value = init_value


class Downloader:
def __init__(
self, code_path: str, base_url: str, config_path: Optional[str] = None
Expand Down Expand Up @@ -78,22 +72,22 @@ def _write_cached_graph(
# are atomic operations at filesystem level.
os.replace(tmp_path, path)

async def download_inputs(self, task: Task) -> DownloadedInputs:
async def download_input(self, task: Task) -> SerializedObject:
logger = self._task_logger(task)

input: SerializedObject
first_function_in_graph = task.invocation_id == task.input_key.split("|")[-1]
if first_function_in_graph:
# The first function in Graph gets its input from graph invocation payload.
input = await self._fetch_graph_invocation_payload(task, logger)
return await self._fetch_graph_invocation_payload(task, logger)
else:
input = await self._fetch_function_input(task, logger)
return await self._fetch_function_input(task, logger)

init_value: Optional[SerializedObject] = None
if task.reducer_output_id is not None:
init_value = await self._fetch_function_init_value(task, logger)
async def download_init_value(self, task: Task) -> Optional[SerializedObject]:
if task.reducer_output_id is None:
return None

return DownloadedInputs(input=input, init_value=init_value)
logger = self._task_logger(task)
return await self._fetch_function_init_value(task, logger)

def _task_logger(self, task: Task) -> Any:
return structlog.get_logger(
Expand Down
58 changes: 26 additions & 32 deletions python-sdk/indexify/executor/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,22 @@
)

from .api_objects import Task
from .downloader import DownloadedInputs, Downloader
from .function_executor.process_function_executor_factory import (
ProcessFunctionExecutorFactory,
)
from .function_worker import (
FunctionWorker,
FunctionWorkerInput,
FunctionWorkerOutput,
from .downloader import Downloader
from .function_executor.server.function_executor_server_factory import (
FunctionExecutorServerFactory,
)
from .task_fetcher import TaskFetcher
from .task_reporter import TaskReporter
from .task_runner import TaskInput, TaskOutput, TaskRunner


class Executor:
def __init__(
self,
executor_id: str,
code_path: Path,
function_executor_server_factory: FunctionExecutorServerFactory,
server_addr: str = "localhost:8900",
development_mode: bool = False,
config_path: Optional[str] = None,
name_alias: Optional[str] = None,
image_hash: Optional[str] = None,
Expand All @@ -45,10 +41,8 @@ def __init__(
self._server_addr = server_addr
self._base_url = f"{protocol}://{self._server_addr}"
self._code_path = code_path
self._function_worker = FunctionWorker(
function_executor_factory=ProcessFunctionExecutorFactory(
development_mode=development_mode,
),
self._task_runnner = TaskRunner(
function_executor_server_factory=function_executor_server_factory,
base_url=self._base_url,
config_path=config_path,
)
Expand Down Expand Up @@ -92,39 +86,39 @@ async def _run_task(self, task: Task) -> None:
Doesn't raise any Exceptions. All errors are reported to the server."""
logger = self._task_logger(task)
output: Optional[FunctionWorkerOutput] = None
output: Optional[TaskOutput] = None

try:
graph: SerializedObject = await self._downloader.download_graph(task)
input: DownloadedInputs = await self._downloader.download_inputs(task)
output = await self._function_worker.run(
input=FunctionWorkerInput(
input: SerializedObject = await self._downloader.download_input(task)
init_value: Optional[SerializedObject] = (
await self._downloader.download_init_value(task)
)
logger.info("task_execution_started")
output: TaskOutput = await self._task_runnner.run(
TaskInput(
task=task,
graph=graph,
function_input=input,
)
input=input,
init_value=init_value,
),
logger=logger,
)
logger.info("task_execution_finished", success=output.success)
except Exception as e:
logger.error("failed running the task", exc_info=e)
output = TaskOutput.internal_error(task)
logger.error("task_execution_failed", exc_info=e)

await self._report_task_outcome(task=task, output=output, logger=logger)
await self._report_task_outcome(output=output, logger=logger)

async def _report_task_outcome(
self, task: Task, output: Optional[FunctionWorkerOutput], logger: Any
) -> None:
"""Reports the task with the given output to the server.
None output means that the task execution didn't finish due to an internal error.
Doesn't raise any exceptions."""
async def _report_task_outcome(self, output: TaskOutput, logger: Any) -> None:
"""Reports the task with the given output to the server."""
reporting_retries: int = 0

while True:
logger = logger.bind(retries=reporting_retries)
try:
await self._task_reporter.report(
task=task, output=output, logger=logger
)
await self._task_reporter.report(output=output, logger=logger)
break
except Exception as e:
logger.error(
Expand All @@ -137,7 +131,7 @@ async def _report_task_outcome(
async def _shutdown(self, loop):
self._logger.info("shutting_down")
self._should_run = False
await self._function_worker.shutdown()
await self._task_runnner.shutdown()
for task in asyncio.all_tasks(loop):
task.cancel()

Expand Down
139 changes: 120 additions & 19 deletions python-sdk/indexify/executor/function_executor/function_executor.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,133 @@
import asyncio
from typing import Any, Optional

import grpc

# Timeout for Function Executor startup in seconds.
# The timeout is counted from the moment when the Function Executor environment
# is fully prepared and the Function Executor gets started.
FUNCTION_EXECUTOR_READY_TIMEOUT_SEC = 5
from indexify.common_util import get_httpx_client
from indexify.function_executor.proto.function_executor_pb2 import (
InitializeRequest,
InitializeResponse,
)
from indexify.function_executor.proto.function_executor_pb2_grpc import (
FunctionExecutorStub,
)

from .invocation_state_client import InvocationStateClient
from .server.function_executor_server import (
FUNCTION_EXECUTOR_SERVER_READY_TIMEOUT_SEC,
FunctionExecutorServer,
)
from .server.function_executor_server_factory import (
FunctionExecutorServerConfiguration,
FunctionExecutorServerFactory,
)


class FunctionExecutor:
"""Abstract interface for a FunctionExecutor.
"""Executor side class supporting a running FunctionExecutorServer.
FunctionExecutor primary responsibility is creation and initialization
of all resources associated with a particular Function Executor Server
including the Server itself. FunctionExecutor owns all these resources
and provides other Executor components with access to them.
FunctionExecutor is a class that executes tasks for a particular function.
FunctionExecutor implements the gRPC server that listens for incoming tasks.
Addition of any business logic besides resource management is discouraged.
Please add such logic to other classes managed by this class.
"""

async def channel(self) -> grpc.aio.Channel:
"""Returns a async gRPC channel to the Function Executor.
def __init__(self, server_factory: FunctionExecutorServerFactory, logger: Any):
self._server_factory: FunctionExecutorServerFactory = server_factory
self._logger = logger.bind(module=__name__)
self._server: Optional[FunctionExecutorServer] = None
self._channel: Optional[grpc.aio.Channel] = None
self._invocation_state_client: Optional[InvocationStateClient] = None
self._initialized = False

async def initialize(
self,
config: FunctionExecutorServerConfiguration,
initialize_request: InitializeRequest,
base_url: str,
config_path: Optional[str],
):
"""Creates and initializes a FunctionExecutorServer and all resources associated with it."""
try:
self._server = await self._server_factory.create(
config=config, logger=self._logger
)
self._channel = await self._server.create_channel(self._logger)
await _channel_ready(self._channel)

stub: FunctionExecutorStub = FunctionExecutorStub(self._channel)
await _initialize_server(stub, initialize_request)

self._invocation_state_client = InvocationStateClient(
stub=stub,
base_url=base_url,
http_client=get_httpx_client(config_path=config_path, make_async=True),
graph=initialize_request.graph_name,
namespace=initialize_request.namespace,
logger=self._logger,
)
await self._invocation_state_client.start()

self._initialized = True
except Exception:
await self.destroy()
raise

def channel(self) -> grpc.aio.Channel:
self._check_initialized()
return self._channel

def invocation_state_client(self) -> InvocationStateClient:
self._check_initialized()
return self._invocation_state_client

async def destroy(self):
"""Destroys all resources owned by this FunctionExecutor.
Never raises any exceptions but logs them."""
try:
if self._invocation_state_client is not None:
await self._invocation_state_client.destroy()
self._invocation_state_client = None
except Exception as e:
self._logger.error(
"failed to destroy FunctionExecutor invocation state client", exc_info=e
)

try:
if self._channel is not None:
await self._channel.close()
self._channel = None
except Exception as e:
self._logger.error(
"failed to close FunctionExecutorServer channel", exc_info=e
)

try:
if self._server is not None:
await self._server_factory.destroy(self._server, self._logger)
self._server = None
except Exception as e:
self._logger.error("failed to destroy FunctionExecutorServer", exc_info=e)

def _check_initialized(self):
if not self._initialized:
raise RuntimeError("FunctionExecutor is not initialized")


The channel is in ready state and can be used for all gRPC communication with the Function Executor
and can be shared among coroutines running in the same event loop in the same thread. Users should
not close the channel as it's reused for all requests.
Raises Exception if an error occurred."""
raise NotImplementedError
async def _channel_ready(channel: grpc.aio.Channel):
await asyncio.wait_for(
channel.channel_ready(),
timeout=FUNCTION_EXECUTOR_SERVER_READY_TIMEOUT_SEC,
)

def state(self) -> Optional[Any]:
"""Returns optional state object.

The state object can be used to associate any data with the Function Executor.
"""
raise NotImplementedError
async def _initialize_server(
stub: FunctionExecutorStub, initialize_request: InitializeRequest
):
initialize_response: InitializeResponse = await stub.initialize(initialize_request)
if not initialize_response.success:
raise Exception("initialize RPC failed at function executor server")

This file was deleted.

Loading

0 comments on commit 2f7e3d6

Please sign in to comment.