diff --git a/cmd/peggo/orchestrator.go b/cmd/peggo/orchestrator.go index 38f1c3be..46890893 100644 --- a/cmd/peggo/orchestrator.go +++ b/cmd/peggo/orchestrator.go @@ -220,6 +220,14 @@ func orchestratorCmd(cmd *cli.Cmd) { peggyAddress := ethcmn.HexToAddress(peggyParams.BridgeEthereumAddress) injAddress := ethcmn.HexToAddress(peggyParams.CosmosCoinErc20Contract) + // Check if the provided ETH address belongs to a validator + isValidator, err := isValidatorAddress(cosmosQueryClient, ethKeyFromAddress) + if err != nil { + log.WithError(err).Fatalln("failed to query the current validator set from injective") + + return + } + erc20ContractMapping := make(map[ethcmn.Address]string) erc20ContractMapping[injAddress] = ctypes.InjectiveCoin @@ -269,7 +277,7 @@ func orchestratorCmd(cmd *cli.Cmd) { ) go func() { - if err := svc.Start(ctx); err != nil { + if err := svc.Start(ctx, isValidator); err != nil { log.Errorln(err) // signal there that the app failed @@ -280,3 +288,22 @@ func orchestratorCmd(cmd *cli.Cmd) { closer.Hold() } } + +func isValidatorAddress(peggyQuery cosmos.PeggyQueryClient, addr ethcmn.Address) (bool, error) { + ctx, cancelFn := context.WithTimeout(context.Background(), time.Second*30) + defer cancelFn() + + currentValset, err := peggyQuery.CurrentValset(ctx) + if err != nil { + return false, err + } + + var isValidator bool + for _, validator := range currentValset.Members { + if ethcmn.HexToAddress(validator.EthereumAddress) == addr { + isValidator = true + } + } + + return isValidator, nil +} diff --git a/orchestrator/cosmos/broadcast.go b/orchestrator/cosmos/broadcast.go index 3b1879bd..b998af99 100644 --- a/orchestrator/cosmos/broadcast.go +++ b/orchestrator/cosmos/broadcast.go @@ -14,6 +14,7 @@ import ( chainclient "github.com/InjectiveLabs/sdk-go/client/chain" "github.com/InjectiveLabs/metrics" + "github.com/InjectiveLabs/peggo/orchestrator/ethereum/keystore" "github.com/InjectiveLabs/peggo/orchestrator/ethereum/peggy" @@ -55,6 +56,7 @@ type PeggyBroadcastClient interface { oldDeposits []*wrappers.PeggySendToCosmosEvent, deposits []*wrappers.PeggySendToInjectiveEvent, withdraws []*wrappers.PeggyTransactionBatchExecutedEvent, + erc20Deployed []*wrappers.PeggyERC20DeployedEvent, valsetUpdates []*wrappers.PeggyValsetUpdatedEvent, ) error @@ -274,7 +276,7 @@ func (s *peggyBroadcastClient) sendOldDepositClaims( log.WithFields(log.Fields{ "event_nonce": oldDeposit.EventNonce.String(), "txHash": txResponse.TxResponse.TxHash, - }).Infoln("Oracle sent old deposit event succesfully") + }).Infoln("Oracle sent old deposit event successfully") } return nil @@ -320,7 +322,7 @@ func (s *peggyBroadcastClient) sendDepositClaims( log.WithFields(log.Fields{ "event_nonce": deposit.EventNonce.String(), "txHash": txResponse.TxResponse.TxHash, - }).Infoln("Oracle sent deposit event succesfully") + }).Infoln("Oracle sent deposit event successfully") } return nil @@ -358,7 +360,7 @@ func (s *peggyBroadcastClient) sendWithdrawClaims( log.WithFields(log.Fields{ "event_nonce": withdraw.EventNonce.String(), "txHash": txResponse.TxResponse.TxHash, - }).Infoln("Oracle sent Withdraw event succesfully") + }).Infoln("Oracle sent Withdraw event successfully") } return nil @@ -407,7 +409,49 @@ func (s *peggyBroadcastClient) sendValsetUpdateClaims( log.WithFields(log.Fields{ "event_nonce": valsetUpdate.EventNonce.String(), "txHash": txResponse.TxResponse.TxHash, - }).Infoln("Oracle sent ValsetUpdate event succesfully") + }).Infoln("Oracle sent ValsetUpdate event successfully") + } + + return nil +} + +func (s *peggyBroadcastClient) sendErc20DeployedClaims( + ctx context.Context, + erc20Deployed *wrappers.PeggyERC20DeployedEvent, +) error { + metrics.ReportFuncCall(s.svcTags) + doneFn := metrics.ReportFuncTiming(s.svcTags) + defer doneFn() + + log.WithFields(log.Fields{ + "EventNonce": erc20Deployed.EventNonce.Uint64(), + "CosmosDenom": erc20Deployed.CosmosDenom, + "TokenContract": erc20Deployed.TokenContract.Hex(), + "Name": erc20Deployed.Name, + "Symbol": erc20Deployed.Symbol, + "Decimals": erc20Deployed.Decimals, + }).Infoln("Oracle observed a erc20Deployed event. Sending MsgERC20DeployedClaim") + + msg := &types.MsgERC20DeployedClaim{ + EventNonce: erc20Deployed.EventNonce.Uint64(), + BlockHeight: erc20Deployed.Raw.BlockNumber, + CosmosDenom: erc20Deployed.CosmosDenom, + TokenContract: erc20Deployed.TokenContract.Hex(), + Name: erc20Deployed.Name, + Symbol: erc20Deployed.Symbol, + Decimals: uint64(erc20Deployed.Decimals), + Orchestrator: s.AccFromAddress().String(), + } + + if txResponse, err := s.broadcastClient.SyncBroadcastMsg(msg); err != nil { + metrics.ReportFuncError(s.svcTags) + log.WithError(err).Errorln("broadcasting MsgERC20DeployedClaim failed") + return err + } else { + log.WithFields(log.Fields{ + "event_nonce": erc20Deployed.EventNonce.String(), + "txHash": txResponse.TxResponse.TxHash, + }).Infoln("Oracle sent ERC20DeployedEvent event successfully") } return nil @@ -419,14 +463,15 @@ func (s *peggyBroadcastClient) SendEthereumClaims( oldDeposits []*wrappers.PeggySendToCosmosEvent, deposits []*wrappers.PeggySendToInjectiveEvent, withdraws []*wrappers.PeggyTransactionBatchExecutedEvent, + erc20Deployed []*wrappers.PeggyERC20DeployedEvent, valsetUpdates []*wrappers.PeggyValsetUpdatedEvent, ) error { metrics.ReportFuncCall(s.svcTags) doneFn := metrics.ReportFuncTiming(s.svcTags) defer doneFn() - totalClaimEvents := len(oldDeposits) + len(deposits) + len(withdraws) + len(valsetUpdates) - var count, h, i, j, k int + totalClaimEvents := len(oldDeposits) + len(deposits) + len(withdraws) + len(erc20Deployed) + len(valsetUpdates) + var count, h, i, j, k, l int // Individual arrays (oldDeposits, deposits, withdraws, valsetUpdates) are sorted. // Broadcast claim events sequentially starting with eventNonce = lastClaimEvent + 1. @@ -464,6 +509,14 @@ func (s *peggyBroadcastClient) SendEthereumClaims( return err } k++ + } else if l < len(erc20Deployed) && erc20Deployed[l].EventNonce.Uint64() == lastClaimEvent+1 { + // send erc20 deployed claim + if err := s.sendErc20DeployedClaims(ctx, erc20Deployed[l]); err != nil { + metrics.ReportFuncError(s.svcTags) + log.WithError(err).Errorln("broadcasting MsgERC20DeployedClaim failed") + return err + } + l++ } count = count + 1 lastClaimEvent = lastClaimEvent + 1 diff --git a/orchestrator/eth_event_watcher.go b/orchestrator/eth_event_watcher.go index 33536352..8be16582 100644 --- a/orchestrator/eth_event_watcher.go +++ b/orchestrator/eth_event_watcher.go @@ -9,6 +9,7 @@ import ( log "github.com/xlab/suplog" "github.com/InjectiveLabs/metrics" + wrappers "github.com/InjectiveLabs/peggo/solidity/wrappers/Peggy.sol" ) @@ -157,6 +158,39 @@ func (s *peggyOrchestrator) CheckForEvents( "Withdraws": transactionBatchExecutedEvents, }).Debugln("Scanned TransactionBatchExecuted events from Ethereum") + var erc20DeployedEvents []*wrappers.PeggyERC20DeployedEvent + { + iter, err := peggyFilterer.FilterERC20DeployedEvent(&bind.FilterOpts{ + Start: startingBlock, + End: ¤tBlock, + }, nil) + if err != nil { + metrics.ReportFuncError(s.svcTags) + log.WithFields(log.Fields{ + "start": startingBlock, + "end": currentBlock, + }).Errorln("failed to scan past FilterERC20Deployed events from Ethereum") + + if !isUnknownBlockErr(err) { + err = errors.Wrap(err, "failed to scan past FilterERC20Deployed events from Ethereum") + return 0, err + } else if iter == nil { + return 0, errors.New("no iterator returned") + } + } + + for iter.Next() { + erc20DeployedEvents = append(erc20DeployedEvents, iter.Event) + } + + iter.Close() + } + log.WithFields(log.Fields{ + "start": startingBlock, + "end": currentBlock, + "erc20Deployed": erc20DeployedEvents, + }).Debugln("Scanned FilterERC20Deployed events from Ethereum") + var valsetUpdatedEvents []*wrappers.PeggyValsetUpdatedEvent { iter, err := peggyFilterer.FilterValsetUpdatedEvent(&bind.FilterOpts{ @@ -191,7 +225,7 @@ func (s *peggyOrchestrator) CheckForEvents( "valsetUpdates": valsetUpdatedEvents, }).Debugln("Scanned ValsetUpdatedEvents events from Ethereum") - // note that starting block overlaps with our last che cked block, because we have to deal with + // note that starting block overlaps with our last checked block, because we have to deal with // the possibility that the relayer was killed after relaying only one of multiple events in a single // block, so we also need this routine so make sure we don't send in the first event in this hypothetical // multi event block again. In theory we only send all events for every block and that will pass of fail @@ -206,11 +240,12 @@ func (s *peggyOrchestrator) CheckForEvents( oldDeposits := filterSendToCosmosEventsByNonce(sendToCosmosEvents, lastClaimEvent.EthereumEventNonce) deposits := filterSendToInjectiveEventsByNonce(sendToInjectiveEvents, lastClaimEvent.EthereumEventNonce) withdraws := filterTransactionBatchExecutedEventsByNonce(transactionBatchExecutedEvents, lastClaimEvent.EthereumEventNonce) + erc20Deployments := filterERC20DeployedEventsByNonce(erc20DeployedEvents, lastClaimEvent.EthereumEventNonce) valsetUpdates := filterValsetUpdateEventsByNonce(valsetUpdatedEvents, lastClaimEvent.EthereumEventNonce) - if len(oldDeposits) > 0 || len(deposits) > 0 || len(withdraws) > 0 || len(valsetUpdates) > 0 { + if len(oldDeposits) > 0 || len(deposits) > 0 || len(withdraws) > 0 || len(erc20Deployments) > 0 || len(valsetUpdates) > 0 { // todo get eth chain id from the chain - if err := s.peggyBroadcastClient.SendEthereumClaims(ctx, lastClaimEvent.EthereumEventNonce, oldDeposits, deposits, withdraws, valsetUpdates); err != nil { + if err := s.peggyBroadcastClient.SendEthereumClaims(ctx, lastClaimEvent.EthereumEventNonce, oldDeposits, deposits, withdraws, erc20Deployments, valsetUpdates); err != nil { metrics.ReportFuncError(s.svcTags) err = errors.Wrap(err, "failed to send ethereum claims to Cosmos chain") return 0, err @@ -265,6 +300,21 @@ func filterTransactionBatchExecutedEventsByNonce( return res } +func filterERC20DeployedEventsByNonce( + events []*wrappers.PeggyERC20DeployedEvent, + nonce uint64, +) []*wrappers.PeggyERC20DeployedEvent { + res := make([]*wrappers.PeggyERC20DeployedEvent, 0, len(events)) + + for _, ev := range events { + if ev.EventNonce.Uint64() > nonce { + res = append(res, ev) + } + } + + return res +} + func filterValsetUpdateEventsByNonce( events []*wrappers.PeggyValsetUpdatedEvent, nonce uint64, diff --git a/orchestrator/main_loops.go b/orchestrator/main_loops.go index 950bd916..a05dc99c 100644 --- a/orchestrator/main_loops.go +++ b/orchestrator/main_loops.go @@ -25,23 +25,14 @@ const defaultLoopDur = 60 * time.Second // Start combines the all major roles required to make // up the Orchestrator, all of these are async loops. -func (s *peggyOrchestrator) Start(ctx context.Context) error { - var pg loops.ParanoidGroup - - pg.Go(func() error { - return s.EthOracleMainLoop(ctx) - }) - pg.Go(func() error { - return s.BatchRequesterLoop(ctx) - }) - pg.Go(func() error { - return s.EthSignerMainLoop(ctx) - }) - pg.Go(func() error { - return s.RelayerMainLoop(ctx) - }) +func (s *peggyOrchestrator) Start(ctx context.Context, validatorMode bool) error { + if !validatorMode { + log.Infoln("Starting peggo in relayer (non-validator) mode") + return s.startRelayerMode(ctx) + } - return pg.Wait() + log.Infoln("Starting peggo in validator mode") + return s.startValidatorMode(ctx) } // EthOracleMainLoop is responsible for making sure that Ethereum events are retrieved from the Ethereum blockchain @@ -395,3 +386,41 @@ func calculateTotalValsetPower(valset *types.Valset) *big.Int { return totalValsetPower } + +// startValidatorMode runs all orchestrator processes. This is called +// when peggo is run alongside a validator injective node. +func (s *peggyOrchestrator) startValidatorMode(ctx context.Context) error { + var pg loops.ParanoidGroup + + pg.Go(func() error { + return s.EthOracleMainLoop(ctx) + }) + pg.Go(func() error { + return s.BatchRequesterLoop(ctx) + }) + pg.Go(func() error { + return s.EthSignerMainLoop(ctx) + }) + pg.Go(func() error { + return s.RelayerMainLoop(ctx) + }) + + return pg.Wait() +} + +// startRelayerMode runs orchestrator processes that only relay specific +// messages that do not require a validator's signature. This mode is run +// alongside a non-validator injective node +func (s *peggyOrchestrator) startRelayerMode(ctx context.Context) error { + var pg loops.ParanoidGroup + + pg.Go(func() error { + return s.BatchRequesterLoop(ctx) + }) + + pg.Go(func() error { + return s.RelayerMainLoop(ctx) + }) + + return pg.Wait() +} diff --git a/orchestrator/orchestrator.go b/orchestrator/orchestrator.go index 6ef8eded..9e466364 100644 --- a/orchestrator/orchestrator.go +++ b/orchestrator/orchestrator.go @@ -17,7 +17,7 @@ import ( ) type PeggyOrchestrator interface { - Start(ctx context.Context) error + Start(ctx context.Context, validatorMode bool) error CheckForEvents(ctx context.Context, startingBlock uint64) (currentBlock uint64, err error) GetLastCheckedBlock(ctx context.Context) (uint64, error)