From 93748242e19a5f03f0fc338862e67c961241d6d2 Mon Sep 17 00:00:00 2001 From: xiaohuo Date: Wed, 4 Sep 2024 22:49:23 +0800 Subject: [PATCH 1/3] feat: support start/stop block number for querying operator info --- services/bls_aggregation/blsagg_test.go | 2 ++ .../operatorsinfo/operatorsinfo_inmemory.go | 22 +++++++++++++------ .../operatorsinfo_inmemory_test.go | 2 ++ 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/services/bls_aggregation/blsagg_test.go b/services/bls_aggregation/blsagg_test.go index 40fa992f..211cb448 100644 --- a/services/bls_aggregation/blsagg_test.go +++ b/services/bls_aggregation/blsagg_test.go @@ -1150,6 +1150,8 @@ func TestIntegrationBlsAgg(t *testing.T) { avsClients.AvsRegistryChainSubscriber, avsClients.AvsRegistryChainReader, nil, + nil, + nil, logger, ) avsRegistryService := avsregistry.NewAvsRegistryServiceChainCaller( diff --git a/services/operatorsinfo/operatorsinfo_inmemory.go b/services/operatorsinfo/operatorsinfo_inmemory.go index b5b43deb..3b09db19 100644 --- a/services/operatorsinfo/operatorsinfo_inmemory.go +++ b/services/operatorsinfo/operatorsinfo_inmemory.go @@ -92,6 +92,8 @@ func NewOperatorsInfoServiceInMemory( avsRegistrySubscriber avsRegistrySubscriber, avsRegistryReader avsRegistryReader, logFilterQueryBlockRange *big.Int, + startBlock *big.Int, + stopBlock *big.Int, logger logging.Logger, ) *OperatorsInfoServiceInMemory { queryC := make(chan query) @@ -112,7 +114,7 @@ func NewOperatorsInfoServiceInMemory( // which requires querying the past events of the pubkey registration contract wg := sync.WaitGroup{} wg.Add(1) - pkcs.startServiceInGoroutine(ctx, queryC, &wg) + pkcs.startServiceInGoroutine(ctx, queryC, &wg, startBlock, stopBlock) wg.Wait() return pkcs } @@ -121,6 +123,8 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine( ctx context.Context, queryC <-chan query, wg *sync.WaitGroup, + startBlock *big.Int, + stopBlock *big.Int, ) { go func() { @@ -153,7 +157,7 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine( ) panic(err) } - err = ops.queryPastRegisteredOperatorEventsAndFillDb(ctx) + err = ops.queryPastRegisteredOperatorEventsAndFillDb(ctx, startBlock, stopBlock) if err != nil { ops.logger.Error( "Fatal error querying past registered operator events and filling db", @@ -275,7 +279,11 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine( }() } -func (ops *OperatorsInfoServiceInMemory) queryPastRegisteredOperatorEventsAndFillDb(ctx context.Context) error { +func (ops *OperatorsInfoServiceInMemory) queryPastRegisteredOperatorEventsAndFillDb( + ctx context.Context, + startBlock *big.Int, + stopBlock *big.Int, +) error { // Querying with nil startBlock and stopBlock will return all events. It doesn't matter if we query some events that // we will receive again in the websocket, // since we will just overwrite the pubkey dict with the same values. @@ -290,8 +298,8 @@ func (ops *OperatorsInfoServiceInMemory) queryPastRegisteredOperatorEventsAndFil go func() { alreadyRegisteredOperatorAddrs, alreadyRegisteredOperatorPubkeys, pubkeysErr = ops.avsRegistryReader.QueryExistingRegisteredOperatorPubKeys( ctx, - nil, - nil, + startBlock, + stopBlock, ops.logFilterQueryBlockRange, ) wg.Done() @@ -301,8 +309,8 @@ func (ops *OperatorsInfoServiceInMemory) queryPastRegisteredOperatorEventsAndFil go func() { socketsMap, socketsErr = ops.avsRegistryReader.QueryExistingRegisteredOperatorSockets( ctx, - nil, - nil, + startBlock, + stopBlock, ops.logFilterQueryBlockRange, ) wg.Done() diff --git a/services/operatorsinfo/operatorsinfo_inmemory_test.go b/services/operatorsinfo/operatorsinfo_inmemory_test.go index c3e473a0..085af409 100644 --- a/services/operatorsinfo/operatorsinfo_inmemory_test.go +++ b/services/operatorsinfo/operatorsinfo_inmemory_test.go @@ -153,6 +153,8 @@ func TestGetOperatorInfo(t *testing.T) { mockAvsRegistrySubscriber, mockAvsReader, nil, + nil, + nil, logger, ) time.Sleep( From eb15e202f60d482183d7acbdabebbf8946464289 Mon Sep 17 00:00:00 2001 From: xiaohuo Date: Thu, 5 Sep 2024 09:11:50 +0800 Subject: [PATCH 2/3] refactor: use Opts struct --- services/bls_aggregation/blsagg_test.go | 3 +-- .../operatorsinfo/operatorsinfo_inmemory.go | 26 ++++++++++--------- .../operatorsinfo_inmemory_test.go | 3 +-- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/services/bls_aggregation/blsagg_test.go b/services/bls_aggregation/blsagg_test.go index 211cb448..a55cd5d4 100644 --- a/services/bls_aggregation/blsagg_test.go +++ b/services/bls_aggregation/blsagg_test.go @@ -1150,8 +1150,7 @@ func TestIntegrationBlsAgg(t *testing.T) { avsClients.AvsRegistryChainSubscriber, avsClients.AvsRegistryChainReader, nil, - nil, - nil, + &operatorsinfo.Opts{}, logger, ) avsRegistryService := avsregistry.NewAvsRegistryServiceChainCaller( diff --git a/services/operatorsinfo/operatorsinfo_inmemory.go b/services/operatorsinfo/operatorsinfo_inmemory.go index 3b09db19..767a28e5 100644 --- a/services/operatorsinfo/operatorsinfo_inmemory.go +++ b/services/operatorsinfo/operatorsinfo_inmemory.go @@ -78,6 +78,11 @@ type resp struct { operatorExists bool } +type Opts struct { + StartBlock *big.Int + StopBlock *big.Int +} + var _ OperatorsInfoService = (*OperatorsInfoServiceInMemory)(nil) // NewOperatorsInfoServiceInMemory constructs a OperatorsInfoServiceInMemory and starts it in a goroutine. @@ -92,8 +97,7 @@ func NewOperatorsInfoServiceInMemory( avsRegistrySubscriber avsRegistrySubscriber, avsRegistryReader avsRegistryReader, logFilterQueryBlockRange *big.Int, - startBlock *big.Int, - stopBlock *big.Int, + opts *Opts, logger logging.Logger, ) *OperatorsInfoServiceInMemory { queryC := make(chan query) @@ -114,7 +118,7 @@ func NewOperatorsInfoServiceInMemory( // which requires querying the past events of the pubkey registration contract wg := sync.WaitGroup{} wg.Add(1) - pkcs.startServiceInGoroutine(ctx, queryC, &wg, startBlock, stopBlock) + pkcs.startServiceInGoroutine(ctx, queryC, &wg, opts) wg.Wait() return pkcs } @@ -123,8 +127,7 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine( ctx context.Context, queryC <-chan query, wg *sync.WaitGroup, - startBlock *big.Int, - stopBlock *big.Int, + opts *Opts, ) { go func() { @@ -157,7 +160,7 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine( ) panic(err) } - err = ops.queryPastRegisteredOperatorEventsAndFillDb(ctx, startBlock, stopBlock) + err = ops.queryPastRegisteredOperatorEventsAndFillDb(ctx, opts) if err != nil { ops.logger.Error( "Fatal error querying past registered operator events and filling db", @@ -281,8 +284,7 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine( func (ops *OperatorsInfoServiceInMemory) queryPastRegisteredOperatorEventsAndFillDb( ctx context.Context, - startBlock *big.Int, - stopBlock *big.Int, + opts *Opts, ) error { // Querying with nil startBlock and stopBlock will return all events. It doesn't matter if we query some events that // we will receive again in the websocket, @@ -298,8 +300,8 @@ func (ops *OperatorsInfoServiceInMemory) queryPastRegisteredOperatorEventsAndFil go func() { alreadyRegisteredOperatorAddrs, alreadyRegisteredOperatorPubkeys, pubkeysErr = ops.avsRegistryReader.QueryExistingRegisteredOperatorPubKeys( ctx, - startBlock, - stopBlock, + opts.StartBlock, + opts.StopBlock, ops.logFilterQueryBlockRange, ) wg.Done() @@ -309,8 +311,8 @@ func (ops *OperatorsInfoServiceInMemory) queryPastRegisteredOperatorEventsAndFil go func() { socketsMap, socketsErr = ops.avsRegistryReader.QueryExistingRegisteredOperatorSockets( ctx, - startBlock, - stopBlock, + opts.StartBlock, + opts.StopBlock, ops.logFilterQueryBlockRange, ) wg.Done() diff --git a/services/operatorsinfo/operatorsinfo_inmemory_test.go b/services/operatorsinfo/operatorsinfo_inmemory_test.go index 085af409..55308124 100644 --- a/services/operatorsinfo/operatorsinfo_inmemory_test.go +++ b/services/operatorsinfo/operatorsinfo_inmemory_test.go @@ -153,8 +153,7 @@ func TestGetOperatorInfo(t *testing.T) { mockAvsRegistrySubscriber, mockAvsReader, nil, - nil, - nil, + &Opts{}, logger, ) time.Sleep( From 11fb34726c2393bffce718fe0ae5484d9390872b Mon Sep 17 00:00:00 2001 From: xiaohuo Date: Thu, 5 Sep 2024 09:29:12 +0800 Subject: [PATCH 3/3] refactor: use struct instead of pointer to avoid panic --- services/bls_aggregation/blsagg_test.go | 2 +- services/operatorsinfo/operatorsinfo_inmemory.go | 8 ++++---- services/operatorsinfo/operatorsinfo_inmemory_test.go | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/services/bls_aggregation/blsagg_test.go b/services/bls_aggregation/blsagg_test.go index a55cd5d4..b247c91d 100644 --- a/services/bls_aggregation/blsagg_test.go +++ b/services/bls_aggregation/blsagg_test.go @@ -1150,7 +1150,7 @@ func TestIntegrationBlsAgg(t *testing.T) { avsClients.AvsRegistryChainSubscriber, avsClients.AvsRegistryChainReader, nil, - &operatorsinfo.Opts{}, + operatorsinfo.Opts{}, logger, ) avsRegistryService := avsregistry.NewAvsRegistryServiceChainCaller( diff --git a/services/operatorsinfo/operatorsinfo_inmemory.go b/services/operatorsinfo/operatorsinfo_inmemory.go index 767a28e5..c2874b92 100644 --- a/services/operatorsinfo/operatorsinfo_inmemory.go +++ b/services/operatorsinfo/operatorsinfo_inmemory.go @@ -97,7 +97,7 @@ func NewOperatorsInfoServiceInMemory( avsRegistrySubscriber avsRegistrySubscriber, avsRegistryReader avsRegistryReader, logFilterQueryBlockRange *big.Int, - opts *Opts, + opts Opts, logger logging.Logger, ) *OperatorsInfoServiceInMemory { queryC := make(chan query) @@ -127,7 +127,7 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine( ctx context.Context, queryC <-chan query, wg *sync.WaitGroup, - opts *Opts, + opts Opts, ) { go func() { @@ -284,7 +284,7 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine( func (ops *OperatorsInfoServiceInMemory) queryPastRegisteredOperatorEventsAndFillDb( ctx context.Context, - opts *Opts, + opts Opts, ) error { // Querying with nil startBlock and stopBlock will return all events. It doesn't matter if we query some events that // we will receive again in the websocket, @@ -338,7 +338,7 @@ func (ops *OperatorsInfoServiceInMemory) queryPastRegisteredOperatorEventsAndFil // we print each socket info on a separate line because slog for some reason doesn't pass map keys via their // LogValue() function, so operatorId (of custom type Bytes32) prints as a byte array instead of its hex // representation from LogValue() - // passing the Bytes32 directly to an slog log statements does call LogValue() and prints the hex representation + // passing the Bytes32 directly to a slog log statements does call LogValue() and prints the hex representation ops.logger.Debug( "operator socket returned from registration events query", "operatorId", diff --git a/services/operatorsinfo/operatorsinfo_inmemory_test.go b/services/operatorsinfo/operatorsinfo_inmemory_test.go index 55308124..041b5af9 100644 --- a/services/operatorsinfo/operatorsinfo_inmemory_test.go +++ b/services/operatorsinfo/operatorsinfo_inmemory_test.go @@ -153,7 +153,7 @@ func TestGetOperatorInfo(t *testing.T) { mockAvsRegistrySubscriber, mockAvsReader, nil, - &Opts{}, + Opts{}, logger, ) time.Sleep(