Skip to content

Commit

Permalink
Update HTTP verbs for DAP-13.
Browse files Browse the repository at this point in the history
Specifically:
* Use POST rather than PUT for report uploads.
* Use GET rather than POST for collection job polling.
  • Loading branch information
branlwyd committed Nov 12, 2024
1 parent bf396e0 commit dccd082
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 91 deletions.
30 changes: 13 additions & 17 deletions aggregator/src/aggregator/collection_job_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use std::{collections::HashSet, sync::Arc};
use trillium::{Handler, KnownHeaderName, Status};
use trillium_testing::{
assert_headers,
prelude::{post, put},
prelude::{get, put},
TestConn,
};

Expand Down Expand Up @@ -95,29 +95,25 @@ impl CollectionJobTestCase {
.await
}

pub(super) async fn post_collection_job_with_auth_token(
pub(super) async fn get_collection_job_with_auth_token(
&self,
collection_job_id: &CollectionJobId,
auth_token: Option<&AuthenticationToken>,
) -> TestConn {
let mut test_conn = post(
self.task
.collection_job_uri(collection_job_id)
.unwrap()
.path(),
);
let mut test_conn = get(self
.task
.collection_job_uri(collection_job_id)
.unwrap()
.path());
if let Some(auth) = auth_token {
let (header, value) = auth.request_authentication();
test_conn = test_conn.with_request_header(header, value);
}
test_conn.run_async(&self.handler).await
}

pub(super) async fn post_collection_job(
&self,
collection_job_id: &CollectionJobId,
) -> TestConn {
self.post_collection_job_with_auth_token(
pub(super) async fn get_collection_job(&self, collection_job_id: &CollectionJobId) -> TestConn {
self.get_collection_job_with_auth_token(
collection_job_id,
Some(self.task.collector_auth_token()),
)
Expand Down Expand Up @@ -343,7 +339,7 @@ async fn collection_job_success_fixed_size() {
.await;
assert_eq!(test_conn.status(), Some(Status::Created));

let test_conn = test_case.post_collection_job(&collection_job_id).await;
let test_conn = test_case.get_collection_job(&collection_job_id).await;
assert_eq!(test_conn.status(), Some(Status::Accepted));

// Update the collection job with the aggregate shares. collection job should now be complete.
Expand Down Expand Up @@ -408,7 +404,7 @@ async fn collection_job_success_fixed_size() {
panic!("unexpected batch ID");
}

let mut test_conn = test_case.post_collection_job(&collection_job_id).await;
let mut test_conn = test_case.get_collection_job(&collection_job_id).await;
assert_headers!(&test_conn, "content-type" => (Collection::<FixedSize>::MEDIA_TYPE));

let collect_resp: Collection<FixedSize> = decode_response_body(&mut test_conn).await;
Expand Down Expand Up @@ -817,7 +813,7 @@ async fn collection_job_put_idempotence_fixed_size_current_batch_no_extra_report
assert_eq!(response.status(), Some(Status::Created));

// Fetch the first collection job, to advance the current batch.
let response = test_case.post_collection_job(&collection_job_id_1).await;
let response = test_case.get_collection_job(&collection_job_id_1).await;
assert_eq!(response.status(), Some(Status::Accepted));

// Create the second collection job.
Expand All @@ -828,7 +824,7 @@ async fn collection_job_put_idempotence_fixed_size_current_batch_no_extra_report

// Fetch the second collection job, to advance the current batch. There are now no outstanding
// batches left.
let response = test_case.post_collection_job(&collection_job_id_2).await;
let response = test_case.get_collection_job(&collection_job_id_2).await;
assert_eq!(response.status(), Some(Status::Accepted));

// Re-send the collection job creation requests to confirm they are still idempotent.
Expand Down
16 changes: 8 additions & 8 deletions aggregator/src/aggregator/http_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ where
"hpke_config",
hpke_config_cors_preflight,
)
.put("tasks/:task_id/reports", instrumented(api(upload::<C>)))
.post("tasks/:task_id/reports", instrumented(api(upload::<C>)))
.with_route(
trillium::Method::Options,
"tasks/:task_id/reports",
Expand Down Expand Up @@ -397,9 +397,9 @@ where
COLLECTION_JOB_ROUTE,
instrumented(api(collection_jobs_put::<C>)),
)
.post(
.get(
COLLECTION_JOB_ROUTE,
instrumented(api(collection_jobs_post::<C>)),
instrumented(api(collection_jobs_get::<C>)),
)
.delete(
COLLECTION_JOB_ROUTE,
Expand Down Expand Up @@ -491,7 +491,7 @@ async fn hpke_config_cors_preflight(mut conn: Conn) -> Conn {
conn
}

/// API handler for the "/tasks/.../reports" PUT endpoint.
/// API handler for the "/tasks/.../reports" POST endpoint.
async fn upload<C: Clock>(
conn: &mut Conn,
(State(aggregator), BodyBytes(body)): (State<Arc<Aggregator<C>>>, BodyBytes),
Expand All @@ -517,12 +517,12 @@ async fn upload<C: Clock>(
/// Handler for CORS preflight requests to "/tasks/.../reports".
async fn upload_cors_preflight(mut conn: Conn) -> Conn {
conn.response_headers_mut()
.insert(KnownHeaderName::Allow, "PUT");
.insert(KnownHeaderName::Allow, "POST");
if let Some(origin) = conn.request_headers().get(KnownHeaderName::Origin) {
let origin = origin.clone();
let request_headers = conn.response_headers_mut();
request_headers.insert(KnownHeaderName::AccessControlAllowOrigin, origin);
request_headers.insert(KnownHeaderName::AccessControlAllowMethods, "PUT");
request_headers.insert(KnownHeaderName::AccessControlAllowMethods, "POST");
request_headers.insert(KnownHeaderName::AccessControlAllowHeaders, "content-type");
request_headers.insert(
KnownHeaderName::AccessControlMaxAge,
Expand Down Expand Up @@ -629,8 +629,8 @@ async fn collection_jobs_put<C: Clock>(
Ok(Status::Created)
}

/// API handler for the "/tasks/.../collection_jobs/..." POST endpoint.
async fn collection_jobs_post<C: Clock>(
/// API handler for the "/tasks/.../collection_jobs/..." GET endpoint.
async fn collection_jobs_get<C: Clock>(
conn: &mut Conn,
State(aggregator): State<Arc<Aggregator<C>>>,
) -> Result<(), Error> {
Expand Down
18 changes: 9 additions & 9 deletions aggregator/src/aggregator/http_handlers/tests/collection_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use serde_json::json;
use trillium::{KnownHeaderName, Status};
use trillium_testing::{
assert_headers,
prelude::{delete, post, put},
prelude::{delete, get, put},
};

#[tokio::test]
Expand Down Expand Up @@ -288,7 +288,7 @@ async fn collection_job_post_request_unauthenticated_collection_jobs() {

// Incorrect authentication token.
let mut test_conn = test_case
.post_collection_job_with_auth_token(&collection_job_id, Some(&random()))
.get_collection_job_with_auth_token(&collection_job_id, Some(&random()))
.await;

let want_status = u16::from(Status::Forbidden);
Expand All @@ -305,7 +305,7 @@ async fn collection_job_post_request_unauthenticated_collection_jobs() {

// Aggregator authentication token.
let mut test_conn = test_case
.post_collection_job_with_auth_token(
.get_collection_job_with_auth_token(
&collection_job_id,
Some(test_case.task.aggregator_auth_token()),
)
Expand All @@ -325,7 +325,7 @@ async fn collection_job_post_request_unauthenticated_collection_jobs() {

// Missing authentication token.
let mut test_conn = test_case
.post_collection_job_with_auth_token(&collection_job_id, None)
.get_collection_job_with_auth_token(&collection_job_id, None)
.await;

let want_status = u16::from(Status::Forbidden);
Expand Down Expand Up @@ -398,7 +398,7 @@ async fn collection_job_success_time_interval() {

assert_eq!(test_conn.status(), Some(Status::Created));

let test_conn = test_case.post_collection_job(&collection_job_id).await;
let test_conn = test_case.get_collection_job(&collection_job_id).await;
assert_eq!(test_conn.status(), Some(Status::Accepted));

// Update the collection job with the aggregate shares and some aggregation jobs. collection
Expand Down Expand Up @@ -452,7 +452,7 @@ async fn collection_job_success_time_interval() {
.await
.unwrap();

let mut test_conn = test_case.post_collection_job(&collection_job_id).await;
let mut test_conn = test_case.get_collection_job(&collection_job_id).await;

assert_eq!(test_conn.status(), Some(Status::Ok));
assert_headers!(
Expand Down Expand Up @@ -502,7 +502,7 @@ async fn collection_job_success_time_interval() {
}

#[tokio::test]
async fn collection_job_post_request_no_such_collection_job() {
async fn collection_job_get_request_no_such_collection_job() {
let test_case = setup_collection_job_test_case(Role::Leader, QueryType::TimeInterval).await;
test_case
.setup_time_interval_batch(Time::from_seconds_since_epoch(0))
Expand All @@ -514,7 +514,7 @@ async fn collection_job_post_request_no_such_collection_job() {
.task
.collector_auth_token()
.request_authentication();
let test_conn = post(format!(
let test_conn = get(format!(
"/tasks/{}/collection_jobs/{no_such_collection_job_id}",
test_case.task.id()
))
Expand Down Expand Up @@ -659,6 +659,6 @@ async fn delete_collection_job() {
assert_eq!(test_conn.status(), Some(Status::NoContent));

// Get the job again
let test_conn = test_case.post_collection_job(&collection_job_id).await;
let test_conn = test_case.get_collection_job(&collection_job_id).await;
assert_eq!(test_conn.status(), Some(Status::NoContent));
}
40 changes: 20 additions & 20 deletions aggregator/src/aggregator/http_handlers/tests/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use tokio::{
time::{sleep, timeout},
};
use trillium::{KnownHeaderName, Status};
use trillium_testing::{assert_headers, prelude::put, TestConn};
use trillium_testing::{assert_headers, prelude::post, TestConn};
use trillium_tokio::Stopper;

#[tokio::test]
Expand Down Expand Up @@ -86,7 +86,7 @@ async fn upload_handler() {

// Upload a report. Do this twice to prove that PUT is idempotent.
for _ in 0..2 {
let mut test_conn = put(task.report_upload_uri().unwrap().path())
let mut test_conn = post(task.report_upload_uri().unwrap().path())
.with_request_header(KnownHeaderName::ContentType, Report::MEDIA_TYPE)
.with_request_body(report.get_encoded().unwrap())
.run_async(&handler)
Expand All @@ -105,7 +105,7 @@ async fn upload_handler() {
*accepted_report_id,
&hpke_keypair,
);
let mut test_conn = put(task.report_upload_uri().unwrap().path())
let mut test_conn = post(task.report_upload_uri().unwrap().path())
.with_request_header(KnownHeaderName::ContentType, Report::MEDIA_TYPE)
.with_request_body(duplicate_id_report.get_encoded().unwrap())
.run_async(&handler)
Expand All @@ -127,7 +127,7 @@ async fn upload_handler() {
report.leader_encrypted_input_share().clone(),
report.helper_encrypted_input_share().clone(),
);
let mut test_conn = put(task.report_upload_uri().unwrap().path())
let mut test_conn = post(task.report_upload_uri().unwrap().path())
.with_request_header(KnownHeaderName::ContentType, Report::MEDIA_TYPE)
.with_request_body(gc_eligible_report.get_encoded().unwrap())
.run_async(&handler)
Expand Down Expand Up @@ -161,7 +161,7 @@ async fn upload_handler() {
),
report.helper_encrypted_input_share().clone(),
);
let mut test_conn = put(task.report_upload_uri().unwrap().path())
let mut test_conn = post(task.report_upload_uri().unwrap().path())
.with_request_header(KnownHeaderName::ContentType, Report::MEDIA_TYPE)
.with_request_body(bad_report.get_encoded().unwrap())
.run_async(&handler)
Expand Down Expand Up @@ -189,7 +189,7 @@ async fn upload_handler() {
report.leader_encrypted_input_share().clone(),
report.helper_encrypted_input_share().clone(),
);
let mut test_conn = put(task.report_upload_uri().unwrap().path())
let mut test_conn = post(task.report_upload_uri().unwrap().path())
.with_request_header(KnownHeaderName::ContentType, Report::MEDIA_TYPE)
.with_request_body(bad_report.get_encoded().unwrap())
.run_async(&handler)
Expand Down Expand Up @@ -218,7 +218,7 @@ async fn upload_handler() {
&hpke_keypair,
clock.now().add(&Duration::from_seconds(120)).unwrap(),
);
let mut test_conn = put(task_expire_soon.report_upload_uri().unwrap().path())
let mut test_conn = post(task_expire_soon.report_upload_uri().unwrap().path())
.with_request_header(KnownHeaderName::ContentType, Report::MEDIA_TYPE)
.with_request_body(report_2.get_encoded().unwrap())
.run_async(&handler)
Expand Down Expand Up @@ -246,7 +246,7 @@ async fn upload_handler() {
.helper_encrypted_input_share()
.clone(),
);
let mut test_conn = put(task.report_upload_uri().unwrap().path())
let mut test_conn = post(task.report_upload_uri().unwrap().path())
.with_request_header(KnownHeaderName::ContentType, Report::MEDIA_TYPE)
.with_request_body(bad_public_share_report.get_encoded().unwrap())
.run_async(&handler)
Expand All @@ -269,7 +269,7 @@ async fn upload_handler() {
// Encrypt report with some arbitrary key that has the same ID as an existing one.
&HpkeKeypair::test_with_id((*hpke_keypair.config().id()).into()),
);
let mut test_conn = put(task.report_upload_uri().unwrap().path())
let mut test_conn = post(task.report_upload_uri().unwrap().path())
.with_request_header(KnownHeaderName::ContentType, Report::MEDIA_TYPE)
.with_request_body(undecryptable_report.get_encoded().unwrap())
.run_async(&handler)
Expand Down Expand Up @@ -309,7 +309,7 @@ async fn upload_handler() {
.helper_encrypted_input_share()
.clone(),
);
let mut test_conn = put(task.report_upload_uri().unwrap().path())
let mut test_conn = post(task.report_upload_uri().unwrap().path())
.with_request_header(KnownHeaderName::ContentType, Report::MEDIA_TYPE)
.with_request_body(bad_leader_input_share_report.get_encoded().unwrap())
.run_async(&handler)
Expand All @@ -331,21 +331,21 @@ async fn upload_handler() {
(),
)
.with_request_header(KnownHeaderName::Origin, "https://example.com/")
.with_request_header(KnownHeaderName::AccessControlRequestMethod, "PUT")
.with_request_header(KnownHeaderName::AccessControlRequestMethod, "POST")
.with_request_header(KnownHeaderName::AccessControlRequestHeaders, "content-type")
.run_async(&handler)
.await;
assert!(test_conn.status().unwrap().is_success());
assert_headers!(
&test_conn,
"access-control-allow-origin" => "https://example.com/",
"access-control-allow-methods"=> "PUT",
"access-control-allow-methods"=> "POST",
"access-control-allow-headers" => "content-type",
"access-control-max-age"=> "86400",
);

// Check for appropriate CORS headers in response to the main request.
let test_conn = put(task.report_upload_uri().unwrap().path())
let test_conn = post(task.report_upload_uri().unwrap().path())
.with_request_header(KnownHeaderName::Origin, "https://example.com/")
.with_request_header(KnownHeaderName::ContentType, Report::MEDIA_TYPE)
.with_request_body(report.get_encoded().unwrap())
Expand Down Expand Up @@ -375,7 +375,7 @@ async fn upload_handler_helper() {
datastore.put_aggregator_task(&helper_task).await.unwrap();
let report = create_report(&helper_task, &hpke_keypair, clock.now());

let mut test_conn = put(task.report_upload_uri().unwrap().path())
let mut test_conn = post(task.report_upload_uri().unwrap().path())
.with_request_header(KnownHeaderName::ContentType, Report::MEDIA_TYPE)
.with_request_body(report.get_encoded().unwrap())
.run_async(&handler)
Expand Down Expand Up @@ -455,7 +455,7 @@ async fn upload_handler_error_fanout() {
// Upload one report and wait for it to finish, to prepopulate the aggregator's task cache.
let report: Report = create_report(&leader_task, &hpke_keypair, clock.now());
let response = client
.put(url.clone())
.post(url.clone())
.header("Content-Type", Report::MEDIA_TYPE)
.body(report.get_encoded().unwrap())
.send()
Expand Down Expand Up @@ -498,7 +498,7 @@ async fn upload_handler_error_fanout() {
async move {
let report = create_report(&leader_task, &hpke_keypair, clock.now());
let response = client
.put(url)
.post(url)
.header("Content-Type", Report::MEDIA_TYPE)
.body(report.get_encoded().unwrap())
.send()
Expand Down Expand Up @@ -563,7 +563,7 @@ async fn upload_client_early_disconnect() {
// Client sends report, using Content-Length, and waits for one byte of response.
let mut client_socket = TcpStream::connect(local_addr).await.unwrap();
let request_line_and_headers = format!(
"PUT /tasks/{task_id}/reports HTTP/1.1\r\n\
"POST /tasks/{task_id}/reports HTTP/1.1\r\n\
Content-Type: application/dap-report\r\n\
Content-Length: {}\r\n\r\n",
encoded_report_1.len(),
Expand All @@ -586,7 +586,7 @@ async fn upload_client_early_disconnect() {
// Client disconnects before sending the entire request body, using Content-Length.
let mut client_socket = TcpStream::connect(local_addr).await.unwrap();
let request_line_and_headers = format!(
"PUT /tasks/{task_id}/reports HTTP/1.1\r\n\
"POST /tasks/{task_id}/reports HTTP/1.1\r\n\
Content-Type: application/dap-report\r\n\
Content-Length: 1000\r\n\r\n"
);
Expand All @@ -601,7 +601,7 @@ async fn upload_client_early_disconnect() {
// Client sends report, using chunked transfer encoding, and waits for one byte of response.
let mut client_socket = TcpStream::connect(local_addr).await.unwrap();
let request_line_and_headers = format!(
"PUT /tasks/{task_id}/reports HTTP/1.1\r\n\
"POST /tasks/{task_id}/reports HTTP/1.1\r\n\
Content-Type: application/dap-report\r\n\
Transfer-Encoding: chunked\r\n\r\n"
);
Expand Down Expand Up @@ -630,7 +630,7 @@ async fn upload_client_early_disconnect() {
// encoding.
let mut client_socket = TcpStream::connect(local_addr).await.unwrap();
let request_line_and_headers = format!(
"PUT /tasks/{task_id}/reports HTTP/1.1\r\n\
"POST /tasks/{task_id}/reports HTTP/1.1\r\n\
Content-Type: application/dap-report\r\n\
Transfer-Encoding: chunked\r\n\r\n"
);
Expand Down
Loading

0 comments on commit dccd082

Please sign in to comment.