Skip to content

Commit

Permalink
Add Executor ID to Executor logs and pass it all Function Executors
Browse files Browse the repository at this point in the history
This allows us to understand easier what's going on in each Executor
machine.
  • Loading branch information
eabatalov committed Jan 31, 2025
1 parent a2ef26b commit 8ce9b42
Show file tree
Hide file tree
Showing 9 changed files with 20 additions and 10 deletions.
8 changes: 4 additions & 4 deletions indexify/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion indexify/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ grpcio = "1.70.0"
pydantic = "2.10.4"
httpx-sse = "^0.4.0"
# Adds function-executor binary and utils lib.
tensorlake = ">=0.1.14"
tensorlake = ">=0.1.16"

# CLI only
rich = "^13.9.2"
Expand Down
6 changes: 3 additions & 3 deletions indexify/src/indexify/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,14 @@ def executor(
"At least one function must be specified when not running in development mode"
)

logger = structlog.get_logger(module=__name__)
id = nanoid.generate()
executor_version = version("indexify")
id = nanoid.generate()
logger = structlog.get_logger(module=__name__, executor_id=id)

logger.info(
"starting executor",
server_addr=server_addr,
config_path=config_path,
executor_id=id,
executor_version=executor_version,
executor_cache=executor_cache,
ports=ports,
Expand Down
1 change: 1 addition & 0 deletions indexify/src/indexify/executor/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def __init__(
self._base_url = f"{protocol}://{self._server_addr}"
self._code_path = code_path
self._task_runner = TaskRunner(
executor_id=id,
function_executor_server_factory=function_executor_server_factory,
base_url=self._base_url,
config_path=config_path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ class FunctionExecutorServerConfiguration:
configuration parameters or raise an exception if it can't implement
them."""

def __init__(self, image_uri: Optional[str]):
def __init__(self, executor_id: str, image_uri: Optional[str]):
self.executor_id: str = executor_id
# Container image URI of the Function Executor Server.
self.image_uri: Optional[str] = image_uri

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ async def create(
try:
port = self._allocate_port()
args = [
"--executor-id",
config.executor_id,
"--address",
_server_address(port),
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@
class SingleTaskRunner:
def __init__(
self,
executor_id: str,
function_executor_state: FunctionExecutorState,
task_input: TaskInput,
function_executor_server_factory: FunctionExecutorServerFactory,
base_url: str,
config_path: Optional[str],
logger: Any,
):
self._executor_id: str = executor_id
self._state: FunctionExecutorState = function_executor_state
self._task_input: TaskInput = task_input
self._factory: FunctionExecutorServerFactory = function_executor_server_factory
Expand Down Expand Up @@ -76,6 +78,7 @@ async def _create_function_executor(self) -> FunctionExecutor:
)
config: FunctionExecutorServerConfiguration = (
FunctionExecutorServerConfiguration(
executor_id=self._executor_id,
image_uri=self._task_input.task.image_uri,
)
)
Expand Down
3 changes: 3 additions & 0 deletions indexify/src/indexify/executor/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ class TaskRunner:

def __init__(
self,
executor_id: str,
function_executor_server_factory: FunctionExecutorServerFactory,
base_url: str,
config_path: Optional[str],
disable_automatic_function_executor_management: bool,
):
self._executor_id: str = executor_id
self._factory: FunctionExecutorServerFactory = function_executor_server_factory
self._base_url: str = base_url
self._config_path: Optional[str] = config_path
Expand Down Expand Up @@ -88,6 +90,7 @@ async def _run_task(
self, state: FunctionExecutorState, task_input: TaskInput, logger: Any
) -> TaskOutput:
runner: SingleTaskRunner = SingleTaskRunner(
executor_id=self._executor_id,
function_executor_state=state,
task_input=task_input,
function_executor_server_factory=self._factory,
Expand Down

0 comments on commit 8ce9b42

Please sign in to comment.