Skip to content

Commit

Permalink
feat(rust): integrate the new span exporter with the rest of the appl…
Browse files Browse the repository at this point in the history
…ication
  • Loading branch information
etorreborre committed Feb 6, 2025
1 parent 6fb11e6 commit b205142
Show file tree
Hide file tree
Showing 51 changed files with 831 additions and 787 deletions.
32 changes: 16 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async fn main(ctx: Context) -> Result<()> {
// as a member of the production cluster so it returns a signed credential
// attesting to that knowledge.
let authority_node = NodeManager::authority_node_client(
&tcp,
tcp.clone(),
node.secure_channels().clone(),
&issuer,
&MultiAddr::try_from("/dnsaddr/localhost/tcp/5000/secure/api")?,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ async fn main(ctx: Context) -> Result<()> {
// as a member of the production cluster so it returns a signed credential
// attesting to that knowledge.
let authority_node = NodeManager::authority_node_client(
&tcp,
tcp.clone(),
node.secure_channels().clone(),
&issuer,
&MultiAddr::try_from("/dnsaddr/localhost/tcp/5000/secure/api").unwrap(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async fn start_node(ctx: Context, project_information_path: &str, token: OneTime
// create a secure channel to the authority
// when creating the channel we check that the opposite side is indeed presenting the authority identity
let authority_node = NodeManager::authority_node_client(
&tcp,
tcp.clone(),
node.secure_channels().clone(),
&control_plane,
&MultiAddr::try_from("/dnsaddr/localhost/tcp/5000")?,
Expand All @@ -82,7 +82,7 @@ async fn start_node(ctx: Context, project_information_path: &str, token: OneTime
// Create a credential retriever that will be used to obtain credentials
let credential_retriever = Arc::new(RemoteCredentialRetrieverCreator::new(
node.context().try_clone()?,
Arc::new(tcp.clone()),
tcp.clone(),
node.secure_channels(),
RemoteCredentialRetrieverInfo::create_for_project_member(
project.authority_identifier(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async fn start_node(ctx: Context, project_information_path: &str, token: OneTime
// create a secure channel to the authority
// when creating the channel we check that the opposite side is indeed presenting the authority identity
let authority_node = NodeManager::authority_node_client(
&tcp,
tcp.clone(),
node.secure_channels().clone(),
&edge_plane,
&MultiAddr::try_from("/dnsaddr/localhost/tcp/5000")?,
Expand All @@ -80,7 +80,7 @@ async fn start_node(ctx: Context, project_information_path: &str, token: OneTime
// Create a credential retriever that will be used to obtain credentials
let credential_retriever = Arc::new(RemoteCredentialRetrieverCreator::new(
node.context().try_clone()?,
Arc::new(tcp.clone()),
tcp.clone(),
node.secure_channels(),
RemoteCredentialRetrieverInfo::create_for_project_member(
project.authority_identifier(),
Expand Down
18 changes: 9 additions & 9 deletions implementations/rust/ockam/ockam_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@ nix = { version = "0.29", features = ["signal"] }
nu-ansi-term = "0.50"
once_cell = { version = "1", default-features = false }
open = "5.3.0"
opentelemetry = { version = "0.26.0", features = ["logs", "metrics", "trace"] }
opentelemetry-appender-tracing = { version = "0.26.0" }
opentelemetry-otlp = { version = "0.26.0", features = ["logs", "metrics", "trace", "grpc-tonic", "tls", "tls-roots"], default-features = false }
opentelemetry-proto = { version = "0.26.1", features = ["full"] }
opentelemetry-semantic-conventions = { version = "0.26.0", features = ["semconv_experimental"] }
opentelemetry_sdk = { version = "0.26.0", features = ["logs", "metrics", "trace", "rt-tokio", "rt-tokio-current-thread", "testing", "logs_level_enabled"], default-features = false }
opentelemetry = { version = "0.27", features = ["logs", "metrics", "trace"] }
opentelemetry-appender-tracing = { version = "0.27" }
opentelemetry-otlp = { version = "0.27", features = ["logs", "metrics", "trace", "grpc-tonic", "gzip-tonic", "tls", "tls-roots"], default-features = false }
opentelemetry-proto = { version = "0.27", features = ["full"] }
opentelemetry-semantic-conventions = { version = "0.27", features = ["semconv_experimental"] }
opentelemetry_sdk = { version = "0.27", features = ["logs", "metrics", "trace", "rt-tokio", "rt-tokio-current-thread", "testing"], default-features = false }
petname = { version = "2.0.2", default-features = false, features = ["default-rng", "default-words"] }
r3bl_rs_utils_core = "0.9"
r3bl_tui = "0.5"
Expand All @@ -112,14 +112,14 @@ tracing = { version = "0.1", default-features = false }
tracing-appender = "0.2.2"
tracing-core = { version = "0.1.32", default-features = false }
tracing-error = "0.2.0"
tracing-opentelemetry = "0.27.0"
tracing-opentelemetry = "0.28"
tracing-subscriber = { version = "0.3", features = ["json"] }
url = "2.5.2"

ockam_multiaddr = { path = "../ockam_multiaddr", version = "0.69.0", features = ["cbor", "serde"] }
ockam_transport_core = { path = "../ockam_transport_core", version = "^0.101.0" }
ockam_transport_tcp = { path = "../ockam_transport_tcp", version = "^0.135.0", default-features = false, features = ["std"] }
tonic = "0.12"
tonic = { version = "0.12", features = ["default", "gzip"] }
futures-core = "0.3.31"

[dependencies.ockam_core]
Expand Down Expand Up @@ -166,7 +166,7 @@ multimap = "0.10.0"
ockam_macros = { path = "../ockam_macros", features = ["std"], version = "^0.37.0" }
ockam_transport_core = { path = "../ockam_transport_core", version = "^0.101.0" }
ockam_transport_tcp = { path = "../ockam_transport_tcp", default-features = false, version = "^0.135.0" }
opentelemetry_sdk = { version = "0.26.0", features = ["logs", "metrics", "trace", "rt-tokio", "testing"], default-features = false }
opentelemetry_sdk = { version = "0.27", features = ["logs", "metrics", "trace", "rt-tokio", "testing"], default-features = false }
quickcheck = "1.0.1"
quickcheck_macros = "1.0.0"
serial_test = "3.0.0"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use hyper::Uri;
use std::collections::BTreeMap;
use tracing::info;

Expand All @@ -10,6 +11,11 @@ use crate::authenticator::{
AuthorityEnrollmentTokenRepository, AuthorityEnrollmentTokenSqlxDatabase, AuthorityMember,
AuthorityMembersRepository, AuthorityMembersSqlxDatabase,
};
use crate::authority_node::Configuration;
use crate::echoer::Echoer;
use crate::logs::HttpForwarder;
use crate::nodes::service::default_address::DefaultAddress;
use crate::ApiError;
use ockam::identity::utils::now;
use ockam::identity::{
Identifier, Identities, SecureChannelListenerOptions, SecureChannelSqlxDatabase,
Expand All @@ -18,15 +24,12 @@ use ockam::identity::{
use ockam::tcp::{TcpListenerOptions, TcpTransport};
use ockam_core::compat::sync::Arc;
use ockam_core::env::get_env;
use ockam_core::errcode::{Kind, Origin};
use ockam_core::flow_control::FlowControlId;
use ockam_core::Result;
use ockam_core::{Error, Result};
use ockam_node::database::SqlxDatabase;
use ockam_node::Context;

use crate::authority_node::Configuration;
use crate::echoer::Echoer;
use crate::nodes::service::default_address::DefaultAddress;

/// This struct represents an Authority, which is an
/// Identity which other identities trust to authenticate attributes
///
Expand Down Expand Up @@ -308,6 +311,31 @@ impl Authority {
ctx.start_worker(address, Echoer)
}

/// Start an http forwarder service
pub async fn start_http_forwarder(
&self,
ctx: &Context,
secure_channel_flow_control_id: &FlowControlId,
configuration: &Configuration,
) -> Result<()> {
if let Some(telemetry_endpoint_url) = &configuration.telemetry_endpoint_url {
let address = DefaultAddress::HTTP_FORWARDER;

ctx.flow_controls()
.add_consumer(&address.into(), secure_channel_flow_control_id);

let url = telemetry_endpoint_url.to_string();
let uri = url
.parse::<Uri>()
.map_err(|e| Error::new(Origin::Ockam, Kind::Invalid, e))?;
ctx.start_worker(
address,
HttpForwarder::new(uri).await.map_err(ApiError::core)?,
)?
};
Ok(())
}

/// Add a member directly to storage, without additional validation
/// This is used during the authority start-up to add an identity for exporting traces
pub async fn add_member(
Expand Down Expand Up @@ -354,7 +382,7 @@ impl Authority {
}

#[cfg(test)]
mod tests {
pub mod tests {
use super::*;
use crate::authenticator::direct::{
Members, OCKAM_ROLE_ATTRIBUTE_ENROLLER_VALUE, OCKAM_ROLE_ATTRIBUTE_KEY,
Expand All @@ -375,6 +403,7 @@ mod tests {
use std::future::Future;
use std::str::FromStr;
use std::time::Duration;
use url::Url;

/// This test gets a reference to the postgres database and starts 2 authority nodes
/// to make sure that they can work even when using the same database.
Expand Down Expand Up @@ -402,6 +431,7 @@ mod tests {

let authority1 = start_authority_node(
db.clone(),
None,
&ctx1,
port1,
"authority-node-1",
Expand All @@ -410,6 +440,7 @@ mod tests {
.await?;
let authority2 = start_authority_node(
db,
None,
&ctx2,
port2,
"authority-node-2",
Expand Down Expand Up @@ -484,6 +515,7 @@ mod tests {
/// - An identity that should be trusted as an enroller
fn create_configuration(
database_configuration: DatabaseConfiguration,
telemetry_endpoint_url: Option<Url>,
authority: &Identifier,
port: u16,
trusted: &[Identifier],
Expand Down Expand Up @@ -516,6 +548,7 @@ mod tests {
account_authority: None,
enforce_admin_checks: false,
disable_trust_context_id: false,
telemetry_endpoint_url,
})
}

Expand All @@ -528,7 +561,7 @@ mod tests {
caller: &Identifier,
) -> Result<AuthorityNodeClient> {
let client = NodeManager::authority_node_client(
&TcpTransport::create(ctx)?,
TcpTransport::create(ctx)?,
secure_channels,
authority_identifier,
authority_route,
Expand All @@ -547,6 +580,7 @@ mod tests {
/// - An identifier for an enroller
async fn start_authority_node(
db: SqlxDatabase,
telemetry_endpoint_url: Option<Url>,
ctx: &Context,
port: u16,
node_name: &str,
Expand All @@ -555,8 +589,13 @@ mod tests {
let identities = identities::create(db.clone(), node_name);
let authority = identities.identities_creation().create_identity().await?;

let configuration =
create_configuration(db.configuration.clone(), &authority, port, trusted)?;
let configuration = create_configuration(
db.configuration.clone(),
telemetry_endpoint_url,
&authority,
port,
trusted,
)?;
let authority = Authority::create(&configuration, Some(db.clone())).await?;
authority_node::start_node(ctx, &configuration, authority.clone()).await?;
Ok(authority)
Expand Down Expand Up @@ -690,11 +729,11 @@ mod tests {
result
})?
}
}

#[cfg(test)]
pub fn random_port() -> u16 {
let listener = std::net::TcpListener::bind("127.0.0.1:0").expect("Failed to bind to address");
let address = listener.local_addr().expect("Failed to get local address");
address.port()
pub fn random_port() -> u16 {
let listener =
std::net::TcpListener::bind("127.0.0.1:0").expect("Failed to bind to address");
let address = listener.local_addr().expect("Failed to get local address");
address.port()
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use ockam::identity::models::ChangeHistory;
use serde::{Deserialize, Serialize};

use ockam::identity::Identifier;
use ockam_core::compat::collections::HashMap;
use ockam_core::compat::fmt;
use ockam_core::compat::fmt::{Display, Formatter};
use ockam_node::database::DatabaseConfiguration;
use serde::{Deserialize, Serialize};
use url::Url;

use crate::authenticator::PreTrustedIdentities;
use crate::config::lookup::InternetAddress;
Expand Down Expand Up @@ -55,6 +55,9 @@ pub struct Configuration {
/// Will not include trust_context_id and project id into credential
/// Set to true after old clients are updated
pub disable_trust_context_id: bool,

/// Url of the OpenTelemetry collector endpoint
pub telemetry_endpoint_url: Option<Url>,
}

/// Local and private functions for the authority configuration
Expand Down
Loading

0 comments on commit b205142

Please sign in to comment.