Skip to content

feat: Add comprehensive analytics API with OpenTelemetry observability #1639

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,20 @@
- 🚢 Deployment instructions using Docker Compose, including how to set up a frontend Traefik proxy to handle automatic HTTPS certificates.
- 🏭 CI (continuous integration) and CD (continuous deployment) based on GitHub Actions.

## Analytics and Observability

This project now includes enhanced analytics capabilities and observability through OpenTelemetry:

* **New Analytics Endpoints**:
* `/api/v1/analytics/user-summary`: Provides a summary of user statistics, including total users, active/inactive counts, and signup trends (if user creation timestamps are available).
* `/api/v1/analytics/item-trends`: Provides a summary of item statistics, including total items and creation trends (if item creation timestamps are available).
These endpoints utilize Polars for efficient in-memory data aggregation.

* **OpenTelemetry Integration**:
* The backend is instrumented with OpenTelemetry for distributed tracing. This provides insights into request flows and database interactions.
* To export traces, configure the OTLP exporter endpoint via the environment variable: `OTEL_EXPORTER_OTLP_ENDPOINT="<your_otlp_collector_url:port>"` (e.g., `http://localhost:4317`).
* You can also customize the service name reported to your observability platform using the `OTEL_SERVICE_NAME` environment variable.

### Dashboard Login

