Skip to content

Commit

Permalink
Introduce DD traces & metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
roekatz committed Feb 14, 2024
1 parent bcfb9e5 commit 85ddf8b
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 105 deletions.
6 changes: 5 additions & 1 deletion packages/opal-common/opal_common/git/bundle_maker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from pathlib import Path
from typing import List, Optional, Set

from ddtrace import tracer
from git import Repo
from git.objects import Commit
from opal_common.engine import get_rego_package, is_data_module, is_policy_module
Expand Down Expand Up @@ -249,7 +250,10 @@ def make_bundle(self, commit: Commit) -> PolicyBundle:
logger.debug(f"Explicit manifest to be used: {explicit_manifest}")

for source_file in viewer.files(filter):
contents = source_file.read()
with tracer.trace(
"bundle_maker.git_file_read", resource=str(source_file.path)
):
contents = source_file.read()
path = source_file.path

if is_data_module(path):
Expand Down
44 changes: 44 additions & 0 deletions packages/opal-common/opal_common/monitoring/apm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from typing import Optional
from urllib.parse import urlparse
import os

from ddtrace import Span, patch, tracer, config
from ddtrace.filters import TraceFilter
from loguru import logger


def configure_apm(enable_apm: bool, service_name: str):
"""optionally enable datadog APM / profiler."""
if enable_apm:
logger.info("Enabling DataDog APM")

class FilterRootPathTraces(TraceFilter):
def process_trace(self, trace: list[Span]) -> Optional[list[Span]]:
for span in trace:
if span.parent_id is not None:
return trace

if url := span.get_tag("http.url"):
parsed_url = urlparse(url)

if parsed_url.path == "/":
return None

return trace

patch(fastapi=True, redis=True, asyncpg=True, aiohttp=True, celery=True)
tracer.configure(
settings={
"FILTERS": [
FilterRootPathTraces(),
]
}
)

# Override service name
config.fastapi["service_name"] = service_name
config.fastapi["request_span_name"] = f"{service_name}.request"

else:
logger.info("DataDog APM disabled")
tracer.configure(enabled=False)
52 changes: 52 additions & 0 deletions packages/opal-common/opal_common/monitoring/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import os
from typing import Optional

import datadog
from loguru import logger


def configure_metrics(
enable_metrics: bool, statsd_host: str, statsd_port: int, namespace: str = ""
):
if not enable_metrics:
logger.info("DogStatsD metrics disabled")
return
else:
logger.info(
"DogStatsD metrics enabled; statsd: {host}:{port}",
host=statsd_host,
port=statsd_port,
)

if not namespace:
namespace = os.environ.get("DD_SERVICE", "")

namespace = namespace.lower().replace("-", "_")
datadog.initialize(
statsd_host=statsd_host,
statsd_port=statsd_port,
statsd_namespace=f"permit.{namespace}",
)


def _format_tags(tags: Optional[dict[str, str]]) -> Optional[list[str]]:
if not tags:
return None

return [f"{k}:{v}" for k, v in tags.items()]


def increment(metric: str, tags: Optional[dict[str, str]] = None):
datadog.statsd.increment(metric, tags=_format_tags(tags))


def decrement(metric: str, tags: Optional[dict[str, str]] = None):
datadog.statsd.decrement(metric, tags=_format_tags(tags))


def gauge(metric: str, value: float, tags: Optional[dict[str, str]] = None):
datadog.statsd.gauge(metric, value, tags=_format_tags(tags))


def event(title: str, message: str, tags: Optional[dict[str, str]] = None):
datadog.statsd.event(title=title, message=message, tags=_format_tags(tags))
9 changes: 6 additions & 3 deletions packages/opal-common/opal_common/topics/publisher.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
from typing import Any, Optional, Set

from ddtrace import tracer
from fastapi_websocket_pubsub import PubSubClient, PubSubEndpoint, Topic, TopicList
from opal_common.logger import logger

Expand Down Expand Up @@ -131,10 +132,12 @@ def __init__(self, endpoint: PubSubEndpoint):
self._endpoint = endpoint
super().__init__()

async def _publish_impl(self, topics: TopicList, data: Any = None):
with tracer.trace("topic_publisher.publish", resource=str(topics)):
await self._endpoint.publish(topics=topics, data=data)

async def publish(self, topics: TopicList, data: Any = None):
await self._add_task(
asyncio.create_task(self._endpoint.publish(topics=topics, data=data))
)
await self._add_task(asyncio.create_task(self._publish_impl(topics, data)))


