Skip to content

Commit

Permalink
feat(rust): add a span exporter using a secure channel
Browse files Browse the repository at this point in the history
  • Loading branch information
etorreborre committed Feb 6, 2025
1 parent 5d6ef60 commit 6fb11e6
Show file tree
Hide file tree
Showing 9 changed files with 738 additions and 8 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions implementations/rust/ockam/ockam_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ open = "5.3.0"
opentelemetry = { version = "0.26.0", features = ["logs", "metrics", "trace"] }
opentelemetry-appender-tracing = { version = "0.26.0" }
opentelemetry-otlp = { version = "0.26.0", features = ["logs", "metrics", "trace", "grpc-tonic", "tls", "tls-roots"], default-features = false }
opentelemetry-proto = { version = "0.26.1", features = ["full"] }
opentelemetry-semantic-conventions = { version = "0.26.0", features = ["semconv_experimental"] }
opentelemetry_sdk = { version = "0.26.0", features = ["logs", "metrics", "trace", "rt-tokio", "rt-tokio-current-thread", "testing", "logs_level_enabled"], default-features = false }
petname = { version = "2.0.2", default-features = false, features = ["default-rng", "default-words"] }
Expand Down Expand Up @@ -119,6 +120,7 @@ ockam_multiaddr = { path = "../ockam_multiaddr", version = "0.69.0", features =
ockam_transport_core = { path = "../ockam_transport_core", version = "^0.101.0" }
ockam_transport_tcp = { path = "../ockam_transport_tcp", version = "^0.135.0", default-features = false, features = ["std"] }
tonic = "0.12"
futures-core = "0.3.31"

[dependencies.ockam_core]
version = "0.124.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,6 @@ mod tests {
use ockam_node::database::{with_postgres, DatabaseConfiguration};
use ockam_node::NodeBuilder;
use std::future::Future;
use std::net::TcpListener;
use std::str::FromStr;
use std::time::Duration;

Expand Down Expand Up @@ -691,10 +690,11 @@ mod tests {
result
})?
}
}

fn random_port() -> u16 {
let listener = TcpListener::bind("127.0.0.1:0").expect("Failed to bind to address");
let address = listener.local_addr().expect("Failed to get local address");
address.port()
}
#[cfg(test)]
pub fn random_port() -> u16 {
let listener = std::net::TcpListener::bind("127.0.0.1:0").expect("Failed to bind to address");
let address = listener.local_addr().expect("Failed to get local address");
address.port()
}
80 changes: 80 additions & 0 deletions implementations/rust/ockam/ockam_api/src/logs/http_forwarder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use crate::logs::secure_client_service::OckamRequest;
use crate::{ApiError, Result};
use hyper::{http, Uri};
use ockam_core::api::{Method, Request};
use ockam_core::errcode::{Kind, Origin};
use ockam_core::{async_trait, Routed, Worker};
use ockam_node::Context;
use std::future;
use tonic::body::BoxBody;
use tonic::client::GrpcService;
use tonic::transport::Channel;

pub const HTTP_FORWARDER: &str = "http_forwarder";

/// The HttpForwarder worker accepts http requests serialized as Ockam messages
/// and forwards them to an HTTP endpoint.
///
/// Note that we don't wait for a response from the endpoint.
pub struct HttpForwarder {
channel: Channel,
}

impl HttpForwarder {
/// Create a Channel for the given URI
pub async fn new(uri: Uri) -> Result<Self> {
let channel = Channel::builder(uri)
.connect()
.await
.map_err(ApiError::message)?;

Ok(Self { channel })
}

/// Forward an http Request.
/// We don't wait for a response here.
async fn forward_http_request(&mut self, request: http::Request<BoxBody>) -> Result<()> {
self.ready().await.map_err(ApiError::core)?;
let _ = self
.channel
.call(request)
.await
.map_err(ApiError::message)?;

Ok(())
}

/// Check if the channel is ready before making a call
async fn ready(&mut self) -> Result<()> {
future::poll_fn(|cx| self.channel.poll_ready(cx))
.await
.map_err(ApiError::message)
}
}

#[async_trait]
impl Worker for HttpForwarder {
type Message = Request<OckamRequest>;
type Context = Context;

async fn handle_message(
&mut self,
_ctx: &mut Context,
message: Routed<Request<OckamRequest>>,
) -> ockam_core::Result<()> {
let (header, body) = message.into_body()?.into_parts();
match (header.method(), header.path(), body) {
// Every posted message must be forwarded
(Some(Method::Post), "/", Some(ockam_request)) => {
let http_request = ockam_request.to_http_request().map_err(|e| {
ockam_core::Error::new(Origin::Api, Kind::Serialization, format!("{e:?}"))
})?;
self.forward_http_request(http_request)
.await
.map_err(|e| ockam_core::Error::new(Origin::Api, Kind::Io, format!("{e:?}")))?;
}
_ => (),
};
Ok(())
}
}
3 changes: 3 additions & 0 deletions implementations/rust/ockam/ockam_api/src/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@ mod current_span;
mod default_values;
pub mod env_variables;
pub mod exporting_configuration;
mod http_forwarder;
mod log_exporters;
pub mod logging_configuration;
mod logging_options;
mod ockam_tonic_traces_client;
mod secure_client_service;
pub mod setup;
mod span_exporters;
mod tracing_guard;
Expand Down
Loading

0 comments on commit 6fb11e6

Please sign in to comment.