Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions src/clients/chunker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use axum::http::HeaderMap;
use futures::{Future, StreamExt, TryStreamExt};
use ginepro::LoadBalancedChannel;
use tonic::{Code, Request, Response, Status, Streaming};
use tracing::Span;

use super::{
BoxStream, Client, Error, create_grpc_client, errors::grpc_to_http_code,
Expand All @@ -39,7 +38,6 @@ use crate::{
caikit_data_model::nlp::{ChunkerTokenizationStreamResult, TokenizationResults},
grpc::health::v1::{HealthCheckRequest, health_client::HealthClient},
},
utils::trace::trace_context_from_grpc_response,
};

const DEFAULT_PORT: u16 = 8085;
Expand Down Expand Up @@ -74,8 +72,6 @@ impl ChunkerClient {
let mut client = self.client.clone();
let request = request_with_headers(request, model_id);
let response = client.chunker_tokenization_task_predict(request).await?;
let span = Span::current();
trace_context_from_grpc_response(&span, &response);
Ok(response.into_inner())
}

Expand All @@ -91,8 +87,6 @@ impl ChunkerClient {
let response_stream_fut: Pin<Box<dyn Future<Output = StreamingTokenizationResult> + Send>> =
Box::pin(client.bidi_streaming_chunker_tokenization_task_predict(request));
let response_stream = response_stream_fut.await?;
let span = Span::current();
trace_context_from_grpc_response(&span, &response_stream);
Ok(response_stream.into_inner().map_err(Into::into).boxed())
}
}
Expand Down
2 changes: 0 additions & 2 deletions src/clients/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,6 @@ impl HttpClient {
message: format!("client request timeout: {e}"),
}),
}?;
let span = Span::current();
trace::trace_context_from_http_response(&span, &response);
Ok(response.into())
}
None => Err(builder.body(body).err().map_or_else(
Expand Down
11 changes: 1 addition & 10 deletions src/clients/nlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use axum::http::HeaderMap;
use futures::{StreamExt, TryStreamExt};
use ginepro::LoadBalancedChannel;
use tonic::{Code, Request};
use tracing::{Span, debug, instrument};
use tracing::{debug, instrument};

use super::{
BoxStream, Client, Error, create_grpc_client, errors::grpc_to_http_code,
Expand All @@ -41,7 +41,6 @@ use crate::{
},
grpc::health::v1::{HealthCheckRequest, health_client::HealthClient},
},
utils::trace::trace_context_from_grpc_response,
};

const DEFAULT_PORT: u16 = 8085;
Expand Down Expand Up @@ -74,8 +73,6 @@ impl NlpClient {
let request = request_with_headers(request, model_id, headers);
debug!(?request, "sending request to NLP gRPC service");
let response = client.tokenization_task_predict(request).await?;
let span = Span::current();
trace_context_from_grpc_response(&span, &response);
Ok(response.into_inner())
}

Expand All @@ -86,12 +83,10 @@ impl NlpClient {
request: TokenClassificationTaskRequest,
headers: HeaderMap,
) -> Result<TokenClassificationResults, Error> {
let span = Span::current();
let mut client = self.client.clone();
let request = request_with_headers(request, model_id, headers);
debug!(?request, "sending request to NLP gRPC service");
let response = client.token_classification_task_predict(request).await?;
trace_context_from_grpc_response(&span, &response);
Ok(response.into_inner())
}

Expand All @@ -106,8 +101,6 @@ impl NlpClient {
let request = request_with_headers(request, model_id, headers);
debug!(?request, "sending request to NLP gRPC service");
let response = client.text_generation_task_predict(request).await?;
let span: Span = Span::current();
trace_context_from_grpc_response(&span, &response);
Ok(response.into_inner())
}

Expand All @@ -124,8 +117,6 @@ impl NlpClient {
let response = client
.server_streaming_text_generation_task_predict(request)
.await?;
let span = Span::current();
trace_context_from_grpc_response(&span, &response);
let response_stream = response.into_inner().map_err(Into::into).boxed();
Ok(response_stream)
}
Expand Down
10 changes: 0 additions & 10 deletions src/clients/tgis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use axum::http::HeaderMap;
use futures::{StreamExt, TryStreamExt};
use ginepro::LoadBalancedChannel;
use tonic::Code;
use tracing::Span;

use super::{
BoxStream, Client, Error, create_grpc_client, errors::grpc_to_http_code,
Expand All @@ -34,7 +33,6 @@ use crate::{
BatchedTokenizeResponse, GenerationResponse, ModelInfoRequest, ModelInfoResponse,
SingleGenerationRequest, generation_service_client::GenerationServiceClient,
},
utils::trace::trace_context_from_grpc_response,
};

const DEFAULT_PORT: u16 = 8033;
Expand All @@ -58,8 +56,6 @@ impl TgisClient {
let request = grpc_request_with_headers(request, headers);
let mut client = self.client.clone();
let response = client.generate(request).await?;
let span = Span::current();
trace_context_from_grpc_response(&span, &response);
Ok(response.into_inner())
}

Expand All @@ -71,8 +67,6 @@ impl TgisClient {
let request = grpc_request_with_headers(request, headers);
let mut client = self.client.clone();
let response = client.generate_stream(request).await?;
let span = Span::current();
trace_context_from_grpc_response(&span, &response);
Ok(response.into_inner().map_err(Into::into).boxed())
}

Expand All @@ -84,17 +78,13 @@ impl TgisClient {
let mut client = self.client.clone();
let request = grpc_request_with_headers(request, headers);
let response = client.tokenize(request).await?;
let span = Span::current();
trace_context_from_grpc_response(&span, &response);
Ok(response.into_inner())
}

pub async fn model_info(&self, request: ModelInfoRequest) -> Result<ModelInfoResponse, Error> {
let request = grpc_request_with_headers(request, HeaderMap::new());
let mut client = self.client.clone();
let response = client.model_info(request).await?;
let span = Span::current();
trace_context_from_grpc_response(&span, &response);
Ok(response.into_inner())
}
}
Expand Down
42 changes: 2 additions & 40 deletions src/utils/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use opentelemetry::{
global,
trace::{TraceContextExt, TraceId, TracerProvider},
};
use opentelemetry_http::{HeaderExtractor, HeaderInjector};
use opentelemetry_http::HeaderInjector;
use opentelemetry_otlp::{MetricExporter, SpanExporter, WithExportConfig};
use opentelemetry_sdk::{
Resource,
Expand All @@ -35,10 +35,7 @@ use tracing::{Span, error, info, info_span};
use tracing_opentelemetry::{MetricsLayer, OpenTelemetrySpanExt};
use tracing_subscriber::{EnvFilter, Layer, layer::SubscriberExt};

use crate::{
args::{LogFormat, OtlpProtocol, TracingConfig},
clients::http::TracedResponse,
};
use crate::args::{LogFormat, OtlpProtocol, TracingConfig};

pub const DEFAULT_GRPC_OTLP_ENDPOINT: &str = "http://localhost:4317";
pub const DEFAULT_HTTP_OTLP_ENDPOINT: &str = "http://localhost:4318";
Expand Down Expand Up @@ -309,41 +306,6 @@ pub fn with_traceparent_header(ctx: &opentelemetry::Context, headers: HeaderMap)
})
}

