Skip to content

Commit

Permalink
Host Executor uses Functions Executor processes to run customer funct…
Browse files Browse the repository at this point in the history
…ions (#1102)

* Host Executor uses Functions Executor processes to run customer functions

All tasks for a function version are routed into the same Function Executor
process because they support concurrent running of tasks.
Host executor implements "one Function Executor for all function versions"
policy. Indexify customers rely on the fact that they always have a single
Function Executor per function for all function versions. This
ensures that a function that uses GPU can run on the same machine
for any of its versions. This policy is supposed to be a temporary solution
until server starts telling Executor what Function Executors to start
and finish explicitly.

Also removed redundant num_workers configuration and some other
dead code. This configuration is not used and is now completely
obsolete with the Function Executor in separate processes.

Function Executor processes take about 1.2 sec to start and setup
a gRPC channel at about p50 with rare durations jumping up to 3
sec p100. I removed all locks from the code and it didn't result
in any speedup. I measured that these delays are comming from
library calls
`await asyncio.create_subprocess_exec` and
`asyncio.wait_for(channel.channel_ready())`. There's no plan
to fix them for now as it'd take a significant effort to
figure out what's going on in the libraries. Instead we bump
the cold start duration limit in the test to 5 sec from 100 ms.
This limit is still okay for the product as we don't have a
hard requirement for cold start durations yet.

Looks like gRPC also added about 10 ms of latency to warm starts.
This is a minor regression. Doesn't require the test limit change.

Before:

```
cold_start_duration: 0.009701967239379883 seconds
warm_start_duration: 0.006789207458496094 seconds
```

After:

```
cold_start_duration: 1.2409627437591553 seconds
warm_start_duration: 0.016773223876953125 seconds
```

Host Executor is using abstract FunctionExecutor objects so
them and their Factory can be replaced in the future if e.g.
we want to run Function Executors not as processes but as
threads or containers.

Testing:

make test
make fmt
make check

* Add a test for file descriptor caching

It can take minutes for a model to load into a GPU. Indexify customers
rely on the fact that they can cache file descriptors of loaded models
between tasks of the same function version. Added a test that verifies
this.

Testing:

make fmt
make check
make test
  • Loading branch information
eabatalov authored Dec 12, 2024
1 parent d867d7c commit c2ffb80
Show file tree
Hide file tree
Showing 22 changed files with 949 additions and 669 deletions.
37 changes: 19 additions & 18 deletions python-sdk/indexify/cli.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
from .logging import configure_logging_early, configure_production_logging

configure_logging_early()


import asyncio
import os
import shutil
Expand Down Expand Up @@ -40,13 +45,9 @@
}
)

logging = structlog.get_logger(module=__name__)
console = Console(theme=custom_theme)

app = typer.Typer(pretty_exceptions_enable=False, no_args_is_help=True)
executor_cache_option: Optional[str] = typer.Option(
"~/.indexify/executor_cache", help="Path to the executor cache directory"
)
config_path_option: Optional[str] = typer.Option(
None, help="Path to the TLS configuration file"
)
Expand Down Expand Up @@ -185,38 +186,32 @@ def executor(
dev: Annotated[
bool, typer.Option("--dev", "-d", help="Run the executor in development mode")
] = False,
workers: Annotated[
int, typer.Option(help="number of worker processes for extraction")
] = 1,
config_path: Optional[str] = config_path_option,
executor_cache: Optional[str] = executor_cache_option,
executor_cache: Optional[str] = typer.Option(
"~/.indexify/executor_cache", help="Path to the executor cache directory"
),
name_alias: Optional[str] = typer.Option(
None, help="Name alias for the executor if it's spun up with the base image"
),
image_version: Optional[int] = typer.Option(
"1", help="Requested Image Version for this executor"
),
):
# configure structured logging
if not dev:
processors = [
structlog.processors.dict_tracebacks,
structlog.processors.JSONRenderer(),
]
structlog.configure(processors=processors)
configure_production_logging()

id = nanoid.generate()
executor_version = version("indexify")
logging.info(
logger.info(
"executor started",
workers=workers,
server_addr=server_addr,
config_path=config_path,
executor_id=id,
executor_version=executor_version,
executor_cache=executor_cache,
name_alias=name_alias,
image_version=image_version,
dev_mode=dev,
)

from pathlib import Path
Expand All @@ -228,18 +223,18 @@ def executor(

agent = ExtractorAgent(
id,
num_workers=workers,
server_addr=server_addr,
config_path=config_path,
code_path=executor_cache,
name_alias=name_alias,
image_version=image_version,
development_mode=dev,
)

try:
asyncio.get_event_loop().run_until_complete(agent.run())
except asyncio.CancelledError:
logging.info("graceful shutdown")
logger.info("graceful shutdown")


@app.command(help="Runs a Function Executor server")
Expand All @@ -248,8 +243,14 @@ def function_executor(
help="Function Executor server address"
),
indexify_server_address: str = typer.Option(help="Indexify server address"),
dev: Annotated[
bool, typer.Option("--dev", "-d", help="Run the executor in development mode")
] = False,
config_path: Optional[str] = config_path_option,
):
if not dev:
configure_production_logging()

logger.info(
"starting function executor server",
function_executor_server_address=function_executor_server_address,
Expand Down
Loading

0 comments on commit c2ffb80

Please sign in to comment.