diff --git a/chainio/clients/builder.go b/chainio/clients/builder.go index 3f7311dc..7475fc97 100644 --- a/chainio/clients/builder.go +++ b/chainio/clients/builder.go @@ -3,6 +3,7 @@ package clients import ( "context" "crypto/ecdsa" + "fmt" "time" "github.com/ethereum/go-ethereum/ethclient" @@ -56,7 +57,10 @@ func BuildReadClients( config BuildAllConfig, logger logging.Logger, ) (*ReadClients, error) { - config.validate(logger) + err := config.validate(logger) + if err != nil { + return nil, utils.WrapError("Failed to validate logger", err) + } // Create the metrics server promReg := prometheus.NewRegistry() @@ -127,7 +131,10 @@ func BuildAll( ecdsaPrivateKey *ecdsa.PrivateKey, logger logging.Logger, ) (*Clients, error) { - config.validate(logger) + err := config.validate(logger) + if err != nil { + return nil, utils.WrapError("Failed to validate logger", err) + } // Create the metrics server promReg := prometheus.NewRegistry() @@ -148,7 +155,8 @@ func BuildAll( defer cancel() chainid, err := ethHttpClient.ChainID(rpcCtx) if err != nil { - logger.Fatal("Cannot get chain id", "err", err) + logger.Error("Cannot get chain id", "err", err) + return nil, utils.WrapError("Cannot get chain id", err) } signerV2, addr, err := signerv2.SignerFromConfig(signerv2.Config{PrivateKey: ecdsaPrivateKey}, chainid) if err != nil { @@ -215,23 +223,30 @@ func BuildAll( // Very basic validation that makes sure all fields are nonempty // we might eventually want more sophisticated validation, based on regexp, // or use something like https://json-schema.org/ (?) -func (config *BuildAllConfig) validate(logger logging.Logger) { +func (config *BuildAllConfig) validate(logger logging.Logger) error { if config.EthHttpUrl == "" { - logger.Fatalf("BuildAllConfig.validate: Missing eth http url") + logger.Error("BuildAllConfig.validate: Missing eth http url") + return fmt.Errorf("BuildAllConfig.validate: Missing eth http url") } if config.EthWsUrl == "" { - logger.Fatalf("BuildAllConfig.validate: Missing eth ws url") + logger.Error("BuildAllConfig.validate: Missing eth ws url") + return fmt.Errorf("BuildAllConfig.validate: Missing eth ws url") } if config.RegistryCoordinatorAddr == "" { - logger.Fatalf("BuildAllConfig.validate: Missing bls registry coordinator address") + logger.Error("BuildAllConfig.validate: Missing bls registry coordinator address") + return fmt.Errorf("BuildAllConfig.validate: Missing bls registry coordinator address") } if config.OperatorStateRetrieverAddr == "" { - logger.Fatalf("BuildAllConfig.validate: Missing bls operator state retriever address") + logger.Error("BuildAllConfig.validate: Missing bls operator state retriever address") + return fmt.Errorf("BuildAllConfig.validate: Missing bls operator state retriever address") } if config.AvsName == "" { - logger.Fatalf("BuildAllConfig.validate: Missing avs name") + logger.Error("BuildAllConfig.validate: Missing avs name") + return fmt.Errorf("BuildAllConfig.validate: Missing avs name") } if config.PromMetricsIpPortAddress == "" { - logger.Fatalf("BuildAllConfig.validate: Missing prometheus metrics ip port address") + logger.Error("BuildAllConfig.validate: Missing prometheus metrics ip port address") + return fmt.Errorf("BuildAllConfig.validate: Missing prometheus metrics ip port address") } + return nil } diff --git a/chainio/txmgr/geometric/geometric_example_test.go b/chainio/txmgr/geometric/geometric_example_test.go index e6820be2..54886834 100644 --- a/chainio/txmgr/geometric/geometric_example_test.go +++ b/chainio/txmgr/geometric/geometric_example_test.go @@ -12,6 +12,7 @@ import ( "github.com/Layr-Labs/eigensdk-go/logging" "github.com/Layr-Labs/eigensdk-go/signerv2" "github.com/Layr-Labs/eigensdk-go/testutils" + "github.com/Layr-Labs/eigensdk-go/utils" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" @@ -23,63 +24,76 @@ var ( chainid = big.NewInt(31337) ) +func createTx(client eth.HttpBackend, address common.Address) (*types.Transaction, error) { + zeroAddr := common.HexToAddress("0x0") + nonce, err := client.PendingNonceAt(context.TODO(), address) + if err != nil { + return nil, utils.WrapError("Failed to get PendingNonceAt", err) + } + return types.NewTx(&types.DynamicFeeTx{ + To: &zeroAddr, + Nonce: nonce, + }), nil +} + +func createTxMgr(rpcUrl string, ecdsaPrivateKey *ecdsa.PrivateKey) (eth.HttpBackend, *GeometricTxManager, error) { + logger := logging.NewTextSLogger(os.Stdout, &logging.SLoggerOptions{}) + client, err := ethclient.Dial(rpcUrl) + if err != nil { + return nil, nil, err + } + signerV2, signerAddr, err := signerv2.SignerFromConfig(signerv2.Config{PrivateKey: ecdsaPrivateKey}, chainid) + if err != nil { + return nil, nil, err + } + wallet, err := wallet.NewPrivateKeyWallet(client, signerV2, signerAddr, logger) + if err != nil { + return nil, nil, err + } + reg := prometheus.NewRegistry() + metrics := NewMetrics(reg, "example", logger) + return client, NewGeometricTxnManager(client, wallet, logger, metrics, GeometricTxnManagerParams{}), nil +} + func ExampleGeometricTxManager() { anvilC, err := testutils.StartAnvilContainer("") if err != nil { - panic(err) + fmt.Fprintln(os.Stderr, err) + os.Exit(1) } anvilUrl, err := anvilC.Endpoint(context.TODO(), "http") if err != nil { - panic(err) + fmt.Fprintln(os.Stderr, err) + os.Exit(1) } ecdsaPrivateKey, err := crypto.HexToECDSA(testutils.ANVIL_FIRST_PRIVATE_KEY) if err != nil { - panic(err) + fmt.Fprintln(os.Stderr, err) + os.Exit(1) } pk := ecdsaPrivateKey.PublicKey address := crypto.PubkeyToAddress(pk) - client, txmgr := createTxMgr(anvilUrl, ecdsaPrivateKey) - - tx := createTx(client, address) - _, err = txmgr.Send(context.TODO(), tx, true) + client, txmgr, err := createTxMgr(anvilUrl, ecdsaPrivateKey) if err != nil { - panic(err) + fmt.Fprintln(os.Stderr, err) + os.Exit(1) } - // we just add this to make sure the example runs - fmt.Println("Tx sent") - // Output: Tx sent -} - -func createTx(client eth.HttpBackend, address common.Address) *types.Transaction { - zeroAddr := common.HexToAddress("0x0") - nonce, err := client.PendingNonceAt(context.TODO(), address) + tx, err := createTx(client, address) if err != nil { - panic(err) + fmt.Fprintln(os.Stderr, err) + os.Exit(1) } - return types.NewTx(&types.DynamicFeeTx{ - To: &zeroAddr, - Nonce: nonce, - }) -} -func createTxMgr(rpcUrl string, ecdsaPrivateKey *ecdsa.PrivateKey) (eth.HttpBackend, *GeometricTxManager) { - logger := logging.NewTextSLogger(os.Stdout, &logging.SLoggerOptions{}) - client, err := ethclient.Dial(rpcUrl) - if err != nil { - panic(err) - } - signerV2, signerAddr, err := signerv2.SignerFromConfig(signerv2.Config{PrivateKey: ecdsaPrivateKey}, chainid) - if err != nil { - panic(err) - } - wallet, err := wallet.NewPrivateKeyWallet(client, signerV2, signerAddr, logger) + _, err = txmgr.Send(context.TODO(), tx, true) if err != nil { - panic(err) + fmt.Fprintln(os.Stderr, err) + os.Exit(1) } - reg := prometheus.NewRegistry() - metrics := NewMetrics(reg, "example", logger) - return client, NewGeometricTxnManager(client, wallet, logger, metrics, GeometricTxnManagerParams{}) + + // we just add this to make sure the example runs + fmt.Println("Tx sent") + // Output: Tx sent } diff --git a/logging/zap_logger.go b/logging/zap_logger.go index 5120a5ca..38d31ffe 100644 --- a/logging/zap_logger.go +++ b/logging/zap_logger.go @@ -3,6 +3,7 @@ package logging import ( "fmt" + "github.com/Layr-Labs/eigensdk-go/utils" "go.uber.org/zap" ) @@ -28,7 +29,7 @@ func NewZapLogger(env LogLevel) (Logger, error) { } else if env == Development { config = zap.NewDevelopmentConfig() } else { - panic(fmt.Sprintf("Unknown environment. Expected %s or %s. Received %s.", Development, Production, env)) + return nil, fmt.Errorf("unknown environment. Expected %s or %s. Received %s", Development, Production, env) } return NewZapLoggerByConfig(config, zap.AddCallerSkip(1)) @@ -39,7 +40,7 @@ func NewZapLogger(env LogLevel) (Logger, error) { func NewZapLoggerByConfig(config zap.Config, options ...zap.Option) (Logger, error) { logger, err := config.Build(options...) if err != nil { - panic(err) + return nil, utils.WrapError("Can not build config with the given options", err) } return &ZapLogger{ diff --git a/metrics/eigenmetrics_example_test.go b/metrics/eigenmetrics_example_test.go index bf55ab83..f4075d89 100644 --- a/metrics/eigenmetrics_example_test.go +++ b/metrics/eigenmetrics_example_test.go @@ -6,6 +6,8 @@ package metrics_test import ( "context" + "fmt" + "os" "github.com/Layr-Labs/eigensdk-go/chainio/clients" "github.com/Layr-Labs/eigensdk-go/chainio/clients/eth" @@ -26,13 +28,15 @@ func ExampleEigenMetrics() { logger, err := logging.NewZapLogger("development") if err != nil { - panic(err) + fmt.Fprintln(os.Stderr, err) + os.Exit(1) } // get the Writer for the EL contracts ecdsaPrivateKey, err := crypto.HexToECDSA("0x0") if err != nil { - panic(err) + fmt.Fprintln(os.Stderr, err) + os.Exit(1) } operatorEcdsaAddr := crypto.PubkeyToAddress(ecdsaPrivateKey.PublicKey) @@ -46,7 +50,8 @@ func ExampleEigenMetrics() { } clients, err := clients.BuildAll(chainioConfig, ecdsaPrivateKey, logger) if err != nil { - panic(err) + fmt.Fprintln(os.Stderr, err) + os.Exit(1) } reg := prometheus.NewRegistry() eigenMetrics := metrics.NewEigenMetrics("exampleAvs", ":9090", reg, logger) @@ -71,7 +76,8 @@ func ExampleEigenMetrics() { rpcCallsCollector := rpccalls.NewCollector("exampleAvs", reg) instrumentedEthClient, err := eth.NewInstrumentedClient("http://localhost:8545", rpcCallsCollector) if err != nil { - panic(err) + fmt.Fprintln(os.Stderr, err) + os.Exit(1) } eigenMetrics.Start(context.Background(), reg) diff --git a/nodeapi/nodeapi_example_test.go b/nodeapi/nodeapi_example_test.go index 7c9e7844..2fa76047 100644 --- a/nodeapi/nodeapi_example_test.go +++ b/nodeapi/nodeapi_example_test.go @@ -1,6 +1,9 @@ package nodeapi_test import ( + "fmt" + "os" + "github.com/Layr-Labs/eigensdk-go/logging" "github.com/Layr-Labs/eigensdk-go/nodeapi" ) @@ -8,7 +11,8 @@ import ( func ExampleNodeApi() { logger, err := logging.NewZapLogger("development") if err != nil { - panic(err) + fmt.Fprintln(os.Stderr, err) + os.Exit(1) } nodeApi := nodeapi.NewNodeApi("testAvs", "v0.0.1", "localhost:8080", logger) diff --git a/services/avsregistry/avsregistry_chaincaller.go b/services/avsregistry/avsregistry_chaincaller.go index 5b2da475..b7966ffd 100644 --- a/services/avsregistry/avsregistry_chaincaller.go +++ b/services/avsregistry/avsregistry_chaincaller.go @@ -76,11 +76,20 @@ func (ar *AvsRegistryServiceChainCaller) GetOperatorsAvsStateAtBlock( } numquorums := len(quorumNumbers) if len(operatorsStakesInQuorums) != numquorums { - ar.logger.Fatal( + ar.logger.Error( "Number of quorums returned from GetOperatorsStakeInQuorumsAtBlock does not match number of quorums requested. Probably pointing to old contract or wrong implementation.", "service", "AvsRegistryServiceChainCaller", + "operatorsStakesInQuorums", + operatorsStakesInQuorums, + "numquorums", + numquorums, ) + return nil, + utils.WrapError( + "number of quorums returned from GetOperatorsStakeInQuorumsAtBlock does not match number of quorums requested. Probably pointing to old contract or wrong implementation", + nil, + ) } for quorumIdx, quorumNum := range quorumNumbers { @@ -149,7 +158,7 @@ func (ar *AvsRegistryServiceChainCaller) getOperatorInfo( info, ok := ar.operatorInfoService.GetOperatorInfo(ctx, operatorAddr) if !ok { return types.OperatorInfo{}, fmt.Errorf( - "Failed to get operator info from operatorInfoService (operatorAddr: %v, operatorId: %v)", + "failed to get operator info from operatorInfoService (operatorAddr: %v, operatorId: %v)", operatorAddr, operatorId, ) diff --git a/services/operatorsinfo/operatorsinfo_inmemory.go b/services/operatorsinfo/operatorsinfo_inmemory.go index c2874b92..cf3d6b24 100644 --- a/services/operatorsinfo/operatorsinfo_inmemory.go +++ b/services/operatorsinfo/operatorsinfo_inmemory.go @@ -9,6 +9,7 @@ import ( blsapkreg "github.com/Layr-Labs/eigensdk-go/contracts/bindings/BLSApkRegistry" regcoord "github.com/Layr-Labs/eigensdk-go/contracts/bindings/RegistryCoordinator" "github.com/ethereum/go-ethereum/event" + "golang.org/x/sync/errgroup" "github.com/Layr-Labs/eigensdk-go/crypto/bls" "github.com/Layr-Labs/eigensdk-go/logging" @@ -49,17 +50,13 @@ type avsRegistrySubscriber interface { // Warning: this service should probably not be used in production. Haven't done a thorough analysis of all the clients // but there is still an open PR about an issue with ws subscription on geth: // https://github.com/ethereum/go-ethereum/issues/23845 -// Another reason to note for infra/devops engineer who would put this into production, is that this service crashes on -// websocket connection errors or when failing to query past events. The philosophy here is that hard crashing is -// better than silently failing, since it will be easier to debug. Naturally, this means that this aggregator using this -// service needs -// to be replicated and load-balanced, so that when it fails traffic can be switched to the other aggregator. type OperatorsInfoServiceInMemory struct { logFilterQueryBlockRange *big.Int avsRegistrySubscriber avsRegistrySubscriber avsRegistryReader avsRegistryReader logger logging.Logger queryC chan<- query + errG *errgroup.Group // queried via the queryC channel, so don't need mutex to access pubkeyDict map[common.Address]types.OperatorPubkeys operatorAddrToId map[common.Address]types.OperatorId @@ -104,12 +101,15 @@ func NewOperatorsInfoServiceInMemory( if logFilterQueryBlockRange == nil { logFilterQueryBlockRange = defaultLogFilterQueryBlockRange } + errG, ctx := errgroup.WithContext(ctx) + pkcs := &OperatorsInfoServiceInMemory{ avsRegistrySubscriber: avsRegistrySubscriber, avsRegistryReader: avsRegistryReader, logFilterQueryBlockRange: logFilterQueryBlockRange, logger: logger, queryC: queryC, + errG: errG, pubkeyDict: make(map[common.Address]types.OperatorPubkeys), operatorAddrToId: make(map[common.Address]types.OperatorId), socketDict: make(map[types.OperatorId]types.Socket), @@ -117,169 +117,167 @@ func NewOperatorsInfoServiceInMemory( // We use this waitgroup to wait on the initialization of the inmemory pubkey dict, // which requires querying the past events of the pubkey registration contract wg := sync.WaitGroup{} + wg.Add(1) - pkcs.startServiceInGoroutine(ctx, queryC, &wg, opts) + errG.Go(func() error { + return pkcs.runService(ctx, queryC, &wg, opts) + }) wg.Wait() return pkcs } -func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine( +func (ops *OperatorsInfoServiceInMemory) runService( ctx context.Context, queryC <-chan query, wg *sync.WaitGroup, opts Opts, -) { - go func() { - - // TODO(samlaf): we should probably save the service in the logger itself and add it automatically to all logs - ops.logger.Debug( - "Subscribing to new pubkey registration events on blsApkRegistry contract", +) error { + // TODO(samlaf): we should probably save the service in the logger itself and add it automatically to all logs + ops.logger.Debug( + "Subscribing to new pubkey registration events on blsApkRegistry contract", + "service", + "OperatorPubkeysServiceInMemory", + ) + newPubkeyRegistrationC, newPubkeyRegistrationSub, err := ops.avsRegistrySubscriber.SubscribeToNewPubkeyRegistrations() + if err != nil { + ops.logger.Error( + "Fatal error opening websocket subscription for new pubkey registrations", + "err", + err, "service", "OperatorPubkeysServiceInMemory", ) - newPubkeyRegistrationC, newPubkeyRegistrationSub, err := ops.avsRegistrySubscriber.SubscribeToNewPubkeyRegistrations() - if err != nil { - ops.logger.Error( - "Fatal error opening websocket subscription for new pubkey registrations", - "err", - err, - "service", - "OperatorPubkeysServiceInMemory", - ) - // see the warning above the struct definition to understand why we panic here - panic(err) - } - newSocketRegistrationC, newSocketRegistrationSub, err := ops.avsRegistrySubscriber.SubscribeToOperatorSocketUpdates() - if err != nil { - ops.logger.Error( - "Fatal error opening websocket subscription for new socket registrations", - "err", - err, - "service", - "OperatorPubkeysServiceInMemory", - ) - panic(err) - } - err = ops.queryPastRegisteredOperatorEventsAndFillDb(ctx, opts) - if err != nil { + return err + } + newSocketRegistrationC, newSocketRegistrationSub, err := ops.avsRegistrySubscriber.SubscribeToOperatorSocketUpdates() + if err != nil { + ops.logger.Error( + "Fatal error opening websocket subscription for new socket registrations", + "err", + err, + "service", + "OperatorPubkeysServiceInMemory", + ) + return err + } + err = ops.queryPastRegisteredOperatorEventsAndFillDb(ctx, opts) + if err != nil { + ops.logger.Error( + "Fatal error querying past registered operator events and filling db", + "err", + err, + "service", + "OperatorPubkeysServiceInMemory", + ) + return err + } + // The constructor can return after we have backfilled the db by querying the events of operators that have + // registered with the blsApkRegistry + // before the block at which we started the ws subscription above + wg.Done() + for { + select { + case <-ctx.Done(): + // TODO(samlaf): should we do anything here? Seems like this only happens when the aggregator is + // shutting down and we want graceful exit + ops.logger.Infof("OperatorPubkeysServiceInMemory: Context cancelled, exiting") + return errors.New("OperatorPubkeysServiceInMemory: Context cancelled, exiting") + case err := <-newPubkeyRegistrationSub.Err(): ops.logger.Error( - "Fatal error querying past registered operator events and filling db", + "Error in websocket subscription for new pubkey registration events. Attempting to reconnect...", "err", err, "service", "OperatorPubkeysServiceInMemory", ) - panic(err) - } - // The constructor can return after we have backfilled the db by querying the events of operators that have - // registered with the blsApkRegistry - // before the block at which we started the ws subscription above - wg.Done() - for { - select { - case <-ctx.Done(): - // TODO(samlaf): should we do anything here? Seems like this only happens when the aggregator is - // shutting down and we want graceful exit - ops.logger.Infof("OperatorPubkeysServiceInMemory: Context cancelled, exiting") - return - case err := <-newPubkeyRegistrationSub.Err(): + newPubkeyRegistrationSub.Unsubscribe() + newPubkeyRegistrationC, newPubkeyRegistrationSub, err = ops.avsRegistrySubscriber.SubscribeToNewPubkeyRegistrations() + if err != nil { ops.logger.Error( - "Error in websocket subscription for new pubkey registration events. Attempting to reconnect...", + "Error opening websocket subscription for new pubkey registrations", "err", err, "service", "OperatorPubkeysServiceInMemory", ) - newPubkeyRegistrationSub.Unsubscribe() - newPubkeyRegistrationC, newPubkeyRegistrationSub, err = ops.avsRegistrySubscriber.SubscribeToNewPubkeyRegistrations() - if err != nil { - ops.logger.Error( - "Error opening websocket subscription for new pubkey registrations", - "err", - err, - "service", - "OperatorPubkeysServiceInMemory", - ) - // see the warning above the struct definition to understand why we panic here - panic(err) - } - case err := <-newSocketRegistrationSub.Err(): + return err + } + case err := <-newSocketRegistrationSub.Err(): + ops.logger.Error( + "Error in websocket subscription for new socket registration events. Attempting to reconnect...", + "err", + err, + "service", + "OperatorPubkeysServiceInMemory", + ) + newSocketRegistrationSub.Unsubscribe() + newSocketRegistrationC, newSocketRegistrationSub, err = ops.avsRegistrySubscriber.SubscribeToOperatorSocketUpdates() + if err != nil { ops.logger.Error( - "Error in websocket subscription for new socket registration events. Attempting to reconnect...", + "Error opening websocket subscription for new socket registrations", "err", err, "service", "OperatorPubkeysServiceInMemory", ) - newSocketRegistrationSub.Unsubscribe() - newSocketRegistrationC, newSocketRegistrationSub, err = ops.avsRegistrySubscriber.SubscribeToOperatorSocketUpdates() - if err != nil { - ops.logger.Error( - "Error opening websocket subscription for new socket registrations", - "err", - err, - "service", - "OperatorPubkeysServiceInMemory", - ) - panic(err) - } - case newPubkeyRegistrationEvent := <-newPubkeyRegistrationC: - operatorAddr := newPubkeyRegistrationEvent.Operator - ops.pubkeyDict[operatorAddr] = types.OperatorPubkeys{ - G1Pubkey: bls.NewG1Point( - newPubkeyRegistrationEvent.PubkeyG1.X, - newPubkeyRegistrationEvent.PubkeyG1.Y, - ), - G2Pubkey: bls.NewG2Point( - newPubkeyRegistrationEvent.PubkeyG2.X, - newPubkeyRegistrationEvent.PubkeyG2.Y, - ), - } + return err + } + case newPubkeyRegistrationEvent := <-newPubkeyRegistrationC: + operatorAddr := newPubkeyRegistrationEvent.Operator + ops.pubkeyDict[operatorAddr] = types.OperatorPubkeys{ + G1Pubkey: bls.NewG1Point( + newPubkeyRegistrationEvent.PubkeyG1.X, + newPubkeyRegistrationEvent.PubkeyG1.Y, + ), + G2Pubkey: bls.NewG2Point( + newPubkeyRegistrationEvent.PubkeyG2.X, + newPubkeyRegistrationEvent.PubkeyG2.Y, + ), + } - operatorId := types.OperatorIdFromContractG1Pubkey(newPubkeyRegistrationEvent.PubkeyG1) - ops.operatorAddrToId[operatorAddr] = operatorId - ops.logger.Debug( - "Added operator pubkeys to pubkey dict from new pubkey registration event", - "service", - "OperatorPubkeysServiceInMemory", - "block", - newPubkeyRegistrationEvent.Raw.BlockNumber, - "operatorAddr", - operatorAddr, - "operatorId", - operatorId, - "G1pubkey", - ops.pubkeyDict[operatorAddr].G1Pubkey, - "G2pubkey", - ops.pubkeyDict[operatorAddr].G2Pubkey, - ) - case newSocketRegistrationEvent := <-newSocketRegistrationC: - ops.logger.Debug( - "Received new socket registration event", - "service", - "OperatorPubkeysServiceInMemory", - "operatorId", - types.OperatorId(newSocketRegistrationEvent.OperatorId), - "socket", - newSocketRegistrationEvent.Socket, - ) - ops.updateSocketMapping( - newSocketRegistrationEvent.OperatorId, - types.Socket(newSocketRegistrationEvent.Socket), - ) - // Receive a query from GetOperatorPubkeys - case query := <-queryC: - pubkeys, ok := ops.pubkeyDict[query.operatorAddr] - operatorId := ops.operatorAddrToId[query.operatorAddr] - socket := ops.socketDict[operatorId] - operatorInfo := types.OperatorInfo{ - Socket: socket, - Pubkeys: pubkeys, - } - query.respC <- resp{operatorInfo, ok} + operatorId := types.OperatorIdFromContractG1Pubkey(newPubkeyRegistrationEvent.PubkeyG1) + ops.operatorAddrToId[operatorAddr] = operatorId + ops.logger.Debug( + "Added operator pubkeys to pubkey dict from new pubkey registration event", + "service", + "OperatorPubkeysServiceInMemory", + "block", + newPubkeyRegistrationEvent.Raw.BlockNumber, + "operatorAddr", + operatorAddr, + "operatorId", + operatorId, + "G1pubkey", + ops.pubkeyDict[operatorAddr].G1Pubkey, + "G2pubkey", + ops.pubkeyDict[operatorAddr].G2Pubkey, + ) + case newSocketRegistrationEvent := <-newSocketRegistrationC: + ops.logger.Debug( + "Received new socket registration event", + "service", + "OperatorPubkeysServiceInMemory", + "operatorId", + types.OperatorId(newSocketRegistrationEvent.OperatorId), + "socket", + newSocketRegistrationEvent.Socket, + ) + ops.updateSocketMapping( + newSocketRegistrationEvent.OperatorId, + types.Socket(newSocketRegistrationEvent.Socket), + ) + // Receive a query from GetOperatorPubkeys + case query := <-queryC: + pubkeys, ok := ops.pubkeyDict[query.operatorAddr] + operatorId := ops.operatorAddrToId[query.operatorAddr] + socket := ops.socketDict[operatorId] + operatorInfo := types.OperatorInfo{ + Socket: socket, + Pubkeys: pubkeys, } + query.respC <- resp{operatorInfo, ok} } - }() + } } func (ops *OperatorsInfoServiceInMemory) queryPastRegisteredOperatorEventsAndFillDb( diff --git a/services/operatorsinfo/operatorsinfo_inmemory_test.go b/services/operatorsinfo/operatorsinfo_inmemory_test.go index 041b5af9..e5ba20c2 100644 --- a/services/operatorsinfo/operatorsinfo_inmemory_test.go +++ b/services/operatorsinfo/operatorsinfo_inmemory_test.go @@ -12,7 +12,6 @@ import ( "github.com/ethereum/go-ethereum/event" - apkregistrybindings "github.com/Layr-Labs/eigensdk-go/contracts/bindings/BLSApkRegistry" blsapkreg "github.com/Layr-Labs/eigensdk-go/contracts/bindings/BLSApkRegistry" regcoord "github.com/Layr-Labs/eigensdk-go/contracts/bindings/RegistryCoordinator" "github.com/Layr-Labs/eigensdk-go/crypto/bls" @@ -25,14 +24,14 @@ import ( ) type fakeAVSRegistrySubscriber struct { - pubkeyRegistrationEventC chan *apkregistrybindings.ContractBLSApkRegistryNewPubkeyRegistration + pubkeyRegistrationEventC chan *blsapkreg.ContractBLSApkRegistryNewPubkeyRegistration operatorSocketUpdateEventC chan *regcoord.ContractRegistryCoordinatorOperatorSocketUpdate eventSubscription *fakeEventSubscription } func newFakeAVSRegistrySubscriber( eventSubscription *fakeEventSubscription, - pubkeyRegistrationEventC chan *apkregistrybindings.ContractBLSApkRegistryNewPubkeyRegistration, + pubkeyRegistrationEventC chan *blsapkreg.ContractBLSApkRegistryNewPubkeyRegistration, operatorSocketUpdateEventC chan *regcoord.ContractRegistryCoordinatorOperatorSocketUpdate, ) *fakeAVSRegistrySubscriber { return &fakeAVSRegistrySubscriber{ @@ -86,8 +85,8 @@ func TestGetOperatorInfo(t *testing.T) { ContractG2Pubkey: contractG2Pubkey, } - pubkeyRegistrationEventC := make(chan *apkregistrybindings.ContractBLSApkRegistryNewPubkeyRegistration, 1) - pubkeyRegistrationEvent := &apkregistrybindings.ContractBLSApkRegistryNewPubkeyRegistration{ + pubkeyRegistrationEventC := make(chan *blsapkreg.ContractBLSApkRegistryNewPubkeyRegistration, 1) + pubkeyRegistrationEvent := &blsapkreg.ContractBLSApkRegistryNewPubkeyRegistration{ Operator: testOperator1.OperatorAddr, PubkeyG1: testOperator1.ContractG1Pubkey, PubkeyG2: testOperator1.ContractG2Pubkey, @@ -106,7 +105,7 @@ func TestGetOperatorInfo(t *testing.T) { var tests = []struct { name string operator *fakes.TestOperator - pubkeyRegistrationEventC chan *apkregistrybindings.ContractBLSApkRegistryNewPubkeyRegistration + pubkeyRegistrationEventC chan *blsapkreg.ContractBLSApkRegistryNewPubkeyRegistration operatorSocketUpdateEventC chan *regcoord.ContractRegistryCoordinatorOperatorSocketUpdate eventErrC chan error queryOperatorAddr common.Address