Skip to content

Commit

Permalink
chore: refactor routines and add order to routine address pull
Browse files Browse the repository at this point in the history
  • Loading branch information
robcxyz committed Jul 23, 2023
1 parent 0a5c338 commit 01bbce6
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 87 deletions.
17 changes: 17 additions & 0 deletions src/crud/crud_base_select.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,20 @@ func (m *Crud[Model, ModelOrm]) SelectManyContractCreations(

return transactions, db.Error
}

func (m *Crud[Model, ModelOrm]) SelectBatchOrder(
limit int,
skip int,
order string,
) (*[]Model, error) {
db := m.db
db = db.Model(&m.Model)
db = db.Limit(limit)
if skip != 0 {
db = db.Offset(skip)
}
db = db.Order(order)
output := &[]Model{}
db = db.Find(output)
return output, db.Error
}
10 changes: 7 additions & 3 deletions src/routines/count_addresses.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,15 @@ func countAddressesToRedisRoutine() {
return
}

redis.GetRedisClient().SetCount(config.Config.RedisKeyPrefix+"address_count", countAll)
redis.GetRedisClient().SetCount(config.Config.RedisKeyPrefix+"address_contract_count", countContracts)
err = redis.GetRedisClient().SetCount(config.Config.RedisKeyPrefix+"address_count", countAll)
err = redis.GetRedisClient().SetCount(config.Config.RedisKeyPrefix+"address_contract_count", countContracts)

for address, count := range countTokenAddresses {
redis.GetRedisClient().SetCount(config.Config.RedisKeyPrefix+"token_address_count_by_token_contract_"+address, count)
err = redis.GetRedisClient().SetCount(config.Config.RedisKeyPrefix+"token_address_count_by_token_contract_"+address, count)
if err != nil {
// Try again
zap.S().Warn("Routine=AddressCount Redis - ERROR: ", err.Error())
}
}

zap.S().Info("Routine=AddressCount - Completed routine...")
Expand Down
41 changes: 41 additions & 0 deletions src/routines/cron.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package routines

import (
"github.com/sudoblockio/icon-transformer/config"
"go.uber.org/zap"
"time"
)

var cronRoutines = []func(){
addressIsPrep,
tokenAddressCountRoutine, // Isn't used - RM?
countAddressesToRedisRoutine,
}

func CronStart() {

zap.S().Warn("Init cron...")
// Init - Jobs that run once on startup
if config.Config.NetworkName == "mainnet" {
addressTypeRoutine()
}

// Short
go RoutinesCron(cronRoutines, config.Config.RoutinesSleepDuration)

// Long
// TODO: These were snubbed because they stress DB and should be run with something to slow them down
//go AddressRoutinesCron(addressRoutines, 6*time.Hour)
}

