Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
f2f6029
add watchdog
s-gavrenkov Aug 15, 2025
d03b904
5 attempts
s-gavrenkov Aug 15, 2025
7f18f48
watchdog_additional
s-gavrenkov Aug 19, 2025
c1f6631
fixed version
s-gavrenkov Aug 19, 2025
a7820bf
add env USE_NIM_WATCHDOG
s-gavrenkov Aug 19, 2025
ca59c04
add env USE_NIM_WATCHDOG
s-gavrenkov Aug 20, 2025
9dc5f8a
fixed lint
s-gavrenkov Aug 20, 2025
3324fee
Reconcile dependencies, updated IDs, tags
svc-harness-git2 Aug 20, 2025
fac5559
[BUZZOK-27100] Fix Drum Inline Runner and streamline DRUM options gen…
mjnitz02 Aug 12, 2025
fdf706b
[-] (Auto) Bump env_info versions (#1622)
svc-harness-git2 Aug 13, 2025
8cb12f7
Bump keras in /public_dropin_environments/python3_keras (#1624)
dependabot[bot] Aug 13, 2025
6b3f28e
Configure OTel metrics by default in drum. (#1620)
nickolai-dr Aug 13, 2025
06797e7
[RAPTOR-14453] Regen requirements.txt to fix CVE-2025-8747 (#1623)
nullspoon Aug 13, 2025
f5a415b
[BUZZOK-27241] Update DRUM version to 1.16.22 to add support for `def…
mjnitz02 Aug 13, 2025
051d708
[BUZZOK-27241] [BUZZOK-27421] Bump requirements in GenAI Agents envir…
mjnitz02 Aug 14, 2025
aa6a984
[CFX-3334] Update to latest drgithelper and properly set permissions …
c-h-russell-walker Aug 16, 2025
3670e4e
Add OTEL logging configuration, refactor traces and metrics. (#1626)
nickolai-dr Aug 18, 2025
21ffc47
Bump DRUM version. (#1631)
nickolai-dr Aug 19, 2025
607df3e
updated requrements
s-gavrenkov Aug 20, 2025
4d5bce2
updated requrements
s-gavrenkov Aug 20, 2025
2d3f924
Merge remote-tracking branch 'origin/gavrenkov/poc_drum_watchdog' int…
s-gavrenkov Aug 20, 2025
970fc6c
Reconcile dependencies, updated IDs, tags
svc-harness-git2 Aug 20, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions custom_model_runner/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

#### [1.16.23] - 2025-08-18
##### Changed
- Add OTEL metrics and logs configuration.

#### [1.16.22] - 2025-08-12
##### Changed
- Add support for kwargs and headers to generative ai chat models
- Fix support for drum inline execution

#### [1.16.21] - 2025-07-16
##### Removed
- Removed PMML support
Expand Down
94 changes: 59 additions & 35 deletions custom_model_runner/datarobot_drum/drum/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,24 @@
PayloadFormat,
)
from datarobot_drum.drum.exceptions import DrumCommonException
from datarobot_drum.drum.lazy_loading.lazy_loading_handler import LazyLoadingHandler
from datarobot_drum.runtime_parameters.runtime_parameters import RuntimeParametersLoader
from opentelemetry import trace, context
from opentelemetry import trace, context, metrics
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter

from opentelemetry._logs import set_logger_provider
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.sdk._logs.export import SimpleLogRecordProcessor

from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator


ctx_request_id = ContextVar("request_id")


Expand Down Expand Up @@ -139,7 +146,39 @@ def make_otel_endpoint(datarobot_endpoint):
return result


def setup_tracer(runtime_parameters, options):
def _setup_otel_logging(resource, multiprocessing=False):
logger_provider = LoggerProvider(resource=resource)
set_logger_provider(logger_provider)
exporter = OTLPLogExporter()
if multiprocessing:
logger_provider.add_log_record_processor(SimpleLogRecordProcessor(exporter))
else:
logger_provider.add_log_record_processor(BatchLogRecordProcessor(exporter))
handler = LoggingHandler(level=logging.DEBUG, logger_provider=logger_provider)
logging.getLogger().addHandler(handler)
return logger_provider


def _setup_otel_metrics(resource):
metric_exporter = OTLPMetricExporter()
metric_reader = PeriodicExportingMetricReader(metric_exporter)
metric_provider = MeterProvider(metric_readers=[metric_reader], resource=resource)
metrics.set_meter_provider(metric_provider)
return metric_provider


def _setup_otel_tracing(resource, multiprocessing=False):
otlp_exporter = OTLPSpanExporter()
trace_provider = TracerProvider(resource=resource)
if multiprocessing:
trace_provider.add_span_processor(SimpleSpanProcessor(otlp_exporter))
else:
trace_provider.add_span_processor(BatchSpanProcessor(otlp_exporter))
trace.set_tracer_provider(trace_provider)
return trace_provider


def setup_otel(runtime_parameters, options):
"""Setups OTEL tracer.

OTEL is configured by OTEL_EXPORTER_OTLP_ENDPOINT and
Expand All @@ -153,25 +192,20 @@ def setup_tracer(runtime_parameters, options):
command argumetns
Returns
-------
TracerProvider
(TracerProvider, MetricProvider)
"""
log = get_drum_logger("setup_tracer")
log = get_drum_logger("setup_otel")

# Can be used to disable OTEL reporting from env var parameters
# https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/
if runtime_parameters.has("OTEL_SDK_DISABLED") and os.environ.get("OTEL_SDK_DISABLED"):
log.info("Tracing explictly disabled")
return

main_endpoint = os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT")
trace_endpoint = os.environ.get("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT")
if not main_endpoint and not trace_endpoint:
log.info("Tracing is not configured")
return
log.info("OTEL explictly disabled")
return (None, None, None)

resource = Resource.create()
otlp_exporter = OTLPSpanExporter()
provider = TracerProvider(resource=resource)
endpoint = os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT")
if not endpoint:
log.info("OTEL is not configured")
return (None, None, None)

# In case of NIM flask server is configured to run in multiprocessing
# mode that uses fork. Since BatchSpanProcessor start background thread
Expand All @@ -180,16 +214,15 @@ def setup_tracer(runtime_parameters, options):
# missing due to process exits before all data offloaded. In forking
# case we use SimpleSpanProcessor (mostly NIMs) otherwise BatchSpanProcessor
# (most frequent case)
if options.max_workers > 1:
provider.add_span_processor(SimpleSpanProcessor(otlp_exporter))
else:
provider.add_span_processor(BatchSpanProcessor(otlp_exporter))
multiprocessing = options.max_workers > 1

trace.set_tracer_provider(provider)
resource = Resource.create()
trace_provider = _setup_otel_tracing(resource=resource, multiprocessing=multiprocessing)
logger_provider = _setup_otel_logging(resource=resource, multiprocessing=multiprocessing)
metric_provider = _setup_otel_metrics(resource=resource)

endpoint = main_endpoint or trace_endpoint
log.info(f"Tracing is configured with endpoint: {endpoint}")
return provider
log.info(f"OTEL is configured with endpoint: {endpoint}")
return trace_provider, metric_provider, logger_provider


@contextmanager
Expand Down Expand Up @@ -232,12 +265,3 @@ def extract_chat_response_attributes(response):
# last completion wins
attributes["gen_ai.completion"] = m.get("content")
return attributes


def setup_required_environment_variables(options):
if "runtime_params_file" in options and options.runtime_params_file:
loader = RuntimeParametersLoader(options.runtime_params_file, options.code_dir)
loader.setup_environment_variables()

if "lazy_loading_file" in options and options.lazy_loading_file:
LazyLoadingHandler.setup_environment_variables_from_values_file(options.lazy_loading_file)
2 changes: 1 addition & 1 deletion custom_model_runner/datarobot_drum/drum/description.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@
This is proprietary source code of DataRobot, Inc. and its affiliates.
Released under the terms of DataRobot Tool and Utility Agreement.
"""
version = "1.16.20"
version = "1.16.23"
__version__ = version
project_name = "datarobot-drum"
44 changes: 12 additions & 32 deletions custom_model_runner/datarobot_drum/drum/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,13 @@
import signal
import sys

from datarobot_drum.drum.args_parser import CMRunnerArgsRegistry
from datarobot_drum.drum.common import (
config_logging,
setup_tracer,
setup_required_environment_variables,
)
from datarobot_drum.drum.common import config_logging, setup_otel
from datarobot_drum.drum.utils.setup import setup_options
from datarobot_drum.drum.enum import RunMode
from datarobot_drum.drum.enum import ExitCodes
from datarobot_drum.drum.exceptions import DrumSchemaValidationException
from datarobot_drum.drum.runtime import DrumRuntime
from datarobot_drum.runtime_parameters.runtime_parameters import (
RuntimeParametersLoader,
RuntimeParameters,
)

Expand All @@ -75,39 +70,24 @@ def signal_handler(sig, frame):
# Let traceer offload accumulated spans before shutdown.
if runtime.trace_provider is not None:
runtime.trace_provider.shutdown()
if runtime.metric_provider is not None:
runtime.metric_provider.shutdown()
if runtime.log_provider is not None:
runtime.log_provider.shutdown()

os._exit(130)

arg_parser = CMRunnerArgsRegistry.get_arg_parser()

try:
import argcomplete
except ImportError:
print(
"WARNING: autocompletion of arguments is not supported "
"as 'argcomplete' package is not found",
file=sys.stderr,
)
else:
# argcomplete call should be as close to the beginning as possible
argcomplete.autocomplete(arg_parser)

CMRunnerArgsRegistry.extend_sys_argv_with_env_vars()

options = arg_parser.parse_args()
CMRunnerArgsRegistry.verify_options(options)

try:
setup_required_environment_variables(options)
options = setup_options()
runtime.options = options
except Exception as exc:
print(str(exc))
exit(255)

if RuntimeParameters.has("CUSTOM_MODEL_WORKERS"):
options.max_workers = RuntimeParameters.get("CUSTOM_MODEL_WORKERS")
runtime.options = options

runtime.trace_provider = setup_tracer(RuntimeParameters, options)
trace_provider, metric_provider, log_provider = setup_otel(RuntimeParameters, options)
runtime.trace_provider = trace_provider
runtime.metric_provider = metric_provider
runtime.log_provider = log_provider

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
from typing import Generator, List

from datarobot_drum.drum.args_parser import CMRunnerArgsRegistry
from datarobot_drum.drum.common import setup_required_environment_variables, setup_tracer
from datarobot_drum.drum.common import setup_otel
from datarobot_drum.drum.utils.setup import setup_options
from datarobot_drum.drum.drum import CMRunner
from datarobot_drum.drum.language_predictors.base_language_predictor import BaseLanguagePredictor
from datarobot_drum.drum.runtime import DrumRuntime
Expand Down Expand Up @@ -68,14 +69,23 @@ def drum_inline_predictor(
target_type,
*cmd_args,
]
options = arg_parser.parse_args(args)
CMRunnerArgsRegistry.verify_options(options)
setup_required_environment_variables(options)

runtime.options = options
setup_tracer(RuntimeParameters, options)
try:
options = setup_options(args)
runtime.options = options
except Exception as exc:
print(str(exc))
exit(255)

trace_provider, metric_provider, log_provider = setup_otel(RuntimeParameters, options)
runtime.cm_runner = CMRunner(runtime)
params = runtime.cm_runner.get_predictor_params()
predictor = GenericPredictorComponent(params)

yield predictor.predictor
if trace_provider is not None:
trace_provider.shutdown()
if metric_provider is not None:
metric_provider.shutdown()
if log_provider is not None:
log_provider.shutdown()
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
Released under the terms of DataRobot Tool and Utility Agreement.
"""
import logging
import os
import sys
import time
from pathlib import Path
from threading import Thread

import requests
from flask import Response, jsonify, request
Expand All @@ -22,6 +25,7 @@
ModelInfoKeys,
RunLanguage,
TargetType,
URL_PREFIX_ENV_VAR_NAME,
)
from datarobot_drum.drum.exceptions import DrumCommonException
from datarobot_drum.drum.model_metadata import read_model_metadata_yaml
Expand Down Expand Up @@ -71,6 +75,7 @@ def __init__(self, params: dict):
"run_predictor_total", "finish", StatsOperation.SUB, "start"
)
self._predictor = self._setup_predictor()
self._server_watchdog = None

def _setup_predictor(self):
if self._run_language == RunLanguage.PYTHON:
Expand Down Expand Up @@ -301,10 +306,95 @@ def _run_flask_app(self, app):
processes = self._params.get("processes")
logger.info("Number of webserver processes: %s", processes)
try:
if str(os.environ.get("USE_NIM_WATCHDOG", "false")).lower() in ["true", "1", "yes"]:
# Start the watchdog thread before running the app
self._server_watchdog = Thread(
target=self.watchdog,
args=(port,), # Pass host and port as arguments
daemon=True,
name="OpenAI Watchdog",
)
self._server_watchdog.start()

app.run(host, port, threaded=False, processes=processes)
except OSError as e:
raise DrumCommonException("{}: host: {}; port: {}".format(e, host, port))

def watchdog(self, port):
"""
Watchdog thread that periodically checks if the server is alive by making
GET requests to the /ping/ endpoint. Makes 3 attempts with quadratic backoff
before terminating the Flask app.
"""

logger.info("Starting watchdog to monitor server health...")

import os

url_host = os.environ.get("TEST_URL_HOST", "localhost")
url_prefix = os.environ.get(URL_PREFIX_ENV_VAR_NAME, "")
health_url = f"http://{url_host}:{port}/{url_prefix}/info/"

request_timeout = 120
check_interval = 10 # seconds
max_attempts = 5

attempt = 0
base_sleep_time = 2

while True:
try:
# Check if server is responding to health checks
logger.debug(f"Server health check")
response = requests.get(health_url, timeout=request_timeout)
logger.debug(f"Server health check status: {response.status_code}")
# Connection succeeded, reset attempts and wait for next check
attempt = 0
time.sleep(check_interval) # Regular check interval
continue

except Exception as e:
attempt += 1
logger.error(f"health_url {health_url}")
logger.error(
f"Server health check failed (attempt {attempt}/{max_attempts}): {str(e)}"
)

if attempt >= max_attempts:
logger.error(
"All health check attempts failed. Forcefully killing all processes."
)

# First try clean termination
try:
self._terminate()
except Exception as e:
logger.error(f"Error during clean termination: {str(e)}")

# Force kill all processes
import subprocess

# Use more direct system commands to kill processes
try:
# Kill packedge jobs first (more aggressive approach)
logger.info("Killing Python package jobs")
# Run `busybox ps` and capture output
result = subprocess.run(["busybox", "ps"], capture_output=True, text=True)
# Parse lines, skip the header
lines = result.stdout.strip().split("\n")[1:]
# Extract the PID (first column)
pids = [int(line.split()[0]) for line in lines]
for pid in pids:
print("Killing pid:", pid)
subprocess.run(f"kill {pid}", shell=True)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semgrep identified an issue in your code:
Found 'subprocess' function 'run' with 'shell=True'. This is dangerous because this call will spawn the command using a shell process. Doing so propagates current shell settings and variables, which makes it much easier for a malicious actor to execute commands. Use 'shell=False' instead.

To resolve this comment:

✨ Commit Assistant fix suggestion

Suggested change
subprocess.run(f"kill {pid}", shell=True)
subprocess.run(f"kill {pid}", shell=False)
View step-by-step instructions
  1. Change the subprocess.run(f"kill {pid}", shell=True) call to avoid using the shell.
  2. Update the line to pass the command and arguments as a list, like this: subprocess.run(["kill", str(pid)]).
    This prevents shell injection vulnerabilities and makes the command execution safer, especially when working with dynamic input.
💬 Ignore this finding

Reply with Semgrep commands to ignore this finding.

  • /fp <comment> for false positive
  • /ar <comment> for acceptable risk
  • /other <comment> for all other reasons

Alternatively, triage in Semgrep AppSec Platform to ignore the finding created by subprocess-shell-true.

You can view more details about this finding in the Semgrep AppSec Platform.

except Exception as kill_error:
logger.error(f"Error during process killing: {str(kill_error)}")

# Quadratic backoff
sleep_time = base_sleep_time * (attempt**2)
logger.info(f"Retrying in {sleep_time} seconds...")
time.sleep(sleep_time)

def terminate(self):
terminate_op = getattr(self._predictor, "terminate", None)
if callable(terminate_op):
Expand Down
Loading