Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: upgrde to axum 0.7 #315

Merged
merged 5 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,775 changes: 1,256 additions & 519 deletions Cargo.lock

Large diffs are not rendered by default.

28 changes: 14 additions & 14 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ build = "build.rs"
resolver = "2"

[features]
full= ["functional_tests", "multitenant", "analytics", "geoblock", "cloud"]
full = ["functional_tests", "multitenant", "analytics", "geoblock", "cloud"]
# Used to enable functional tests
functional_tests = []
# Multi-tenancy mode
Expand All @@ -22,14 +22,14 @@ geoblock = []
cloud = []

[dependencies]
wc = { git = "https://github.com/WalletConnect/utils-rs.git", tag = "v0.7.0", features = ["analytics", "geoip", "geoblock"] }
wc = { git = "https://github.com/WalletConnect/utils-rs.git", branch = "feat/axum-0.7", features = ["full"] }

tokio = { version = "1", features = ["full"] }
axum = { version = "0.6", features = ["json", "multipart", "tokio"] }
axum-client-ip = "0.4"
tower = "0.4"
tower-http = { version = "0.4", features = ["trace", "cors", "request-id", "propagate-header", "catch-panic", "util"] }
hyper = "0.14"
axum = { version = "0.7.5", features = ["json", "multipart", "tokio"] }
axum-client-ip = "0.5.1"
tower = "0.4.13"
tower-http = { version = "0.5.2", features = ["trace", "cors", "request-id", "propagate-header", "catch-panic", "util"] }
hyper = "1.2.0"

# Database
sqlx = { version = "0.6", features = ["runtime-tokio-native-tls", "postgres", "json", "chrono", "macros"] }
Expand Down Expand Up @@ -63,21 +63,21 @@ a2 = { git = "https://github.com/WalletConnect/a2", rev = "d0236c3", features =
fcm = "0.9"

# Signature validation
ed25519-dalek = "2.0.0-rc.2"
ed25519-dalek = "2.1.1"

# JWT Authentication
relay_rpc = { git = "https://github.com/WalletConnect/WalletConnectRust.git", rev = "v0.23.0"}
relay_rpc = { git = "https://github.com/WalletConnect/WalletConnectRust.git", branch = "fix/bump-http-deps" }
jsonwebtoken = "8.1"
data-encoding = "2.3"

# Analytics
aws-config = "0.56"
aws-sdk-s3 = "0.31"
aws-config = "1.1.9"
aws-sdk-s3 = "1.21.0"
parquet = { git = "https://github.com/WalletConnect/arrow-rs.git", rev = "99a1cc3", default-features = false, features = ["flate2"] }
parquet_derive = { git = "https://github.com/WalletConnect/arrow-rs.git", rev = "99a1cc3" }

# Misc
reqwest = { version = "0.11", features = ["multipart"] }
reqwest = { version = "0.12.2", features = ["multipart", "json"] }
async-trait = "0.1"
thiserror = "1.0"
hex = "0.4"
Expand All @@ -91,15 +91,15 @@ ipnet = "2.5"
cerberus = { git = "https://github.com/WalletConnect/cerberus.git", tag = "v0.5.0" }
async-recursion = "1.0.4"
tap = "1.0.1"
wiremock = "0.5.21"
wiremock = "0.6.0"

[dev-dependencies]
serial_test = "1.0"
test-context = "0.1"
futures-util = "0.3"
random-string = "1.0"
rand = "0.8"
ed25519-dalek = { version = "2.0.0-rc.2", features = ["rand_core"] }
ed25519-dalek = { version = "2.1.1", features = ["rand_core"] }

[build-dependencies]
build-info-build = "0.0"
5 changes: 4 additions & 1 deletion justfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
lint: clippy fmt

unit: lint test test-all lint-tf
unit: lint test test-all test-single-tenant lint-tf

devloop: unit fmt-imports

Expand All @@ -12,6 +12,9 @@ test:
test-all:
RUST_BACKTRACE=1 cargo test --all-features --lib --bins -- {{test}}

test-single-tenant:
Copy link
Member Author

@chris13524 chris13524 Apr 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a config case in the CI tests that's not covered with local tests. CI failed, had to set this up to get the failing tests locally.

RUST_BACKTRACE=1 cargo test --features=functional_tests -- {{test}}

clippy:
#!/bin/bash
set -euo pipefail
Expand Down
227 changes: 183 additions & 44 deletions src/analytics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,141 @@ use {
crate::{
analytics::{client_info::ClientInfo, message_info::MessageInfo},
config::Config,
error::Result,
log::prelude::*,
},
aws_sdk_s3::Client as S3Client,
std::{net::IpAddr, sync::Arc},
std::{net::IpAddr, sync::Arc, time::Duration},
wc::{
analytics::{
collectors::{batch::BatchOpts, noop::NoopCollector},
exporters::aws::{AwsExporter, AwsOpts},
writers::parquet::ParquetWriter,
Analytics,
self, AnalyticsExt, ArcCollector, AwsConfig, AwsExporter, BatchCollector,
BatchObserver, CollectionObserver, Collector, CollectorConfig, ExportObserver,
ParquetBatchFactory,
},
geoip::{self, MaxMindResolver, Resolver},
metrics::otel,
},
};

pub mod client_info;
pub mod message_info;

const ANALYTICS_EXPORT_TIMEOUT: Duration = Duration::from_secs(30);
const DATA_QUEUE_CAPACITY: usize = 8192;

#[derive(Clone, Copy)]
enum DataKind {
Messages,
Clients,
}

impl DataKind {
#[inline]
fn as_str(&self) -> &'static str {
match self {
Self::Messages => "messages",
Self::Clients => "clients",
}
}

#[inline]
fn as_kv(&self) -> otel::KeyValue {
otel::KeyValue::new("data_kind", self.as_str())
}
}

