Skip to content

Commit

Permalink
Replace aggregate() with streaming API
Browse files Browse the repository at this point in the history
Modify both `Daf` and `Vdaf` by replacing `aggregate()` with two
methods:

* `agg_init()` returns an empty aggregate share
* `agg_update()` updates an aggregate share with an output share

Also, add a method `merge()` for merging two aggregate shares, as will
be required for specifying aggregation in DAP.

Co-authored-by: Brandon Pitman <[email protected]>
  • Loading branch information
cjpatton and branlwyd committed Oct 9, 2024
1 parent 592f767 commit 8bbac2e
Show file tree
Hide file tree
Showing 8 changed files with 261 additions and 184 deletions.
221 changes: 121 additions & 100 deletions draft-irtf-cfrg-vdaf.md
Original file line number Diff line number Diff line change
Expand Up @@ -875,9 +875,9 @@ measurement process is as follows:
this process succeeds, then each aggregator holds one output share.
* In VDAFs, Aggregators will need to exchange information among themselves
as part of the validation process.
* Each Aggregator combines the output shares in the batch to compute the
"aggregate share" for that batch, i.e., its share of the desired aggregate
result.
* As each Aggregator recovers output shares, it accumulates them into an
"aggregate share" for the batch, i.e., its
share of the desired aggregate result.
* The Aggregators submit their aggregate shares to the Collector, who combines
them to obtain the aggregate result over the batch.

Expand Down Expand Up @@ -1057,46 +1057,62 @@ DAFs MUST implement the following function:

## Aggregation {#sec-daf-aggregate}

Once an Aggregator holds output shares for a batch of measurements (where
batches are defined by the application), it combines them into a share of the
desired aggregate result:
Once an Aggregator holds an output share, it adds it into its aggregate share
for the batch (where batches are defined by the application).

* `daf.aggregate(agg_param: AggParam, out_shares: list[OutShare]) -> AggShare`
is the deterministic aggregation algorithm. It is run by each Aggregator a
set of recovered output shares.
* `daf.agg_init(agg_param: AggParam) -> AggShare` returns an empty aggregate
share. It is called to initialize aggregation of a batch of measurements.

~~~~
Aggregator 0 Aggregator 1 Aggregator SHARES-1
============ ============ ===================
* `daf.agg_update(agg_param: AggParam, agg_share: AggShare, out_share:
OutShare) -> AggShare` accumulates an output share into an aggregate share
and returns the updated aggregate share.

