Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(zetaclient): remove redundant getters/setters #3268

Merged
merged 6 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 31 additions & 42 deletions cmd/zetaclientd/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,19 @@ import (
"strings"

"cosmossdk.io/errors"
"github.com/btcsuite/btcd/rpcclient"
sdk "github.com/cosmos/cosmos-sdk/types"
ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/onrik/ethrpc"
"github.com/rs/zerolog"
"github.com/spf13/cobra"

"github.com/zeta-chain/node/pkg/coin"
"github.com/zeta-chain/node/testutil/sample"
"github.com/zeta-chain/node/zetaclient/chains/base"
btcobserver "github.com/zeta-chain/node/zetaclient/chains/bitcoin/observer"
evmobserver "github.com/zeta-chain/node/zetaclient/chains/evm/observer"
"github.com/zeta-chain/node/zetaclient/config"
zctx "github.com/zeta-chain/node/zetaclient/context"
"github.com/zeta-chain/node/zetaclient/db"
"github.com/zeta-chain/node/zetaclient/keys"
"github.com/zeta-chain/node/zetaclient/orchestrator"
"github.com/zeta-chain/node/zetaclient/zetacore"
Expand Down Expand Up @@ -87,27 +86,26 @@ func InboundGetBallot(_ *cobra.Command, args []string) error {
return err
}

chainProto := chain.RawChain()
baseLogger := base.Logger{Std: zerolog.Nop(), Compliance: zerolog.Nop()}

observers, err := orchestrator.CreateChainObserverMap(ctx, client, nil, db.SqliteInMemory, baseLogger, nil)
if err != nil {
return errors.Wrap(err, "failed to create chain observer map")
}
swift1337 marked this conversation as resolved.
Show resolved Hide resolved

// get ballot identifier according to the chain type
if chain.IsEVM() {
evmObserver := evmobserver.Observer{}
evmObserver.WithZetacoreClient(client)
var ethRPC *ethrpc.EthRPC
var client *ethclient.Client
coinType := coin.CoinType_Cmd
for chainIDFromConfig, evmConfig := range cfg.GetAllEVMConfigs() {
if chainIDFromConfig == chainID {
ethRPC = ethrpc.NewEthRPC(evmConfig.Endpoint)
client, err = ethclient.Dial(evmConfig.Endpoint)
if err != nil {
return err
}
evmObserver.WithEvmClient(client)
evmObserver.WithEvmJSONRPC(ethRPC)
evmObserver.WithChain(*chainProto)
}
observer, ok := observers[chainID]
if !ok {
return fmt.Errorf("observer not found for evm chain %d", chain.ID())
}

evmObserver, ok := observer.(*evmobserver.Observer)
if !ok {
return fmt.Errorf("observer is not evm observer for chain %d", chain.ID())
}

coinType := coin.CoinType_Cmd
hash := ethcommon.HexToHash(inboundHash)
tx, isPending, err := evmObserver.TransactionByHash(inboundHash)
if err != nil {
Expand All @@ -118,7 +116,7 @@ func InboundGetBallot(_ *cobra.Command, args []string) error {
return fmt.Errorf("tx is still pending")
}

receipt, err := client.TransactionReceipt(context.Background(), hash)
receipt, err := evmObserver.TransactionReceipt(ctx, hash)
swift1337 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("tx receipt not found on chain %s, %d", err.Error(), chain.ID())
}
Expand Down Expand Up @@ -158,33 +156,23 @@ func InboundGetBallot(_ *cobra.Command, args []string) error {
}
fmt.Println("CoinType : ", coinType)
} else if chain.IsBitcoin() {
btcObserver := btcobserver.Observer{}
btcObserver.WithZetacoreClient(client)
btcObserver.WithChain(*chainProto)
btcConfig, found := cfg.GetBTCConfig(chainID)
if !found {
return fmt.Errorf("unable to find config for BTC chain %d", chainID)
}
connCfg := &rpcclient.ConnConfig{
Host: btcConfig.RPCHost,
User: btcConfig.RPCUsername,
Pass: btcConfig.RPCPassword,
HTTPPostMode: true,
DisableTLS: true,
Params: btcConfig.RPCParams,
observer, ok := observers[chainID]
if !ok {
return fmt.Errorf("observer not found for btc chain %d", chainID)
}

btcClient, err := rpcclient.New(connCfg, nil)
if err != nil {
return err
btcObserver, ok := observer.(*btcobserver.Observer)
if !ok {
return fmt.Errorf("observer is not btc observer for chain %d", chainID)
}
btcObserver.WithBtcClient(btcClient)

ballotIdentifier, err = btcObserver.CheckReceiptForBtcTxHash(ctx, inboundHash, false)
if err != nil {
return err
}
}
fmt.Println("BallotIdentifier : ", ballotIdentifier)

fmt.Println("BallotIdentifier: ", ballotIdentifier)
swift1337 marked this conversation as resolved.
Show resolved Hide resolved

// query ballot
ballot, err := client.GetBallot(ctx, ballotIdentifier)
Expand All @@ -193,9 +181,10 @@ func InboundGetBallot(_ *cobra.Command, args []string) error {
}

for _, vote := range ballot.Voters {
fmt.Printf("%s : %s \n", vote.VoterAddress, vote.VoteType)
fmt.Printf("%s: %s\n", vote.VoterAddress, vote.VoteType)
}
fmt.Println("BallotStatus : ", ballot.BallotStatus)

swift1337 marked this conversation as resolved.
Show resolved Hide resolved
fmt.Println("BallotStatus: ", ballot.BallotStatus)

return nil
}
2 changes: 1 addition & 1 deletion cmd/zetaclientd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func Start(_ *cobra.Command, _ []string) error {

// CreateSignerMap: This creates a map of all signers for each chain.
// Each signer is responsible for signing transactions for a particular chain
signerMap, err := orchestrator.CreateSignerMap(ctx, tss, logger, telemetryServer)
signerMap, err := orchestrator.CreateSignerMap(ctx, tss, logger)
if err != nil {
log.Error().Err(err).Msg("Unable to create signer map")
return err
Expand Down
120 changes: 30 additions & 90 deletions zetaclient/chains/base/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ const (
// DefaultBlockCacheSize is the default number of blocks that the observer will keep in cache for performance (without RPC calls)
// Cached blocks can be used to get block information and verify transactions
DefaultBlockCacheSize = 1000

// DefaultHeaderCacheSize is the default number of headers that the observer will keep in cache for performance (without RPC calls)
// Cached headers can be used to get header information
DefaultHeaderCacheSize = 1000
)

// Observer is the base structure for chain observers, grouping the common logic for each chain observer client.
Expand Down Expand Up @@ -64,12 +60,8 @@ type Observer struct {
// rpcAlertLatency is the threshold of RPC latency to trigger an alert
rpcAlertLatency time.Duration

// blockCache is the cache for blocks
blockCache *lru.Cache

// headerCache is the cache for headers
headerCache *lru.Cache

// db is the database to persist data
db *db.DB

Expand All @@ -95,13 +87,17 @@ func NewObserver(
zetacoreClient interfaces.ZetacoreClient,
tss interfaces.TSSSigner,
blockCacheSize int,
headerCacheSize int,
rpcAlertLatency int64,
ts *metrics.TelemetryServer,
database *db.DB,
logger Logger,
) (*Observer, error) {
ob := Observer{
blockCache, err := lru.New(blockCacheSize)
if err != nil {
return nil, errors.Wrap(err, "error creating block cache")
}

return &Observer{
chain: chain,
chainParams: chainParams,
zetacoreClient: zetacoreClient,
Expand All @@ -112,27 +108,11 @@ func NewObserver(
rpcAlertLatency: time.Duration(rpcAlertLatency) * time.Second,
ts: ts,
db: database,
blockCache: blockCache,
mu: &sync.Mutex{},
logger: newObserverLogger(chain, logger),
stop: make(chan struct{}),
}

// setup loggers
ob.WithLogger(logger)

// create block cache
var err error
ob.blockCache, err = lru.New(blockCacheSize)
if err != nil {
return nil, errors.Wrap(err, "error creating block cache")
}

// create header cache
ob.headerCache, err = lru.New(headerCacheSize)
if err != nil {
return nil, errors.Wrap(err, "error creating header cache")
}

return &ob, nil
}, nil
}

// Start starts the observer. Returns false if it's already started (noop).
Expand Down Expand Up @@ -178,12 +158,6 @@ func (ob *Observer) Chain() chains.Chain {
return ob.chain
}

// WithChain attaches a new chain to the observer.
func (ob *Observer) WithChain(chain chains.Chain) *Observer {
ob.chain = chain
return ob
}

// ChainParams returns the chain params for the observer.
func (ob *Observer) ChainParams() observertypes.ChainParams {
ob.mu.Lock()
Expand All @@ -205,23 +179,11 @@ func (ob *Observer) ZetacoreClient() interfaces.ZetacoreClient {
return ob.zetacoreClient
}

// WithZetacoreClient attaches a new zetacore client to the observer.
func (ob *Observer) WithZetacoreClient(client interfaces.ZetacoreClient) *Observer {
ob.zetacoreClient = client
return ob
}

// TSS returns the tss signer for the observer.
func (ob *Observer) TSS() interfaces.TSSSigner {
return ob.tss
}

// WithTSS attaches a new tss signer to the observer.
func (ob *Observer) WithTSS(tss interfaces.TSSSigner) *Observer {
ob.tss = tss
return ob
}

// TSSAddressString returns the TSS address for the chain.
//
// Note: all chains uses TSS EVM address except Bitcoin chain.
Expand Down Expand Up @@ -287,23 +249,6 @@ func (ob *Observer) BlockCache() *lru.Cache {
return ob.blockCache
}

// WithBlockCache attaches a new block cache to the observer.
func (ob *Observer) WithBlockCache(cache *lru.Cache) *Observer {
ob.blockCache = cache
return ob
}

// HeaderCache returns the header cache for the observer.
func (ob *Observer) HeaderCache() *lru.Cache {
return ob.headerCache
}

// WithHeaderCache attaches a new header cache to the observer.
func (ob *Observer) WithHeaderCache(cache *lru.Cache) *Observer {
ob.headerCache = cache
return ob
}

// OutboundID returns a unique identifier for the outbound transaction.
// The identifier is now used as the key for maps that store outbound related data (e.g. transaction, receipt, etc).
func (ob *Observer) OutboundID(nonce uint64) string {
Expand All @@ -316,12 +261,6 @@ func (ob *Observer) DB() *db.DB {
return ob.db
}

// WithTelemetryServer attaches a new telemetry server to the observer.
func (ob *Observer) WithTelemetryServer(ts *metrics.TelemetryServer) *Observer {
ob.ts = ts
return ob
}

// TelemetryServer returns the telemetry server for the observer.
func (ob *Observer) TelemetryServer() *metrics.TelemetryServer {
return ob.ts
Expand All @@ -332,26 +271,6 @@ func (ob *Observer) Logger() *ObserverLogger {
return &ob.logger
}

// WithLogger attaches a new logger to the observer.
func (ob *Observer) WithLogger(logger Logger) *Observer {
chainLogger := logger.Std.
With().
Int64(logs.FieldChain, ob.chain.ChainId).
Str(logs.FieldChainNetwork, ob.chain.Network.String()).
Logger()

ob.logger = ObserverLogger{
Chain: chainLogger,
Inbound: chainLogger.With().Str(logs.FieldModule, logs.ModNameInbound).Logger(),
Outbound: chainLogger.With().Str(logs.FieldModule, logs.ModNameOutbound).Logger(),
GasPrice: chainLogger.With().Str(logs.FieldModule, logs.ModNameGasPrice).Logger(),
Headers: chainLogger.With().Str(logs.FieldModule, logs.ModNameHeaders).Logger(),
Compliance: logger.Compliance,
}

return ob
}

// Mu returns the mutex for the observer.
func (ob *Observer) Mu() *sync.Mutex {
return ob.mu
Expand Down Expand Up @@ -544,3 +463,24 @@ func EnvVarLatestBlockByChain(chain chains.Chain) string {
func EnvVarLatestTxByChain(chain chains.Chain) string {
return fmt.Sprintf("CHAIN_%d_SCAN_FROM_TX", chain.ChainId)
}

func newObserverLogger(chain chains.Chain, logger Logger) ObserverLogger {
withLogFields := func(l zerolog.Logger) zerolog.Logger {
return l.With().
Int64(logs.FieldChain, chain.ChainId).
Str(logs.FieldChainNetwork, chain.Network.String()).
Logger()
}

log := withLogFields(logger.Std)
complianceLog := withLogFields(logger.Compliance)

return ObserverLogger{
Chain: log,
Inbound: log.With().Str(logs.FieldModule, logs.ModNameInbound).Logger(),
Outbound: log.With().Str(logs.FieldModule, logs.ModNameOutbound).Logger(),
GasPrice: log.With().Str(logs.FieldModule, logs.ModNameGasPrice).Logger(),
Headers: log.With().Str(logs.FieldModule, logs.ModNameHeaders).Logger(),
Compliance: complianceLog,
}
}
Loading
Loading