// Wrapper for generic routines
func RoutinesCron(routines []func(), sleepDuration time.Duration) {
for {
zap.S().Warn("Starting cron...")
for _, r := range routines {
r()
}
zap.S().Info("Completed routine, sleeping...")
time.Sleep(sleepDuration)
}
}
87 changes: 4 additions & 83 deletions src/routines/routines.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ var addressRoutines = []func(a *models.Address){

var tokenAddressRoutines = []func(t *models.TokenAddress){
setTokenAddressBalances,
// TODO: Fix this
// TODO: Fix this - Might not need - not used it seems? This is not cached and does not hit db
// function signiture would suggest another loop anyways.
//setTokenAddressTxCounts,
}

func StartRecovery() {
zap.S().Warn("Init recovery...")

// Global count
//Global count
setTransactionCounts()

// One shot
Expand All @@ -46,40 +47,6 @@ func StartRecovery() {
os.Exit(0)
}

var cronRoutines = []func(){
addressIsPrep,
tokenAddressCountRoutine, // Isn't used - RM?
countAddressesToRedisRoutine,
}

func CronStart() {

zap.S().Warn("Init cron...")
// Init - Jobs that run once on startup
if config.Config.NetworkName == "mainnet" {
addressTypeRoutine()
}

// Short
go RoutinesCron(cronRoutines, config.Config.RoutinesSleepDuration)

// Long
// TODO: These were snubbed because they stress DB and should be run with something to slow them down
//go AddressRoutinesCron(addressRoutines, 6*time.Hour)
}

// Wrapper for generic routines
func RoutinesCron(routines []func(), sleepDuration time.Duration) {
for {
zap.S().Warn("Starting cron...")
for _, r := range routines {
r()
}
zap.S().Info("Completed routine, sleeping...")
time.Sleep(sleepDuration)
}
}

func LoopRoutine[M any, O any](Crud *crud.Crud[M, O], routines []func(*M)) {

skip := 0
Expand All @@ -88,7 +55,7 @@ func LoopRoutine[M any, O any](Crud *crud.Crud[M, O], routines []func(*M)) {
zap.S().Info("Starting worker on table=", Crud.TableName, " with skip=", skip, " with limit=", limit)
// Run loop until addresses have all been iterated over
for {
routineItems, err := Crud.SelectMany(limit, skip)
routineItems, err := Crud.SelectBatchOrder(limit, skip, "address")
if errors.Is(err, gorm.ErrRecordNotFound) {
zap.S().Warn("Ending address routine with error=", err.Error())
break
Expand All @@ -114,49 +81,3 @@ func LoopRoutine[M any, O any](Crud *crud.Crud[M, O], routines []func(*M)) {
skip += config.Config.RoutinesBatchSize * config.Config.RoutinesNumWorkers
}
}

//func LoopRoutine[M any, O any](Crud *crud.Crud[M, O], routines []func(*M)) {
// var wg sync.WaitGroup
// wg.Add(config.Config.RoutinesNumWorkers)
// for i := 0; i < config.Config.RoutinesNumWorkers; i++ {
// i := i
// go func() {
// //defer wg.Done()
// // Loop through all addresses
// skip := i * config.Config.RoutinesBatchSize
// limit := config.Config.RoutinesBatchSize
//
// zap.S().Info("Starting worker on table=", Crud.TableName, " with skip=", skip, " with workerId=", i)
// // Run loop until addresses have all been iterated over
// for {
//
// routineItems, err := Crud.SelectMany(limit, skip)
// if errors.Is(err, gorm.ErrRecordNotFound) {
// zap.S().Warn("Ending address routine with error=", err.Error())
// break
// } else if err != nil {
// zap.S().Warn("Ending address routine with error=", err.Error())
// break
// }
// if len(*routineItems) == 0 {
// zap.S().Warn("Ending address routine, no more addresses")
// break
// }
//
// zap.S().Info("Starting skip=", skip, " limit=", limit, " table=", Crud.TableName, " workerId=", i)
// for i := 0; i < len(*routineItems); i++ {
// for _, r := range routines {
// var item *M
// item = &(*routineItems)[i]
// r(item)
// }
// }
// zap.S().Info("Finished skip=", skip, " limit=", limit, " table=", Crud.TableName, " workerId=", i)
//
// skip += config.Config.RoutinesBatchSize * config.Config.RoutinesNumWorkers
// }
// wg.Done()
// }()
// }
// wg.Wait()
//}
4 changes: 3 additions & 1 deletion src/routines/token_address_balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@ import (
"github.com/sudoblockio/icon-transformer/config"
"github.com/sudoblockio/icon-transformer/models"
"testing"
"time"
)

func TestSetTokenAddressBalances(t *testing.T) {
config.ReadEnvironment()
tokenAddress := &models.TokenAddress{
TokenContractAddress: "cxcfe9d1f83fa871e903008471cca786662437e58d",
Address: "hxb41775a05572c421917b6d5d80fd5f31c495b7f8",
Address: "hx42c7d33652beabb87eae2a09f67c53b1927b2a96",
}
setTokenAddressBalances(tokenAddress)

assert.NotNil(t, tokenAddress.Balance)
time.Sleep(5 * time.Second)
}
42 changes: 42 additions & 0 deletions src/routines/token_address_tx_counts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package routines

import (
"github.com/sudoblockio/icon-transformer/config"
"github.com/sudoblockio/icon-transformer/models"
"github.com/sudoblockio/icon-transformer/redis"
"go.uber.org/zap"

"github.com/sudoblockio/icon-transformer/crud"
)

func GetTokenAddressTxCounts(address *models.Address) *models.Address {

// Token Transfer Count
countTokenTx, err := crud.GetTokenTransferCrud().CountByAddress(address.Address)
if err != nil {
zap.S().Warn("Routine=InternalTxCount, Address=", address.Address, " - Error: ", err.Error())
return nil
}
address.TokenTransferCount = countTokenTx
err = redis.GetRedisClient().SetCount(
config.Config.RedisKeyPrefix+"token_transfer_count_by_token_contract_"+address.Address,
countTokenTx,
)
if err != nil {
zap.S().Warn("Routine=TokenTransfer, Address=", address.Address, " - Error: ", err.Error())
return nil
}

return address
}

func setTokenAddressTxCounts(address *models.Address) {
addressNew := GetTokenAddressTxCounts(address)
if address != nil {
//crud.GetAddressRoutineCruds()["counts"].LoaderChannel <- addressNew
err := crud.GetAddressRoutineCruds()["counts"].UpsertOne(addressNew)
if err != nil {
zap.S().Fatal(err.Error())
}
}
}

0 comments on commit 01bbce6

Please sign in to comment.