From c3594ae99e8e600fd23145ab977de463aade2c45 Mon Sep 17 00:00:00 2001 From: Pouyanpi <13303554+Pouyanpi@users.noreply.github.com> Date: Wed, 22 Oct 2025 10:23:02 +0200 Subject: [PATCH 1/4] fix(runtime): ensure stop flag is set for policy violations in parallel rails --- nemoguardrails/colang/v1_0/runtime/runtime.py | 21 ++- tests/test_parallel_rails.py | 129 ++++++++++++++++++ 2 files changed, 149 insertions(+), 1 deletion(-) diff --git a/nemoguardrails/colang/v1_0/runtime/runtime.py b/nemoguardrails/colang/v1_0/runtime/runtime.py index 0046d4878..19afd7e0c 100644 --- a/nemoguardrails/colang/v1_0/runtime/runtime.py +++ b/nemoguardrails/colang/v1_0/runtime/runtime.py @@ -311,7 +311,13 @@ async def _run_flows_in_parallel( # Wrapper function to help reverse map the task result to the flow ID async def task_call_helper(flow_uid, post_event, func, *args, **kwargs): result = await func(*args, **kwargs) - if post_event: + + has_stop = any( + event["type"] == "BotIntent" and event["intent"] == "stop" + for event in result + ) + + if post_event and not has_stop: result.append(post_event) args[1].append( {"type": "event", "timestamp": time(), "data": post_event} @@ -361,6 +367,7 @@ async def task_call_helper(flow_uid, post_event, func, *args, **kwargs): unique_flow_ids[flow_uid] = task stopped_task_results: List[dict] = [] + stopped_task_processing_log: List[dict] = [] # Process tasks as they complete using as_completed try: @@ -377,6 +384,9 @@ async def task_call_helper(flow_uid, post_event, func, *args, **kwargs): # If this flow had a stop event if has_stop: stopped_task_results = task_results[flow_id] + result + stopped_task_processing_log = task_processing_logs[ + flow_id + ].copy() # Cancel all remaining tasks for pending_task in tasks: @@ -433,6 +443,15 @@ async def task_call_helper(flow_uid, post_event, func, *args, **kwargs): finished_task_processing_logs.extend(task_processing_logs[flow_id]) if processing_log: + for plog in stopped_task_processing_log: + # Filter out "Listen" and "start_flow" events from task processing log + if plog["type"] == "event" and ( + plog["data"]["type"] == "Listen" + or plog["data"]["type"] == "start_flow" + ): + continue + processing_log.append(plog) + for plog in finished_task_processing_logs: # Filter out "Listen" and "start_flow" events from task processing log if plog["type"] == "event" and ( diff --git a/tests/test_parallel_rails.py b/tests/test_parallel_rails.py index 8c4d017c9..58b90a22b 100644 --- a/tests/test_parallel_rails.py +++ b/tests/test_parallel_rails.py @@ -152,3 +152,132 @@ async def test_parallel_rails_output_fail_2(): and result.response[0]["content"] == "I cannot express a term in the bot answer." ) + + +@pytest.mark.asyncio +async def test_parallel_rails_input_stop_flag(): + config = RailsConfig.from_path(os.path.join(CONFIGS_FOLDER, "parallel_rails")) + chat = TestChat( + config, + llm_completions=[ + "No", + "Hi there! How can I assist you with questions about the ABC Company today?", + "No", + ], + ) + + chat >> "hi, this is a blocked term." + result = await chat.app.generate_async(messages=chat.history, options=OPTIONS) + + stopped_rails = [rail for rail in result.log.activated_rails if rail.stop] + assert len(stopped_rails) == 1, "Expected exactly one stopped rail" + assert ( + "check blocked input terms" in stopped_rails[0].name + ), f"Expected 'check blocked input terms' rail to be stopped, got {stopped_rails[0].name}" + + +@pytest.mark.asyncio +async def test_parallel_rails_output_stop_flag(): + config = RailsConfig.from_path(os.path.join(CONFIGS_FOLDER, "parallel_rails")) + chat = TestChat( + config, + llm_completions=[ + "No", + "Hi there! This is a blocked term!", + "No", + ], + ) + + chat >> "hi!" + result = await chat.app.generate_async(messages=chat.history, options=OPTIONS) + + stopped_rails = [rail for rail in result.log.activated_rails if rail.stop] + assert len(stopped_rails) == 1, "Expected exactly one stopped rail" + assert ( + "check blocked output terms" in stopped_rails[0].name + ), f"Expected 'check blocked output terms' rail to be stopped, got {stopped_rails[0].name}" + + +@pytest.mark.asyncio +async def test_parallel_rails_client_code_pattern(): + config = RailsConfig.from_path(os.path.join(CONFIGS_FOLDER, "parallel_rails")) + chat = TestChat( + config, + llm_completions=[ + "No", + "Hi there! This is a blocked term!", + "No", + ], + ) + + chat >> "hi!" + result = await chat.app.generate_async(messages=chat.history, options=OPTIONS) + + activated_rails = result.log.activated_rails if result.log else None + assert activated_rails is not None, "Expected activated_rails to be present" + + rails_to_check = [ + "self check output", + "check blocked output terms $duration=1.0", + ] + rails_set = set(rails_to_check) + + stopping_rails = [rail for rail in activated_rails if rail.stop] + + assert len(stopping_rails) > 0, "Expected at least one stopping rail" + + blocked_rails = [] + for rail in stopping_rails: + if rail.name in rails_set: + blocked_rails.append(rail.name) + + assert ( + len(blocked_rails) == 1 + ), f"Expected exactly one blocked rail from our check list, got {len(blocked_rails)}: {blocked_rails}" + assert ( + "check blocked output terms $duration=1.0" in blocked_rails + ), f"Expected 'check blocked output terms $duration=1.0' to be blocked, got {blocked_rails}" + + for rail in activated_rails: + if ( + rail.name in rails_set + and rail.name != "check blocked output terms $duration=1.0" + ): + assert ( + not rail.stop + ), f"Non-blocked rail {rail.name} should not have stop=True" + + +@pytest.mark.asyncio +async def test_parallel_rails_multiple_activated_rails(): + config = RailsConfig.from_path(os.path.join(CONFIGS_FOLDER, "parallel_rails")) + chat = TestChat( + config, + llm_completions=[ + "No", + "Hi there! This is a blocked term!", + "No", + ], + ) + + chat >> "hi!" + result = await chat.app.generate_async(messages=chat.history, options=OPTIONS) + + activated_rails = result.log.activated_rails if result.log else None + assert activated_rails is not None, "Expected activated_rails to be present" + assert len(activated_rails) > 1, ( + f"Expected multiple activated_rails, got {len(activated_rails)}: " + f"{[rail.name for rail in activated_rails]}" + ) + + stopped_rails = [rail for rail in activated_rails if rail.stop] + assert len(stopped_rails) == 1, ( + f"Expected exactly one stopped rail, got {len(stopped_rails)}: " + f"{[rail.name for rail in stopped_rails]}" + ) + + rails_with_stop_true = [rail for rail in activated_rails if rail.stop is True] + assert len(rails_with_stop_true) == 1, ( + f"Expected exactly one rail with stop=True, got {len(rails_with_stop_true)}: " + f"{[rail.name for rail in rails_with_stop_true]}" + ) From 117dd93e4b083269a2316bc6ce75f1ddae8fa389 Mon Sep 17 00:00:00 2001 From: Pouyan <13303554+Pouyanpi@users.noreply.github.com> Date: Wed, 22 Oct 2025 10:57:13 +0200 Subject: [PATCH 2/4] Update nemoguardrails/colang/v1_0/runtime/runtime.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Pouyan <13303554+Pouyanpi@users.noreply.github.com> --- nemoguardrails/colang/v1_0/runtime/runtime.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nemoguardrails/colang/v1_0/runtime/runtime.py b/nemoguardrails/colang/v1_0/runtime/runtime.py index 19afd7e0c..8877d6b30 100644 --- a/nemoguardrails/colang/v1_0/runtime/runtime.py +++ b/nemoguardrails/colang/v1_0/runtime/runtime.py @@ -384,7 +384,7 @@ async def task_call_helper(flow_uid, post_event, func, *args, **kwargs): # If this flow had a stop event if has_stop: stopped_task_results = task_results[flow_id] + result - stopped_task_processing_log = task_processing_logs[ + stopped_task_processing_logs = task_processing_logs[ flow_id ].copy() From d18a19a9d314e123986b08d75621c4be312d0aa3 Mon Sep 17 00:00:00 2001 From: Pouyanpi <13303554+Pouyanpi@users.noreply.github.com> Date: Fri, 24 Oct 2025 14:09:41 +0200 Subject: [PATCH 3/4] refactor(runtime): deduplicate log filtering logic --- nemoguardrails/colang/v1_0/runtime/runtime.py | 28 ++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/nemoguardrails/colang/v1_0/runtime/runtime.py b/nemoguardrails/colang/v1_0/runtime/runtime.py index 8877d6b30..dee67ae1d 100644 --- a/nemoguardrails/colang/v1_0/runtime/runtime.py +++ b/nemoguardrails/colang/v1_0/runtime/runtime.py @@ -443,23 +443,19 @@ async def task_call_helper(flow_uid, post_event, func, *args, **kwargs): finished_task_processing_logs.extend(task_processing_logs[flow_id]) if processing_log: - for plog in stopped_task_processing_log: - # Filter out "Listen" and "start_flow" events from task processing log - if plog["type"] == "event" and ( - plog["data"]["type"] == "Listen" - or plog["data"]["type"] == "start_flow" - ): - continue - processing_log.append(plog) - for plog in finished_task_processing_logs: - # Filter out "Listen" and "start_flow" events from task processing log - if plog["type"] == "event" and ( - plog["data"]["type"] == "Listen" - or plog["data"]["type"] == "start_flow" - ): - continue - processing_log.append(plog) + def filter_and_append(logs, target_log): + for plog in logs: + # Filter out "Listen" and "start_flow" events from task processing log + if plog["type"] == "event" and ( + plog["data"]["type"] == "Listen" + or plog["data"]["type"] == "start_flow" + ): + continue + target_log.append(plog) + + filter_and_append(stopped_task_processing_log, processing_log) + filter_and_append(finished_task_processing_logs, processing_log) # We pack all events into a single event to add it to the event history. history_events = new_event_dict( From a9965490642e74436b107339d6897f1464a45e2d Mon Sep 17 00:00:00 2001 From: Pouyanpi <13303554+Pouyanpi@users.noreply.github.com> Date: Fri, 24 Oct 2025 14:36:55 +0200 Subject: [PATCH 4/4] rename stopped_task_processing_log variable --- nemoguardrails/colang/v1_0/runtime/runtime.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nemoguardrails/colang/v1_0/runtime/runtime.py b/nemoguardrails/colang/v1_0/runtime/runtime.py index dee67ae1d..172342f50 100644 --- a/nemoguardrails/colang/v1_0/runtime/runtime.py +++ b/nemoguardrails/colang/v1_0/runtime/runtime.py @@ -367,7 +367,7 @@ async def task_call_helper(flow_uid, post_event, func, *args, **kwargs): unique_flow_ids[flow_uid] = task stopped_task_results: List[dict] = [] - stopped_task_processing_log: List[dict] = [] + stopped_task_processing_logs: List[dict] = [] # Process tasks as they complete using as_completed try: @@ -454,7 +454,7 @@ def filter_and_append(logs, target_log): continue target_log.append(plog) - filter_and_append(stopped_task_processing_log, processing_log) + filter_and_append(stopped_task_processing_logs, processing_log) filter_and_append(finished_task_processing_logs, processing_log) # We pack all events into a single event to add it to the event history.