Skip to content

Commit

Permalink
Merge pull request #1258 from mskcc/feature/dd_trace
Browse files Browse the repository at this point in the history
Feature/dd trace
  • Loading branch information
allanbolipata authored Jan 10, 2024
2 parents dac2d76 + 0c2521e commit 757d986
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 3 deletions.
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ addons:
- postgresql-11
- postgresql-client-11
python:
- "3.7.3"
- "3.8"
before_install:
- sudo apt-get update
- sudo apt-get --yes remove postgresql\*
Expand All @@ -20,6 +20,7 @@ before_install:
- sudo service postgresql restart 11
install:
- psql -p 5433 -c 'create database travis_ci_test;' -U postgres
- pip install --upgrade pip
- pip install -r requirements-dev.txt
- source travis_env.sh
- python manage.py migrate
Expand Down
1 change: 1 addition & 0 deletions beagle/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
"drf_multiple_model",
"drf_yasg",
"advanced_filters",
"ddtrace.contrib.django",
]


Expand Down
6 changes: 5 additions & 1 deletion beagle_etl/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from file_system.repository import FileRepository
from notifier.tasks import send_notification
from notifier.events import ETLImportEvent, ETLJobsLinksEvent, PermissionDeniedEvent, SendEmailEvent

from ddtrace import tracer

logger = logging.getLogger(__name__)

Expand All @@ -40,6 +40,7 @@ def fetch_request_nats():


@shared_task
@tracer.wrap(service="beagle")
def process_smile_events():
update_requests = set()

Expand All @@ -60,6 +61,9 @@ def process_smile_events():
for message in messages:
if message.request_id in update_requests:
update_requests.remove(message.request_id)
current_span = tracer.current_span()
request_id = message.request_id
current_span.set_tag("request.id", request_id)
logger.info(f"New request: {message.request_id}")
new_request.delay(str(message.id))

Expand Down
2 changes: 1 addition & 1 deletion container/beagle.def
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ Includecmd: no

python3 ${BEAGLE_PATH}/manage.py migrate --noinput
python3 ${BEAGLE_PATH}/manage.py collectstatic --noinput
gunicorn beagle.wsgi --log-file $BEAGLE_LOG_PATH --bind 0.0.0.0:$BEAGLE_PORT --threads 10 --pythonpath ${BEAGLE_PATH} > /dev/null 2>&1 < /dev/null &
DD_SERVICE="beagle" DD_ENV="dev" DD_LOGS_INJECTION=true DD_TRACE_SAMPLE_RATE="1" DD_PROFILING_ENABLED=true ddtrace-run gunicorn beagle.wsgi --log-file $BEAGLE_LOG_PATH --bind 0.0.0.0:$BEAGLE_PORT --threads 10 --pythonpath ${BEAGLE_PATH} > /dev/null 2>&1 < /dev/null &

##############################
# start celery
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ asyncio-nats-client==0.11.5
nats-py==2.0.0
django-advanced-filters==1.4.0
sbpack==2022.3.16
ddtrace==1.7.3
7 changes: 7 additions & 0 deletions runner/operator/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from beagle_etl.models import Operator as OperatorModel
from runner.operator.operator_logger import OperatorLogger
from runner.run.objects.run_creator_object import RunCreator
from ddtrace import tracer


class Operator(object):
Expand Down Expand Up @@ -66,7 +67,10 @@ def failed_to_create_job(self, error):
operator_error = OperatorErrorSerializer(
data={"operator_name": self.model.slug, "request_id": self.request_id, "error": error}
)
current_span = tracer.current_span()
if operator_error.is_valid():
cmo_request_id = self.request_id
current_span.set_tag("request.id", cmo_request_id)
operator_error.save()
self.logger.info(
"Operator: %s failed to create a job for request_id: %s with error: %s"
Expand All @@ -77,6 +81,9 @@ def failed_to_create_job(self, error):

def ready_job(self, pipeline, tempo_inputs, job):
self._jobs.append(RunCreator(app=pipeline, inputs=job))
current_span = tracer.current_span()
cmo_request_id = self.request_id
span.set_tag("request.id", cmo_request_id)

def on_job_fail(self, run):
pass
Expand Down
5 changes: 5 additions & 0 deletions runner/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from study.objects import StudyObject
from study.models import JobGroupWatcher, JobGroupWatcherConfig
from django.http import HttpResponse
from ddtrace import tracer

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -187,9 +188,13 @@ def create_operator_run_from_jobs(


@shared_task
@tracer.wrap(service="beagle")
def create_jobs_from_request(
request_id, operator_id, job_group_id, job_group_notifier_id=None, pipeline=None, file_group=None, notify=False
):
current_span = tracer.current_span()
current_span.set_tag("request.id", request_id)

logger.info(format_log("Creating operator with %s" % operator_id, job_group_id=job_group_id, request_id=request_id))
operator_model = Operator.objects.get(id=operator_id)

Expand Down

0 comments on commit 757d986

Please sign in to comment.