Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor/465 cmd config rework #495

Merged
merged 14 commits into from
Oct 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import (

var (
DB *gorm.DB
GlobalCfg *config.Config
ClientCfg *config.ClientConfig
)

func setup() (*gorm.DB, *config.Config, int, string, error) {
argConfig, flagSet, svcPort, err := config.ParseArgs(os.Stderr, os.Args[1:])
func setup() (*gorm.DB, *config.ClientConfig, int, string, error) {
argConfig, flagSet, svcPort, err := config.ParseClientArgs(os.Stderr, os.Args[1:])
if err != nil {
if strings.Contains(err.Error(), "help requested") {
config.Log.Info("Please see valid flags above.")
Expand All @@ -45,7 +45,7 @@ func setup() (*gorm.DB, *config.Config, int, string, error) {
location = "./config.toml"
}

fileConfig, err := config.GetConfig(location)
fileConfig, err := config.GetClientConfig(location)
if err != nil {
if !strings.Contains(err.Error(), "no such file or directory") {
config.Log.Panicf("Error opening configuration file. Err: %v", err)
Expand All @@ -54,7 +54,7 @@ func setup() (*gorm.DB, *config.Config, int, string, error) {
}

// merge and validate configs
cfg := config.MergeConfigs(fileConfig, argConfig)
cfg := config.MergeClientConfigs(fileConfig, argConfig)
err = cfg.ValidateClientConfig()
if err != nil {
flagSet.PrintDefaults()
Expand Down Expand Up @@ -96,7 +96,7 @@ func main() {
}

DB = db
GlobalCfg = cfg
ClientCfg = cfg

// Have to keep this here so that import of docs subfolder (which contains proper init()) stays
docs.SwaggerInfo.Title = "Cosmos Tax CLI"
Expand Down Expand Up @@ -227,7 +227,7 @@ func GetTaxableEventsCSV(c *gin.Context) {
return
}

accountRows, headers, err := csv.ParseForAddress(addresses, startDate, endDate, DB, format, *GlobalCfg)
accountRows, headers, err := csv.ParseForAddress(addresses, startDate, endDate, DB, format)
if err != nil {
// the error returned here has already been pushed to the context... I think.
config.Log.Errorf("Error getting rows for addresses: %v", addresses)
Expand Down Expand Up @@ -275,7 +275,7 @@ func GetTaxableEventsJSON(c *gin.Context) {
return
}

accountRows, _, err := csv.ParseForAddress(addresses, startDate, endDate, DB, format, *GlobalCfg)
accountRows, _, err := csv.ParseForAddress(addresses, startDate, endDate, DB, format)
if err != nil {
// the error returned here has already been pushed to the context... I think.
config.Log.Errorf("Error getting rows for addresses: %v", addresses)
Expand Down
6 changes: 3 additions & 3 deletions cmd/block_enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (idxr *Indexer) enqueueBlocksToProcessByMsgType(blockChan chan int64, chain
JOIN messages ON messages.tx_id = txes.id
JOIN message_types ON message_types.id = messages.message_type_id
AND message_types.message_type = ?
WHERE height > ? AND height < ? AND blockchain_id = ?::int;
WHERE height >= ? AND height <= ? AND blockchain_id = ?::int;
`, msgType, startBlock, endBlock, chainID).Rows()
if err != nil {
config.Log.Fatalf("Error checking DB for blocks to reindex. Err: %v", err)
Expand Down Expand Up @@ -134,7 +134,7 @@ func (idxr *Indexer) enqueueBlocksToProcessFromBlockInputFile(blockChan chan int
// enqueueBlocksToProcess will pass the blocks that need to be processed to the blockchannel
func (idxr *Indexer) enqueueBlocksToProcess(blockChan chan int64, chainID uint) {
// Unless explicitly prevented, lets attempt to enqueue any failed blocks
if !idxr.cfg.Base.PreventReattempts {
if idxr.cfg.Base.ReattemptFailedBlocks {
idxr.enqueueFailedBlocks(blockChan, chainID)
}

Expand All @@ -160,7 +160,7 @@ func (idxr *Indexer) enqueueBlocksToProcess(blockChan chan int64, chainID uint)
if len(blockChan) <= cap(blockChan)/4 {
// This is the latest block height available on the Node.
var err error
latestBlock, err = rpc.GetLatestBlockHeightWithRetry(idxr.cl, idxr.cfg.Base.RPCRetryAttempts, idxr.cfg.Base.RPCRetryMaxWait)
latestBlock, err = rpc.GetLatestBlockHeightWithRetry(idxr.cl, idxr.cfg.Base.RequestRetryAttempts, idxr.cfg.Base.RequestRetryMaxWait)
if err != nil {
config.Log.Fatal("Error getting blockchain latest height. Err: %v", err)
}
Expand Down
115 changes: 75 additions & 40 deletions cmd/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,23 @@ import (
"gorm.io/gorm"
)

var reindexMsgType string
type Indexer struct {
cfg *config.IndexConfig
dryRun bool
db *gorm.DB
cl *client.ChainClient
scheduler *gocron.Scheduler
}

var indexer Indexer

func init() {
indexCmd.Flags().StringVar(&reindexMsgType, "re-index-message-type", "", "If specified, the indexer will reindex only the blocks containing the message type provided.")
indexer.cfg = &config.IndexConfig{}
config.SetupLogFlags(&indexer.cfg.Log, indexCmd)
config.SetupDatabaseFlags(&indexer.cfg.Database, indexCmd)
config.SetupLensFlags(&indexer.cfg.Lens, indexCmd)
config.SetupThrottlingFlag(&indexer.cfg.Base.Throttling, indexCmd)
config.SetupIndexSpecificFlags(indexer.cfg, indexCmd)

rootCmd.AddCommand(indexCmd)
}
Expand All @@ -37,78 +50,100 @@ var indexCmd = &cobra.Command{
Long: `Indexes the Cosmos-based blockchain according to the configurations found on the command line
or in the specified config file. Indexes taxable events into a database for easy querying. It is
highly recommended to keep this command running as a background service to keep your index up to date.`,
Run: index,
PreRunE: setupIndex,
Run: index,
}

// The Indexer struct is used to perform index operations
type Indexer struct {
cfg *config.Config
dryRun bool
db *gorm.DB
cl *client.ChainClient
scheduler *gocron.Scheduler
func setupIndex(cmd *cobra.Command, args []string) error {
bindFlags(cmd, viperConf)

err := indexer.cfg.Validate()
if err != nil {
return err
}

setupLogger(indexer.cfg.Log.Level, indexer.cfg.Log.Path, indexer.cfg.Log.Pretty)

// 0 is an invalid starting block, set it to 1
if indexer.cfg.Base.StartBlock == 0 {
indexer.cfg.Base.StartBlock = 1
}

db, err := connectToDBAndMigrate(indexer.cfg.Database)
if err != nil {
config.Log.Fatal("Could not establish connection to the database", err)
}

indexer.db = db

indexer.scheduler = gocron.NewScheduler(time.UTC)

// We should stop relying on the denom cache now that we are running this as a CLI tool only
dbTypes.CacheDenoms(db)
dbTypes.CacheIBCDenoms(db)

indexer.dryRun = indexer.cfg.Base.Dry

return nil
}

// The Indexer struct is used to perform index operations

func setupIndexer() *Indexer {
var idxr Indexer
var err error
idxr.cfg, idxr.dryRun, idxr.db, idxr.scheduler, err = setup(conf)
if err != nil {
log.Fatalf("Error during application setup. Err: %v", err)
}

// Setup chain specific stuff
core.SetupAddressRegex(idxr.cfg.Lens.AccountPrefix + "(valoper)?1[a-z0-9]{38}")
core.SetupAddressPrefix(idxr.cfg.Lens.AccountPrefix)
core.ChainSpecificMessageTypeHandlerBootstrap(idxr.cfg.Lens.ChainID)
core.ChainSpecificBeginBlockerEventTypeHandlerBootstrap(idxr.cfg.Lens.ChainID)
core.ChainSpecificEndBlockerEventTypeHandlerBootstrap(idxr.cfg.Lens.ChainID)
core.ChainSpecificEpochIdentifierEventTypeHandlersBootstrap(idxr.cfg.Lens.ChainID)
core.SetupAddressRegex(indexer.cfg.Lens.AccountPrefix + "(valoper)?1[a-z0-9]{38}")
core.SetupAddressPrefix(indexer.cfg.Lens.AccountPrefix)
core.ChainSpecificMessageTypeHandlerBootstrap(indexer.cfg.Lens.ChainID)
core.ChainSpecificBeginBlockerEventTypeHandlerBootstrap(indexer.cfg.Lens.ChainID)
core.ChainSpecificEndBlockerEventTypeHandlerBootstrap(indexer.cfg.Lens.ChainID)
core.ChainSpecificEpochIdentifierEventTypeHandlersBootstrap(indexer.cfg.Lens.ChainID)

config.SetChainConfig(idxr.cfg.Lens.AccountPrefix)
config.SetChainConfig(indexer.cfg.Lens.AccountPrefix)

// Setup scheduler to periodically update denoms
if idxr.cfg.Base.API != "" {
_, err = idxr.scheduler.Every(6).Hours().Do(tasks.IBCDenomUpsertTask, idxr.cfg.Base.API, idxr.db)
if indexer.cfg.Base.API != "" {
_, err = indexer.scheduler.Every(6).Hours().Do(tasks.IBCDenomUpsertTask, indexer.cfg.Base.API, indexer.db)
if err != nil {
config.Log.Error("Error scheduling ibc denom upsert task. Err: ", err)
}

idxr.scheduler.StartAsync()
indexer.scheduler.StartAsync()
}

// Some chains do not have the denom metadata URL available on chain, so we do chain specific downloads instead.
tasks.DoChainSpecificUpsertDenoms(idxr.db, idxr.cfg.Lens.ChainID, idxr.cfg.Base.RPCRetryAttempts, idxr.cfg.Base.RPCRetryMaxWait)
idxr.cl = config.GetLensClient(idxr.cfg.Lens)
tasks.DoChainSpecificUpsertDenoms(indexer.db, indexer.cfg.Lens.ChainID, indexer.cfg.Base.RequestRetryAttempts, indexer.cfg.Base.RequestRetryMaxWait)
indexer.cl = config.GetLensClient(indexer.cfg.Lens)

// Depending on the app configuration, wait for the chain to catch up
chainCatchingUp, err := rpc.IsCatchingUp(idxr.cl)
for idxr.cfg.Base.WaitForChain && chainCatchingUp && err == nil {
chainCatchingUp, err := rpc.IsCatchingUp(indexer.cl)
for indexer.cfg.Base.WaitForChain && chainCatchingUp && err == nil {
// Wait between status checks, don't spam the node with requests
config.Log.Debug("Chain is still catching up, please wait or disable check in config.")
time.Sleep(time.Second * time.Duration(idxr.cfg.Base.WaitForChainDelay))
chainCatchingUp, err = rpc.IsCatchingUp(idxr.cl)
time.Sleep(time.Second * time.Duration(indexer.cfg.Base.WaitForChainDelay))
chainCatchingUp, err = rpc.IsCatchingUp(indexer.cl)

// This EOF error pops up from time to time and is unpredictable
// It is most likely an error on the node, we would need to see any error logs on the node side
// Try one more time
if err != nil && strings.HasSuffix(err.Error(), "EOF") {
time.Sleep(time.Second * time.Duration(idxr.cfg.Base.WaitForChainDelay))
chainCatchingUp, err = rpc.IsCatchingUp(idxr.cl)
time.Sleep(time.Second * time.Duration(indexer.cfg.Base.WaitForChainDelay))
chainCatchingUp, err = rpc.IsCatchingUp(indexer.cl)
}
}
if err != nil {
config.Log.Fatal("Error querying chain status.", err)
}

if idxr.cfg.Lens.ChainID == osmosis.ChainID && idxr.cfg.Base.EpochEventIndexingEnabled {
err := osmosis.SetupOsmosisEpochIndexer(idxr.cl, idxr.cfg.Base.EpochIndexingIdentifier)
if indexer.cfg.Lens.ChainID == osmosis.ChainID && indexer.cfg.Base.EpochEventIndexingEnabled {
err := osmosis.SetupOsmosisEpochIndexer(indexer.cl, indexer.cfg.Base.EpochIndexingIdentifier)
if err != nil {
config.Log.Fatal("Error setting up Osmosis Epoch Indexer.", err)
}
}

return &idxr
return &indexer
}

func index(cmd *cobra.Command, args []string) {
Expand Down Expand Up @@ -194,8 +229,8 @@ func index(cmd *cobra.Command, args []string) {
// Add jobs to the queue to be processed
if idxr.cfg.Base.ChainIndexingEnabled {
switch {
case reindexMsgType != "":
idxr.enqueueBlocksToProcessByMsgType(blockChan, dbChainID, reindexMsgType)
case idxr.cfg.Base.ReindexMessageType != "":
idxr.enqueueBlocksToProcessByMsgType(blockChan, dbChainID, idxr.cfg.Base.ReindexMessageType)
case idxr.cfg.Base.BlockInputFile != "":
idxr.enqueueBlocksToProcessFromBlockInputFile(blockChan, idxr.cfg.Base.BlockInputFile)
default:
Expand Down Expand Up @@ -411,7 +446,7 @@ func (idxr *Indexer) indexBlockEvents(wg *sync.WaitGroup, failedBlockHandler cor
currentHeight := startHeight

for endHeight == -1 || currentHeight <= endHeight {
bresults, err := rpc.GetBlockResultWithRetry(rpcClient, currentHeight, idxr.cfg.Base.RPCRetryAttempts, idxr.cfg.Base.RPCRetryMaxWait)
bresults, err := rpc.GetBlockResultWithRetry(rpcClient, currentHeight, idxr.cfg.Base.RequestRetryAttempts, idxr.cfg.Base.RequestRetryMaxWait)
if err != nil {
config.Log.Error(fmt.Sprintf("Error receiving block result for block %d", currentHeight), err)
failedBlockHandler(currentHeight, core.FailedBlockEventHandling, err)
Expand Down Expand Up @@ -532,7 +567,7 @@ func (idxr *Indexer) indexEpochEvents(wg *sync.WaitGroup, failedBlockHandler cor
for _, epoch := range epochsBetween {
config.Log.Infof("Indexing epoch events for epoch %v at height %d", epoch.EpochNumber, epoch.StartHeight)

bresults, err := rpc.GetBlockResultWithRetry(rpcClient, int64(epoch.StartHeight), idxr.cfg.Base.RPCRetryAttempts, idxr.cfg.Base.RPCRetryMaxWait)
bresults, err := rpc.GetBlockResultWithRetry(rpcClient, int64(epoch.StartHeight), idxr.cfg.Base.RequestRetryAttempts, idxr.cfg.Base.RequestRetryMaxWait)
if err != nil {
config.Log.Error(fmt.Sprintf("Error receiving block result for block %d", epoch.StartHeight), err)
failedBlockHandler(int64(epoch.StartHeight), core.FailedBlockEventHandling, err)
Expand Down
Loading