[![API docs](img/login.png)](https://github.com/fastapi/full-stack-fastapi-template)
Expand Down
3 changes: 2 additions & 1 deletion backend/app/api/main.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from fastapi import APIRouter

from app.api.routes import items, login, private, users, utils
from app.api.routes import analytics, items, login, private, users, utils
from app.core.config import settings

api_router = APIRouter()
api_router.include_router(login.router)
api_router.include_router(users.router)
api_router.include_router(utils.router)
api_router.include_router(items.router)
api_router.include_router(analytics.router)


if settings.ENVIRONMENT == "local":
Expand Down
201 changes: 201 additions & 0 deletions backend/app/api/routes/analytics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
from datetime import date

import polars as pl
import pydantic # Ensure pydantic is imported
from fastapi import APIRouter
from opentelemetry import trace
from sqlmodel import select

from app.api.deps import SessionDep # SessionDep for dependency injection
from app.models import Item, User # Assuming User model is in app.models, Import Item


# Pydantic models for response
class UserSignupTrend(pydantic.BaseModel): # Corrected: pydantic.BaseModel
signup_date: date
count: int


class UserActivity(pydantic.BaseModel): # Corrected: pydantic.BaseModel
active_users: int
inactive_users: int


class UserAnalyticsSummary(pydantic.BaseModel): # Corrected: pydantic.BaseModel
total_users: int
signup_trends: list[UserSignupTrend]
activity_summary: UserActivity
# Add more fields as desired, e.g., average_items_per_user: float


# Pydantic models for Item analytics
class ItemCreationTrend(pydantic.BaseModel):
creation_date: date
count: int


# class ItemOwnerDistribution(pydantic.BaseModel): # Optional for now
# owner_id: str
# item_count: int


class ItemAnalyticsTrends(pydantic.BaseModel):
total_items: int
creation_trends: list[ItemCreationTrend]
# owner_distribution: List[ItemOwnerDistribution] # Optional


router = APIRouter(prefix="/analytics", tags=["analytics"])
tracer = trace.get_tracer(__name__)


@router.get("/user-summary", response_model=UserAnalyticsSummary)
def get_user_summary(
session: SessionDep,
) -> (
UserAnalyticsSummary
): # get_current_active_superuser is imported but not used here yet
with tracer.start_as_current_span("user_summary_endpoint"):
with tracer.start_as_current_span("fetch_all_users_sql"):
statement = select(User)
users_list = list(session.exec(statement).all())

if not users_list:
return UserAnalyticsSummary(
total_users=0,
signup_trends=[],
activity_summary=UserActivity(active_users=0, inactive_users=0),
)

with tracer.start_as_current_span("convert_users_to_polars"):
users_data = []
for user in users_list:
user_dict = {
"id": user.id, # No explicit str() casting for now, per instructions
"email": user.email,
"is_active": user.is_active,
"is_superuser": user.is_superuser,
"full_name": user.full_name,
}
# Attempt to get 'created_at' if it exists (it doesn't in the standard model)
if hasattr(user, "created_at") and user.created_at:
user_dict["created_at"] = user.created_at
users_data.append(user_dict)

if (
not users_data
): # Should not happen if users_list is not empty, but as a safe guard
return UserAnalyticsSummary(
total_users=0,
signup_trends=[],
activity_summary=UserActivity(active_users=0, inactive_users=0),
)

# Create DataFrame without explicit casting of 'id' first.
# If Polars errors on UUID, the instruction is to add:
# df_users = df_users.with_columns(pl.col('id').cast(pl.Utf8))
df_users = pl.DataFrame(users_data)

total_users = df_users.height # More idiomatic for Polars than len(df_users)

with tracer.start_as_current_span("calculate_user_activity_polars"):
active_users = df_users.filter(pl.col("is_active")).height
inactive_users = total_users - active_users

signup_trends_data = []
if (
"created_at" in df_users.columns
): # This will be false as User model has no created_at
with tracer.start_as_current_span("calculate_signup_trends_polars"):
# Ensure 'created_at' is a datetime type. If string, parse it.
# Assuming it's already a datetime.date or datetime.datetime from SQLModel
# If it's datetime, cast to date for daily trends
df_users_with_date = df_users.with_columns(
pl.col("created_at").cast(pl.Date).alias("signup_day")
)

signup_counts_df = (
df_users_with_date.group_by("signup_day")
.agg(pl.count().alias("count"))
.sort("signup_day")
)

signup_trends_data = [
UserSignupTrend(signup_date=row["signup_day"], count=row["count"])
for row in signup_counts_df.to_dicts()
]

return UserAnalyticsSummary(
total_users=total_users,
signup_trends=signup_trends_data, # Will be empty as 'created_at' is not in User model
activity_summary=UserActivity(
active_users=active_users, inactive_users=inactive_users
),
)


@router.get("/item-trends", response_model=ItemAnalyticsTrends)
def get_item_trends(session: SessionDep) -> ItemAnalyticsTrends:
with tracer.start_as_current_span("item_trends_endpoint"):
with tracer.start_as_current_span("fetch_all_items_sql"):
statement = select(Item)
items_list = list(session.exec(statement).all())

if not items_list:
return ItemAnalyticsTrends(
total_items=0,
creation_trends=[],
# owner_distribution=[] # Optional
)

with tracer.start_as_current_span("convert_items_to_polars"):
items_data = []
for item in items_list:
item_dict = {
"id": str(item.id), # Cast UUID to string
"title": item.title,
"description": item.description,
"owner_id": str(item.owner_id), # Cast UUID to string
}
# IMPORTANT: Item model does not have 'created_at'.
# This will result in empty creation_trends.
if hasattr(item, "created_at") and item.created_at:
item_dict["created_at"] = item.created_at
items_data.append(item_dict)

if not items_data: # Safety check
return ItemAnalyticsTrends(total_items=0, creation_trends=[])

df_items = pl.DataFrame(items_data)

total_items = df_items.height

creation_trends_data = []
if "created_at" in df_items.columns:
with tracer.start_as_current_span("calculate_item_creation_trends_polars"):
# Ensure 'created_at' is datetime, then cast to date for daily trends
df_items_with_date = df_items.with_columns(
pl.col("created_at").cast(pl.Date).alias("creation_day")
)

creation_counts_df = (
df_items_with_date.group_by("creation_day")
.agg(pl.count().alias("count"))
.sort("creation_day")
)

creation_trends_data = [
ItemCreationTrend(
creation_date=row["creation_day"], count=row["count"]
)
for row in creation_counts_df.to_dicts()
]

# Placeholder for owner distribution if implemented later
# owner_distribution_data = []

return ItemAnalyticsTrends(
total_items=total_items,
creation_trends=creation_trends_data,
# owner_distribution=owner_distribution_data # Optional
)
47 changes: 47 additions & 0 deletions backend/app/core/telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import os

from fastapi import FastAPI
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

from app.core.config import settings # To potentially get service name
from app.core.db import engine # Assuming engine is in app.core.db


def init_telemetry(app: FastAPI) -> None:
# Set service name, try from settings or default
service_name = getattr(settings, "OTEL_SERVICE_NAME", "fastapi-application")

resource = Resource(attributes={"service.name": service_name})

# Configure OTLP exporter only if endpoint is explicitly set
# This prevents connection attempts in CI/test environments
otlp_endpoint = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT")

provider = TracerProvider(resource=resource)

if otlp_endpoint:
span_exporter = OTLPSpanExporter(
endpoint=otlp_endpoint,
# insecure=True # Set to True if your collector is not using TLS. Adjust as needed.
)
processor = BatchSpanProcessor(span_exporter)
provider.add_span_processor(processor)

# Sets the global default tracer provider
trace.set_tracer_provider(provider)

# Instrument FastAPI
FastAPIInstrumentor.instrument_app(app)

# Instrument SQLAlchemy
# Ensure the engine is already configured/available when this is called.
SQLAlchemyInstrumentor().instrument(engine=engine)

# You can get a tracer instance and create spans if needed for custom instrumentation later
# tracer = trace.get_tracer(__name__)
4 changes: 4 additions & 0 deletions backend/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from app.api.main import api_router
from app.core.config import settings
from app.core.telemetry import init_telemetry # Import the new function


def custom_generate_unique_id(route: APIRoute) -> str:
Expand All @@ -20,6 +21,9 @@ def custom_generate_unique_id(route: APIRoute) -> str:
generate_unique_id_function=custom_generate_unique_id,
)

