Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
8ecd692
add parent mode for aggregator
jait91 Sep 29, 2025
a3d2820
add parent mode for aggregator
jait91 Sep 29, 2025
ee8bfd8
implement parent round manager and service
jait91 Oct 14, 2025
6a7e8bd
Merge remote-tracking branch 'origin/main' into sharding-parent-aggre…
jait91 Oct 14, 2025
f79b21f
small fix
jait91 Oct 15, 2025
5c42b81
add sharding support for child aggregator mode
lploom Sep 30, 2025
780b41d
Implement child mode for SMT
ahtotruu Oct 15, 2025
3ae7ad3
Implement parent mode for SMT
ahtotruu Oct 15, 2025
556f6fe
fix block serialization
lploom Oct 16, 2025
e70aa39
fix package structure
lploom Oct 16, 2025
07925c8
fix creation of round manager in child mode
lploom Oct 16, 2025
365c81e
Merge pull request #68 from unicitynetwork/sharding-child-aggregator
lploom Oct 16, 2025
c93972f
use mongo Date type in timestamp fields
lploom Oct 16, 2025
0eda03f
Inclusion proof improvements
ahtotruu Oct 16, 2025
f6d1ae8
Implement joining child and parent hash proofs for sharded setting
ahtotruu Oct 16, 2025
e97ead9
Fixed typos in tests
ahtotruu Oct 16, 2025
478b459
Usability: child SMT now takes full item path as key and strips the s…
ahtotruu Oct 17, 2025
a827fe7
Improved error handling in SMTs
ahtotruu Oct 17, 2025
f006e4d
Define separate type for shard IDs
ahtotruu Oct 17, 2025
a9af32c
fix shard ID validation and HA mode in parent aggregator
jait91 Oct 17, 2025
30326cb
Merge remote-tracking branch 'origin/sharding-smt' into sharding-pare…
jait91 Oct 17, 2025
af243f3
fix merge issues
jait91 Oct 17, 2025
8ac0d11
Merge pull request #69 from unicitynetwork/time-refactor
lploom Oct 20, 2025
b43bdd7
sharding: update child aggregator mode to latest smt changes
lploom Oct 20, 2025
5a8191c
Merge pull request #71 from unicitynetwork/sharding-child-aggregator
lploom Oct 20, 2025
8ae550b
load child aggregator config on startup
lploom Oct 20, 2025
d073cfe
fix nil pointer with HA leader selector
lploom Oct 20, 2025
ae1cec7
add basic sharded mode docker compose
lploom Oct 20, 2025
a6fa693
add e2e test for sharding
jait91 Oct 21, 2025
6c1d565
refactoring
jait91 Oct 21, 2025
cbc9eaf
add error response code for invalid shard
lploom Oct 21, 2025
e92756c
add sharding performance test
lploom Oct 21, 2025
0b348b2
minor test improvement
lploom Oct 22, 2025
9cbb8ee
add nil checks to smt JoinPaths function
lploom Oct 22, 2025
7f8506f
verify parent shard proof response
lploom Oct 22, 2025
a77959a
fix cloneBranch
jait91 Oct 22, 2025
4e49e60
fix parent_round_manager_test
jait91 Oct 22, 2025
1689080
move mini-batch processing before the MaxCommitmentsPerRound check
jait91 Oct 23, 2025
b65d31e
store smt node fields as hex strings
lploom Oct 23, 2025
14ff708
revert debug logging
lploom Oct 23, 2025
aa47cb4
add inclusion proof verification to perf test
lploom Oct 23, 2025
a0f9ebf
refactor: remove bson dependency from pkg/api
lploom Oct 24, 2025
1bf9656
remove redundant config param
lploom Oct 27, 2025
9f7a39c
add shard inclusion proof verification
lploom Oct 27, 2025
9bd4f88
fix aggregator record storage
lploom Oct 27, 2025
d3cb967
add nil check before dereferencing
lploom Oct 28, 2025
e0523fd
Merge pull request #74 from unicitynetwork/bson-refactor
lploom Oct 28, 2025
971837d
add sharding diagnostic output to healthcheck api
lploom Oct 28, 2025
610c0e7
add chapter about sharding
lploom Oct 28, 2025
e845fa5
Merge pull request #76 from unicitynetwork/sharding-healthcheck
lploom Oct 29, 2025
6b476e0
use api.ShardID instead of int type
lploom Oct 29, 2025
81c1454
add chain params configuration
lploom Oct 29, 2025
65af14b
improve shard id verification
lploom Oct 29, 2025
d35a024
Change default ForkID from mainnet to testnet
MastaP Oct 29, 2025
82b7813
More checks when joining hash paths
ahtotruu Oct 29, 2025
3284169
* clear ShardUpdates map between rounds
jait91 Oct 29, 2025
3405826
Update tests to match the imporved examples in the spec
ahtotruu Nov 4, 2025
ad4fca6
More readable representation of constants
ahtotruu Nov 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ aggregator.log
/performance-test
bft-config
/data/
.cache/
95 changes: 95 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ The service will start on `http://localhost:3000` by default.