fn success_kv(success: bool) -> otel::KeyValue {
otel::KeyValue::new("success", success)
}

#[derive(Clone, Copy)]
struct Observer(DataKind);

impl<T, E> BatchObserver<T, E> for Observer
where
E: std::error::Error,
{
fn observe_batch_serialization(&self, elapsed: Duration, res: &Result<Vec<u8>, E>) {
let size = res.as_deref().map(|data| data.len()).unwrap_or(0);
let elapsed = elapsed.as_millis() as u64;

wc::metrics::counter!(
"analytics_batches_finished",
1,
&[self.0.as_kv(), success_kv(res.is_ok())]
);

if let Err(err) = res {
tracing::warn!(
?err,
data_kind = self.0.as_str(),
"failed to serialize analytics batch"
);
} else {
tracing::info!(
size,
elapsed,
data_kind = self.0.as_str(),
"analytics data batch serialized"
);
}
}
}

impl<T, E> CollectionObserver<T, E> for Observer
where
E: std::error::Error,
{
fn observe_collection(&self, res: &Result<(), E>) {
wc::metrics::counter!(
"analytics_records_collected",
1,
&[self.0.as_kv(), success_kv(res.is_ok())]
);

if let Err(err) = res {
tracing::warn!(
?err,
data_kind = self.0.as_str(),
"failed to collect analytics data"
);
}
}
}

impl<E> ExportObserver<E> for Observer
where
E: std::error::Error,
{
fn observe_export(&self, elapsed: Duration, res: &Result<(), E>) {
wc::metrics::counter!(
"analytics_batches_exported",
1,
&[self.0.as_kv(), success_kv(res.is_ok())]
);

let elapsed = elapsed.as_millis() as u64;

if let Err(err) = res {
tracing::warn!(
?err,
elapsed,
data_kind = self.0.as_str(),
"analytics export failed"
);
} else {
tracing::info!(
elapsed,
data_kind = self.0.as_str(),
"analytics export failed"
);
}
}
}

#[derive(Clone)]
pub struct PushAnalytics {
pub messages: Analytics<MessageInfo>,
pub clients: Analytics<ClientInfo>,
pub messages: ArcCollector<MessageInfo>,
pub clients: ArcCollector<ClientInfo>,
pub geoip_resolver: Option<Arc<MaxMindResolver>>,
}

Expand All @@ -33,64 +145,91 @@ impl PushAnalytics {
info!("initializing analytics with noop export");

Self {
messages: Analytics::new(NoopCollector),
clients: Analytics::new(NoopCollector),
messages: analytics::noop_collector().boxed_shared(),
clients: analytics::noop_collector().boxed_shared(),
geoip_resolver: None,
}
}

