Skip to content

Commit

Permalink
Test for Function routing to Executors with function allowlists
Browse files Browse the repository at this point in the history
The test might not catch a routing bug on a single run but will catch
it eventually if it happens.
  • Loading branch information
eabatalov committed Jan 13, 2025
1 parent e5f2878 commit f2cf803
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 0 deletions.
127 changes: 127 additions & 0 deletions indexify/tests/src/tests/test_function_allowlist.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import os
import subprocess
import time
import unittest
from typing import List, Optional

from indexify import Graph, indexify_function
from indexify.remote_graph import RemoteGraph
from tests.utils import (
ExecutorProcessContextManager,
fetch_dev_mode_executor_pid,
function_uri,
)

# There's a dev mode executor already running in the testing environment.
# It's used for all other tests that don't check the function allowlist.
# This existing Executor can run any function.
dev_mode_executor_pid: Optional[int] = None
# This Executor can only run function_a.
function_a_executor_pid: Optional[int] = None
# This Executor can only run function_b.
function_b_executor_pid: Optional[int] = None


@indexify_function()
def function_a() -> str:
global dev_mode_executor_pid
global function_a_executor_pid
global function_b_executor_pid

# Assuming Subprocess Function Executors are used in Open Source.
current_executor_pid: int = os.getppid()
allowed_executor_pids: List[int] = [function_a_executor_pid, dev_mode_executor_pid]
if current_executor_pid not in allowed_executor_pids:
raise Exception(
f"Function A Executor PID {current_executor_pid} is not in the allowlist: {allowed_executor_pids}"
)
return "success"


@indexify_function()
def function_b(_: str) -> str:
global dev_mode_executor_pid
global function_a_executor_pid
global function_b_executor_pid

# Assuming Subprocess Function Executors are used in Open Source.
current_executor_pid: int = os.getppid()
allowed_executor_pids: List[int] = [function_b_executor_pid, dev_mode_executor_pid]
if current_executor_pid not in allowed_executor_pids:
raise Exception(
f"Function B Executor PID {current_executor_pid} is not in the allowlist: {allowed_executor_pids}"
)
return "success"


class TestFunctionAllowlist(unittest.TestCase):
def test_different_executors_run_different_functions(self):
global dev_mode_executor_pid
dev_mode_executor_pid = fetch_dev_mode_executor_pid()
print(f"Found dev mode Executor PID: {dev_mode_executor_pid}")

with ExecutorProcessContextManager(
[
"--function",
function_uri(
"default",
"test_different_executors_run_different_functions",
"function_a",
"1.0",
),
"--ports",
"60000",
"60001",
]
) as executor_a:
executor_a: subprocess.Popen
global function_a_executor_pid
function_a_executor_pid = executor_a.pid
print(f"Started Executor A with PID: {function_a_executor_pid}")

with ExecutorProcessContextManager(
[
"--function",
function_uri(
"default",
"test_different_executors_run_different_functions",
"function_b",
"1.0",
),
"--ports",
"60001",
"60002",
]
) as executor_b:
executor_b: subprocess.Popen
global function_b_executor_pid
function_b_executor_pid = executor_b.pid
print(f"Started Executor B with PID: {function_b_executor_pid}")

# Wait to let Executor A and B start and register at the Server.
time.sleep(5)

graph = Graph(
name="test_different_executors_run_different_functions",
description="test",
start_node=function_a,
version="1.0",
)
graph.add_edge(function_a, function_b)
graph = RemoteGraph.deploy(graph)

# As invocations might land on dev Executor, we need to run the graph multiple times
# to ensure that we catch wrong routing to Executor A or B if it ever happens.
for _ in range(10):
invocation_id = graph.run(block_until_done=True)
output = graph.output(invocation_id, "function_a")
self.assertEqual(len(output), 1)
self.assertEqual(output[0], "success")

output = graph.output(invocation_id, "function_b")
self.assertEqual(len(output), 1)
self.assertEqual(output[0], "success")


if __name__ == "__main__":
unittest.main()
32 changes: 32 additions & 0 deletions indexify/tests/src/tests/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import subprocess
from typing import List, Optional


def fetch_dev_mode_executor_pid() -> int:
result = subprocess.run(
["pgrep", "-f", "indexify-cli.+executor"],
check=True,
stdout=subprocess.PIPE,
text=True,
)
return int(result.stdout.strip())


def function_uri(namespace: str, graph: str, function: str, version: str) -> str:
return ":".join([namespace, graph, function, version])


class ExecutorProcessContextManager:
def __init__(self, args: List[str]):
self._args = ["indexify-cli", "executor"]
self._args.extend(args)
self._process: Optional[subprocess.Popen] = None

def __enter__(self) -> subprocess.Popen:
self._process = subprocess.Popen(self._args)
return self._process

def __exit__(self, exc_type, exc_value, traceback):
if self._process:
self._process.terminate()
self._process.wait()

0 comments on commit f2cf803

Please sign in to comment.