# Initialize OpenTelemetry
init_telemetry(app)

# Set all CORS enabled origins
if settings.all_cors_origins:
app.add_middleware(
Expand Down
56 changes: 56 additions & 0 deletions backend/app/tests/api/test_analytics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from fastapi.testclient import TestClient

from app.core.config import settings

# Assuming your FastAPI app instance is named 'app' in 'app.main'
# Adjust the import if your app instance is located elsewhere for tests
# from app.main import app

# Expected Pydantic response models (adjust import path if they are moved)
# from app.api.routes.analytics import UserAnalyticsSummary, ItemAnalyticsTrends


# Test for User Analytics Summary
def test_get_user_summary(client: TestClient) -> None:
response = client.get(f"{settings.API_V1_STR}/analytics/user-summary")
assert response.status_code == 200
data = response.json()

assert "total_users" in data
assert isinstance(data["total_users"], int)

assert "signup_trends" in data
assert isinstance(data["signup_trends"], list)

assert "activity_summary" in data
assert "active_users" in data["activity_summary"]
assert "inactive_users" in data["activity_summary"]
assert isinstance(data["activity_summary"]["active_users"], int)
assert isinstance(data["activity_summary"]["inactive_users"], int)

# Check if signup_trends items have the correct structure if not empty
if data["signup_trends"]:
trend_item = data["signup_trends"][0]
assert "signup_date" in trend_item
assert "count" in trend_item
assert isinstance(trend_item["count"], int)


# Test for Item Analytics Trends
def test_get_item_trends(client: TestClient) -> None:
response = client.get(f"{settings.API_V1_STR}/analytics/item-trends")
assert response.status_code == 200
data = response.json()

assert "total_items" in data
assert isinstance(data["total_items"], int)

assert "creation_trends" in data
assert isinstance(data["creation_trends"], list)

# Check if creation_trends items have the correct structure if not empty
if data["creation_trends"]:
trend_item = data["creation_trends"][0]
assert "creation_date" in trend_item
assert "count" in trend_item
assert isinstance(trend_item["count"], int)
9 changes: 8 additions & 1 deletion backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,21 @@ dependencies = [
"pydantic-settings<3.0.0,>=2.2.1",
"sentry-sdk[fastapi]<2.0.0,>=1.40.6",
"pyjwt<3.0.0,>=2.8.0",
"duckdb",
"polars",
"opentelemetry-api",
"opentelemetry-sdk",
"opentelemetry-exporter-otlp",
"opentelemetry-instrumentation-fastapi",
"opentelemetry-instrumentation-sqlalchemy",
]

[tool.uv]
dev-dependencies = [
"pytest<8.0.0,>=7.4.3",
"mypy<2.0.0,>=1.8.0",
"ruff<1.0.0,>=0.2.2",
"pre-commit<4.0.0,>=3.6.2",
"pre-commit>=3.6.2,<4.0.0",
"types-passlib<2.0.0.0,>=1.7.7.20240106",
"coverage<8.0.0,>=7.4.3",
]
Expand Down
Loading
Loading