diff --git a/operator/pkg/operator.go b/operator/pkg/operator.go index b2e46e1eb..832563193 100644 --- a/operator/pkg/operator.go +++ b/operator/pkg/operator.go @@ -6,7 +6,11 @@ import ( "crypto/ecdsa" "encoding/binary" "fmt" + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/accounts/abi" "log" + "math/big" + "strings" "sync" "time" @@ -32,11 +36,12 @@ import ( "github.com/yetanotherco/aligned_layer/common" servicemanager "github.com/yetanotherco/aligned_layer/contracts/bindings/AlignedLayerServiceManager" "github.com/yetanotherco/aligned_layer/core/chainio" - "github.com/yetanotherco/aligned_layer/core/types" - "github.com/yetanotherco/aligned_layer/core/config" + "github.com/yetanotherco/aligned_layer/core/types" ) +const blockInterval uint64 = 50000 + type Operator struct { Config config.OperatorConfig Address ethcommon.Address @@ -176,7 +181,11 @@ func (o *Operator) Start(ctx context.Context) error { o.Logger.Infof("Signed hash: %+v", *responseSignature) go o.aggRpcClient.SendSignedTaskResponseToAggregator(&signedTaskResponse) case <-pollLatestTaskTimer.C: - o.checkAndProcessLatestTask() + err := o.checkAndProcessLatestTask() + if err != nil { + o.Logger.Infof("Could not process latest task: %v", err) + continue + } } } } @@ -257,33 +266,75 @@ func (o *Operator) processNewBatch(newBatchLog *servicemanager.ContractAlignedLa } -func (o *Operator) checkAndProcessLatestTask() { +func (o *Operator) checkAndProcessLatestTask() error { latestTask, err := o.getLatestTaskFromBlockchain() if err != nil { - o.Logger.Errorf("Failed to get latest task from blockchain: %v", err) - return + return fmt.Errorf("failed to get latest task from blockchain: %w", err) } o.processedTasksMutex.Lock() + if o.lastProcessedTask != nil && bytes.Equal(latestTask.BatchMerkleRoot[:], o.lastProcessedTask.BatchMerkleRoot[:]) { - o.processedTasksMutex.Unlock() - return + return fmt.Errorf("latest task already processed") } o.processedTasksMutex.Unlock() err = o.processNewBatch(latestTask) if err != nil { - return + return fmt.Errorf("failed to process latest task: %w", err) } + + return nil } func (o *Operator) getLatestTaskFromBlockchain() (*servicemanager.ContractAlignedLayerServiceManagerNewBatch, error) { - //query := ethereum.FilterQuery{ - // Addresses: []ethcommon.Address{o.Config.AlignedLayerDeploymentConfig.AlignedLayerServiceManagerAddr}, - //} - //o.Config.BaseConfig.EthRpcClient.FilterLogs() - // TODO: Implement this - return nil, nil + latestBlock, err := o.Config.BaseConfig.EthRpcClient.BlockNumber(context.Background()) + + if err != nil { + return nil, fmt.Errorf("failed to get latest block number: %w", err) + } + + var fromBlock uint64 + + if latestBlock < blockInterval { + fromBlock = 0 + } else { + fromBlock = latestBlock - blockInterval + } + + query := ethereum.FilterQuery{ + FromBlock: big.NewInt(int64(fromBlock)), + ToBlock: big.NewInt(int64(latestBlock)), + Addresses: []ethcommon.Address{o.Config.AlignedLayerDeploymentConfig.AlignedLayerServiceManagerAddr}, + } + + logs, err := o.Config.BaseConfig.EthRpcClient.FilterLogs(context.Background(), query) + if err != nil { + log.Printf("Failed to get logs: %v", err) + return nil, err + } + + if len(logs) == 0 { + return nil, fmt.Errorf("no logs found") + } + + lastLog := logs[len(logs)-1] + + alignedLayerServiceManagerABI, err := abi.JSON(strings.NewReader(servicemanager.ContractAlignedLayerServiceManagerMetaData.ABI)) + if err != nil { + return nil, fmt.Errorf("failed to parse ABI: %w", err) + } + + o.Logger.Infof("Last Log: %v", lastLog) + + var latestTask servicemanager.ContractAlignedLayerServiceManagerNewBatch + err = alignedLayerServiceManagerABI.UnpackIntoInterface(&latestTask, "NewBatch", lastLog.Data) + if err != nil { + return nil, fmt.Errorf("failed to unpack log data: %w", err) + } + + return &latestTask, nil + } func (o *Operator) verify(verificationData VerificationData, results chan bool) {