diff --git a/Cargo.lock b/Cargo.lock index fe2e48659..a36f2e034 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3898,9 +3898,9 @@ checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" [[package]] name = "tracing" -version = "0.1.36" +version = "0.1.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fce9567bd60a67d08a16488756721ba392f24f29006402881e43b19aac64307" +checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" dependencies = [ "cfg-if", "log", @@ -3911,9 +3911,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.22" +version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11c75893af559bc8e10716548bdef5cb2b983f8e637db9d0e15126b61b484ee2" +checksum = "4017f8f45139870ca7e672686113917c71c7a6e02d4924eda67186083c03081a" dependencies = [ "proc-macro2", "quote", @@ -3922,9 +3922,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.29" +version = "0.1.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5aeea4303076558a00714b823f9ad67d58a3bbda1df83d8827d21193156e22f7" +checksum = "24eb03ba0eab1fd845050058ce5e616558e8f8d8fca633e6b163fe25c797213a" dependencies = [ "once_cell", "valuable", diff --git a/integration_tests/Cargo.toml b/integration_tests/Cargo.toml index 1c4f972b6..0278c2533 100644 --- a/integration_tests/Cargo.toml +++ b/integration_tests/Cargo.toml @@ -38,7 +38,7 @@ serde = { version = "1", optional = true } serde_json = { version = "1", optional = true } testcontainers = "0.14.0" tokio = { version = "1", features = ["full", "tracing"], optional = true } -tracing = "0.1.36" +tracing = "0.1.37" url = { version = "2.3.1", features = ["serde"] } [dev-dependencies] diff --git a/interop_binaries/Cargo.toml b/interop_binaries/Cargo.toml index e87df34f9..e86787aba 100644 --- a/interop_binaries/Cargo.toml +++ b/interop_binaries/Cargo.toml @@ -43,7 +43,7 @@ serde = { version = "1.0.145", features = ["derive"] } serde_json = "1.0.86" testcontainers = { version = "0.14", optional = true } tokio = { version = "1.21", features = ["full", "tracing"] } -tracing = "0.1.36" +tracing = "0.1.37" tracing-log = "0.1.3" tracing-subscriber = { version = "0.3", features = ["std", "env-filter", "fmt"] } url = { version = "2.3.1", features = ["serde"] } diff --git a/janus_client/Cargo.toml b/janus_client/Cargo.toml index f2eeed205..1c666f583 100644 --- a/janus_client/Cargo.toml +++ b/janus_client/Cargo.toml @@ -19,7 +19,7 @@ prio = { version = "0.8.2", features = ["multithreaded"] } reqwest = { version = "0.11.12", default-features = false, features = ["rustls-tls"] } thiserror = "1.0" tokio = { version = "1.21", features = ["full"] } -tracing = "0.1.36" +tracing = "0.1.37" url = "2.3.1" [dev-dependencies] diff --git a/janus_collector/Cargo.toml b/janus_collector/Cargo.toml index 3b9d127ae..f83de575e 100644 --- a/janus_collector/Cargo.toml +++ b/janus_collector/Cargo.toml @@ -22,7 +22,7 @@ reqwest = { version = "0.11.12", default-features = false, features = ["rustls-t retry-after = "0.3.1" thiserror = "1.0" tokio = { version = "1.21", features = ["full"] } -tracing = "0.1.36" +tracing = "0.1.37" url = "2.3.1" [dev-dependencies] diff --git a/janus_core/Cargo.toml b/janus_core/Cargo.toml index 3581e5c52..ea9f59b95 100644 --- a/janus_core/Cargo.toml +++ b/janus_core/Cargo.toml @@ -41,7 +41,7 @@ ring = "0.16.20" serde = { version = "1.0.145", features = ["derive"] } thiserror = "1.0" tokio = { version = "1.21", features = ["macros", "net", "rt"] } -tracing = "0.1.36" +tracing = "0.1.37" # Dependencies required only if feature "test-util" is enabled assert_matches = { version = "1", optional = true } diff --git a/janus_server/Cargo.toml b/janus_server/Cargo.toml index e5eaa2db0..f3176a75d 100644 --- a/janus_server/Cargo.toml +++ b/janus_server/Cargo.toml @@ -69,7 +69,7 @@ thiserror = "1.0" tokio = { version = "1.21", features = ["full", "tracing"] } tokio-postgres = { version = "0.7.7", features = ["with-chrono-0_4", "with-serde_json-1", "with-uuid-1", "array-impls"] } tonic = { version = "0.8", optional = true, features = ["tls", "tls-webpki-roots"] } # keep this version in sync with what opentelemetry-otlp uses -tracing = "0.1.36" +tracing = "0.1.37" tracing-log = "0.1.3" tracing-opentelemetry = { version = "0.18", optional = true } tracing-subscriber = { version = "0.3", features = ["std", "env-filter", "fmt", "json"] } diff --git a/janus_server/src/aggregator.rs b/janus_server/src/aggregator.rs index 0f8c48e6c..08397726a 100644 --- a/janus_server/src/aggregator.rs +++ b/janus_server/src/aggregator.rs @@ -2857,15 +2857,20 @@ mod tests { async fn upload_wrong_hpke_config_id() { install_test_trace_subscriber(); - let (aggregator, _, mut report, _, _db_handle) = setup_upload_test().await; + let (aggregator, task, report, _, _db_handle) = setup_upload_test().await; - report = Report::new( + let unused_hpke_config_id = (0..) + .map(HpkeConfigId::from) + .find(|id| !task.hpke_keys.contains_key(id)) + .unwrap(); + + let report = Report::new( report.task_id(), report.nonce(), report.extensions().to_vec(), vec![ HpkeCiphertext::new( - HpkeConfigId::from(101), + unused_hpke_config_id, report.encrypted_input_shares()[0] .encapsulated_context() .to_vec(), @@ -2877,7 +2882,7 @@ mod tests { assert_matches!(aggregator.handle_upload(&report.get_encoded()).await, Err(Error::OutdatedHpkeConfig(config_id, task_id)) => { assert_eq!(task_id, report.task_id()); - assert_eq!(config_id, HpkeConfigId::from(101)); + assert_eq!(config_id, unused_hpke_config_id); }); } diff --git a/janus_server/src/aggregator/aggregation_job_creator.rs b/janus_server/src/aggregator/aggregation_job_creator.rs index 04c13ca4e..d7d3f78df 100644 --- a/janus_server/src/aggregator/aggregation_job_creator.rs +++ b/janus_server/src/aggregator/aggregation_job_creator.rs @@ -14,17 +14,18 @@ use opentelemetry::{ metrics::{Histogram, Unit}, Context, KeyValue, }; -use prio::vdaf; -use prio::vdaf::prio3::{Prio3Aes128Count, Prio3Aes128Histogram, Prio3Aes128Sum}; -use prio::{codec::Encode, vdaf::prio3::Prio3Aes128CountVecMultithreaded}; +use prio::{ + codec::Encode, + vdaf::prio3::Prio3Aes128CountVecMultithreaded, + vdaf::{ + self, + prio3::{Prio3Aes128Count, Prio3Aes128Histogram, Prio3Aes128Sum}, + }, +}; use rand::{thread_rng, Rng}; -use std::collections::HashMap; -#[cfg(test)] -use std::hash::Hash; -use std::sync::Arc; -use std::time::Duration; -use tokio::select; +use std::{collections::HashMap, convert::Infallible, sync::Arc, time::Duration}; use tokio::{ + select, sync::oneshot::{self, Receiver, Sender}, time::{self, Instant, MissedTickBehavior}, }; @@ -85,7 +86,7 @@ impl AggregationJobCreator { } #[tracing::instrument(skip(self))] - pub async fn run(self: Arc) -> ! { + pub async fn run(self: Arc) -> Infallible { // TODO(#224): add support for handling only a subset of tasks in a single job (i.e. sharding). // Create histogram metrics. @@ -362,7 +363,7 @@ impl AggregationJobCreator { A::PrepareState: Send + Sync + Encode, A::OutputShare: Send + Sync, for<'a> &'a A::OutputShare: Into>, - A::AggregationParam: Send + Sync + Eq + Hash, + A::AggregationParam: Send + Sync + Eq + std::hash::Hash, { let task_id = task.id; let min_batch_duration = task.min_batch_duration; diff --git a/janus_server/src/binary_utils/job_driver.rs b/janus_server/src/binary_utils/job_driver.rs index 922309652..d17e3e451 100644 --- a/janus_server/src/binary_utils/job_driver.rs +++ b/janus_server/src/binary_utils/job_driver.rs @@ -7,7 +7,7 @@ use opentelemetry::{ metrics::{Meter, Unit}, Context, KeyValue, }; -use std::{fmt::Debug, future::Future, sync::Arc}; +use std::{convert::Infallible, fmt::Debug, future::Future, sync::Arc}; use tokio::{ sync::Semaphore, time::{self, Instant}, @@ -88,7 +88,7 @@ where /// Run this job driver, periodically seeking incomplete jobs and stepping them. #[tracing::instrument(skip(self))] - pub async fn run(self: Arc) -> ! { + pub async fn run(self: Arc) -> Infallible { // Create metric recorders. let job_acquire_time_histogram = self .meter