Skip to content

Commit

Permalink
test: Parameterize stress test
Browse files Browse the repository at this point in the history
  • Loading branch information
topher-lo committed Jun 25, 2024
1 parent 1331e32 commit 685f26c
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 46 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
timeout-minutes: 30
strategy:
matrix:
n_workers: ["1", "2", "4", "8"]
num_workers: ["1", "2", "4", "8"]
steps:
- uses: actions/checkout@v4

Expand All @@ -33,8 +33,8 @@ jobs:

- name: Start Docker services
env:
N_WORKERS: ${{ matrix.n_workers }}
run: docker compose up --scale worker=$N_WORKERS -d
NUM_WORKERS: ${{ matrix.num_workers }}
run: docker compose up --scale worker=$NUM_WORKERS -d

- name: Verify Tracecat API is running
run: curl -s http://localhost:8000/health | jq -e '.status == "ok"'
Expand Down
93 changes: 50 additions & 43 deletions tests/unit/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,27 +224,24 @@ def dsl_with_expected(request: pytest.FixtureRequest) -> DSLInput:
return dsl, expected


correctness_test_cases = [
"unit_conditional_adder_tree_skips",
"unit_conditional_adder_tree_continues",
"unit_conditional_adder_tree_skip_propagates",
"unit_conditional_adder_diamond_skip_with_join_weak_dep",
"unit_transform_forwarder_loop",
"unit_transform_forwarder_loop_chained",
"unit_transform_forwarder_arrange",
"unit_transform_forwarder_arrange_loop",
"unit_transform_forwarder_zip",
"unit_transform_forwarder_map_loop",
"unit_runtime_test_adder_tree",
"unit_runtime_test_chain",
]


@pytest.mark.parametrize(
"dsl_with_expected",
correctness_test_cases,
[
"unit_conditional_adder_tree_skips",
"unit_conditional_adder_tree_continues",
"unit_conditional_adder_tree_skip_propagates",
"unit_conditional_adder_diamond_skip_with_join_weak_dep",
"unit_transform_forwarder_loop",
"unit_transform_forwarder_loop_chained",
"unit_transform_forwarder_arrange",
"unit_transform_forwarder_arrange_loop",
"unit_transform_forwarder_zip",
"unit_transform_forwarder_map_loop",
"unit_runtime_test_adder_tree",
"unit_runtime_test_chain",
],
indirect=True,
ids=correctness_test_cases,
ids=lambda x: x,
)
@pytest.mark.asyncio
async def test_workflow_completes_and_correct(
Expand Down Expand Up @@ -276,40 +273,50 @@ async def test_workflow_completes_and_correct(
@pytest.mark.parametrize(
"dsl",
[DATA_PATH / "stress_adder_tree.yml"],
ids=lambda x: x.split("/")[-1].split(".")[0],
indirect=True,
)
@pytest.mark.parametrize(
"num_workflows", [10, 100, 1000], ids=lambda x: f"num_workflows={x}"
)
@pytest.mark.slow
@pytest.mark.slow
@pytest.mark.asyncio
async def test_stress_workflow(dsl, temporal_cluster, mock_registry, auth_sandbox):
"""Test that we can have multiple executions of the same workflow running at the same time."""
async def test_stress_workflow(
dsl, num_workflows, temporal_cluster, mock_registry, auth_sandbox, benchmark
):
"""Multiple executions of the same workflow run at the same time."""
test_name = f"test_stress_workflow-{dsl.title}"
client = await get_temporal_client()

tasks: list[asyncio.Task] = []
async with (
Worker(
client,
task_queue=os.environ["TEMPORAL__CLUSTER_QUEUE"],
activities=DSLActivities.load(),
workflows=[DSLWorkflow],
workflow_runner=new_sandbox_runner(),
),
):
async with asyncio.TaskGroup() as tg:
# We can have multiple executions of the same workflow running at the same time
for i in range(100):
wf_exec_id = generate_test_exec_id(test_name + f"-{i}")
task = tg.create_task(
client.execute_workflow(
DSLWorkflow.run,
DSLRunArgs(dsl=dsl, role=ctx_role.get(), wf_id=TEST_WF_ID),
id=wf_exec_id,
task_queue=os.environ["TEMPORAL__CLUSTER_QUEUE"],
retry_policy=RetryPolicy(maximum_attempts=1),
async def run_worklows():
tasks: list[asyncio.Task] = []
async with (
Worker(
client,
task_queue=os.environ["TEMPORAL__CLUSTER_QUEUE"],
activities=DSLActivities.load(),
workflows=[DSLWorkflow],
workflow_runner=new_sandbox_runner(),
),
):
async with asyncio.TaskGroup() as tg:
# We can have multiple executions of the same workflow running at the same time
for i in range(num_workflows):
wf_exec_id = generate_test_exec_id(test_name + f"-{i}")
task = tg.create_task(
client.execute_workflow(
DSLWorkflow.run,
DSLRunArgs(dsl=dsl, role=ctx_role.get(), wf_id=TEST_WF_ID),
id=wf_exec_id,
task_queue=os.environ["TEMPORAL__CLUSTER_QUEUE"],
retry_policy=RetryPolicy(maximum_attempts=1),
)
)
)
tasks.append(task)
tasks.append(task)
return tasks

tasks = benchmark.pedantic(run_worklows, iterations=3, rounds=1)
assert all(task.done() for task in tasks)


Expand Down

0 comments on commit 685f26c

Please sign in to comment.