Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous aggregation jobs #3451

Open
inahga opened this issue Oct 22, 2024 · 6 comments
Open

Asynchronous aggregation jobs #3451

inahga opened this issue Oct 22, 2024 · 6 comments

Comments

@inahga
Copy link
Contributor

inahga commented Oct 22, 2024

DAP-12 supports handling aggregation jobs asynchronously. Determine how to implement this, and implement it.

@inahga
Copy link
Contributor Author

inahga commented Oct 22, 2024

There are a few ways we can support asynchronous aggregation.

Leader changes

In all situations, we need to change the leader such that it supports a helper behaving
asynchronously. This requires aggregation job driver changes. There are two approaches
possible:

  1. Add polling logic directly after the first aggregate init call, without kicking the job back
    in the queue. This is the most straightforward approach, and requires no database schema
    changes. Recovery should still be trivial as long as we generate the same PUT request.

  2. Add logic that kicks the job back in the queue when receiving a polling response. At a glance
    I think this is a much more invasive change. We need a way to checkpoint the leader-side
    preparation state to the database, and handle a new state PollingHelper.

    This is complicated but would make more efficient use of the job driver's time, since job
    drivers won't be tied up by polling the helper. (This can also be trivially remediated by
    increasing the concurrency of the job driver).

I lean towards approach 1, since it can be straightforwardly accomplished. Depending on the outcome
of the second desgn decision below, we may not have a strong use case for a async-aware Leader, so
optimization won't be necessary.

The database schema changes look invasive, so we might at least want to at least sketch out approach
2 and commit database changes preemptively, even if only adopting approach 1.

I believe Daphne will be implementing helper-side async, so we should be able to use it for
practical interop testing. See cloudflare/daphne#697.

Helper changes

We have options for how we want to implement helper-side async.

  1. Minimally, i.e. handle calls synchronously. This is (should be) allowable under the spec. Only
    make the necessary HTTP semantic changes.

  2. Support async.

    Implementation is sketched out in Asynchronous aggregation jobs #3331. We would need a
    new ReportAggregations state (n.b. the PR overloads StartLeader as a hack, proper implementation
    would introduce a new state).

    It's proposed to handle async jobs in the aggregation job driver. This seems like the natural
    place to put them. It does complicate trivial helper deployments somewhat. The simple solution
    is to allow the aggregator binary to internally run an instance of the aggregation job driver,
    like we do with the garbage collector and key rotator.

    Options for supporting one or both of async and sync.

    a. Support both, toggled by feature flag or some arbitrary runtime heuristic (e.g. size of
    aggregation job). Most flexible, lets us play around in load testing or production to determine
    which is better.

    b. Support async only. Simpler. Introduces overall less code into Janus, i.e. nominally moves
    the current aggregation handlers into the job driver.

If adopting 1, we still may want to make the schema changes necessary for 2, in case we want to
adopt 2 later.

#3331 is available as a reference of what approach 2 looks like, but I would probably start from a
clean sheet since it's fairly cobbled together.

Open Questions/Conclusions

  1. For the leader, should we go for approach 1 or 2? If we do approach 1, should we still map out the
    schema changes necessary to support option 2?

  2. For the helper, should we go for approach 1 or 2? If we do approach 2, should we adopt approach 2a,
    or approach 2b? If we do approach 1, should we still map out the schemea changes necesary to support
    approach 2?

We may need to speak with stakeholders to determine where they lean as well.

@inahga inahga mentioned this issue Oct 22, 2024
20 tasks
@branlwyd
Copy link
Contributor

Leader

As I see it, the primary operational difference to approach #1 is that we will retransmit the request with each polling attempt, wasting bandwidth. Approach #2 does not have this (but does require more implementation effort). Are there other differences?

If that is the score, I think we should strongly consider implementing #2. Either way, we should definitely sketch out the schema changes required for #2.

Separately, #1 might not save as much implementation effort as we expect: currently the aggregation job driver expects to be able to pick up any unclaimed work item. Without some work to ensure appropriate backoff, we might end up polling the same aggregation job very frequently. (We'd need something similar for #2.)

Helper

We don't need to implement this immediately/at all -- #1 is allowed by the spec. I think we should implement the schema changes and consider kicking the functional implmenetation down the road.

@inahga
Copy link
Contributor Author

