Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hybrid integration test follow up #1479

Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
2b06be6
tmp commit: failing
eriktaubeneck Dec 5, 2024
1b09114
fix panic from encrypt-hybrid command
eriktaubeneck Dec 5, 2024
99a86a0
progress
eriktaubeneck Dec 5, 2024
5eec76e
distinct filenames for in_the_clear and mpc output
eriktaubeneck Dec 5, 2024
14d2b3d
temporary printlns for debugging
eriktaubeneck Dec 5, 2024
36bba71
make sure to call .wait on command
eriktaubeneck Dec 5, 2024
f63d16d
fix os error with config setup
eriktaubeneck Dec 5, 2024
0a01488
add query_type lookup
eriktaubeneck Dec 5, 2024
222b46d
add plumbing for starting a hybrid query
eriktaubeneck Dec 3, 2024
4c5558a
make helpers not silent for debugging
eriktaubeneck Dec 5, 2024
82a61dc
Merge branch 'main' into hybrid-integration-test-follow-up
eriktaubeneck Dec 5, 2024
8effb67
integrate with RoundRobinSubmission, in progress
eriktaubeneck Dec 5, 2024
1e6418c
Per shard submission from report collector
akoshelev Dec 5, 2024
44500bb
update HistogramValue type to be consistent across helper and report …
eriktaubeneck Dec 6, 2024
fd76f4e
add hybrid_test assert against in the clear results
eriktaubeneck Dec 6, 2024
35f532f
update comment
eriktaubeneck Dec 6, 2024
ea85c05
Make hybrid run with compact gate
akoshelev Dec 6, 2024
48cad75
Add required features for Hybrid integration tests
akoshelev Dec 6, 2024
299b1bd
update pre-commit and github check.yaml to use correct flags for hybr…
eriktaubeneck Dec 6, 2024
1d76fe6
no default features for hybrid
eriktaubeneck Dec 6, 2024
9f9ceae
check.yaml typo
eriktaubeneck Dec 6, 2024
d3bd6e2
increase STEP_COUNT_LIMIT
eriktaubeneck Dec 6, 2024
626421a
lower step count limit
eriktaubeneck Dec 6, 2024
700646c
Update ipa-core/src/bin/report_collector.rs
eriktaubeneck Dec 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion ipa-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,10 @@ required-features = [
[[test]]
name = "hybrid"
required-features = [
"test-fixture",
"cli",
"compact-gate",
"web-app",
"real-world-infra",
"test-fixture",
"relaxed-dp",
]
69 changes: 54 additions & 15 deletions ipa-core/src/bin/report_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
fmt::Debug,
fs::{File, OpenOptions},
io,
io::{stdout, Write},
io::{stdout, BufReader, Write},
ops::Deref,
path::{Path, PathBuf},
};
Expand All @@ -16,13 +16,20 @@ use ipa_core::{
playbook::{
make_clients, make_sharded_clients, playbook_oprf_ipa, run_hybrid_query_and_validate,
run_query_and_validate, validate, validate_dp, HybridQueryResult, InputSource,
RoundRobinSubmission, StreamingSubmission,
},
CsvSerializer, IpaQueryResult, Verbosity,
},
config::{KeyRegistries, NetworkConfig},
ff::{boolean_array::BA32, FieldType},
helpers::query::{
DpMechanism, HybridQueryParams, IpaQueryConfig, QueryConfig, QuerySize, QueryType,
ff::{
boolean_array::{BA16, BA32},
FieldType,
},
helpers::{
query::{
DpMechanism, HybridQueryParams, IpaQueryConfig, QueryConfig, QuerySize, QueryType,
},
BodyStream,
},
net::{Helper, IpaHttpClient},
report::{EncryptedOprfReportStreams, DEFAULT_KEY_ID},
Expand Down Expand Up @@ -143,6 +150,10 @@ enum ReportCollectorCommand {

#[clap(flatten)]
hybrid_query_config: HybridQueryParams,

/// Number of records to aggreagte
eriktaubeneck marked this conversation as resolved.
Show resolved Hide resolved
#[clap(long, short = 'n')]
count: u32,
},
}

Expand Down Expand Up @@ -255,7 +266,17 @@ async fn main() -> Result<(), Box<dyn Error>> {
ReportCollectorCommand::MaliciousHybrid {
ref encrypted_inputs,
hybrid_query_config,
} => hybrid(&args, hybrid_query_config, clients, encrypted_inputs).await?,
count,
} => {
hybrid(
&args,
hybrid_query_config,
clients,
encrypted_inputs,
count.try_into().expect("u32 should fit into usize"),
)
.await?
}
};