pub fn with_aws_export(
s3_client: S3Client,
export_bucket: &str,
node_ip: IpAddr,
node_addr: IpAddr,
geoip_resolver: Option<Arc<MaxMindResolver>>,
) -> Result<Self> {
info!(%export_bucket, "initializing analytics with aws export");

let opts = BatchOpts::default();
let bucket_name: Arc<str> = export_bucket.into();
let node_ip: Arc<str> = node_ip.to_string().into();

) -> Self {
let messages = {
let exporter = AwsExporter::new(AwsOpts {
export_prefix: "echo/messages",
export_name: "push_messages",
file_extension: "parquet",
bucket_name: bucket_name.clone(),
s3_client: s3_client.clone(),
node_ip: node_ip.clone(),
});

let collector = ParquetWriter::<MessageInfo>::new(opts.clone(), exporter)?;
Analytics::new(collector)
let data_kind = DataKind::Messages;
let observer = Observer(data_kind);
BatchCollector::new(
CollectorConfig {
data_queue_capacity: DATA_QUEUE_CAPACITY,
..Default::default()
},
ParquetBatchFactory::new(Default::default()).with_observer(observer),
AwsExporter::new(AwsConfig {
export_prefix: "echo/messages".to_string(),
export_name: "push_messages".to_string(),
node_addr,
file_extension: "parquet".to_owned(),
bucket_name: export_bucket.to_owned(),
s3_client: s3_client.clone(),
upload_timeout: ANALYTICS_EXPORT_TIMEOUT,
})
.with_observer(observer),
)
.with_observer(observer)
.boxed_shared()
};

let clients = {
let exporter = AwsExporter::new(AwsOpts {
export_prefix: "echo/clients",
export_name: "push_clients",
file_extension: "parquet",
bucket_name,
s3_client,
node_ip,
});

Analytics::new(ParquetWriter::new(opts, exporter)?)
let data_kind = DataKind::Clients;
let observer = Observer(data_kind);
BatchCollector::new(
CollectorConfig {
data_queue_capacity: DATA_QUEUE_CAPACITY,
..Default::default()
},
ParquetBatchFactory::new(Default::default()).with_observer(observer),
AwsExporter::new(AwsConfig {
export_prefix: "echo/clients".to_string(),
export_name: "push_clients".to_string(),
node_addr,
file_extension: "parquet".to_owned(),
bucket_name: export_bucket.to_owned(),
s3_client: s3_client.clone(),
upload_timeout: ANALYTICS_EXPORT_TIMEOUT,
})
.with_observer(observer),
)
.with_observer(observer)
.boxed_shared()
};

Ok(Self {
Self {
messages,
clients,
geoip_resolver,
})
}
}

pub fn message(&self, data: MessageInfo) {
self.messages.collect(data);
if let Err(err) = self.messages.collect(data) {
tracing::warn!(
?err,
data_kind = DataKind::Messages.as_str(),
"failed to collect analytics"
);
}
}

pub fn client(&self, data: ClientInfo) {
self.clients.collect(data);
if let Err(err) = self.clients.collect(data) {
tracing::warn!(
?err,
data_kind = DataKind::Clients.as_str(),
"failed to collect analytics"
);
}
}

pub fn lookup_geo_data(&self, addr: IpAddr) -> Option<geoip::Data> {
Expand All @@ -110,7 +249,7 @@ pub async fn initialize(
s3_client: S3Client,
echo_ip: IpAddr,
geoip_resolver: Option<Arc<MaxMindResolver>>,
) -> Result<PushAnalytics> {
) -> PushAnalytics {
PushAnalytics::with_aws_export(
s3_client,
&config.analytics_export_bucket,
Expand Down
3 changes: 0 additions & 3 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,6 @@ pub enum Error {
#[error(transparent)]
JWT(#[from] jsonwebtoken::errors::Error),

#[error(transparent)]
Parquet(#[from] wc::analytics::collectors::batch::BatchError<parquet::errors::ParquetError>),

#[error("failed to load geoip database from s3")]
GeoIpS3Failed,

Expand Down
Loading
Loading