From f40848bd90de51242f2026399673139faf217df1 Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Fri, 1 Nov 2024 18:31:38 -0300 Subject: [PATCH 01/14] changing logger.Fatal in GetOperatorsAvsStateAtBlock --- services/avsregistry/avsregistry_chaincaller.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/services/avsregistry/avsregistry_chaincaller.go b/services/avsregistry/avsregistry_chaincaller.go index 5b2da475..360d09c4 100644 --- a/services/avsregistry/avsregistry_chaincaller.go +++ b/services/avsregistry/avsregistry_chaincaller.go @@ -76,11 +76,12 @@ func (ar *AvsRegistryServiceChainCaller) GetOperatorsAvsStateAtBlock( } numquorums := len(quorumNumbers) if len(operatorsStakesInQuorums) != numquorums { - ar.logger.Fatal( + ar.logger.Error( "Number of quorums returned from GetOperatorsStakeInQuorumsAtBlock does not match number of quorums requested. Probably pointing to old contract or wrong implementation.", "service", "AvsRegistryServiceChainCaller", ) + return nil, fmt.Errorf("number of quorums returned from GetOperatorsStakeInQuorumsAtBlock does not match number of quorums requested. Probably pointing to old contract or wrong implementation") } for quorumIdx, quorumNum := range quorumNumbers { From da3e61d7dc1c5f92d42a94fcc94f696b1f8927fa Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Fri, 1 Nov 2024 18:32:23 -0300 Subject: [PATCH 02/14] fix warning in error logger --- services/avsregistry/avsregistry_chaincaller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/avsregistry/avsregistry_chaincaller.go b/services/avsregistry/avsregistry_chaincaller.go index 360d09c4..599f44ed 100644 --- a/services/avsregistry/avsregistry_chaincaller.go +++ b/services/avsregistry/avsregistry_chaincaller.go @@ -150,7 +150,7 @@ func (ar *AvsRegistryServiceChainCaller) getOperatorInfo( info, ok := ar.operatorInfoService.GetOperatorInfo(ctx, operatorAddr) if !ok { return types.OperatorInfo{}, fmt.Errorf( - "Failed to get operator info from operatorInfoService (operatorAddr: %v, operatorId: %v)", + "failed to get operator info from operatorInfoService (operatorAddr: %v, operatorId: %v)", operatorAddr, operatorId, ) From e229fee8986e0af4eca6e4cb889506391399294b Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Fri, 1 Nov 2024 18:40:12 -0300 Subject: [PATCH 03/14] WrapError in GetOperatorsAvsStateAtBlock --- services/avsregistry/avsregistry_chaincaller.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/services/avsregistry/avsregistry_chaincaller.go b/services/avsregistry/avsregistry_chaincaller.go index 599f44ed..ad496241 100644 --- a/services/avsregistry/avsregistry_chaincaller.go +++ b/services/avsregistry/avsregistry_chaincaller.go @@ -81,7 +81,8 @@ func (ar *AvsRegistryServiceChainCaller) GetOperatorsAvsStateAtBlock( "service", "AvsRegistryServiceChainCaller", ) - return nil, fmt.Errorf("number of quorums returned from GetOperatorsStakeInQuorumsAtBlock does not match number of quorums requested. Probably pointing to old contract or wrong implementation") + return nil, + utils.WrapError("number of quorums returned from GetOperatorsStakeInQuorumsAtBlock does not match number of quorums requested. Probably pointing to old contract or wrong implementation", nil) } for quorumIdx, quorumNum := range quorumNumbers { From 663b684e23792b2fdde0efb808835af2a34854ef Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Fri, 1 Nov 2024 18:42:30 -0300 Subject: [PATCH 04/14] make fmt --- services/avsregistry/avsregistry_chaincaller.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/services/avsregistry/avsregistry_chaincaller.go b/services/avsregistry/avsregistry_chaincaller.go index ad496241..23f3daef 100644 --- a/services/avsregistry/avsregistry_chaincaller.go +++ b/services/avsregistry/avsregistry_chaincaller.go @@ -82,7 +82,10 @@ func (ar *AvsRegistryServiceChainCaller) GetOperatorsAvsStateAtBlock( "AvsRegistryServiceChainCaller", ) return nil, - utils.WrapError("number of quorums returned from GetOperatorsStakeInQuorumsAtBlock does not match number of quorums requested. Probably pointing to old contract or wrong implementation", nil) + utils.WrapError( + "number of quorums returned from GetOperatorsStakeInQuorumsAtBlock does not match number of quorums requested. Probably pointing to old contract or wrong implementation", + nil, + ) } for quorumIdx, quorumNum := range quorumNumbers { From 7d8b2ce85a3fd7be2b79cd17503da370ec7ffd52 Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Fri, 1 Nov 2024 18:51:28 -0300 Subject: [PATCH 05/14] changing logger.Fatal to logger.Error and return errors in BuildAllConfig.validate --- chainio/clients/builder.go | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/chainio/clients/builder.go b/chainio/clients/builder.go index 3f7311dc..4a12ce14 100644 --- a/chainio/clients/builder.go +++ b/chainio/clients/builder.go @@ -3,6 +3,7 @@ package clients import ( "context" "crypto/ecdsa" + "fmt" "time" "github.com/ethereum/go-ethereum/ethclient" @@ -148,7 +149,8 @@ func BuildAll( defer cancel() chainid, err := ethHttpClient.ChainID(rpcCtx) if err != nil { - logger.Fatal("Cannot get chain id", "err", err) + logger.Error("Cannot get chain id", "err", err) + return nil, utils.WrapError("Cannot get chain id", err) } signerV2, addr, err := signerv2.SignerFromConfig(signerv2.Config{PrivateKey: ecdsaPrivateKey}, chainid) if err != nil { @@ -215,23 +217,30 @@ func BuildAll( // Very basic validation that makes sure all fields are nonempty // we might eventually want more sophisticated validation, based on regexp, // or use something like https://json-schema.org/ (?) -func (config *BuildAllConfig) validate(logger logging.Logger) { +func (config *BuildAllConfig) validate(logger logging.Logger) error { if config.EthHttpUrl == "" { - logger.Fatalf("BuildAllConfig.validate: Missing eth http url") + logger.Error("BuildAllConfig.validate: Missing eth http url") + return fmt.Errorf("BuildAllConfig.validate: Missing eth http url") } if config.EthWsUrl == "" { - logger.Fatalf("BuildAllConfig.validate: Missing eth ws url") + logger.Error("BuildAllConfig.validate: Missing eth ws url") + return fmt.Errorf("BuildAllConfig.validate: Missing eth ws url") } if config.RegistryCoordinatorAddr == "" { - logger.Fatalf("BuildAllConfig.validate: Missing bls registry coordinator address") + logger.Error("BuildAllConfig.validate: Missing bls registry coordinator address") + return fmt.Errorf("BuildAllConfig.validate: Missing bls registry coordinator address") } if config.OperatorStateRetrieverAddr == "" { - logger.Fatalf("BuildAllConfig.validate: Missing bls operator state retriever address") + logger.Error("BuildAllConfig.validate: Missing bls operator state retriever address") + return fmt.Errorf("BuildAllConfig.validate: Missing bls operator state retriever address") } if config.AvsName == "" { - logger.Fatalf("BuildAllConfig.validate: Missing avs name") + logger.Error("BuildAllConfig.validate: Missing avs name") + return fmt.Errorf("BuildAllConfig.validate: Missing avs name") } if config.PromMetricsIpPortAddress == "" { - logger.Fatalf("BuildAllConfig.validate: Missing prometheus metrics ip port address") + logger.Error("BuildAllConfig.validate: Missing prometheus metrics ip port address") + return fmt.Errorf("BuildAllConfig.validate: Missing prometheus metrics ip port address") } + return nil } From f08dabc1ae5c79a884de454fa92c3f33a9941274 Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Mon, 4 Nov 2024 17:48:54 -0300 Subject: [PATCH 06/14] removed panics in geometric example test --- .../txmgr/geometric/geometric_example_test.go | 70 +++++++++++-------- 1 file changed, 39 insertions(+), 31 deletions(-) diff --git a/chainio/txmgr/geometric/geometric_example_test.go b/chainio/txmgr/geometric/geometric_example_test.go index d77740a9..9883d95f 100644 --- a/chainio/txmgr/geometric/geometric_example_test.go +++ b/chainio/txmgr/geometric/geometric_example_test.go @@ -12,6 +12,7 @@ import ( "github.com/Layr-Labs/eigensdk-go/logging" "github.com/Layr-Labs/eigensdk-go/signerv2" "github.com/Layr-Labs/eigensdk-go/testutils" + "github.com/Layr-Labs/eigensdk-go/utils" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" @@ -23,6 +24,37 @@ var ( chainid = big.NewInt(31337) ) +func createTx(client eth.HttpBackend, address common.Address) (*types.Transaction, error) { + zeroAddr := common.HexToAddress("0x0") + nonce, err := client.PendingNonceAt(context.TODO(), address) + if err != nil { + return nil, utils.WrapError("Failed to get PendingNonceAt", err) + } + return types.NewTx(&types.DynamicFeeTx{ + To: &zeroAddr, + Nonce: nonce, + }), nil +} + +func createTxMgr(rpcUrl string, ecdsaPrivateKey *ecdsa.PrivateKey) (eth.HttpBackend, *GeometricTxManager, error) { + logger := logging.NewTextSLogger(os.Stdout, &logging.SLoggerOptions{}) + client, err := ethclient.Dial(rpcUrl) + if err != nil { + return nil, nil, err + } + signerV2, signerAddr, err := signerv2.SignerFromConfig(signerv2.Config{PrivateKey: ecdsaPrivateKey}, chainid) + if err != nil { + return nil, nil, err + } + wallet, err := wallet.NewPrivateKeyWallet(client, signerV2, signerAddr, logger) + if err != nil { + return nil, nil, err + } + reg := prometheus.NewRegistry() + metrics := NewMetrics(reg, "example", logger) + return client, NewGeometricTxnManager(client, wallet, logger, metrics, GeometricTxnManagerParams{}), nil +} + func ExampleGeometricTxManager() { anvilC, err := testutils.StartAnvilContainer("") if err != nil { @@ -40,46 +72,22 @@ func ExampleGeometricTxManager() { pk := ecdsaPrivateKey.PublicKey address := crypto.PubkeyToAddress(pk) - client, txmgr := createTxMgr(anvilUrl, ecdsaPrivateKey) - - tx := createTx(client, address) - _, err = txmgr.Send(context.TODO(), tx, true) + client, txmgr, err := createTxMgr(anvilUrl, ecdsaPrivateKey) if err != nil { panic(err) } - // we just add this to make sure the example runs - fmt.Println("Tx sent") - // Output: Tx sent -} - -func createTx(client eth.HttpBackend, address common.Address) *types.Transaction { - zeroAddr := common.HexToAddress("0x0") - nonce, err := client.PendingNonceAt(context.TODO(), address) + tx, err := createTx(client, address) if err != nil { panic(err) } - return types.NewTx(&types.DynamicFeeTx{ - To: &zeroAddr, - Nonce: nonce, - }) -} -func createTxMgr(rpcUrl string, ecdsaPrivateKey *ecdsa.PrivateKey) (eth.HttpBackend, *GeometricTxManager) { - logger := logging.NewTextSLogger(os.Stdout, &logging.SLoggerOptions{}) - client, err := ethclient.Dial(rpcUrl) - if err != nil { - panic(err) - } - signerV2, signerAddr, err := signerv2.SignerFromConfig(signerv2.Config{PrivateKey: ecdsaPrivateKey}, chainid) - if err != nil { - panic(err) - } - wallet, err := wallet.NewPrivateKeyWallet(client, signerV2, signerAddr, logger) + _, err = txmgr.Send(context.TODO(), tx, true) if err != nil { panic(err) } - reg := prometheus.NewRegistry() - metrics := NewMetrics(reg, "example", logger) - return client, NewGeometricTxnManager(client, wallet, logger, metrics, GeometricTxnManagerParams{}) + + // we just add this to make sure the example runs + fmt.Println("Tx sent") + // Output: Tx sent } From 8bb296d6c342886b53568b16342b1cdccdc0e9b1 Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Mon, 4 Nov 2024 17:53:52 -0300 Subject: [PATCH 07/14] removed panic in zap logger --- logging/zap_logger.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/logging/zap_logger.go b/logging/zap_logger.go index 5120a5ca..38d31ffe 100644 --- a/logging/zap_logger.go +++ b/logging/zap_logger.go @@ -3,6 +3,7 @@ package logging import ( "fmt" + "github.com/Layr-Labs/eigensdk-go/utils" "go.uber.org/zap" ) @@ -28,7 +29,7 @@ func NewZapLogger(env LogLevel) (Logger, error) { } else if env == Development { config = zap.NewDevelopmentConfig() } else { - panic(fmt.Sprintf("Unknown environment. Expected %s or %s. Received %s.", Development, Production, env)) + return nil, fmt.Errorf("unknown environment. Expected %s or %s. Received %s", Development, Production, env) } return NewZapLoggerByConfig(config, zap.AddCallerSkip(1)) @@ -39,7 +40,7 @@ func NewZapLogger(env LogLevel) (Logger, error) { func NewZapLoggerByConfig(config zap.Config, options ...zap.Option) (Logger, error) { logger, err := config.Build(options...) if err != nil { - panic(err) + return nil, utils.WrapError("Can not build config with the given options", err) } return &ZapLogger{ From 8c77fa0b5d0090cbe24b9c8eb922294b9c1981f2 Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Mon, 4 Nov 2024 18:17:44 -0300 Subject: [PATCH 08/14] sending error through a channel in startServiceInGoroutine --- .../operatorsinfo/operatorsinfo_inmemory.go | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/services/operatorsinfo/operatorsinfo_inmemory.go b/services/operatorsinfo/operatorsinfo_inmemory.go index c2874b92..8820b452 100644 --- a/services/operatorsinfo/operatorsinfo_inmemory.go +++ b/services/operatorsinfo/operatorsinfo_inmemory.go @@ -128,8 +128,9 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine( queryC <-chan query, wg *sync.WaitGroup, opts Opts, -) { - go func() { +) chan<- error { + errCh := make(chan error, 1) + go func(errCh chan<- error) { // TODO(samlaf): we should probably save the service in the logger itself and add it automatically to all logs ops.logger.Debug( @@ -146,8 +147,9 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine( "service", "OperatorPubkeysServiceInMemory", ) - // see the warning above the struct definition to understand why we panic here - panic(err) + // TODO! see the warning above the struct definition to understand why we panic here + errCh <- err + return } newSocketRegistrationC, newSocketRegistrationSub, err := ops.avsRegistrySubscriber.SubscribeToOperatorSocketUpdates() if err != nil { @@ -158,7 +160,8 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine( "service", "OperatorPubkeysServiceInMemory", ) - panic(err) + errCh <- err + return } err = ops.queryPastRegisteredOperatorEventsAndFillDb(ctx, opts) if err != nil { @@ -169,7 +172,8 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine( "service", "OperatorPubkeysServiceInMemory", ) - panic(err) + errCh <- err + return } // The constructor can return after we have backfilled the db by querying the events of operators that have // registered with the blsApkRegistry @@ -181,6 +185,7 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine( // TODO(samlaf): should we do anything here? Seems like this only happens when the aggregator is // shutting down and we want graceful exit ops.logger.Infof("OperatorPubkeysServiceInMemory: Context cancelled, exiting") + errCh <- errors.New("OperatorPubkeysServiceInMemory: Context cancelled, exiting") return case err := <-newPubkeyRegistrationSub.Err(): ops.logger.Error( @@ -201,7 +206,8 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine( "OperatorPubkeysServiceInMemory", ) // see the warning above the struct definition to understand why we panic here - panic(err) + errCh <- err + return } case err := <-newSocketRegistrationSub.Err(): ops.logger.Error( @@ -221,7 +227,8 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine( "service", "OperatorPubkeysServiceInMemory", ) - panic(err) + errCh <- err + return } case newPubkeyRegistrationEvent := <-newPubkeyRegistrationC: operatorAddr := newPubkeyRegistrationEvent.Operator @@ -279,7 +286,8 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine( query.respC <- resp{operatorInfo, ok} } } - }() + }(errCh) + return errCh } func (ops *OperatorsInfoServiceInMemory) queryPastRegisteredOperatorEventsAndFillDb( From 4bce41afd59927bf8d2c00cd3e6366d04bae102d Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Tue, 5 Nov 2024 15:52:07 -0300 Subject: [PATCH 09/14] check err result in config.validate(logger) --- chainio/clients/builder.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/chainio/clients/builder.go b/chainio/clients/builder.go index 4a12ce14..7475fc97 100644 --- a/chainio/clients/builder.go +++ b/chainio/clients/builder.go @@ -57,7 +57,10 @@ func BuildReadClients( config BuildAllConfig, logger logging.Logger, ) (*ReadClients, error) { - config.validate(logger) + err := config.validate(logger) + if err != nil { + return nil, utils.WrapError("Failed to validate logger", err) + } // Create the metrics server promReg := prometheus.NewRegistry() @@ -128,7 +131,10 @@ func BuildAll( ecdsaPrivateKey *ecdsa.PrivateKey, logger logging.Logger, ) (*Clients, error) { - config.validate(logger) + err := config.validate(logger) + if err != nil { + return nil, utils.WrapError("Failed to validate logger", err) + } // Create the metrics server promReg := prometheus.NewRegistry() From 6bd2d58b0defdf02e8dafeea7de96e3bae4ac459 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 13 Jan 2025 15:45:18 -0300 Subject: [PATCH 10/14] chore: remove outdated comment --- services/operatorsinfo/operatorsinfo_inmemory.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/services/operatorsinfo/operatorsinfo_inmemory.go b/services/operatorsinfo/operatorsinfo_inmemory.go index 8820b452..2e229d6c 100644 --- a/services/operatorsinfo/operatorsinfo_inmemory.go +++ b/services/operatorsinfo/operatorsinfo_inmemory.go @@ -49,11 +49,6 @@ type avsRegistrySubscriber interface { // Warning: this service should probably not be used in production. Haven't done a thorough analysis of all the clients // but there is still an open PR about an issue with ws subscription on geth: // https://github.com/ethereum/go-ethereum/issues/23845 -// Another reason to note for infra/devops engineer who would put this into production, is that this service crashes on -// websocket connection errors or when failing to query past events. The philosophy here is that hard crashing is -// better than silently failing, since it will be easier to debug. Naturally, this means that this aggregator using this -// service needs -// to be replicated and load-balanced, so that when it fails traffic can be switched to the other aggregator. type OperatorsInfoServiceInMemory struct { logFilterQueryBlockRange *big.Int avsRegistrySubscriber avsRegistrySubscriber @@ -147,7 +142,6 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine( "service", "OperatorPubkeysServiceInMemory", ) - // TODO! see the warning above the struct definition to understand why we panic here errCh <- err return } @@ -205,7 +199,6 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine( "service", "OperatorPubkeysServiceInMemory", ) - // see the warning above the struct definition to understand why we panic here errCh <- err return } From 24b05a9896500f7f1dfee85f41b3d28afce17ad6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 13 Jan 2025 16:09:15 -0300 Subject: [PATCH 11/14] chore: remove duplicated import --- services/operatorsinfo/operatorsinfo_inmemory_test.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/services/operatorsinfo/operatorsinfo_inmemory_test.go b/services/operatorsinfo/operatorsinfo_inmemory_test.go index 041b5af9..e5ba20c2 100644 --- a/services/operatorsinfo/operatorsinfo_inmemory_test.go +++ b/services/operatorsinfo/operatorsinfo_inmemory_test.go @@ -12,7 +12,6 @@ import ( "github.com/ethereum/go-ethereum/event" - apkregistrybindings "github.com/Layr-Labs/eigensdk-go/contracts/bindings/BLSApkRegistry" blsapkreg "github.com/Layr-Labs/eigensdk-go/contracts/bindings/BLSApkRegistry" regcoord "github.com/Layr-Labs/eigensdk-go/contracts/bindings/RegistryCoordinator" "github.com/Layr-Labs/eigensdk-go/crypto/bls" @@ -25,14 +24,14 @@ import ( ) type fakeAVSRegistrySubscriber struct { - pubkeyRegistrationEventC chan *apkregistrybindings.ContractBLSApkRegistryNewPubkeyRegistration + pubkeyRegistrationEventC chan *blsapkreg.ContractBLSApkRegistryNewPubkeyRegistration operatorSocketUpdateEventC chan *regcoord.ContractRegistryCoordinatorOperatorSocketUpdate eventSubscription *fakeEventSubscription } func newFakeAVSRegistrySubscriber( eventSubscription *fakeEventSubscription, - pubkeyRegistrationEventC chan *apkregistrybindings.ContractBLSApkRegistryNewPubkeyRegistration, + pubkeyRegistrationEventC chan *blsapkreg.ContractBLSApkRegistryNewPubkeyRegistration, operatorSocketUpdateEventC chan *regcoord.ContractRegistryCoordinatorOperatorSocketUpdate, ) *fakeAVSRegistrySubscriber { return &fakeAVSRegistrySubscriber{ @@ -86,8 +85,8 @@ func TestGetOperatorInfo(t *testing.T) { ContractG2Pubkey: contractG2Pubkey, } - pubkeyRegistrationEventC := make(chan *apkregistrybindings.ContractBLSApkRegistryNewPubkeyRegistration, 1) - pubkeyRegistrationEvent := &apkregistrybindings.ContractBLSApkRegistryNewPubkeyRegistration{ + pubkeyRegistrationEventC := make(chan *blsapkreg.ContractBLSApkRegistryNewPubkeyRegistration, 1) + pubkeyRegistrationEvent := &blsapkreg.ContractBLSApkRegistryNewPubkeyRegistration{ Operator: testOperator1.OperatorAddr, PubkeyG1: testOperator1.ContractG1Pubkey, PubkeyG2: testOperator1.ContractG2Pubkey, @@ -106,7 +105,7 @@ func TestGetOperatorInfo(t *testing.T) { var tests = []struct { name string operator *fakes.TestOperator - pubkeyRegistrationEventC chan *apkregistrybindings.ContractBLSApkRegistryNewPubkeyRegistration + pubkeyRegistrationEventC chan *blsapkreg.ContractBLSApkRegistryNewPubkeyRegistration operatorSocketUpdateEventC chan *regcoord.ContractRegistryCoordinatorOperatorSocketUpdate eventErrC chan error queryOperatorAddr common.Address From 812a75d9b7f885801a69770f11f10ef3a539e92b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Wed, 15 Jan 2025 18:59:53 -0300 Subject: [PATCH 12/14] chore: add more context to log --- services/avsregistry/avsregistry_chaincaller.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/services/avsregistry/avsregistry_chaincaller.go b/services/avsregistry/avsregistry_chaincaller.go index 23f3daef..b7966ffd 100644 --- a/services/avsregistry/avsregistry_chaincaller.go +++ b/services/avsregistry/avsregistry_chaincaller.go @@ -80,6 +80,10 @@ func (ar *AvsRegistryServiceChainCaller) GetOperatorsAvsStateAtBlock( "Number of quorums returned from GetOperatorsStakeInQuorumsAtBlock does not match number of quorums requested. Probably pointing to old contract or wrong implementation.", "service", "AvsRegistryServiceChainCaller", + "operatorsStakesInQuorums", + operatorsStakesInQuorums, + "numquorums", + numquorums, ) return nil, utils.WrapError( From b5fd18e6dd4a12fd51790095e9639ab0936dd4da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Wed, 15 Jan 2025 19:04:12 -0300 Subject: [PATCH 13/14] chore: remove panics from examples --- .../txmgr/geometric/geometric_example_test.go | 18 ++++++++++++------ metrics/eigenmetrics_example_test.go | 14 ++++++++++---- nodeapi/nodeapi_example_test.go | 6 +++++- 3 files changed, 27 insertions(+), 11 deletions(-) diff --git a/chainio/txmgr/geometric/geometric_example_test.go b/chainio/txmgr/geometric/geometric_example_test.go index 9883d95f..9ec8719e 100644 --- a/chainio/txmgr/geometric/geometric_example_test.go +++ b/chainio/txmgr/geometric/geometric_example_test.go @@ -58,33 +58,39 @@ func createTxMgr(rpcUrl string, ecdsaPrivateKey *ecdsa.PrivateKey) (eth.HttpBack func ExampleGeometricTxManager() { anvilC, err := testutils.StartAnvilContainer("") if err != nil { - panic(err) + fmt.Fprintln(os.Stderr, err) + os.Exit(1) } anvilUrl, err := anvilC.Endpoint(context.TODO(), "http") if err != nil { - panic(err) + fmt.Fprintln(os.Stderr, err) + os.Exit(1) } ecdsaPrivateKey, err := crypto.HexToECDSA("ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80") if err != nil { - panic(err) + fmt.Fprintln(os.Stderr, err) + os.Exit(1) } pk := ecdsaPrivateKey.PublicKey address := crypto.PubkeyToAddress(pk) client, txmgr, err := createTxMgr(anvilUrl, ecdsaPrivateKey) if err != nil { - panic(err) + fmt.Fprintln(os.Stderr, err) + os.Exit(1) } tx, err := createTx(client, address) if err != nil { - panic(err) + fmt.Fprintln(os.Stderr, err) + os.Exit(1) } _, err = txmgr.Send(context.TODO(), tx, true) if err != nil { - panic(err) + fmt.Fprintln(os.Stderr, err) + os.Exit(1) } // we just add this to make sure the example runs diff --git a/metrics/eigenmetrics_example_test.go b/metrics/eigenmetrics_example_test.go index bf55ab83..f4075d89 100644 --- a/metrics/eigenmetrics_example_test.go +++ b/metrics/eigenmetrics_example_test.go @@ -6,6 +6,8 @@ package metrics_test import ( "context" + "fmt" + "os" "github.com/Layr-Labs/eigensdk-go/chainio/clients" "github.com/Layr-Labs/eigensdk-go/chainio/clients/eth" @@ -26,13 +28,15 @@ func ExampleEigenMetrics() { logger, err := logging.NewZapLogger("development") if err != nil { - panic(err) + fmt.Fprintln(os.Stderr, err) + os.Exit(1) } // get the Writer for the EL contracts ecdsaPrivateKey, err := crypto.HexToECDSA("0x0") if err != nil { - panic(err) + fmt.Fprintln(os.Stderr, err) + os.Exit(1) } operatorEcdsaAddr := crypto.PubkeyToAddress(ecdsaPrivateKey.PublicKey) @@ -46,7 +50,8 @@ func ExampleEigenMetrics() { } clients, err := clients.BuildAll(chainioConfig, ecdsaPrivateKey, logger) if err != nil { - panic(err) + fmt.Fprintln(os.Stderr, err) + os.Exit(1) } reg := prometheus.NewRegistry() eigenMetrics := metrics.NewEigenMetrics("exampleAvs", ":9090", reg, logger) @@ -71,7 +76,8 @@ func ExampleEigenMetrics() { rpcCallsCollector := rpccalls.NewCollector("exampleAvs", reg) instrumentedEthClient, err := eth.NewInstrumentedClient("http://localhost:8545", rpcCallsCollector) if err != nil { - panic(err) + fmt.Fprintln(os.Stderr, err) + os.Exit(1) } eigenMetrics.Start(context.Background(), reg) diff --git a/nodeapi/nodeapi_example_test.go b/nodeapi/nodeapi_example_test.go index 7c9e7844..2fa76047 100644 --- a/nodeapi/nodeapi_example_test.go +++ b/nodeapi/nodeapi_example_test.go @@ -1,6 +1,9 @@ package nodeapi_test import ( + "fmt" + "os" + "github.com/Layr-Labs/eigensdk-go/logging" "github.com/Layr-Labs/eigensdk-go/nodeapi" ) @@ -8,7 +11,8 @@ import ( func ExampleNodeApi() { logger, err := logging.NewZapLogger("development") if err != nil { - panic(err) + fmt.Fprintln(os.Stderr, err) + os.Exit(1) } nodeApi := nodeapi.NewNodeApi("testAvs", "v0.0.1", "localhost:8080", logger) From 505847b68b01d0ceb4feb16c82096440fd344229 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Wed, 15 Jan 2025 19:14:00 -0300 Subject: [PATCH 14/14] refactor: use errgroup instead of chan error --- .../operatorsinfo/operatorsinfo_inmemory.go | 269 +++++++++--------- 1 file changed, 133 insertions(+), 136 deletions(-) diff --git a/services/operatorsinfo/operatorsinfo_inmemory.go b/services/operatorsinfo/operatorsinfo_inmemory.go index 2e229d6c..cf3d6b24 100644 --- a/services/operatorsinfo/operatorsinfo_inmemory.go +++ b/services/operatorsinfo/operatorsinfo_inmemory.go @@ -9,6 +9,7 @@ import ( blsapkreg "github.com/Layr-Labs/eigensdk-go/contracts/bindings/BLSApkRegistry" regcoord "github.com/Layr-Labs/eigensdk-go/contracts/bindings/RegistryCoordinator" "github.com/ethereum/go-ethereum/event" + "golang.org/x/sync/errgroup" "github.com/Layr-Labs/eigensdk-go/crypto/bls" "github.com/Layr-Labs/eigensdk-go/logging" @@ -55,6 +56,7 @@ type OperatorsInfoServiceInMemory struct { avsRegistryReader avsRegistryReader logger logging.Logger queryC chan<- query + errG *errgroup.Group // queried via the queryC channel, so don't need mutex to access pubkeyDict map[common.Address]types.OperatorPubkeys operatorAddrToId map[common.Address]types.OperatorId @@ -99,12 +101,15 @@ func NewOperatorsInfoServiceInMemory( if logFilterQueryBlockRange == nil { logFilterQueryBlockRange = defaultLogFilterQueryBlockRange } + errG, ctx := errgroup.WithContext(ctx) + pkcs := &OperatorsInfoServiceInMemory{ avsRegistrySubscriber: avsRegistrySubscriber, avsRegistryReader: avsRegistryReader, logFilterQueryBlockRange: logFilterQueryBlockRange, logger: logger, queryC: queryC, + errG: errG, pubkeyDict: make(map[common.Address]types.OperatorPubkeys), operatorAddrToId: make(map[common.Address]types.OperatorId), socketDict: make(map[types.OperatorId]types.Socket), @@ -112,175 +117,167 @@ func NewOperatorsInfoServiceInMemory( // We use this waitgroup to wait on the initialization of the inmemory pubkey dict, // which requires querying the past events of the pubkey registration contract wg := sync.WaitGroup{} + wg.Add(1) - pkcs.startServiceInGoroutine(ctx, queryC, &wg, opts) + errG.Go(func() error { + return pkcs.runService(ctx, queryC, &wg, opts) + }) wg.Wait() return pkcs } -func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine( +func (ops *OperatorsInfoServiceInMemory) runService( ctx context.Context, queryC <-chan query, wg *sync.WaitGroup, opts Opts, -) chan<- error { - errCh := make(chan error, 1) - go func(errCh chan<- error) { - - // TODO(samlaf): we should probably save the service in the logger itself and add it automatically to all logs - ops.logger.Debug( - "Subscribing to new pubkey registration events on blsApkRegistry contract", +) error { + // TODO(samlaf): we should probably save the service in the logger itself and add it automatically to all logs + ops.logger.Debug( + "Subscribing to new pubkey registration events on blsApkRegistry contract", + "service", + "OperatorPubkeysServiceInMemory", + ) + newPubkeyRegistrationC, newPubkeyRegistrationSub, err := ops.avsRegistrySubscriber.SubscribeToNewPubkeyRegistrations() + if err != nil { + ops.logger.Error( + "Fatal error opening websocket subscription for new pubkey registrations", + "err", + err, "service", "OperatorPubkeysServiceInMemory", ) - newPubkeyRegistrationC, newPubkeyRegistrationSub, err := ops.avsRegistrySubscriber.SubscribeToNewPubkeyRegistrations() - if err != nil { - ops.logger.Error( - "Fatal error opening websocket subscription for new pubkey registrations", - "err", - err, - "service", - "OperatorPubkeysServiceInMemory", - ) - errCh <- err - return - } - newSocketRegistrationC, newSocketRegistrationSub, err := ops.avsRegistrySubscriber.SubscribeToOperatorSocketUpdates() - if err != nil { - ops.logger.Error( - "Fatal error opening websocket subscription for new socket registrations", - "err", - err, - "service", - "OperatorPubkeysServiceInMemory", - ) - errCh <- err - return - } - err = ops.queryPastRegisteredOperatorEventsAndFillDb(ctx, opts) - if err != nil { + return err + } + newSocketRegistrationC, newSocketRegistrationSub, err := ops.avsRegistrySubscriber.SubscribeToOperatorSocketUpdates() + if err != nil { + ops.logger.Error( + "Fatal error opening websocket subscription for new socket registrations", + "err", + err, + "service", + "OperatorPubkeysServiceInMemory", + ) + return err + } + err = ops.queryPastRegisteredOperatorEventsAndFillDb(ctx, opts) + if err != nil { + ops.logger.Error( + "Fatal error querying past registered operator events and filling db", + "err", + err, + "service", + "OperatorPubkeysServiceInMemory", + ) + return err + } + // The constructor can return after we have backfilled the db by querying the events of operators that have + // registered with the blsApkRegistry + // before the block at which we started the ws subscription above + wg.Done() + for { + select { + case <-ctx.Done(): + // TODO(samlaf): should we do anything here? Seems like this only happens when the aggregator is + // shutting down and we want graceful exit + ops.logger.Infof("OperatorPubkeysServiceInMemory: Context cancelled, exiting") + return errors.New("OperatorPubkeysServiceInMemory: Context cancelled, exiting") + case err := <-newPubkeyRegistrationSub.Err(): ops.logger.Error( - "Fatal error querying past registered operator events and filling db", + "Error in websocket subscription for new pubkey registration events. Attempting to reconnect...", "err", err, "service", "OperatorPubkeysServiceInMemory", ) - errCh <- err - return - } - // The constructor can return after we have backfilled the db by querying the events of operators that have - // registered with the blsApkRegistry - // before the block at which we started the ws subscription above - wg.Done() - for { - select { - case <-ctx.Done(): - // TODO(samlaf): should we do anything here? Seems like this only happens when the aggregator is - // shutting down and we want graceful exit - ops.logger.Infof("OperatorPubkeysServiceInMemory: Context cancelled, exiting") - errCh <- errors.New("OperatorPubkeysServiceInMemory: Context cancelled, exiting") - return - case err := <-newPubkeyRegistrationSub.Err(): + newPubkeyRegistrationSub.Unsubscribe() + newPubkeyRegistrationC, newPubkeyRegistrationSub, err = ops.avsRegistrySubscriber.SubscribeToNewPubkeyRegistrations() + if err != nil { ops.logger.Error( - "Error in websocket subscription for new pubkey registration events. Attempting to reconnect...", + "Error opening websocket subscription for new pubkey registrations", "err", err, "service", "OperatorPubkeysServiceInMemory", ) - newPubkeyRegistrationSub.Unsubscribe() - newPubkeyRegistrationC, newPubkeyRegistrationSub, err = ops.avsRegistrySubscriber.SubscribeToNewPubkeyRegistrations() - if err != nil { - ops.logger.Error( - "Error opening websocket subscription for new pubkey registrations", - "err", - err, - "service", - "OperatorPubkeysServiceInMemory", - ) - errCh <- err - return - } - case err := <-newSocketRegistrationSub.Err(): + return err + } + case err := <-newSocketRegistrationSub.Err(): + ops.logger.Error( + "Error in websocket subscription for new socket registration events. Attempting to reconnect...", + "err", + err, + "service", + "OperatorPubkeysServiceInMemory", + ) + newSocketRegistrationSub.Unsubscribe() + newSocketRegistrationC, newSocketRegistrationSub, err = ops.avsRegistrySubscriber.SubscribeToOperatorSocketUpdates() + if err != nil { ops.logger.Error( - "Error in websocket subscription for new socket registration events. Attempting to reconnect...", + "Error opening websocket subscription for new socket registrations", "err", err, "service", "OperatorPubkeysServiceInMemory", ) - newSocketRegistrationSub.Unsubscribe() - newSocketRegistrationC, newSocketRegistrationSub, err = ops.avsRegistrySubscriber.SubscribeToOperatorSocketUpdates() - if err != nil { - ops.logger.Error( - "Error opening websocket subscription for new socket registrations", - "err", - err, - "service", - "OperatorPubkeysServiceInMemory", - ) - errCh <- err - return - } - case newPubkeyRegistrationEvent := <-newPubkeyRegistrationC: - operatorAddr := newPubkeyRegistrationEvent.Operator - ops.pubkeyDict[operatorAddr] = types.OperatorPubkeys{ - G1Pubkey: bls.NewG1Point( - newPubkeyRegistrationEvent.PubkeyG1.X, - newPubkeyRegistrationEvent.PubkeyG1.Y, - ), - G2Pubkey: bls.NewG2Point( - newPubkeyRegistrationEvent.PubkeyG2.X, - newPubkeyRegistrationEvent.PubkeyG2.Y, - ), - } + return err + } + case newPubkeyRegistrationEvent := <-newPubkeyRegistrationC: + operatorAddr := newPubkeyRegistrationEvent.Operator + ops.pubkeyDict[operatorAddr] = types.OperatorPubkeys{ + G1Pubkey: bls.NewG1Point( + newPubkeyRegistrationEvent.PubkeyG1.X, + newPubkeyRegistrationEvent.PubkeyG1.Y, + ), + G2Pubkey: bls.NewG2Point( + newPubkeyRegistrationEvent.PubkeyG2.X, + newPubkeyRegistrationEvent.PubkeyG2.Y, + ), + } - operatorId := types.OperatorIdFromContractG1Pubkey(newPubkeyRegistrationEvent.PubkeyG1) - ops.operatorAddrToId[operatorAddr] = operatorId - ops.logger.Debug( - "Added operator pubkeys to pubkey dict from new pubkey registration event", - "service", - "OperatorPubkeysServiceInMemory", - "block", - newPubkeyRegistrationEvent.Raw.BlockNumber, - "operatorAddr", - operatorAddr, - "operatorId", - operatorId, - "G1pubkey", - ops.pubkeyDict[operatorAddr].G1Pubkey, - "G2pubkey", - ops.pubkeyDict[operatorAddr].G2Pubkey, - ) - case newSocketRegistrationEvent := <-newSocketRegistrationC: - ops.logger.Debug( - "Received new socket registration event", - "service", - "OperatorPubkeysServiceInMemory", - "operatorId", - types.OperatorId(newSocketRegistrationEvent.OperatorId), - "socket", - newSocketRegistrationEvent.Socket, - ) - ops.updateSocketMapping( - newSocketRegistrationEvent.OperatorId, - types.Socket(newSocketRegistrationEvent.Socket), - ) - // Receive a query from GetOperatorPubkeys - case query := <-queryC: - pubkeys, ok := ops.pubkeyDict[query.operatorAddr] - operatorId := ops.operatorAddrToId[query.operatorAddr] - socket := ops.socketDict[operatorId] - operatorInfo := types.OperatorInfo{ - Socket: socket, - Pubkeys: pubkeys, - } - query.respC <- resp{operatorInfo, ok} + operatorId := types.OperatorIdFromContractG1Pubkey(newPubkeyRegistrationEvent.PubkeyG1) + ops.operatorAddrToId[operatorAddr] = operatorId + ops.logger.Debug( + "Added operator pubkeys to pubkey dict from new pubkey registration event", + "service", + "OperatorPubkeysServiceInMemory", + "block", + newPubkeyRegistrationEvent.Raw.BlockNumber, + "operatorAddr", + operatorAddr, + "operatorId", + operatorId, + "G1pubkey", + ops.pubkeyDict[operatorAddr].G1Pubkey, + "G2pubkey", + ops.pubkeyDict[operatorAddr].G2Pubkey, + ) + case newSocketRegistrationEvent := <-newSocketRegistrationC: + ops.logger.Debug( + "Received new socket registration event", + "service", + "OperatorPubkeysServiceInMemory", + "operatorId", + types.OperatorId(newSocketRegistrationEvent.OperatorId), + "socket", + newSocketRegistrationEvent.Socket, + ) + ops.updateSocketMapping( + newSocketRegistrationEvent.OperatorId, + types.Socket(newSocketRegistrationEvent.Socket), + ) + // Receive a query from GetOperatorPubkeys + case query := <-queryC: + pubkeys, ok := ops.pubkeyDict[query.operatorAddr] + operatorId := ops.operatorAddrToId[query.operatorAddr] + socket := ops.socketDict[operatorId] + operatorInfo := types.OperatorInfo{ + Socket: socket, + Pubkeys: pubkeys, } + query.respC <- resp{operatorInfo, ok} } - }(errCh) - return errCh + } } func (ops *OperatorsInfoServiceInMemory) queryPastRegisteredOperatorEventsAndFillDb(