Ok(())
Expand Down Expand Up @@ -402,20 +423,37 @@ async fn hybrid(
hybrid_query_config: HybridQueryParams,
helper_clients: Vec<[IpaHttpClient<Helper>; 3]>,
encrypted_inputs: &EncryptedInputs,
count: usize,
) -> Result<(), Box<dyn Error>> {
let query_type = QueryType::MaliciousHybrid(hybrid_query_config);

let files = [
let [h1_streams, h2_streams, h3_streams] = [
&encrypted_inputs.enc_input_file1,
&encrypted_inputs.enc_input_file2,
&encrypted_inputs.enc_input_file3,
];

// despite the name, this is generic enough to work with hybrid
let encrypted_report_streams = EncryptedOprfReportStreams::from(files);
]
.map(|path| {
let file = File::open(path).unwrap_or_else(|e| panic!("unable to open file {path:?}. {e}"));
RoundRobinSubmission::new(BufReader::new(file))
})
.map(|s| s.into_byte_streams(args.shard_count));

// create byte streams for each shard
let submissions = h1_streams
.into_iter()
.zip(h2_streams.into_iter())
.zip(h3_streams.into_iter())
.map(|((s1, s2), s3)| {
[
BodyStream::from_bytes_stream(s1),
BodyStream::from_bytes_stream(s2),
BodyStream::from_bytes_stream(s3),
]
})
.collect::<Vec<_>>();

let query_config = QueryConfig {
size: QuerySize::try_from(encrypted_report_streams.query_size).unwrap(),
size: QuerySize::try_from(count).unwrap(),
field_type: FieldType::Fp32BitPrime,
query_type,
};
Expand All @@ -426,12 +464,13 @@ async fn hybrid(
.expect("Unable to create query!");

tracing::info!("Starting query for OPRF");
// the value for histogram values (BA32) must be kept in sync with the server-side

// the value for histogram values (BA16) must be kept in sync with the server-side
// implementation, otherwise a runtime reconstruct error will be generated.
// see ipa-core/src/query/executor.rs
let actual = run_hybrid_query_and_validate::<BA32>(
encrypted_report_streams.streams,
encrypted_report_streams.query_size,
let actual = run_hybrid_query_and_validate::<BA16>(
submissions,
count,
helper_clients,
query_id,
hybrid_query_config,
Expand Down
8 changes: 4 additions & 4 deletions ipa-core/src/cli/crypto/hybrid_encrypt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,16 @@ impl HybridEncryptArgs {
let mut key_registries = KeyRegistries::default();

let network =
NetworkConfig::from_toml_str(&read_to_string(&self.network).unwrap_or_else(|e| {
panic!("Failed to open network file: {:?}. {}", &self.network, e)
}))
NetworkConfig::from_toml_str_sharded(&read_to_string(&self.network).unwrap_or_else(
|e| panic!("Failed to open network file: {:?}. {}", &self.network, e),
))
.unwrap_or_else(|e| {
panic!(
"Failed to parse network file into toml: {:?}. {}",
&self.network, e
)
});
let Some(key_registries) = key_registries.init_from(&network) else {
let Some(key_registries) = key_registries.init_from(&network[0]) else {
panic!("could not load network file")
};

Expand Down
35 changes: 19 additions & 16 deletions ipa-core/src/cli/playbook/hybrid.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![cfg(all(feature = "web-app", feature = "cli"))]
use std::{
cmp::min,
iter::zip,
time::{Duration, Instant},
};

Expand All @@ -25,7 +26,7 @@ use crate::{
/// if results are invalid
#[allow(clippy::disallowed_methods)] // allow try_join_all
pub async fn run_hybrid_query_and_validate<HV>(
inputs: [BodyStream; 3],
inputs: Vec<[BodyStream; 3]>,
query_size: usize,
clients: Vec<[IpaHttpClient<Helper>; 3]>,
query_id: QueryId,
Expand All @@ -36,28 +37,30 @@ where
AdditiveShare<HV>: Serializable,
{
let mpc_time = Instant::now();

// for now, submit everything to the leader. TODO: round robin submission
let leader_clients = &clients[0];
try_join_all(
inputs
.into_iter()
.zip(leader_clients)
.map(|(input_stream, client)| {
client.query_input(QueryInput {
query_id,
input_stream,
})
}),
)
assert_eq!(clients.len(), inputs.len());
// submit inputs to each shard
let _ = try_join_all(zip(clients.iter(), inputs.into_iter()).map(
|(shard_clients, shard_inputs)| {
try_join_all(shard_clients.iter().zip(shard_inputs.into_iter()).map(
|(client, input)| {
client.query_input(QueryInput {
query_id,
input_stream: input,
})
},
))
},
))
.await
.unwrap();

let leader_clients = &clients[0];

let mut delay = Duration::from_millis(125);
loop {
if try_join_all(
leader_clients
.iter()
.each_ref()
.map(|client| client.query_status(query_id)),
)
.await
Expand Down
1 change: 1 addition & 0 deletions ipa-core/src/cli/playbook/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use tokio::time::sleep;
pub use self::{
hybrid::{run_hybrid_query_and_validate, HybridQueryResult},
ipa::{playbook_oprf_ipa, run_query_and_validate},
streaming::{RoundRobinSubmission, StreamingSubmission},
};
use crate::{
cli::config_parse::HelperNetworkConfigParseExt,
Expand Down
4 changes: 2 additions & 2 deletions ipa-core/src/cli/playbook/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{

/// Trait for submitting inputs as streams, rather than reading everything
/// in memory. Should provide better performance for very large inputs.
trait StreamingSubmission {
pub trait StreamingSubmission {
/// Spits itself into `count` instances of [`BytesStream`].
fn into_byte_streams(self, count: usize) -> Vec<impl BytesStream>;
}
Expand All @@ -25,7 +25,7 @@ trait StreamingSubmission {
/// and delimited by newlines. The output streams will have
/// run-length encoding, meaning that each element will have
/// a 2 byte length prefix added to it.
struct RoundRobinSubmission<R>(R);
pub struct RoundRobinSubmission<R>(R);

impl<R: BufRead> RoundRobinSubmission<R> {
pub fn new(read_from: R) -> Self {
Expand Down
4 changes: 4 additions & 0 deletions ipa-core/src/net/http_serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ pub mod query {
let Query(q) = req.extract().await?;
Ok(QueryType::MaliciousOprfIpa(q))
}
QueryType::MALICIOUS_HYBRID_STR => {
let Query(q) = req.extract().await?;
Ok(QueryType::MaliciousHybrid(q))
}
other => Err(Error::bad_query_value("query_type", other)),
}?;
Ok(QueryConfigQueryParams(QueryConfig {
Expand Down
6 changes: 2 additions & 4 deletions ipa-core/src/protocol/basics/shard_fin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
ff::{boolean::Boolean, boolean_array::BooleanArray, Serializable},
helpers::{Message, TotalRecords},
protocol::{
boolean::step::SixteenBitStep,
boolean::step::ThirtyTwoBitStep,
context::{
dzkp_validator::DZKPValidator, DZKPContext, DZKPUpgradedMaliciousContext,
DZKPUpgradedSemiHonestContext, MaliciousProtocolSteps, ShardedContext,
Expand Down Expand Up @@ -288,9 +288,7 @@ where
C: 'a,
{
async move {
// todo: SixteenBit only works for values up to BA16. EightBitStep will panic if we try
// to add larger values
self.values = integer_sat_add::<_, SixteenBitStep, B>(
self.values = integer_sat_add::<_, ThirtyTwoBitStep, B>(
eriktaubeneck marked this conversation as resolved.
Show resolved Hide resolved
ctx,
record_id,
&self.values,
Expand Down
2 changes: 1 addition & 1 deletion ipa-core/src/protocol/hybrid/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ where
let aggregated_reports = aggregate_reports::<BK, V, C>(ctx.clone(), sharded_reports).await?;

let histogram = breakdown_reveal_aggregation::<C, BK, V, HV, B>(
ctx.clone(),
ctx.narrow(&Step::Aggregate),
aggregated_reports,
&dp_padding_params,
)
Expand Down
3 changes: 3 additions & 0 deletions ipa-core/src/protocol/hybrid/step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ pub(crate) enum HybridStep {
GroupBySum,
#[step(child = crate::protocol::context::step::DzkpValidationProtocolStep)]
GroupBySumValidate,
#[step(child = crate::protocol::ipa_prf::aggregation::step::AggregationStep)]
Aggregate,
#[step(child = FinalizeSteps)]
Finalize,
}
Expand All @@ -33,6 +35,7 @@ pub(crate) enum AggregateReportsStep {

#[derive(CompactStep)]
pub(crate) enum FinalizeSteps {
#[step(child = crate::protocol::ipa_prf::boolean_ops::step::SaturatedAdditionStep)]
Add,
#[step(child = crate::protocol::context::step::DzkpValidationProtocolStep)]
Validate,
Expand Down
19 changes: 17 additions & 2 deletions ipa-core/src/query/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::{
Gate,
},
query::{
runner::{OprfIpaQuery, QueryResult},
runner::{execute_hybrid_protocol, OprfIpaQuery, QueryResult},
state::RunningQuery,
},
sync::Arc,
Expand Down Expand Up @@ -165,7 +165,22 @@ pub fn execute<R: PrivateKeyRegistry>(
)
},
),
(QueryType::MaliciousHybrid(_), _) => todo!(),
(QueryType::MaliciousHybrid(ipa_config), _) => do_query(
runtime,
config,
gateway,
input,
move |prss, gateway, config, input| {
Box::pin(execute_hybrid_protocol(
prss,
gateway,
input,
ipa_config,
config,
key_registry,
))
},
),
}
}

Expand Down
Loading
Loading