The service is configured via environment variables:

### Chain Configuration

| Variable | Description | Default |
|-----------------|-----------------|-----------|
| `CHAIN_ID` | Chain ID | `unicity` |
| `CHAIN_VERSION` | Chain version | `1.0` |
| `CHAIN_FORK_ID` | Chain's Fork ID | `mainnet` |

### Server Configuration
| Variable | Description | Default |
|----------|-------------|---------|
Expand Down Expand Up @@ -220,6 +228,7 @@ Submit a state transition request to the aggregation layer with cryptographic va
- `REQUEST_ID_MISMATCH` - RequestID doesn't match SHA256(publicKey || stateHash)
- `INVALID_STATE_HASH_FORMAT` - StateHash not in proper DataHash imprint format
- `INVALID_TRANSACTION_HASH_FORMAT` - TransactionHash not in proper DataHash imprint format
- `INVALID_SHARD` - The commitment was sent to the wrong shard
- `UNSUPPORTED_ALGORITHM` - Algorithm other than secp256k1

#### `get_inclusion_proof`
Expand Down Expand Up @@ -551,6 +560,92 @@ The service implements a MongoDB-based leader election system:
- **`follower`** - Processing API requests, monitoring for leadership
- **`standalone`** - Single server mode (HA disabled)

## Sharding

To support horizontal scaling, the aggregators can be run in a sharded configuration consisting of one parent aggregator
and multiple child aggregators. In this mode, the global Sparse Merkle Tree (SMT) is split across the child nodes, and
agents must submit their commitments to the correct child node.

