Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions src/pipecat/metrics/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,96 @@ class TTSUsageMetricsData(MetricsData):
value: int


class STTUsage(BaseModel):
"""Audio usage statistics for STT operations.

Parameters:
audio_duration_seconds: Duration of audio processed in seconds.
requests: Number of STT requests made.

# Content metrics (similar to TTS character counting)
word_count: Number of words transcribed.
character_count: Number of characters transcribed.

# Performance metrics
processing_time_seconds: Total processing time in seconds.
real_time_factor: Processing time / audio duration (< 1.0 is faster than real-time).
words_per_second: Words transcribed per second (throughput).
time_to_first_transcript: Time from audio start to first transcription (like TTFT in LLMs).
time_to_final_transcript: Time from audio start to final transcription.

# Quality metrics
average_confidence: Average confidence score (0.0 to 1.0).
word_error_rate: Word Error Rate percentage (if ground truth available).
proper_noun_accuracy: Proper noun transcription accuracy percentage.

# Audio metadata
sample_rate: Audio sample rate in Hz (e.g., 16000).
channels: Number of audio channels (1 for mono, 2 for stereo).
encoding: Audio encoding format (e.g., "LINEAR16", "OPUS").

# Cost tracking
cost_per_word: Cost per word transcribed.
estimated_cost: Estimated total cost for this transcription.

Calculation Examples:
# Words Per Second (WPS)
words_per_second = word_count / processing_time_seconds

# Real-Time Factor (RTF)
real_time_factor = processing_time_seconds / audio_duration_seconds
# RTF < 1.0 means faster than real-time (good!)
# RTF = 0.5 means processing took half the audio duration

# Word Error Rate (WER) - requires ground truth
wer = (substitutions + insertions + deletions) / total_reference_words * 100

# Cost Per Word
cost_per_word = estimated_cost / word_count

# Time to First Transcript (TTFT)
ttft = timestamp_first_transcript - audio_start_time
"""

audio_duration_seconds: float
requests: int = 1

# Content metrics
word_count: Optional[int] = None
character_count: Optional[int] = None

# Performance metrics
processing_time_seconds: Optional[float] = None
real_time_factor: Optional[float] = None # processing_time / audio_duration
words_per_second: Optional[float] = None # word_count / processing_time
time_to_first_transcript: Optional[float] = None # TTFT in seconds
time_to_final_transcript: Optional[float] = None # Total latency

# Quality metrics
average_confidence: Optional[float] = None # 0.0 to 1.0
word_error_rate: Optional[float] = None # WER percentage
proper_noun_accuracy: Optional[float] = None # Proper noun accuracy percentage

# Audio metadata
sample_rate: Optional[int] = None
channels: Optional[int] = None
encoding: Optional[str] = None

# Cost tracking
cost_per_word: Optional[float] = None
estimated_cost: Optional[float] = None


class STTUsageMetricsData(MetricsData):
"""Speech-to-Text usage metrics data.

Parameters:
value: Audio duration and request statistics for the STT operation.
"""

value: STTUsage


class SmartTurnMetricsData(MetricsData):
"""Metrics data for smart turn predictions.

Expand Down
43 changes: 43 additions & 0 deletions src/pipecat/processors/frame_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,49 @@ async def start_tts_usage_metrics(self, text: str):
if frame:
await self.push_frame(frame)

async def start_stt_usage_metrics(
self,
audio_duration: float,
transcript: Optional[str] = None,
processing_time: Optional[float] = None,
confidence: Optional[float] = None,
sample_rate: Optional[int] = None,
channels: Optional[int] = None,
encoding: Optional[str] = None,
cost_per_minute: Optional[float] = None,
ttft: Optional[float] = None,
ground_truth: Optional[str] = None,
):
"""Start enhanced STT usage metrics collection with automatic calculations.

