Skip to content

Commit

Permalink
fix(audit): Remove Task data from aggregator after a response has bee…
Browse files Browse the repository at this point in the history
…n responded or expires. (#1004)

Co-authored-by: Uriel Mihura <[email protected]>
Co-authored-by: Mario Rugiero <[email protected]>
  • Loading branch information
3 people authored Oct 18, 2024
1 parent b10fc6f commit 2c11cf1
Show file tree
Hide file tree
Showing 6 changed files with 572 additions and 441 deletions.
9 changes: 9 additions & 0 deletions aggregator/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ func aggregatorMain(ctx *cli.Context) error {
return err
}

// Supervisor revives garbage collector
go func() {
for {
log.Println("Starting Garbage collector")
aggregator.ClearTasksFromMaps()
log.Println("Garbage collector panicked, Supervisor restarting")
}
}()

// Listen for new task created in the ServiceManager contract in a separate goroutine, both V1 and V2 subscriptions:
go func() {
listenErr := aggregator.SubscribeToNewTasks()
Expand Down
60 changes: 58 additions & 2 deletions aggregator/internal/pkg/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type Aggregator struct {
// and can start from zero
batchesIdxByIdentifierHash map[[32]byte]uint32

// Stores the taskCreatedBlock for each batch bt batch index
// Stores the taskCreatedBlock for each batch by batch index
batchCreatedBlockByIdx map[uint32]uint64

// Stores the TaskResponse for each batch by batchIdentifierHash
Expand Down Expand Up @@ -215,6 +215,8 @@ func (agg *Aggregator) Start(ctx context.Context) error {

const MaxSentTxRetries = 5

const BLS_AGG_SERVICE_TIMEOUT = 100 * time.Second

func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsAggregationServiceResponse) {
agg.taskMutex.Lock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Fetching task data")
Expand Down Expand Up @@ -275,6 +277,7 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
agg.logger.Info("Aggregator successfully responded to task",
"taskIndex", blsAggServiceResp.TaskIndex,
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))

return
}

Expand Down Expand Up @@ -361,12 +364,17 @@ func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]by
BatchMerkleRoot: batchMerkleRoot,
SenderAddress: senderAddress,
}
agg.logger.Info(
"Task Info added in aggregator:",
"Task", batchIndex,
"batchIdentifierHash", batchIdentifierHash,
)
agg.nextBatchIndex += 1

quorumNums := eigentypes.QuorumNums{eigentypes.QuorumNum(QUORUM_NUMBER)}
quorumThresholdPercentages := eigentypes.QuorumThresholdPercentages{eigentypes.QuorumThresholdPercentage(QUORUM_THRESHOLD)}

err := agg.blsAggregationService.InitializeNewTask(batchIndex, taskCreatedBlock, quorumNums, quorumThresholdPercentages, 100*time.Second)
err := agg.blsAggregationService.InitializeNewTask(batchIndex, taskCreatedBlock, quorumNums, quorumThresholdPercentages, BLS_AGG_SERVICE_TIMEOUT)
// FIXME(marian): When this errors, should we retry initializing new task? Logging fatal for now.
if err != nil {
agg.logger.Fatalf("BLS aggregation service error when initializing new task: %s", err)
Expand All @@ -377,3 +385,51 @@ func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]by
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Adding new task")
agg.logger.Info("New task added", "batchIndex", batchIndex, "batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))
}

// Long-lived goroutine that periodically checks and removes old Tasks from stored Maps
// It runs every GarbageCollectorPeriod and removes all tasks older than GarbageCollectorTasksAge
// This was added because each task occupies memory in the maps, and we need to free it to avoid a memory leak
func (agg *Aggregator) ClearTasksFromMaps() {
defer func() {
err := recover() //stops panics
if err != nil {
agg.logger.Error("Recovered from panic", "err", err)
}
}()

agg.AggregatorConfig.BaseConfig.Logger.Info(fmt.Sprintf("- Removing finalized Task Infos from Maps every %v", agg.AggregatorConfig.Aggregator.GarbageCollectorPeriod))
lastIdxDeleted := uint32(0)

for {
time.Sleep(agg.AggregatorConfig.Aggregator.GarbageCollectorPeriod)

agg.AggregatorConfig.BaseConfig.Logger.Info("Cleaning finalized tasks from maps")
oldTaskIdHash, err := agg.avsReader.GetOldTaskHash(agg.AggregatorConfig.Aggregator.GarbageCollectorTasksAge, agg.AggregatorConfig.Aggregator.GarbageCollectorTasksInterval)
if err != nil {
agg.logger.Error("Error getting old task hash, skipping this garbage collect", "err", err)
continue // Retry in the next iteration
}
if oldTaskIdHash == nil {
agg.logger.Warn("No old tasks found")
continue // Retry in the next iteration
}

taskIdxToDelete := agg.batchesIdxByIdentifierHash[*oldTaskIdHash]
agg.logger.Info("Old task found", "taskIndex", taskIdxToDelete)
// delete from lastIdxDeleted to taskIdxToDelete
for i := lastIdxDeleted + 1; i <= taskIdxToDelete; i++ {
batchIdentifierHash, exists := agg.batchesIdentifierHashByIdx[i]
if exists {
agg.logger.Info("Cleaning up finalized task", "taskIndex", i)
delete(agg.batchesIdxByIdentifierHash, batchIdentifierHash)
delete(agg.batchCreatedBlockByIdx, i)
delete(agg.batchesIdentifierHashByIdx, i)
delete(agg.batchDataByIdentifierHash, batchIdentifierHash)
} else {
agg.logger.Warn("Task not found in maps", "taskIndex", i)
}
}
lastIdxDeleted = taskIdxToDelete
agg.AggregatorConfig.BaseConfig.Logger.Info("Done cleaning finalized tasks from maps")
}
}
Loading

0 comments on commit 2c11cf1

Please sign in to comment.