out_share_0_0 out_share_1_0 out_share_[SHARES-1]_0
out_share_0_1 out_share_1_1 out_share_[SHARES-1]_1
out_share_0_2 out_share_1_2 out_share_[SHARES-1]_2
... ... ...
out_share_0_B out_share_1_B out_share_[SHARES-1]_B
| | |
V V V
+-----------+ +-----------+ +-----------+
| aggregate | | aggregate | ... | aggregate |
+-----------+ +-----------+ +-----------+
| | |
V V V
agg_share_0 agg_share_1 agg_share_[SHARES-1]
~~~~
{: #aggregate-flow title="Aggregation of output shares. `B` indicates the number of
measurements in the batch."}
Aggregator j
============

+------------+ +------------+
| agg_init |--->| agg_update |<--- out_share_0
+------------+ +------------+
|
V
+------------+
| agg_update |<--- out_share_1,
+------------+
|
V
...
|
V
+------------+
| agg_update |<--- out_share_MEAS_COUNT
+------------+
|
V
agg_share_j
~~~~
{: #aggregate-flow title="Local aggregation of output shares by Aggregator j.
The number of measurements in the batch is denoted by MEAS_COUNT."}

In many situations it is desirable to split an aggregate share across multiple
storage elements, then merge the aggregate shares together just before
releasing the completed aggregate share to the Collector. DAFs facilitate this
with the following method:

* `daf.merge(agg_param: AggParam, agg_shares: list[AggShare]) -> AggShare`
merges a sequence of aggregate shares into a single aggregate share.

For simplicity, we have written this algorithm in a "one-shot" form, where all
output shares for a batch are provided at the same time. Many DAFs may also
support a "streaming" form, where shares are processed one at a time.
### Aggregation Order {#agg-order}

Implementation note: for most natural DAFs (and VDAFs) it is not necessary for
an Aggregator to store all output shares individually before aggregating.
Typically it is possible to merge output shares into aggregate shares as they
arrive, merge these into other aggregate shares, and so on. In particular, this
is the case when the output shares are vectors over some finite field and
aggregating them involves merely adding up the vectors element-wise. Such is the
case for Prio3 {{prio3}} and Poplar1 {{poplar1}}.
For most DAFs and VDAFs, the outcome of aggregation is not sensitive to the
order in which ouptut shares are aggregated. That is, aggregate shares can be
updated and merged with other aggregate shares in any order. For instance, for
both Prio3 ({{prio3}}) and Poplar1 ({{poplar1}}), the aggregate shares and
output shares both have the same type, a vector over some finite field
({{field}}); aggregation involves simply adding vectors toegther.

In theory, however, correct execution may require each Aggregator to aggregate
output shares in the same order.

## Unsharding {#sec-daf-unshard}

Expand Down Expand Up @@ -1168,28 +1184,24 @@ def run_daf(
"measurements and nonces lists have different lengths"
)

out_shares: list[list[OutShare]]
out_shares = [[] for j in range(daf.SHARES)]
agg_shares: list[AggShare]
agg_shares = [daf.agg_init(agg_param)
for _ in range(daf.SHARES)]
for (measurement, nonce) in zip(measurements, nonces):
# Each Client shards its measurement into input shares and
# distributes them among the Aggregators.
rand = gen_rand(daf.RAND_SIZE)
(public_share, input_shares) = \
daf.shard(ctx, measurement, nonce, rand)

# Each Aggregator prepares its input share for aggregation.
# Each Aggregator computes its output share from its input
# share and aggregates it.
for j in range(daf.SHARES):
out_shares[j].append(
daf.prep(ctx, j, agg_param, nonce,
public_share, input_shares[j]))

# Each Aggregator aggregates its output shares into an aggregate
# share and sends it to the Collector.
agg_shares = []
for j in range(daf.SHARES):
agg_share_j = daf.aggregate(agg_param,
out_shares[j])
agg_shares.append(agg_share_j)
out_share = daf.prep(ctx, j, agg_param, nonce,
public_share, input_shares[j])
agg_shares[j] = daf.agg_update(agg_param,
agg_shares[j],
out_share)

# Collector unshards the aggregate result.
num_measurements = len(measurements)
Expand Down Expand Up @@ -1447,16 +1459,21 @@ VDAFs MUST implement the following function:

## Aggregation {#sec-vdaf-aggregate}

VDAF Aggregation is identical to DAF Aggregation (cf. {{sec-daf-aggregate}}):
VDAF aggregation is identical to DAF aggregation ({{sec-daf-aggregate}}):

* `vdaf.agg_init(agg_param: AggParam) -> AggShare` returns an empty aggregate
share. It is called to initialize aggregation of a batch of measurements.

* `vdaf.aggregate(agg_param: AggParam, out_shares: list[OutShare]) -> AggShare`
is the deterministic aggregation algorithm. It is run by each Aggregator over
the output shares it has computed for a batch of measurements.
* `vdaf.agg_update(agg_param: AggParam, agg_share: AggShare, out_share:
OutShare) -> AggShare` accumulates an output share into an aggregate share
and returns the updated aggregate share.

The data flow for this stage is illustrated in {{aggregate-flow}}. Here again,
we have the aggregation algorithm in a "one-shot" form, where all shares for a
batch are provided at the same time. VDAFs typically also support a "streaming"
form, where shares are processed one at a time.
* `vdaf.merge(agg_param: AggParam, agg_shares: list[AggShare]) -> AggShare`
merges a sequence of aggregate shares into a single aggregate share.

The data flow for this stage is illustrated in {{aggregate-flow}}. Like DAFs,
computation of the VDAF aggregate is not usually sensitive to the order in
which output shares are aggregated. See {{agg-order}}.

## Unsharding {#sec-vdaf-unshard}

Expand Down Expand Up @@ -1513,7 +1530,8 @@ def run_vdaf(
"measurements and nonces lists have different lengths"
)

out_shares = []
agg_shares = [vdaf.agg_init(agg_param)
for _ in range(vdaf.SHARES)]
for (nonce, measurement) in zip(nonces, measurements):
assert len(nonce) == vdaf.NONCE_SIZE

Expand All @@ -1534,7 +1552,7 @@ def run_vdaf(
prep_states.append(state)
outbound_prep_shares.append(share)

# Aggregators recover their output shares.
# Aggregators complete preparation.
for i in range(vdaf.ROUNDS - 1):
prep_msg = vdaf.prep_shares_to_prep(ctx,
agg_param,
Expand All @@ -1547,28 +1565,17 @@ def run_vdaf(
(prep_states[j], prep_share) = out
outbound_prep_shares.append(prep_share)

# The final outputs of the prepare phase are the output
# shares.
prep_msg = vdaf.prep_shares_to_prep(ctx,
agg_param,
outbound_prep_shares)

outbound_out_shares = []
# Each Aggregator computes and aggregates its output share.
for j in range(vdaf.SHARES):
out_share = vdaf.prep_next(ctx, prep_states[j], prep_msg)
assert not isinstance(out_share, tuple)
outbound_out_shares.append(out_share)

out_shares.append(outbound_out_shares)

# Each Aggregator aggregates its output shares into an
# aggregate share. In a distributed VDAF computation, the
# aggregate shares are sent over the network.
agg_shares = []
for j in range(vdaf.SHARES):
out_shares_j = [out[j] for out in out_shares]
agg_share_j = vdaf.aggregate(agg_param, out_shares_j)
agg_shares.append(agg_share_j)
agg_shares[j] = vdaf.agg_update(agg_param,
agg_shares[j],
out_share)

# Collector unshards the aggregate.
num_measurements = len(measurements)
Expand Down Expand Up @@ -3028,14 +3035,22 @@ Aggregating a set of output shares is simply a matter of adding up the vectors
element-wise.

~~~ python
def aggregate(
self,
_agg_param: None,
out_shares: list[list[F]]) -> list[F]:
agg_share = self.flp.field.zeros(self.flp.OUTPUT_LEN)
for out_share in out_shares:
agg_share = vec_add(agg_share, out_share)
return agg_share
def agg_init(self, _agg_param: None) -> list[F]:
return self.flp.field.zeros(self.flp.OUTPUT_LEN)

def agg_update(self,
_agg_param: None,
agg_share: list[F],
out_share: list[F]) -> list[F]:
return vec_add(agg_share, out_share)

def merge(self,
_agg_param: None,
agg_shares: list[list[F]]) -> list[F]:
agg = self.agg_init(None)
for agg_share in agg_shares:
agg = vec_add(agg, agg_share)
return agg
~~~

### Unsharding
Expand All @@ -3049,9 +3064,7 @@ def unshard(
_agg_param: None,
agg_shares: list[list[F]],
num_measurements: int) -> AggResult:
agg = self.flp.field.zeros(self.flp.OUTPUT_LEN)
for agg_share in agg_shares:
agg = vec_add(agg, agg_share)
agg = self.merge(None, agg_shares)
return self.flp.decode(agg, num_measurements)
~~~

Expand Down Expand Up @@ -4928,16 +4941,28 @@ def is_valid(
Aggregation involves simply adding up the output shares.

~~~ python
def aggregate(
self,
agg_param: Poplar1AggParam,
out_shares: list[FieldVec]) -> FieldVec:
def agg_init(self, agg_param: Poplar1AggParam) -> FieldVec:
(level, prefixes) = agg_param
field = self.idpf.current_field(level)
return field.zeros(len(prefixes))

def agg_update(self,
agg_param: Poplar1AggParam,
agg_share: FieldVec,
out_share: FieldVec) -> FieldVec:
a = cast(list[Field], agg_share)
o = cast(list[Field], out_share)
return cast(FieldVec, vec_add(a, o))

def merge(self,
agg_param: Poplar1AggParam,
agg_shares: list[FieldVec]) -> FieldVec:
(level, prefixes) = agg_param
field = self.idpf.current_field(level)
agg_share = cast(list[Field], field.zeros(len(prefixes)))
for out_share in out_shares:
agg_share = vec_add(agg_share, cast(list[Field], out_share))
return cast(FieldVec, agg_share)
agg = cast(list[Field], field.zeros(len(prefixes)))
for agg_share in agg_shares:
agg = vec_add(agg, cast(list[Field], agg_share))
return cast(FieldVec, agg)
~~~

### Unsharding
Expand All @@ -4951,11 +4976,7 @@ def unshard(
agg_param: Poplar1AggParam,
agg_shares: list[FieldVec],
_num_measurements: int) -> list[int]:
(level, prefixes) = agg_param
field = self.idpf.current_field(level)
agg = cast(list[Field], field.zeros(len(prefixes)))
for agg_share in agg_shares:
agg = vec_add(agg, cast(list[Field], agg_share))
agg = self.merge(agg_param, agg_shares)
return [x.as_unsigned() for x in agg]
~~~

Expand Down
18 changes: 13 additions & 5 deletions poc/tests/test_daf.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,19 @@ def prep(
# share.
return input_share

def aggregate(
self,
_agg_param: None,
out_shares: list[Field128]) -> Field128:
return reduce(lambda x, y: x + y, out_shares)
def agg_init(self, _agg_param: None) -> Field128:
return Field128(0)

def agg_update(self,
_agg_param: None,
agg_share: Field128,
out_share: Field128) -> Field128:
return agg_share + out_share

def merge(self,
_agg_param: None,
agg_shares: list[Field128]) -> Field128:
return reduce(lambda x, y: x + y, agg_shares)

def unshard(
self,
Expand Down
15 changes: 13 additions & 2 deletions poc/tests/test_vdaf_ping_pong.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,19 @@ def prep_next(self,
"prep round {}".format(current_round+1),
)

def aggregate(self, _agg_param: int, out_shares: list[int]) -> int:
return sum(out_shares)
def agg_init(self, _agg_param: int) -> int:
return 0

def agg_update(self,
_agg_param: int,
agg_share: int,
agg_delta: int) -> int:
return agg_share + agg_delta

def merge(self,
_agg_param: int,
_agg_shares: list[int]) -> int:
raise NotImplementedError("not needed by tests")

def unshard(self,
agg_param: int,
Expand Down
Loading

0 comments on commit 8bbac2e

Please sign in to comment.