For a more detailed technical explanation of the sharded SMT structure, please refer to the official specification:
[https://github.com/unicitynetwork/specs/blob/main/smt.md](https://github.com/unicitynetwork/specs/blob/main/smt.md)

### Commitment Routing

The commitments are assigned to a shard based on the least significant bits of their commitment identifier. The number of
bits used to determine the shard is defined by the `SHARD_ID_LENGTH` configuration.

For example `SHARD_ID_LENGTH: 1` means that the rightmost `1` bits of commitment identifier determines the correct
shard. In this case there would be 2 shards e.g. commitments ending with bit `0` would go to `shard-1`, and
commitments ending with bit `1` would go to the `shard-2`.

In sharded setup only the parent aggregator talks to the BFT node.

### Shard ID Encoding

The `shardID` is a unique identifier for each shard that includes a `1` as its most significant bit (MSB). This prefix
bit ensures that the leading zeros are preserved for bit manipulations.

Examples
- For `SHARD_ID_LENGTH: 1` the valid `shardID`s are `0b10` (2) and `0b11` (3), for a total of two shards.
- For `SHARD_ID_LENGTH: 2` the valid `shardID`s are `0b100` (4), `0b101` (5), `0b110` (6) and `0b111` (7), for a total of four shards.

A child aggregator validates incoming commitments to ensure they belong to its shard. If a commitment is sent to the
wrong shard, the aggregator will reject it.

### Example Sharded Setup

The following diagram illustrates a sharded setup with one parent and two child aggregators for `SHARD_ID_LENGTH: 1`.

```text
+--------------------+
| Parent Aggregator |
| (2-leaf SMT) |
+--------------------+
/ \
/ \
+----------------+ +----------------+
| Child Agg. #1 | | Child Agg. #2 |
| ShardID = 0b10 | | ShardID = 0b11 |
| (handles *...0)| | (handles *...1)|
+----------------+ +----------------+
^ ^
| |
+----------------+ +----------------+
| Agent sends | | Agent sends |
| commitment | | commitment |
| ID = ...xxx0 | | ID = ...xxx1 |
+----------------+ +----------------+
```

### Configuration

The sharded setup is configured via environment variables, as seen in `sharding-compose.yml`.

A **parent** aggregator is configured with:
```yaml
environment:
SHARDING_MODE: "parent"
SHARD_ID_LENGTH: 1
```

A **child** aggregator is configured with its unique `shardID` and the address of the parent, for example:

Shard-1:
```yaml
environment:
SHARDING_MODE: "child"
SHARDING_CHILD_SHARD_ID: 2 # (binary 0b10)
SHARDING_CHILD_PARENT_RPC_ADDR: http://aggregator-root:3000
```

Shard-2:
```yaml
environment:
SHARDING_MODE: "child"
SHARDING_CHILD_SHARD_ID: 3 # (binary 0b11)
SHARDING_CHILD_PARENT_RPC_ADDR: http://aggregator-root:3000
```

## Error Handling

The service implements comprehensive JSON-RPC 2.0 error codes:
Expand Down
55 changes: 36 additions & 19 deletions cmd/aggregator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,41 +102,50 @@ func main() {
// Create the shared state tracker for block sync height
stateTracker := state.NewSyncStateTracker()

// Create the Round Manager
roundManager, err := round.NewRoundManager(ctx, cfg, log, commitmentQueue, storageInstance, stateTracker)
// Create round manager based on sharding mode
roundManager, err := round.NewManager(ctx, cfg, log, commitmentQueue, storageInstance, stateTracker)
if err != nil {
log.WithComponent("main").Error("Failed to create round manager", "error", err.Error())
gracefulExit(asyncLogger, 1)
}

// Perform initial SMT restoration. This is required in all modes.
// Initialize round manager (SMT restoration, etc.)
if err := roundManager.Start(ctx); err != nil {
log.WithComponent("main").Error("Failed to start round manager", "error", err.Error())
gracefulExit(asyncLogger, 1)
}

// Initialize HA Manager if enabled
// Initialize leader selector and HA Manager if enabled
var ls leaderSelector
var haManager *ha.HAManager
var leaderSelector *ha.LeaderElection
if cfg.HA.Enabled {
log.WithComponent("main").Info("High availability mode enabled")
leaderSelector = ha.NewLeaderElection(log, cfg.HA, storageInstance.LeadershipStorage())
leaderSelector.Start(ctx)
ls = ha.NewLeaderElection(log, cfg.HA, storageInstance.LeadershipStorage())
ls.Start(ctx)

// Disable block syncing for parent aggregator mode
// Parent mode uses state-based SMT (current shard roots) rather than history-based (commitment leaves)
disableBlockSync := cfg.Sharding.Mode == config.ShardingModeParent
if disableBlockSync {
log.WithComponent("main").Info("Block syncing disabled for parent aggregator mode - SMT will be reconstructed on leadership transition")
}

haManager = ha.NewHAManager(log, roundManager, leaderSelector, storageInstance, roundManager.GetSMT(), stateTracker, cfg.Processing.RoundDuration)
haManager = ha.NewHAManager(log, roundManager, ls, storageInstance, roundManager.GetSMT(), cfg.Sharding.Child.ShardID, stateTracker, cfg.Processing.RoundDuration, disableBlockSync)
haManager.Start(ctx)

} else {
log.WithComponent("main").Info("High availability mode is disabled, running as standalone leader")
// In non-HA mode, the node is always the leader, so activate it directly.
// In non-HA mode, activate the round manager directly
if err := roundManager.Activate(ctx); err != nil {
log.WithComponent("main").Error("Failed to activate round manager", "error", err.Error())
gracefulExit(asyncLogger, 1)
}
}

// Initialize service
aggregatorService := service.NewAggregatorService(cfg, log, roundManager, commitmentQueue, storageInstance, leaderSelector)
aggregatorService, err := service.NewService(ctx, cfg, log, roundManager, commitmentQueue, storageInstance, ls)
if err != nil {
log.WithComponent("main").Error("Failed to create service", "error", err.Error())
gracefulExit(asyncLogger, 1)
}

// Initialize gateway server
server := gateway.NewServer(cfg, log, aggregatorService)
Expand Down Expand Up @@ -166,17 +175,19 @@ func main() {
log.WithComponent("main").Error("Failed to stop server gracefully", "error", err.Error())
}

// Stop round manager
roundManager.Stop(shutdownCtx)
// Stop HA Manager if it was started
if haManager != nil {
haManager.Stop()
}

// Stop leader selector if it was started
if leaderSelector != nil {
leaderSelector.Stop(shutdownCtx)
if ls != nil {
ls.Stop(shutdownCtx)
}

// Stop HA Manager if it was started
if haManager != nil {
haManager.Stop()
// Stop round manager
if err := roundManager.Stop(shutdownCtx); err != nil {
log.WithComponent("main").Error("Failed to stop round manager gracefully", "error", err.Error())
}

// Close storage backends
Expand All @@ -194,3 +205,9 @@ func main() {
asyncLogger.Stop()
}
}

type leaderSelector interface {
IsLeader(ctx context.Context) (bool, error)
Start(ctx context.Context)
Stop(ctx context.Context)
}
Loading