class ClientSideTopicPublisher(TopicPublisher):
Expand Down
4 changes: 3 additions & 1 deletion packages/opal-server/opal_server/git_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import aiofiles.os
import pygit2
from ddtrace import tracer
from git import Repo
from opal_common.async_utils import run_sync
from opal_common.git.bundle_maker import BundleMaker
Expand Down Expand Up @@ -177,7 +178,7 @@ async def fetch_and_notify_on_changes(
repo_lock = await self._get_repo_lock()
async with repo_lock:
with tracer.trace(
"scopes_service.fetch_and_notify_on_changes",
"git_policy_fetcher.fetch_and_notify_on_changes",
resource=self._scope_id,
):
if self._discover_repository(self._repo_path):
Expand Down Expand Up @@ -335,6 +336,7 @@ def _get_current_branch_head(self) -> str:
raise ValueError("Could not find current branch head")
return head_commit_hash

@tracer.wrap("git_policy_fetcher.make_bundle")
def make_bundle(self, base_hash: Optional[str] = None) -> PolicyBundle:
repo = Repo(str(self._repo_path))
bundle_maker = BundleMaker(
Expand Down
7 changes: 7 additions & 0 deletions packages/opal-server/opal_server/scopes/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from fastapi.responses import RedirectResponse
from fastapi_websocket_pubsub import PubSubEndpoint
from git import InvalidGitRepositoryError
from opal_common.monitoring import metrics
from opal_common.async_utils import run_sync
from opal_common.authentication.authz import (
require_peer_type,
Expand Down Expand Up @@ -277,6 +278,12 @@ async def get_scope_policy(
return await _generate_default_scope_bundle(scope_id)

async def _generate_default_scope_bundle(scope_id: str) -> PolicyBundle:
metrics.event(
"ScopeNotFound",
message=f"Scope {scope_id} not found. Serving default scope instead",
tags={"scope_id": scope_id},
)

try:
scope = await scopes.get("default")
fetcher = GitPolicyFetcher(
Expand Down
178 changes: 91 additions & 87 deletions packages/opal-server/opal_server/scopes/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import List, Optional, Set, cast

import git
from ddtrace import tracer
from fastapi_websocket_pubsub import PubSubEndpoint
from opal_common.git.commit_viewer import VersionedFile
from opal_common.logger import logger
Expand Down Expand Up @@ -122,102 +123,105 @@ async def sync_scope(
assert scope_id, ValueError("scope_id not set for sync_scope")
scope = await self._scopes.get(scope_id)

if not isinstance(scope.policy, GitPolicyScopeSource):
logger.warning("Non-git scopes are currently not supported!")
return
source = cast(GitPolicyScopeSource, scope.policy)

logger.debug(
f"Sync scope: {scope.scope_id} (remote: {source.url}, branch: {source.branch}, req_time: {req_time})"
)

callbacks = PolicyFetcherCallbacks()
if notify_on_changes:
callbacks = NewCommitsCallbacks(
base_dir=self._base_dir,
scope_id=scope.scope_id,
source=source,
pubsub_endpoint=self._pubsub_endpoint,
)

fetcher = GitPolicyFetcher(
self._base_dir,
scope.scope_id,
source,
callbacks=callbacks,
)
with tracer.trace("scopes_service.sync_scope", resource=scope.scope_id):
if not isinstance(scope.policy, GitPolicyScopeSource):
logger.warning("Non-git scopes are currently not supported!")
return
source = cast(GitPolicyScopeSource, scope.policy)

try:
await fetcher.fetch_and_notify_on_changes(
hinted_hash=hinted_hash, force_fetch=force_fetch, req_time=req_time
)
except Exception as e:
logger.exception(
f"Could not fetch policy for scope {scope.scope_id}, got error: {e}"
logger.debug(
f"Sync scope: {scope.scope_id} (remote: {source.url}, branch: {source.branch}, req_time: {req_time})"
)

async def delete_scope(self, scope_id: str):
logger.info(f"Delete scope: {scope_id}")
scope = await self._scopes.get(scope_id)
url = scope.policy.url

scopes = await self._scopes.all()
remove_repo_clone = True

for scope in scopes:
if scope.scope_id != scope_id and scope.policy.url == url:
logger.info(
f"found another scope with same remote url ({scope.scope_id}), skipping clone deletion"
callbacks = PolicyFetcherCallbacks()
if notify_on_changes:
callbacks = NewCommitsCallbacks(
base_dir=self._base_dir,
scope_id=scope.scope_id,
source=source,
pubsub_endpoint=self._pubsub_endpoint,
)
remove_repo_clone = False
break

if remove_repo_clone:
scope_dir = GitPolicyFetcher.repo_clone_path(
self._base_dir, cast(GitPolicyScopeSource, scope.policy)
fetcher = GitPolicyFetcher(
self._base_dir,
scope.scope_id,
source,
callbacks=callbacks,
)
shutil.rmtree(scope_dir, ignore_errors=True)

await self._scopes.delete(scope_id)

async def sync_scopes(self, only_poll_updates=False, notify_on_changes=True):
scopes = await self._scopes.all()
if only_poll_updates:
# Only sync scopes that have polling enabled (in a periodic check)
scopes = [scope for scope in scopes if scope.policy.poll_updates]

logger.info(
f"OPAL Scopes: syncing {len(scopes)} scopes in the background (polling updates: {only_poll_updates})"
)

fetched_source_ids = set()
skipped_scopes = []
for scope in scopes:
src_id = GitPolicyFetcher.source_id(scope.policy)

# Give priority to scopes that have a unique url per shard (so we'll clone all repos asap)
if src_id in fetched_source_ids:
skipped_scopes.append(scope)
continue

try:
await self.sync_scope(
scope=scope,
force_fetch=True,
notify_on_changes=notify_on_changes,
await fetcher.fetch_and_notify_on_changes(
hinted_hash=hinted_hash, force_fetch=force_fetch, req_time=req_time
)
except Exception as e:
logger.exception(f"sync_scope failed for {scope.scope_id}")

fetched_source_ids.add(src_id)
logger.exception(
f"Could not fetch policy for scope {scope.scope_id}, got error: {e}"
)

for scope in skipped_scopes:
# No need to refetch the same repo, just check for changes
try:
await self.sync_scope(
scope=scope,
force_fetch=False,
notify_on_changes=notify_on_changes,
async def delete_scope(self, scope_id: str):
with tracer.trace("scopes_service.delete_scope", resource=scope_id):
logger.info(f"Delete scope: {scope_id}")
scope = await self._scopes.get(scope_id)
url = scope.policy.url

scopes = await self._scopes.all()
remove_repo_clone = True

for scope in scopes:
if scope.scope_id != scope_id and scope.policy.url == url:
logger.info(
f"found another scope with same remote url ({scope.scope_id}), skipping clone deleteion"
)
remove_repo_clone = False
break

if remove_repo_clone:
scope_dir = GitPolicyFetcher.repo_clone_path(
self._base_dir, cast(GitPolicyScopeSource, scope.policy)
)
except Exception as e:
logger.exception(f"sync_scope failed for {scope.scope_id}")
shutil.rmtree(scope_dir, ignore_errors=True)

await self._scopes.delete(scope_id)

async def sync_scopes(self, only_poll_updates=False, notify_on_changes=True):
with tracer.trace("scopes_service.sync_scopes"):
scopes = await self._scopes.all()
if only_poll_updates:
# Only sync scopes that have polling enabled (in a periodic check)
scopes = [scope for scope in scopes if scope.policy.poll_updates]

logger.info(
f"OPAL Scopes: syncing {len(scopes)} scopes in the background (polling updates: {only_poll_updates})"
)

fetched_source_ids = set()
skipped_scopes = []
for scope in scopes:
src_id = GitPolicyFetcher.source_id(scope.policy)

# Give priority to scopes that have a unique url per shard (so we'll clone all repos asap)
if src_id in fetched_source_ids:
skipped_scopes.append(scope)
continue

try:
await self.sync_scope(
scope=scope,
force_fetch=True,
notify_on_changes=notify_on_changes,
)
except Exception as e:
logger.exception(f"sync_scope failed for {scope.scope_id}")

fetched_source_ids.add(src_id)

for scope in skipped_scopes:
# No need to refetch the same repo, just check for changes
try:
await self.sync_scope(
scope=scope,
force_fetch=False,
notify_on_changes=notify_on_changes,
)
except Exception as e:
logger.exception(f"sync_scope failed for {scope.scope_id}")
Loading

0 comments on commit 85ddf8b

Please sign in to comment.