Skip to content

Commit

Permalink
Oprs info parallel query (#239)
Browse files Browse the repository at this point in the history
* make queryBlockRangea an argument to query log functions

fix operatorsinfo and rebuild mocks

fix bug

make operators-info query logs in parallel

fix lint issue

* change errors.Join to utils.WrapError
  • Loading branch information
samlaf authored May 9, 2024
1 parent 9c7a75a commit 2bab9cc
Showing 1 changed file with 33 additions and 12 deletions.
45 changes: 33 additions & 12 deletions services/operatorsinfo/operatorsinfo_inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package operatorsinfo

import (
"context"
"errors"
"math/big"
"sync"

"github.com/Layr-Labs/eigensdk-go/chainio/clients/avsregistry"
"github.com/Layr-Labs/eigensdk-go/crypto/bls"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/Layr-Labs/eigensdk-go/types"
"github.com/Layr-Labs/eigensdk-go/utils"
"github.com/ethereum/go-ethereum/common"
)

Expand Down Expand Up @@ -101,7 +103,11 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine(ctx context.Con
ops.logger.Error("Fatal error opening websocket subscription for new socket registrations", "err", err, "service", "OperatorPubkeysServiceInMemory")
panic(err)
}
ops.queryPastRegisteredOperatorEventsAndFillDb(ctx)
err = ops.queryPastRegisteredOperatorEventsAndFillDb(ctx)
if err != nil {
ops.logger.Error("Fatal error querying past registered operator events and filling db", "err", err, "service", "OperatorPubkeysServiceInMemory")
panic(err)
}
// The constructor can return after we have backfilled the db by querying the events of operators that have registered with the blsApkRegistry
// before the block at which we started the ws subscription above
wg.Done()
Expand Down Expand Up @@ -160,21 +166,35 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine(ctx context.Con
}()
}

func (ops *OperatorsInfoServiceInMemory) queryPastRegisteredOperatorEventsAndFillDb(ctx context.Context) {
func (ops *OperatorsInfoServiceInMemory) queryPastRegisteredOperatorEventsAndFillDb(ctx context.Context) error {
// Querying with nil startBlock and stopBlock will return all events. It doesn't matter if we query some events that we will receive again in the websocket,
// since we will just overwrite the pubkey dict with the same values.
alreadyRegisteredOperatorAddrs, alreadyRegisteredOperatorPubkeys, err := ops.avsRegistryReader.QueryExistingRegisteredOperatorPubKeys(ctx, nil, nil, ops.logFilterQueryBlockRange)
if err != nil {
ops.logger.Error("Fatal error querying existing registered operators", "err", err, "service", "OperatorPubkeysServiceInMemory")
panic(err)
}
ops.logger.Debug("List of queried operator registration events in blsApkRegistry", "alreadyRegisteredOperatorAddr", alreadyRegisteredOperatorAddrs, "service", "OperatorPubkeysServiceInMemory")
wg := sync.WaitGroup{}
var alreadyRegisteredOperatorAddrs []common.Address
var alreadyRegisteredOperatorPubkeys []types.OperatorPubkeys
var pubkeysErr error

// we make both Queries in parallel because they take time and we don't want to wait for one to finish before starting the other
wg.Add(2)
go func() {
alreadyRegisteredOperatorAddrs, alreadyRegisteredOperatorPubkeys, pubkeysErr = ops.avsRegistryReader.QueryExistingRegisteredOperatorPubKeys(ctx, nil, nil, ops.logFilterQueryBlockRange)
wg.Done()
}()
var socketsMap map[types.OperatorId]types.Socket
var socketsErr error
go func() {
socketsMap, socketsErr = ops.avsRegistryReader.QueryExistingRegisteredOperatorSockets(ctx, nil, nil, ops.logFilterQueryBlockRange)
wg.Done()
}()

socketsMap, err := ops.avsRegistryReader.QueryExistingRegisteredOperatorSockets(ctx, nil, nil, ops.logFilterQueryBlockRange)
if err != nil {
ops.logger.Error("Fatal error querying existing registered operator sockets", "err", err, "service", "OperatorPubkeysServiceInMemory")
panic(err)
wg.Wait()
if pubkeysErr != nil {
return utils.WrapError(errors.New("error querying existing registered operators"), pubkeysErr)
}
if socketsErr != nil {
return utils.WrapError(errors.New("error querying existing registered operator sockets"), socketsErr)
}
ops.logger.Debug("List of queried operator registration events in blsApkRegistry", "alreadyRegisteredOperatorAddr", alreadyRegisteredOperatorAddrs, "alreadyRegisteredOperatorPubkeys", alreadyRegisteredOperatorPubkeys, "service", "OperatorPubkeysServiceInMemory")
ops.logger.Debug("List of queried operator socket registration events", "socketsMap", socketsMap, "service", "OperatorPubkeysServiceInMemory")

// Fill the pubkeydict db with the operators and pubkeys found
Expand All @@ -185,6 +205,7 @@ func (ops *OperatorsInfoServiceInMemory) queryPastRegisteredOperatorEventsAndFil
ops.operatorAddrToId[operatorAddr] = operatorId
ops.updateSocketMapping(operatorId, socketsMap[operatorId])
}
return nil
}

// TODO(samlaf): we might want to also add an async version of this method that returns a channel of operator pubkeys?
Expand Down

0 comments on commit 2bab9cc

Please sign in to comment.