Args:
audio_duration: Duration of audio processed in seconds (required).
transcript: The transcribed text (used to calculate word/character counts).
processing_time: Time taken to process the audio in seconds.
confidence: Average confidence score from 0.0 to 1.0.
sample_rate: Audio sample rate in Hz (e.g., 16000).
channels: Number of audio channels (1 for mono, 2 for stereo).
encoding: Audio encoding format (e.g., "LINEAR16", "OPUS").
cost_per_minute: Cost per minute of audio (for cost estimation).
ttft: Time to first transcript in seconds.
ground_truth: Reference transcript for WER calculation (optional, for testing).
"""
if self.can_generate_metrics() and self.usage_metrics_enabled:
frame = await self._metrics.start_stt_usage_metrics(
audio_duration=audio_duration,
transcript=transcript,
processing_time=processing_time,
confidence=confidence,
sample_rate=sample_rate,
channels=channels,
encoding=encoding,
cost_per_minute=cost_per_minute,
ttft=ttft,
ground_truth=ground_truth,
)
if frame:
await self.push_frame(frame)

async def stop_all_metrics(self):
"""Stop all active metrics collection."""
await self.stop_ttfb_metrics()
Expand Down
5 changes: 5 additions & 0 deletions src/pipecat/processors/frameworks/rtvi.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
from pipecat.metrics.metrics import (
LLMUsageMetricsData,
ProcessingMetricsData,
STTUsageMetricsData,
TTFBMetricsData,
TTSUsageMetricsData,
)
Expand Down Expand Up @@ -1197,6 +1198,10 @@ async def _handle_metrics(self, frame: MetricsFrame):
if "characters" not in metrics:
metrics["characters"] = []
metrics["characters"].append(d.model_dump(exclude_none=True))
elif isinstance(d, STTUsageMetricsData):
if "stt" not in metrics:
metrics["stt"] = []
metrics["stt"].append(d.model_dump(exclude_none=True))

message = RTVIMetricsMessage(data=metrics)
await self.send_rtvi_message(message)
Expand Down
184 changes: 184 additions & 0 deletions src/pipecat/processors/metrics/frame_processor_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
LLMUsageMetricsData,
MetricsData,
ProcessingMetricsData,
STTUsage,
STTUsageMetricsData,
TTFBMetricsData,
TTSUsageMetricsData,
)
Expand Down Expand Up @@ -44,6 +46,7 @@ def __init__(self):
self._start_ttfb_time = 0
self._start_processing_time = 0
self._last_ttfb_time = 0
self._last_processing_time = 0
self._should_report_ttfb = True

async def setup(self, task_manager: BaseTaskManager):
Expand Down Expand Up @@ -83,6 +86,22 @@ def ttfb(self) -> Optional[float]:

return None

@property
def processing_time(self) -> Optional[float]:
"""Get the current processing time value in seconds.

Returns:
The processing time value in seconds, or None if not measured.
"""
if self._last_processing_time > 0:
return self._last_processing_time

# If processing is in progress, calculate current value
if self._start_processing_time > 0:
return time.time() - self._start_processing_time

return None

def _processor_name(self):
"""Get the processor name from core metrics data."""
return self._core_metrics_data.processor
Expand Down Expand Up @@ -149,6 +168,7 @@ async def stop_processing_metrics(self):
return None

value = time.time() - self._start_processing_time
self._last_processing_time = value
logger.debug(f"{self._processor_name()} processing time: {value}")
processing = ProcessingMetricsData(
processor=self._processor_name(), value=value, model=self._model_name()
Expand Down Expand Up @@ -190,3 +210,167 @@ async def start_tts_usage_metrics(self, text: str):
)
logger.debug(f"{self._processor_name()} usage characters: {characters.value}")
return MetricsFrame(data=[characters])

async def start_stt_usage_metrics(
self,
audio_duration: float,
transcript: Optional[str] = None,
processing_time: Optional[float] = None,
confidence: Optional[float] = None,
sample_rate: Optional[int] = None,
channels: Optional[int] = None,
encoding: Optional[str] = None,
cost_per_minute: Optional[float] = None,
ttft: Optional[float] = None,
ground_truth: Optional[str] = None,
):
"""Record enhanced STT usage metrics with automatic calculations.

Args:
audio_duration: Duration of audio processed in seconds (required).
transcript: The transcribed text (used to calculate word/character counts).
processing_time: Time taken to process the audio in seconds.
confidence: Average confidence score from 0.0 to 1.0.
sample_rate: Audio sample rate in Hz (e.g., 16000).
channels: Number of audio channels (1 for mono, 2 for stereo).
encoding: Audio encoding format (e.g., "LINEAR16", "OPUS").
cost_per_minute: Cost per minute of audio (for cost estimation).
ttft: Time to first transcript in seconds.
ground_truth: Reference transcript for WER calculation (optional, for testing).

