diff --git a/indexify/tests/src/tests/test_function_allowlist.py b/indexify/tests/src/tests/test_function_allowlist.py new file mode 100644 index 000000000..dca9f8d82 --- /dev/null +++ b/indexify/tests/src/tests/test_function_allowlist.py @@ -0,0 +1,127 @@ +import os +import subprocess +import time +import unittest +from typing import List, Optional + +from indexify import Graph, indexify_function +from indexify.remote_graph import RemoteGraph +from tests.utils import ( + ExecutorProcessContextManager, + fetch_dev_mode_executor_pid, + function_uri, +) + +# There's a dev mode executor already running in the testing environment. +# It's used for all other tests that don't check the function allowlist. +# This existing Executor can run any function. +dev_mode_executor_pid: Optional[int] = None +# This Executor can only run function_a. +function_a_executor_pid: Optional[int] = None +# This Executor can only run function_b. +function_b_executor_pid: Optional[int] = None + + +@indexify_function() +def function_a() -> str: + global dev_mode_executor_pid + global function_a_executor_pid + global function_b_executor_pid + + # Assuming Subprocess Function Executors are used in Open Source. + current_executor_pid: int = os.getppid() + allowed_executor_pids: List[int] = [function_a_executor_pid, dev_mode_executor_pid] + if current_executor_pid not in allowed_executor_pids: + raise Exception( + f"Function A Executor PID {current_executor_pid} is not in the allowlist: {allowed_executor_pids}" + ) + return "success" + + +@indexify_function() +def function_b(_: str) -> str: + global dev_mode_executor_pid + global function_a_executor_pid + global function_b_executor_pid + + # Assuming Subprocess Function Executors are used in Open Source. + current_executor_pid: int = os.getppid() + allowed_executor_pids: List[int] = [function_b_executor_pid, dev_mode_executor_pid] + if current_executor_pid not in allowed_executor_pids: + raise Exception( + f"Function B Executor PID {current_executor_pid} is not in the allowlist: {allowed_executor_pids}" + ) + return "success" + + +class TestFunctionAllowlist(unittest.TestCase): + def test_different_executors_run_different_functions(self): + global dev_mode_executor_pid + dev_mode_executor_pid = fetch_dev_mode_executor_pid() + print(f"Found dev mode Executor PID: {dev_mode_executor_pid}") + + with ExecutorProcessContextManager( + [ + "--function", + function_uri( + "default", + "test_different_executors_run_different_functions", + "function_a", + "1.0", + ), + "--ports", + "60000", + "60001", + ] + ) as executor_a: + executor_a: subprocess.Popen + global function_a_executor_pid + function_a_executor_pid = executor_a.pid + print(f"Started Executor A with PID: {function_a_executor_pid}") + + with ExecutorProcessContextManager( + [ + "--function", + function_uri( + "default", + "test_different_executors_run_different_functions", + "function_b", + "1.0", + ), + "--ports", + "60001", + "60002", + ] + ) as executor_b: + executor_b: subprocess.Popen + global function_b_executor_pid + function_b_executor_pid = executor_b.pid + print(f"Started Executor B with PID: {function_b_executor_pid}") + + # Wait to let Executor A and B start and register at the Server. + time.sleep(5) + + graph = Graph( + name="test_different_executors_run_different_functions", + description="test", + start_node=function_a, + version="1.0", + ) + graph.add_edge(function_a, function_b) + graph = RemoteGraph.deploy(graph) + + # As invocations might land on dev Executor, we need to run the graph multiple times + # to ensure that we catch wrong routing to Executor A or B if it ever happens. + for _ in range(10): + invocation_id = graph.run(block_until_done=True) + output = graph.output(invocation_id, "function_a") + self.assertEqual(len(output), 1) + self.assertEqual(output[0], "success") + + output = graph.output(invocation_id, "function_b") + self.assertEqual(len(output), 1) + self.assertEqual(output[0], "success") + + +if __name__ == "__main__": + unittest.main() diff --git a/indexify/tests/src/tests/utils.py b/indexify/tests/src/tests/utils.py new file mode 100644 index 000000000..a03e88d61 --- /dev/null +++ b/indexify/tests/src/tests/utils.py @@ -0,0 +1,32 @@ +import subprocess +from typing import List, Optional + + +def fetch_dev_mode_executor_pid() -> int: + result = subprocess.run( + ["pgrep", "-f", "indexify-cli.+executor"], + check=True, + stdout=subprocess.PIPE, + text=True, + ) + return int(result.stdout.strip()) + + +def function_uri(namespace: str, graph: str, function: str, version: str) -> str: + return ":".join([namespace, graph, function, version]) + + +class ExecutorProcessContextManager: + def __init__(self, args: List[str]): + self._args = ["indexify-cli", "executor"] + self._args.extend(args) + self._process: Optional[subprocess.Popen] = None + + def __enter__(self) -> subprocess.Popen: + self._process = subprocess.Popen(self._args) + return self._process + + def __exit__(self, exc_type, exc_value, traceback): + if self._process: + self._process.terminate() + self._process.wait()