From 01bbce6a7b74d5da220ea3873d9f0325ee578591 Mon Sep 17 00:00:00 2001 From: robcxyz Date: Mon, 24 Jul 2023 02:04:44 +0700 Subject: [PATCH] chore: refactor routines and add order to routine address pull --- src/crud/crud_base_select.go | 17 +++++ src/routines/count_addresses.go | 10 ++- src/routines/cron.go | 41 ++++++++++ src/routines/routines.go | 87 +--------------------- src/routines/token_address_balance_test.go | 4 +- src/routines/token_address_tx_counts.go | 42 +++++++++++ 6 files changed, 114 insertions(+), 87 deletions(-) create mode 100644 src/routines/cron.go create mode 100644 src/routines/token_address_tx_counts.go diff --git a/src/crud/crud_base_select.go b/src/crud/crud_base_select.go index ff8bf5a..035e4be 100644 --- a/src/crud/crud_base_select.go +++ b/src/crud/crud_base_select.go @@ -97,3 +97,20 @@ func (m *Crud[Model, ModelOrm]) SelectManyContractCreations( return transactions, db.Error } + +func (m *Crud[Model, ModelOrm]) SelectBatchOrder( + limit int, + skip int, + order string, +) (*[]Model, error) { + db := m.db + db = db.Model(&m.Model) + db = db.Limit(limit) + if skip != 0 { + db = db.Offset(skip) + } + db = db.Order(order) + output := &[]Model{} + db = db.Find(output) + return output, db.Error +} diff --git a/src/routines/count_addresses.go b/src/routines/count_addresses.go index 9f59965..4a9c6e3 100644 --- a/src/routines/count_addresses.go +++ b/src/routines/count_addresses.go @@ -36,11 +36,15 @@ func countAddressesToRedisRoutine() { return } - redis.GetRedisClient().SetCount(config.Config.RedisKeyPrefix+"address_count", countAll) - redis.GetRedisClient().SetCount(config.Config.RedisKeyPrefix+"address_contract_count", countContracts) + err = redis.GetRedisClient().SetCount(config.Config.RedisKeyPrefix+"address_count", countAll) + err = redis.GetRedisClient().SetCount(config.Config.RedisKeyPrefix+"address_contract_count", countContracts) for address, count := range countTokenAddresses { - redis.GetRedisClient().SetCount(config.Config.RedisKeyPrefix+"token_address_count_by_token_contract_"+address, count) + err = redis.GetRedisClient().SetCount(config.Config.RedisKeyPrefix+"token_address_count_by_token_contract_"+address, count) + if err != nil { + // Try again + zap.S().Warn("Routine=AddressCount Redis - ERROR: ", err.Error()) + } } zap.S().Info("Routine=AddressCount - Completed routine...") diff --git a/src/routines/cron.go b/src/routines/cron.go new file mode 100644 index 0000000..9ad7594 --- /dev/null +++ b/src/routines/cron.go @@ -0,0 +1,41 @@ +package routines + +import ( + "github.com/sudoblockio/icon-transformer/config" + "go.uber.org/zap" + "time" +) + +var cronRoutines = []func(){ + addressIsPrep, + tokenAddressCountRoutine, // Isn't used - RM? + countAddressesToRedisRoutine, +} + +func CronStart() { + + zap.S().Warn("Init cron...") + // Init - Jobs that run once on startup + if config.Config.NetworkName == "mainnet" { + addressTypeRoutine() + } + + // Short + go RoutinesCron(cronRoutines, config.Config.RoutinesSleepDuration) + + // Long + // TODO: These were snubbed because they stress DB and should be run with something to slow them down + //go AddressRoutinesCron(addressRoutines, 6*time.Hour) +} + +// Wrapper for generic routines +func RoutinesCron(routines []func(), sleepDuration time.Duration) { + for { + zap.S().Warn("Starting cron...") + for _, r := range routines { + r() + } + zap.S().Info("Completed routine, sleeping...") + time.Sleep(sleepDuration) + } +} diff --git a/src/routines/routines.go b/src/routines/routines.go index 88bde04..9e88f64 100644 --- a/src/routines/routines.go +++ b/src/routines/routines.go @@ -20,14 +20,15 @@ var addressRoutines = []func(a *models.Address){ var tokenAddressRoutines = []func(t *models.TokenAddress){ setTokenAddressBalances, - // TODO: Fix this + // TODO: Fix this - Might not need - not used it seems? This is not cached and does not hit db + // function signiture would suggest another loop anyways. //setTokenAddressTxCounts, } func StartRecovery() { zap.S().Warn("Init recovery...") - // Global count + //Global count setTransactionCounts() // One shot @@ -46,40 +47,6 @@ func StartRecovery() { os.Exit(0) } -var cronRoutines = []func(){ - addressIsPrep, - tokenAddressCountRoutine, // Isn't used - RM? - countAddressesToRedisRoutine, -} - -func CronStart() { - - zap.S().Warn("Init cron...") - // Init - Jobs that run once on startup - if config.Config.NetworkName == "mainnet" { - addressTypeRoutine() - } - - // Short - go RoutinesCron(cronRoutines, config.Config.RoutinesSleepDuration) - - // Long - // TODO: These were snubbed because they stress DB and should be run with something to slow them down - //go AddressRoutinesCron(addressRoutines, 6*time.Hour) -} - -// Wrapper for generic routines -func RoutinesCron(routines []func(), sleepDuration time.Duration) { - for { - zap.S().Warn("Starting cron...") - for _, r := range routines { - r() - } - zap.S().Info("Completed routine, sleeping...") - time.Sleep(sleepDuration) - } -} - func LoopRoutine[M any, O any](Crud *crud.Crud[M, O], routines []func(*M)) { skip := 0 @@ -88,7 +55,7 @@ func LoopRoutine[M any, O any](Crud *crud.Crud[M, O], routines []func(*M)) { zap.S().Info("Starting worker on table=", Crud.TableName, " with skip=", skip, " with limit=", limit) // Run loop until addresses have all been iterated over for { - routineItems, err := Crud.SelectMany(limit, skip) + routineItems, err := Crud.SelectBatchOrder(limit, skip, "address") if errors.Is(err, gorm.ErrRecordNotFound) { zap.S().Warn("Ending address routine with error=", err.Error()) break @@ -114,49 +81,3 @@ func LoopRoutine[M any, O any](Crud *crud.Crud[M, O], routines []func(*M)) { skip += config.Config.RoutinesBatchSize * config.Config.RoutinesNumWorkers } } - -//func LoopRoutine[M any, O any](Crud *crud.Crud[M, O], routines []func(*M)) { -// var wg sync.WaitGroup -// wg.Add(config.Config.RoutinesNumWorkers) -// for i := 0; i < config.Config.RoutinesNumWorkers; i++ { -// i := i -// go func() { -// //defer wg.Done() -// // Loop through all addresses -// skip := i * config.Config.RoutinesBatchSize -// limit := config.Config.RoutinesBatchSize -// -// zap.S().Info("Starting worker on table=", Crud.TableName, " with skip=", skip, " with workerId=", i) -// // Run loop until addresses have all been iterated over -// for { -// -// routineItems, err := Crud.SelectMany(limit, skip) -// if errors.Is(err, gorm.ErrRecordNotFound) { -// zap.S().Warn("Ending address routine with error=", err.Error()) -// break -// } else if err != nil { -// zap.S().Warn("Ending address routine with error=", err.Error()) -// break -// } -// if len(*routineItems) == 0 { -// zap.S().Warn("Ending address routine, no more addresses") -// break -// } -// -// zap.S().Info("Starting skip=", skip, " limit=", limit, " table=", Crud.TableName, " workerId=", i) -// for i := 0; i < len(*routineItems); i++ { -// for _, r := range routines { -// var item *M -// item = &(*routineItems)[i] -// r(item) -// } -// } -// zap.S().Info("Finished skip=", skip, " limit=", limit, " table=", Crud.TableName, " workerId=", i) -// -// skip += config.Config.RoutinesBatchSize * config.Config.RoutinesNumWorkers -// } -// wg.Done() -// }() -// } -// wg.Wait() -//} diff --git a/src/routines/token_address_balance_test.go b/src/routines/token_address_balance_test.go index 2bb4ebc..1f322e0 100644 --- a/src/routines/token_address_balance_test.go +++ b/src/routines/token_address_balance_test.go @@ -5,15 +5,17 @@ import ( "github.com/sudoblockio/icon-transformer/config" "github.com/sudoblockio/icon-transformer/models" "testing" + "time" ) func TestSetTokenAddressBalances(t *testing.T) { config.ReadEnvironment() tokenAddress := &models.TokenAddress{ TokenContractAddress: "cxcfe9d1f83fa871e903008471cca786662437e58d", - Address: "hxb41775a05572c421917b6d5d80fd5f31c495b7f8", + Address: "hx42c7d33652beabb87eae2a09f67c53b1927b2a96", } setTokenAddressBalances(tokenAddress) assert.NotNil(t, tokenAddress.Balance) + time.Sleep(5 * time.Second) } diff --git a/src/routines/token_address_tx_counts.go b/src/routines/token_address_tx_counts.go new file mode 100644 index 0000000..1eeb3b1 --- /dev/null +++ b/src/routines/token_address_tx_counts.go @@ -0,0 +1,42 @@ +package routines + +import ( + "github.com/sudoblockio/icon-transformer/config" + "github.com/sudoblockio/icon-transformer/models" + "github.com/sudoblockio/icon-transformer/redis" + "go.uber.org/zap" + + "github.com/sudoblockio/icon-transformer/crud" +) + +func GetTokenAddressTxCounts(address *models.Address) *models.Address { + + // Token Transfer Count + countTokenTx, err := crud.GetTokenTransferCrud().CountByAddress(address.Address) + if err != nil { + zap.S().Warn("Routine=InternalTxCount, Address=", address.Address, " - Error: ", err.Error()) + return nil + } + address.TokenTransferCount = countTokenTx + err = redis.GetRedisClient().SetCount( + config.Config.RedisKeyPrefix+"token_transfer_count_by_token_contract_"+address.Address, + countTokenTx, + ) + if err != nil { + zap.S().Warn("Routine=TokenTransfer, Address=", address.Address, " - Error: ", err.Error()) + return nil + } + + return address +} + +func setTokenAddressTxCounts(address *models.Address) { + addressNew := GetTokenAddressTxCounts(address) + if address != nil { + //crud.GetAddressRoutineCruds()["counts"].LoaderChannel <- addressNew + err := crud.GetAddressRoutineCruds()["counts"].UpsertOne(addressNew) + if err != nil { + zap.S().Fatal(err.Error()) + } + } +}