Returns:
MetricsFrame containing comprehensive STT usage data.

Example:
# Basic usage (backward compatible)
await self.start_stt_usage_metrics(audio_duration=5.5)

# Enhanced usage with all metrics
await self.start_stt_usage_metrics(
audio_duration=5.5,
transcript="Hello world this is a test",
processing_time=2.3,
confidence=0.95,
sample_rate=16000,
cost_per_minute=0.006,
ttft=0.5
)
"""
# Calculate content metrics from transcript
word_count = None
character_count = None
if transcript:
word_count = len(transcript.split())
character_count = len(transcript)

# Calculate performance metrics
real_time_factor = None
words_per_second = None
if processing_time and audio_duration > 0:
# RTF = processing_time / audio_duration
# RTF < 1.0 means faster than real-time (good!)
real_time_factor = processing_time / audio_duration

if word_count and processing_time and processing_time > 0:
# WPS = total_words / processing_time
words_per_second = word_count / processing_time

# Calculate cost metrics
estimated_cost = None
cost_per_word = None
if cost_per_minute and audio_duration > 0:
# Convert audio duration to minutes and calculate cost
audio_minutes = audio_duration / 60.0
estimated_cost = audio_minutes * cost_per_minute

if word_count and word_count > 0:
cost_per_word = estimated_cost / word_count

# Calculate WER if ground truth is provided
word_error_rate = None
if ground_truth and transcript:
word_error_rate = self._calculate_wer(transcript, ground_truth)

# Build usage metrics
usage = STTUsage(
audio_duration_seconds=audio_duration,
requests=1,
word_count=word_count,
character_count=character_count,
processing_time_seconds=processing_time,
real_time_factor=real_time_factor,
words_per_second=words_per_second,
time_to_first_transcript=ttft,
time_to_final_transcript=processing_time,
average_confidence=confidence,
word_error_rate=word_error_rate,
sample_rate=sample_rate,
channels=channels,
encoding=encoding,
cost_per_word=cost_per_word,
estimated_cost=estimated_cost,
)

value = STTUsageMetricsData(
processor=self._processor_name(), model=self._model_name(), value=usage
)

# Build comprehensive log message
log_parts = [f"{self._processor_name()} STT usage:"]
log_parts.append(f"{audio_duration:.3f}s audio")
if word_count:
log_parts.append(f"{word_count} words")
if words_per_second:
log_parts.append(f"{words_per_second:.1f} WPS")
if real_time_factor:
log_parts.append(f"RTF={real_time_factor:.2f}")
if estimated_cost:
log_parts.append(f"${estimated_cost:.4f}")

logger.debug(", ".join(log_parts))

return MetricsFrame(data=[value])

def _calculate_wer(self, hypothesis: str, reference: str) -> float:
"""Calculate Word Error Rate (WER) between hypothesis and reference.

Args:
hypothesis: The transcribed text.
reference: The ground truth text.

Returns:
WER as a percentage (0-100).

Formula:
WER = (Substitutions + Insertions + Deletions) / Total_Reference_Words * 100
"""
# Split into words
hyp_words = hypothesis.lower().split()
ref_words = reference.lower().split()

# Create matrix for dynamic programming
d = [[0] * (len(ref_words) + 1) for _ in range(len(hyp_words) + 1)]

# Initialize first row and column
for i in range(len(hyp_words) + 1):
d[i][0] = i
for j in range(len(ref_words) + 1):
d[0][j] = j

# Calculate edit distance
for i in range(1, len(hyp_words) + 1):
for j in range(1, len(ref_words) + 1):
if hyp_words[i - 1] == ref_words[j - 1]:
d[i][j] = d[i - 1][j - 1]
else:
substitution = d[i - 1][j - 1] + 1
insertion = d[i][j - 1] + 1
deletion = d[i - 1][j] + 1
d[i][j] = min(substitution, insertion, deletion)

# Calculate WER percentage
if len(ref_words) == 0:
return 0.0 if len(hyp_words) == 0 else 100.0

wer = (d[len(hyp_words)][len(ref_words)] / len(ref_words)) * 100
return round(wer, 2)
Loading