inahga commented Oct 22, 2024

As I see it, the primary operational difference to approach #1 is that we will retransmit the request with each polling attempt, wasting bandwidth. Approach #2 does not have this (but does require more implementation effort). Are there other differences?

Not quite. In both cases there shouldn't be any unnecessary retransmission of request. The key difference is just when we poll the aggregation job.

At the moment, we do roughly

// in step_aggregation_job_aggregate_init()

// Calculated in code previous to this.
let job: AggregationJob;
let leader_side_state; // leader-side prepared report aggregations

let resp = {
    let request = AggregationJobInitializeReq::new(...);
    send_request_to_helper(request, ...);
};
process_response_from_helper(resp, leader_side_state);

Approach 1 prescribes that we simply do the polling inline, roughly:

let backoff = ExponentialBackoff::default();
let resp = {
    let request = AggregationJobInitializeReq::new(...);
    let resp = send_put_request_to_helper(request, ...);

    match resp.status {
        AggregationJobStatus::Ready(prepare_resps) => prepare_resps,
        AggregationJobStatus::Processing(uri_poll_to_at) => {
             loop {
                 let resp = send_get_request_to_helper(uri_to_poll_at);
                 match resp {
                     AggregationJobStatus::Ready(prepare_resps) => break prepare_resps,
                     AggregationJobStatus::processing => backoff.wait(),
                 }
             }
        }
    }
}
process_response_from_helper(resp, leader_side_state);

The drawback being that the job driver is now sleeping without doing productive work between polling attempts (i.e. it's not servicing other waiting aggregation jobs).

Approach 2 prescribes that we do this:

// Calculated in code previous to this,
let job: AggregationJob;
let leader_side_state; // leader-side prepared report aggregations

let resp = {
    let request = AggregationJobInitializeReq::new(job, ...);
    let resp = send_put_request_to_helper(request, ...);

    match resp.status {
        AggregationJobStatus::Ready(prepare_resps) => prepare_resps,
        AggregationJobStatus::Processing(uri_poll_to_at) => {
             job.set_state(AggregationJobState::PollingHelper, leader_side_state);
             job.poll_again_after(some_duration)
             return Ok(());
        }
    }
}
process_response_from_helper(resp, leader_side_state);

Then we have separate logic in the job acquirer that dispatches on this new state:

let aggregation_jobs = get_aggregation_jobs_in_non_terminal_state();
for job in aggregation jobs {
    match aggregation_jobs.state() {
         AggregationJobState::InProgress => { do_whatever_we_do_normally() }
         AggregationJobState::PollingHelper => {
             let leader_state = load_checkpointed_leader_state(job);
             poll_aggregation_job(job, leader_state)
         }
    }
}

where poll_aggregation_job() is

let resp = {
    let resp = send_get_request_to_helper(job.uri_to_poll, ...);
    match resp.status {
        AggregationJobStatus::Ready(prepare_resps) => prepare_resps,
        AggregationJobStatus::Processing(uri_poll_to_at) => {
             job.poll_again_after(some_duration);
             return Ok(());
        }
    }
}

process_response_from_helper(resp, report_states);

The drawback being more complexity and needing to checkpoint the leader-side report aggregation state to the database. But this should make better use of aggregation job driver time.

@branlwyd
Copy link
Contributor

Apologies for the misunderstanding -- I think we should definitely go with Leader approach 2. Taking up an aggregation job driver "slot" doing exponential backoff polling is very likely to negatively impact the throughput of the system. Instead, if the Helper is not ready, we should put the job back onto the queue (likely with exponential backoff over repeated attempts?) and look for different work to pick up.

@inahga
Copy link
Contributor Author

inahga commented Oct 23, 2024

SGTM.

FWIW this problem more or less already exists, because we exponentially backoff on transient retries to the PUT request. During which time the aggregation job driver is not doing productive work. It may be worth also considering what changes are needed to fix this, since we'll be overhauling this code anyway (but not a priority).

@branlwyd
Copy link
Contributor

branlwyd commented Oct 23, 2024

I agree. The simplest way to address this might be to rip out that exponential-backoff polling as well, solving it the same way we plan to solve Helper polling: implementing the backoff based on placing the job back into the queue with an exponential backoff on how quickly it can be re-acquired.

(Though this is off-topic from this issue, & optional.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants