From ecf55fb773444b4b01cd989f58411fb69940021f Mon Sep 17 00:00:00 2001 From: Anatolii Stehnii Date: Thu, 22 May 2025 23:33:27 +0300 Subject: [PATCH 1/3] Modify the way we store output --- .../python311_genai_agents/run_agent.py | 83 ++++++++----------- 1 file changed, 36 insertions(+), 47 deletions(-) diff --git a/public_dropin_environments/python311_genai_agents/run_agent.py b/public_dropin_environments/python311_genai_agents/run_agent.py index 9872246b9..6958831e5 100644 --- a/public_dropin_environments/python311_genai_agents/run_agent.py +++ b/public_dropin_environments/python311_genai_agents/run_agent.py @@ -18,6 +18,7 @@ import json import logging import os +from pathlib import Path import sys from typing import Any, cast @@ -36,9 +37,11 @@ root = logging.getLogger() +CURRENT_DIR = Path(__file__).parent +DEFAULT_OUTPUT_PATH = CURRENT_DIR / "output.log" + parser = argparse.ArgumentParser() -parser.add_argument("--user_prompt", type=str, default="", help="user_prompt for chat endpoint") -parser.add_argument("--extra_body", type=str, default="", help="extra_body for chat endpoint") +parser.add_argument("--chat_completion", type=str, default="", help="json string of chat completion") parser.add_argument( "--custom_model_dir", type=str, @@ -46,55 +49,28 @@ help="directory containing custom.py location", ) parser.add_argument("--output_path", type=str, default="", help="json output file location") -args = parser.parse_args() - -def setup_logging(logger: logging.Logger, output_path: str, log_level: int = logging.INFO) -> None: - if len(output_path) == 0: - output_path = "output.log" - else: - output_path = f"{output_path}.log" +def setup_logging(logger: logging.Logger, log_level: int = logging.INFO) -> None: logger.setLevel(log_level) handler_stream = logging.StreamHandler(sys.stdout) handler_stream.setLevel(log_level) - formatter = logging.Formatter("%(message)s") - handler_stream.setFormatter(formatter) - - if os.path.exists(output_path): - os.remove(output_path) - handler_file = logging.FileHandler(output_path) - handler_file.setLevel(log_level) - formatter_file = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") - handler_file.setFormatter(formatter_file) + formatter_stream = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") + handler_stream.setFormatter(formatter_stream) logger.addHandler(handler_stream) - logger.addHandler(handler_file) -def construct_prompt(user_prompt: str, extra_body: str) -> Any: - extra_body_params = json.loads(extra_body) if extra_body else {} +def construct_prompt(chat_completion: str) -> Any: + chat_completion = json.loads(chat_completion) if chat_completion else {} completion_create_params = CompletionCreateParamsNonStreaming( - model="datarobot-deployed-llm", - messages=[ - ChatCompletionSystemMessageParam( - content="You are a helpful assistant", - role="system", - ), - ChatCompletionUserMessageParam( - content=user_prompt, - role="user", - ), - ], - n=1, - temperature=0.01, - extra_body=extra_body_params, # type: ignore[typeddict-unknown-key] + **chat_completion ) return completion_create_params def execute_drum( - user_prompt: str, extra_body: str, custom_model_dir: str, output_path: str + chat_completion: str, custom_model_dir: str, output_path: str ) -> ChatCompletion: root.info("Executing agent as [chat] endpoint. DRUM Executor.") root.info("Starting DRUM server.") @@ -123,7 +99,7 @@ def execute_drum( api_key="not-required", max_retries=0, ) - completion_create_params = construct_prompt(user_prompt, extra_body) + completion_create_params = construct_prompt(chat_completion) root.info("Executing Agent.") completion = client.chat.completions.create(**completion_create_params) @@ -140,13 +116,26 @@ def execute_drum( return cast(ChatCompletion, completion) -# Agent execution -if len(args.custom_model_dir) == 0: - args.custom_model_dir = os.path.join(os.getcwd(), "custom_model") -setup_logging(logger=root, output_path=args.output_path, log_level=logging.INFO) -result = execute_drum( - user_prompt=args.user_prompt, - extra_body=args.extra_body, - custom_model_dir=args.custom_model_dir, - output_path=args.output_path, -) +if __name__ == "__main__": + with open(DEFAULT_OUTPUT_PATH, "a") as f: + sys.stdout = f + sys.stderr = f + print("Parsing args") + args = parser.parse_args() + + output_path = args.output_path or DEFAULT_OUTPUT_PATH + with open(output_path, "a") as f: + sys.stdout = f + sys.stderr = f + + print("Setting up logging") + setup_logging(logger=root, log_level=logging.INFO) + if len(args.custom_model_dir) == 0: + args.custom_model_dir = CURRENT_DIR / "custom_model" + # Agent execution + root.info(f"Executing agent at {args.custom_model_dir}") + result = execute_drum( + chat_completion=args.chat_completion, + custom_model_dir=args.custom_model_dir, + output_path=args.output_path, + ) From 8019513dfafc3154c5591336f0ba036a7577d2bd Mon Sep 17 00:00:00 2001 From: Anatolii Stehnii Date: Fri, 23 May 2025 01:17:48 +0300 Subject: [PATCH 2/3] Experimental --- .../python311_genai_agents/run_agent.py | 83 ++++++----- .../run_agent_experimental.py | 134 ++++++++++++++++++ 2 files changed, 181 insertions(+), 36 deletions(-) create mode 100644 public_dropin_environments/python311_genai_agents/run_agent_experimental.py diff --git a/public_dropin_environments/python311_genai_agents/run_agent.py b/public_dropin_environments/python311_genai_agents/run_agent.py index 6958831e5..9872246b9 100644 --- a/public_dropin_environments/python311_genai_agents/run_agent.py +++ b/public_dropin_environments/python311_genai_agents/run_agent.py @@ -18,7 +18,6 @@ import json import logging import os -from pathlib import Path import sys from typing import Any, cast @@ -37,11 +36,9 @@ root = logging.getLogger() -CURRENT_DIR = Path(__file__).parent -DEFAULT_OUTPUT_PATH = CURRENT_DIR / "output.log" - parser = argparse.ArgumentParser() -parser.add_argument("--chat_completion", type=str, default="", help="json string of chat completion") +parser.add_argument("--user_prompt", type=str, default="", help="user_prompt for chat endpoint") +parser.add_argument("--extra_body", type=str, default="", help="extra_body for chat endpoint") parser.add_argument( "--custom_model_dir", type=str, @@ -49,28 +46,55 @@ help="directory containing custom.py location", ) parser.add_argument("--output_path", type=str, default="", help="json output file location") +args = parser.parse_args() + +def setup_logging(logger: logging.Logger, output_path: str, log_level: int = logging.INFO) -> None: + if len(output_path) == 0: + output_path = "output.log" + else: + output_path = f"{output_path}.log" -def setup_logging(logger: logging.Logger, log_level: int = logging.INFO) -> None: logger.setLevel(log_level) handler_stream = logging.StreamHandler(sys.stdout) handler_stream.setLevel(log_level) - formatter_stream = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") - handler_stream.setFormatter(formatter_stream) + formatter = logging.Formatter("%(message)s") + handler_stream.setFormatter(formatter) + + if os.path.exists(output_path): + os.remove(output_path) + handler_file = logging.FileHandler(output_path) + handler_file.setLevel(log_level) + formatter_file = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") + handler_file.setFormatter(formatter_file) logger.addHandler(handler_stream) + logger.addHandler(handler_file) -def construct_prompt(chat_completion: str) -> Any: - chat_completion = json.loads(chat_completion) if chat_completion else {} +def construct_prompt(user_prompt: str, extra_body: str) -> Any: + extra_body_params = json.loads(extra_body) if extra_body else {} completion_create_params = CompletionCreateParamsNonStreaming( - **chat_completion + model="datarobot-deployed-llm", + messages=[ + ChatCompletionSystemMessageParam( + content="You are a helpful assistant", + role="system", + ), + ChatCompletionUserMessageParam( + content=user_prompt, + role="user", + ), + ], + n=1, + temperature=0.01, + extra_body=extra_body_params, # type: ignore[typeddict-unknown-key] ) return completion_create_params def execute_drum( - chat_completion: str, custom_model_dir: str, output_path: str + user_prompt: str, extra_body: str, custom_model_dir: str, output_path: str ) -> ChatCompletion: root.info("Executing agent as [chat] endpoint. DRUM Executor.") root.info("Starting DRUM server.") @@ -99,7 +123,7 @@ def execute_drum( api_key="not-required", max_retries=0, ) - completion_create_params = construct_prompt(chat_completion) + completion_create_params = construct_prompt(user_prompt, extra_body) root.info("Executing Agent.") completion = client.chat.completions.create(**completion_create_params) @@ -116,26 +140,13 @@ def execute_drum( return cast(ChatCompletion, completion) -if __name__ == "__main__": - with open(DEFAULT_OUTPUT_PATH, "a") as f: - sys.stdout = f - sys.stderr = f - print("Parsing args") - args = parser.parse_args() - - output_path = args.output_path or DEFAULT_OUTPUT_PATH - with open(output_path, "a") as f: - sys.stdout = f - sys.stderr = f - - print("Setting up logging") - setup_logging(logger=root, log_level=logging.INFO) - if len(args.custom_model_dir) == 0: - args.custom_model_dir = CURRENT_DIR / "custom_model" - # Agent execution - root.info(f"Executing agent at {args.custom_model_dir}") - result = execute_drum( - chat_completion=args.chat_completion, - custom_model_dir=args.custom_model_dir, - output_path=args.output_path, - ) +# Agent execution +if len(args.custom_model_dir) == 0: + args.custom_model_dir = os.path.join(os.getcwd(), "custom_model") +setup_logging(logger=root, output_path=args.output_path, log_level=logging.INFO) +result = execute_drum( + user_prompt=args.user_prompt, + extra_body=args.extra_body, + custom_model_dir=args.custom_model_dir, + output_path=args.output_path, +) diff --git a/public_dropin_environments/python311_genai_agents/run_agent_experimental.py b/public_dropin_environments/python311_genai_agents/run_agent_experimental.py new file mode 100644 index 000000000..ca3356cf1 --- /dev/null +++ b/public_dropin_environments/python311_genai_agents/run_agent_experimental.py @@ -0,0 +1,134 @@ +# Copyright 2025 DataRobot, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: skip-file + +import argparse +import json +import logging +import os +from pathlib import Path +import sys +from typing import Any, cast + +import requests +import datarobot_drum +from datarobot_drum.drum.common import setup_tracer +from datarobot_drum.runtime_parameters.runtime_parameters import RuntimeParameters +from datarobot_drum.drum.adapters.model_adapters.python_model_adapter import PythonModelAdapter +from datarobot_drum.drum.enum import TargetType +from openai import OpenAI +from openai.types.chat import ( + ChatCompletion, + ChatCompletionSystemMessageParam, + ChatCompletionUserMessageParam, +) +from openai.types.chat.completion_create_params import ( + CompletionCreateParamsNonStreaming, +) + +root = logging.getLogger() + +CURRENT_DIR = Path(__file__).parent +DEFAULT_OUTPUT_PATH = CURRENT_DIR / "output.log" + +parser = argparse.ArgumentParser() +parser.add_argument("--chat_completion", type=str, default="", help="json string of chat completion") +parser.add_argument( + "--custom_model_dir", + type=str, + default="", + help="directory containing custom.py location", +) +parser.add_argument("--output_path", type=str, default="", help="json output file location") + + +def setup_logging(logger: logging.Logger, log_level: int = logging.INFO) -> None: + logger.setLevel(log_level) + handler_stream = logging.StreamHandler(sys.stdout) + handler_stream.setLevel(log_level) + formatter_stream = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") + handler_stream.setFormatter(formatter_stream) + + logger.addHandler(handler_stream) + + +def construct_prompt(chat_completion: str) -> Any: + chat_completion = json.loads(chat_completion) if chat_completion else {} + completion_create_params = CompletionCreateParamsNonStreaming( + **chat_completion + ) + return completion_create_params + + +def execute_drum( + chat_completion: str, custom_model_dir: str, output_path: str +) -> ChatCompletion: + #root.info("Setting up tracer") + #setup_tracer(RuntimeParameters) + root.info("Setting up model adapter") + os.environ["TARGET_NAME"] = "response" + model_adapter = PythonModelAdapter(custom_model_dir, target_type=TargetType.AGENTIC_WORKFLOW) + root.info("Model adapter set up. Loading hooks.") + model_adapter.load_custom_hooks() + root.info("Hooks loaded.") + + # Use a standard OpenAI client to call the DRUM server. This mirrors the behavior of a deployed agent. + root.info("Building prompt.") + completion_create_params = construct_prompt(chat_completion) + + root.info("Executing Agent.") + completion = model_adapter.chat(completion_create_params, model=None, association_id=None) + + # Continue outside the context manager to ensure the server is stopped and logs + # are flushed before we write the output + root.info(f"Storing result: {output_path}") + if len(output_path) == 0: + output_path = os.path.join(custom_model_dir, "output.json") + with open(output_path, "w") as fp: + fp.write(completion.to_json()) + + root.info(completion.to_json()) + return cast(ChatCompletion, completion) + + +if __name__ == "__main__": + with open(DEFAULT_OUTPUT_PATH, "a") as f: + sys.stdout = f + sys.stderr = f + print("Parsing args") + args = parser.parse_args() + + output_log_path = args.output_path + ".log" if args.output_path else DEFAULT_OUTPUT_PATH + with open(output_log_path, "a") as f: + sys.stdout = f + sys.stderr = f + + print("Setting up logging") + setup_logging(logger=root, log_level=logging.INFO) + if len(args.custom_model_dir) == 0: + args.custom_model_dir = CURRENT_DIR / "custom_model" + # Agent execution + root.info(f"Executing agent at {args.custom_model_dir}") + try: + result = execute_drum( + chat_completion=args.chat_completion, + custom_model_dir=args.custom_model_dir, + output_path=args.output_path, + ) + except Exception as e: + root.exception(f"Error executing agent: {e}") + sys.exit(1) + + sys.exit(0) From 788e298d5318ed0c82412c275562603f2ccd95ea Mon Sep 17 00:00:00 2001 From: Anatolii Stehnii Date: Fri, 23 May 2025 01:31:24 +0300 Subject: [PATCH 3/3] Original stdout stderr --- .../run_agent_experimental.py | 61 +++++++++++-------- 1 file changed, 34 insertions(+), 27 deletions(-) diff --git a/public_dropin_environments/python311_genai_agents/run_agent_experimental.py b/public_dropin_environments/python311_genai_agents/run_agent_experimental.py index ca3356cf1..91e0d038f 100644 --- a/public_dropin_environments/python311_genai_agents/run_agent_experimental.py +++ b/public_dropin_environments/python311_genai_agents/run_agent_experimental.py @@ -104,31 +104,38 @@ def execute_drum( if __name__ == "__main__": - with open(DEFAULT_OUTPUT_PATH, "a") as f: - sys.stdout = f - sys.stderr = f - print("Parsing args") - args = parser.parse_args() - - output_log_path = args.output_path + ".log" if args.output_path else DEFAULT_OUTPUT_PATH - with open(output_log_path, "a") as f: - sys.stdout = f - sys.stderr = f + stdout = sys.stdout + stderr = sys.stderr + try: + with open(DEFAULT_OUTPUT_PATH, "a") as f: + sys.stdout = f + sys.stderr = f + print("Parsing args") + args = parser.parse_args() - print("Setting up logging") - setup_logging(logger=root, log_level=logging.INFO) - if len(args.custom_model_dir) == 0: - args.custom_model_dir = CURRENT_DIR / "custom_model" - # Agent execution - root.info(f"Executing agent at {args.custom_model_dir}") - try: - result = execute_drum( - chat_completion=args.chat_completion, - custom_model_dir=args.custom_model_dir, - output_path=args.output_path, - ) - except Exception as e: - root.exception(f"Error executing agent: {e}") - sys.exit(1) - - sys.exit(0) + output_log_path = args.output_path + ".log" if args.output_path else DEFAULT_OUTPUT_PATH + with open(output_log_path, "a") as f: + sys.stdout = f + sys.stderr = f + + print("Setting up logging") + setup_logging(logger=root, log_level=logging.INFO) + if len(args.custom_model_dir) == 0: + args.custom_model_dir = CURRENT_DIR / "custom_model" + # Agent execution + root.info(f"Executing agent at {args.custom_model_dir}") + try: + result = execute_drum( + chat_completion=args.chat_completion, + custom_model_dir=args.custom_model_dir, + output_path=args.output_path, + ) + except Exception as e: + root.exception(f"Error executing agent: {e}") + except Exception: + pass + finally: + # Return to original stdout and stderr otherwise the kernel will fail to flush and + # hang + sys.stdout = stdout + sys.stderr = stderr