Skip to content

Commit

Permalink
Tracing propogation'
Browse files Browse the repository at this point in the history
  • Loading branch information
iamvigneshwars committed Apr 2, 2024
1 parent 6e980cb commit c256ade
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 21 deletions.
79 changes: 79 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions processed_data/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ async-graphql-axum = { version = "7.0.2" }
aws-credential-types = { version = "0.56.0" }
aws-sdk-s3 = { version = "0.29.0" }
axum = { version = "0.7.4", features = ["ws"] }
axum-extra = { version = "0.9.3", features = ["typed-header"] }
axum-tracing-opentelemetry = { version = "0.18.0" }
chrono = { version = "0.4.35" }
clap = { version = "4.5.2", features = ["derive", "env"] }
derive_more = { version = "0.99.17" }
Expand Down
42 changes: 21 additions & 21 deletions processed_data/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
mod built_info;
/// GraphQL resolvers
mod graphql;
/// An [`axum::handler::Handler`] for GraphQL
mod route_handlers;

use async_graphql::{extensions::Tracing, http::GraphiQLSource, SDLExportOptions};
use async_graphql_axum::{GraphQL, GraphQLSubscription};
use async_graphql::{http::GraphiQLSource, SDLExportOptions};
use aws_credential_types::{provider::SharedCredentialsProvider, Credentials};
use aws_sdk_s3::{config::Region, Client};
use axum::{response::Html, routing::get, Router};
use axum_tracing_opentelemetry::middleware::{OtelAxumLayer, OtelInResponseLayer};
use clap::{ArgAction::SetTrue, Parser};
use derive_more::{Deref, FromStr, Into};
use graphql::{root_schema_builder, RootSchema};
Expand All @@ -26,10 +28,12 @@ use std::{
time::Duration,
};
use tokio::net::TcpListener;
use tracing::instrument;
use tracing::{info, instrument};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use url::Url;

use crate::route_handlers::GraphQLHandler;

/// A service providing Beamline ISPyB data collected during sessions
#[derive(Debug, Parser)]
#[command(author, version, about, long_about=None)]
Expand Down Expand Up @@ -127,41 +131,40 @@ struct SchemaArgs {
/// Creates a connection pool to access the database
#[instrument(skip(database_url))]
async fn setup_database(database_url: Url) -> Result<DatabaseConnection, TransactionError<DbErr>> {
let connection_options = ConnectOptions::new(database_url.to_string());
info!("Connecting to database at {database_url}");
let connection_options = ConnectOptions::new(database_url.to_string())
.sqlx_logging_level(tracing::log::LevelFilter::Debug)
.to_owned();
let connection = Database::connect(connection_options).await?;
info!("Database connection established: {connection:?}");
Ok(connection)
}

/// Creates an [`axum::Router`] serving GraphiQL, synchronous GraphQL and GraphQL subscriptions
fn setup_router(schema: RootSchema) -> Router {
#[allow(clippy::missing_docs_in_private_items)]
const GRAPHQL_ENDPOINT: &str = "/";
#[allow(clippy::missing_docs_in_private_items)]
const SUBSCRIPTION_ENDPOINT: &str = "/ws";

Router::new()
.route(
GRAPHQL_ENDPOINT,
get(Html(
GraphiQLSource::build()
.endpoint(GRAPHQL_ENDPOINT)
.subscription_endpoint(SUBSCRIPTION_ENDPOINT)
.finish(),
GraphiQLSource::build().endpoint(GRAPHQL_ENDPOINT).finish(),
))
.post_service(GraphQL::new(schema.clone())),
.post(GraphQLHandler::new(schema)),
)
.route_service(SUBSCRIPTION_ENDPOINT, GraphQLSubscription::new(schema))
.layer(OtelInResponseLayer)
.layer(OtelAxumLayer::default())
}

/// Serves the endpoints on the specified port forever
async fn serve(router: Router, port: u16) -> Result<(), std::io::Error> {
let socket_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, port));
let listener = TcpListener::bind(socket_addr).await?;
println!("GraphiQL IDE: {}", socket_addr);
println!("Serving API & GraphQL UI at {}", socket_addr);
axum::serve(listener, router.into_make_service()).await?;
Ok(())
}

/// Sets up Logging & Tracing using opentelemetry if available
fn setup_telemetry(
log_level: tracing::Level,
Expand All @@ -180,6 +183,9 @@ fn setup_telemetry(
),
]);
let (metrics_layer, tracing_layer) = if let Some(otel_collector_url) = otel_collector_url {
opentelemetry::global::set_text_map_propagator(
opentelemetry_sdk::propagation::TraceContextPropagator::default(),
);
(
Some(tracing_opentelemetry::MetricsLayer::new(
opentelemetry_otlp::new_pipeline()
Expand Down Expand Up @@ -232,13 +238,7 @@ async fn main() {
Cli::Serve(args) => {
setup_telemetry(args.log_level, args.otel_collector_url).unwrap();
let database = setup_database(args.database_url).await.unwrap();
let s3_client = aws_sdk_s3::Client::from_s3_client_args(args.s3_client);
let schema = root_schema_builder()
.extension(Tracing)
.data(database)
.data(s3_client)
.data(args.s3_bucket)
.finish();
let schema = root_schema_builder().data(database).finish();
let router = setup_router(schema);
serve(router, args.port).await.unwrap();
}
Expand Down
55 changes: 55 additions & 0 deletions processed_data/src/route_handlers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use async_graphql::Executor;
use async_graphql_axum::{GraphQLRequest, GraphQLResponse};
use axum::{
extract::Request,
handler::Handler,
http::StatusCode,
response::{IntoResponse, Response},
RequestExt,
};
use axum_extra::{
headers::{authorization::Bearer, Authorization},
TypedHeader,
};
use std::{future::Future, pin::Pin};

/// An [`Handler`] which executes an [`Executor`] including the [`Authorization<Bearer>`] in the [`async_graphql::Context`]
#[derive(Debug, Clone)]
pub struct GraphQLHandler<E: Executor> {
/// The GraphQL executor used to process the request
executor: E,
}

impl<E: Executor> GraphQLHandler<E> {
/// Constructs an instance of the handler with the provided schema.
pub fn new(executor: E) -> Self {
Self { executor }
}
}

impl<S, E> Handler<((),), S> for GraphQLHandler<E>
where
E: Executor,
{
type Future = Pin<Box<dyn Future<Output = Response> + Send + 'static>>;

fn call(self, mut req: Request, _state: S) -> Self::Future {
Box::pin(async move {
let token = req
.extract_parts::<TypedHeader<Authorization<Bearer>>>()
.await
.ok()
.map(|token| token.0);
let request = req.extract::<GraphQLRequest, _>().await;
match request {
Ok(request) => GraphQLResponse::from(
self.executor
.execute(request.into_inner().data(token))
.await,
)
.into_response(),
Err(err) => (StatusCode::BAD_REQUEST, err.0.to_string()).into_response(),
}
})
}
}

0 comments on commit c256ade

Please sign in to comment.