/// Extracts the `traceparent` header from an HTTP response's headers and uses it to set the current
/// tracing span context (i.e. use `traceparent` as parent to the current span).
/// Defaults to using the current context when no `traceparent` is found.
/// See https://www.w3.org/TR/trace-context/#trace-context-http-headers-format.
pub fn trace_context_from_http_response(span: &Span, response: &TracedResponse) {
let curr_trace = span.context().span().span_context().trace_id();
let ctx = global::get_text_map_propagator(|propagator| {
// Returns the current context if no `traceparent` is found
propagator.extract(&HeaderExtractor(response.headers()))
});
if ctx.span().span_context().trace_id() == curr_trace
&& let Err(error) = span.set_parent(ctx)
{
error!(%error, "Error setting trace parent for HTTP response");
}
}

/// Extracts the `traceparent` header from a gRPC response's metadata and uses it to set the current
/// tracing span context (i.e. use `traceparent` as parent to the current span).
/// Defaults to using the current context when no `traceparent` is found.
/// See https://www.w3.org/TR/trace-context/#trace-context-http-headers-format.
pub fn trace_context_from_grpc_response<T>(span: &Span, response: &tonic::Response<T>) {
let curr_trace = span.context().span().span_context().trace_id();
let ctx = global::get_text_map_propagator(|propagator| {
let metadata = response.metadata().clone();
// Returns the current context if no `traceparent` is found
propagator.extract(&HeaderExtractor(&metadata.into_headers()))
});
if ctx.span().span_context().trace_id() == curr_trace
&& let Err(error) = span.set_parent(ctx)
{
error!(%error, "Error setting trace parent for gRPC response");
}
}

/// Returns the `trace_id` of the current span according to the global tracing subscriber.
pub fn current_trace_id() -> TraceId {
Span::current().context().span().span_context().trace_id()
Expand Down