Skip to content
This repository has been archived by the owner on Aug 31, 2021. It is now read-only.

Commit

Permalink
Merge pull request #196 from vulcanize/pool_optimization
Browse files Browse the repository at this point in the history
Pool optimization
  • Loading branch information
i-norden authored May 12, 2020
2 parents fbd4a5c + e2ccd3f commit bb14c01
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 60 deletions.
2 changes: 1 addition & 1 deletion db/migrations/00020_create_btc_header_cids_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ CREATE TABLE btc.header_cids (
cid TEXT NOT NULL,
timestamp NUMERIC NOT NULL,
bits BIGINT NOT NULL,
node_id INTEGER NOT NULL REFERENCES nodes (id) ON DELETE CASCADE,
node_id INTEGER NOT NULL REFERENCES nodes (id) ON DELETE CASCADE,
UNIQUE (block_number, block_hash)
);

Expand Down
14 changes: 8 additions & 6 deletions environments/superNodeBTC.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@
user = "vdbm" # $DATABASE_USER
password = "" # $DATABASE_PASSWORD

[ipfs]
path = "~/.ipfs" # $IPFS_PATH
[database.sync]
maxIdle = 1
[database.backFill]
maxIdle = 5

[resync]
chain = "bitcoin" # $RESYNC_CHAIN
type = "full" # $RESYNC_TYPE
start = 0 # $RESYNC_START
stop = 0 # $RESYNC_STOP
batchSize = 1 # $RESYNC_BATCH_SIZE
batchNumber = 50 # $RESYNC_BATCH_NUMBER
batchSize = 5 # $RESYNC_BATCH_SIZE
batchNumber = 5 # $RESYNC_BATCH_NUMBER
clearOldCache = false # $RESYNC_CLEAR_OLD_CACHE
resetValidation = true # $RESYNC_RESET_VALIDATION

Expand All @@ -28,8 +30,8 @@
workers = 1 # $SUPERNODE_WORKERS
backFill = true # $SUPERNODE_BACKFILL
frequency = 45 # $SUPERNODE_FREQUENCY
batchSize = 1 # $SUPERNODE_BATCH_SIZE
batchNumber = 50 # $SUPERNODE_BATCH_NUMBER
batchSize = 5 # $SUPERNODE_BATCH_SIZE
batchNumber = 5 # $SUPERNODE_BATCH_NUMBER
validationLevel = 1 # $SUPERNODE_VALIDATION_LEVEL

[bitcoin]
Expand Down
12 changes: 7 additions & 5 deletions environments/superNodeETH.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,20 @@
user = "vdbm" # $DATABASE_USER
password = "" # $DATABASE_PASSWORD

[ipfs]
path = "~/.ipfs" # $IPFS_PATH
[database.sync]
maxIdle = 1
[database.backFill]
maxIdle = 5

[resync]
chain = "ethereum" # $RESYNC_CHAIN
type = "state" # $RESYNC_TYPE
start = 0 # $RESYNC_START
stop = 0 # $RESYNC_STOP
batchSize = 5 # $RESYNC_BATCH_SIZE
batchNumber = 50 # $RESYNC_BATCH_NUMBER
batchNumber = 5 # $RESYNC_BATCH_NUMBER
timeout = 300 # $HTTP_TIMEOUT
clearOldCache = false # $RESYNC_CLEAR_OLD_CACHE
clearOldCache = true # $RESYNC_CLEAR_OLD_CACHE
resetValidation = true # $RESYNC_RESET_VALIDATION

[superNode]
Expand All @@ -30,7 +32,7 @@
backFill = true # $SUPERNODE_BACKFILL
frequency = 15 # $SUPERNODE_FREQUENCY
batchSize = 5 # $SUPERNODE_BATCH_SIZE
batchNumber = 50 # $SUPERNODE_BATCH_NUMBER
batchNumber = 5 # $SUPERNODE_BATCH_NUMBER
timeout = 300 # $HTTP_TIMEOUT
validationLevel = 1 # $SUPERNODE_VALIDATION_LEVEL

Expand Down
22 changes: 0 additions & 22 deletions pkg/config/config.go

This file was deleted.

33 changes: 23 additions & 10 deletions pkg/config/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,25 @@ import (

// Env variables
const (
DATABASE_NAME = "DATABASE_NAME"
DATABASE_HOSTNAME = "DATABASE_HOSTNAME"
DATABASE_PORT = "DATABASE_PORT"
DATABASE_USER = "DATABASE_USER"
DATABASE_PASSWORD = "DATABASE_PASSWORD"
DATABASE_NAME = "DATABASE_NAME"
DATABASE_HOSTNAME = "DATABASE_HOSTNAME"
DATABASE_PORT = "DATABASE_PORT"
DATABASE_USER = "DATABASE_USER"
DATABASE_PASSWORD = "DATABASE_PASSWORD"
DATABASE_MAX_IDLE_CONNECTIONS = "DATABASE_MAX_IDLE_CONNECTIONS"
DATABASE_MAX_OPEN_CONNECTIONS = "DATABASE_MAX_OPEN_CONNECTIONS"
DATABASE_MAX_CONN_LIFETIME = "DATABASE_MAX_CONN_LIFETIME"
)

type Database struct {
Hostname string
Name string
User string
Password string
Port int
Hostname string
Name string
User string
Password string
Port int
MaxIdle int
MaxOpen int
MaxLifetime int
}

func DbConnectionString(dbConfig Database) string {
Expand All @@ -57,9 +63,16 @@ func (d *Database) Init() {
viper.BindEnv("database.port", DATABASE_PORT)
viper.BindEnv("database.user", DATABASE_USER)
viper.BindEnv("database.password", DATABASE_PASSWORD)
viper.BindEnv("database.maxIdle", DATABASE_MAX_IDLE_CONNECTIONS)
viper.BindEnv("database.maxOpen", DATABASE_MAX_OPEN_CONNECTIONS)
viper.BindEnv("database.maxLifetime", DATABASE_MAX_CONN_LIFETIME)

d.Name = viper.GetString("database.name")
d.Hostname = viper.GetString("database.hostname")
d.Port = viper.GetInt("database.port")
d.User = viper.GetString("database.user")
d.Password = viper.GetString("database.password")
d.MaxIdle = viper.GetInt("database.maxIdle")
d.MaxOpen = viper.GetInt("database.maxOpen")
d.MaxLifetime = viper.GetInt("database.maxLifetime")
}
12 changes: 12 additions & 0 deletions pkg/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package postgres

import (
"time"

"github.com/jmoiron/sqlx"
_ "github.com/lib/pq" //postgres driver
"github.com/vulcanize/vulcanizedb/pkg/config"
Expand All @@ -35,6 +37,16 @@ func NewDB(databaseConfig config.Database, node core.Node) (*DB, error) {
if connectErr != nil {
return &DB{}, ErrDBConnectionFailed(connectErr)
}
if databaseConfig.MaxOpen > 0 {
db.SetMaxOpenConns(databaseConfig.MaxOpen)
}
if databaseConfig.MaxIdle > 0 {
db.SetMaxIdleConns(databaseConfig.MaxIdle)
}
if databaseConfig.MaxLifetime > 0 {
lifetime := time.Duration(databaseConfig.MaxLifetime) * time.Second
db.SetConnMaxLifetime(lifetime)
}
pg := DB{DB: db, Node: node}
nodeErr := pg.CreateNode(&node)
if nodeErr != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/super_node/backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,19 @@ type BackFillService struct {

// NewBackFillService returns a new BackFillInterface
func NewBackFillService(settings *Config, screenAndServeChan chan shared.ConvertedData) (BackFillInterface, error) {
publisher, err := NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.DB, settings.IPFSMode)
publisher, err := NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.BackFillDBConn, settings.IPFSMode)
if err != nil {
return nil, err
}
indexer, err := NewCIDIndexer(settings.Chain, settings.DB, settings.IPFSMode)
indexer, err := NewCIDIndexer(settings.Chain, settings.BackFillDBConn, settings.IPFSMode)
if err != nil {
return nil, err
}
converter, err := NewPayloadConverter(settings.Chain)
if err != nil {
return nil, err
}
retriever, err := NewCIDRetriever(settings.Chain, settings.DB)
retriever, err := NewCIDRetriever(settings.Chain, settings.BackFillDBConn)
if err != nil {
return nil, err
}
Expand Down
75 changes: 67 additions & 8 deletions pkg/super_node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,18 @@ const (
SUPERNODE_BATCH_SIZE = "SUPERNODE_BATCH_SIZE"
SUPERNODE_BATCH_NUMBER = "SUPERNODE_BATCH_NUMBER"
SUPERNODE_VALIDATION_LEVEL = "SUPERNODE_VALIDATION_LEVEL"

SYNC_MAX_IDLE_CONNECTIONS = "SYNC_MAX_IDLE_CONNECTIONS"
SYNC_MAX_OPEN_CONNECTIONS = "SYNC_MAX_OPEN_CONNECTIONS"
SYNC_MAX_CONN_LIFETIME = "SYNC_MAX_CONN_LIFETIME"

BACKFILL_MAX_IDLE_CONNECTIONS = "BACKFILL_MAX_IDLE_CONNECTIONS"
BACKFILL_MAX_OPEN_CONNECTIONS = "BACKFILL_MAX_OPEN_CONNECTIONS"
BACKFILL_MAX_CONN_LIFETIME = "BACKFILL_MAX_CONN_LIFETIME"

SERVER_MAX_IDLE_CONNECTIONS = "SERVER_MAX_IDLE_CONNECTIONS"
SERVER_MAX_OPEN_CONNECTIONS = "SERVER_MAX_OPEN_CONNECTIONS"
SERVER_MAX_CONN_LIFETIME = "SERVER_MAX_CONN_LIFETIME"
)

// Config struct
Expand All @@ -53,21 +65,23 @@ type Config struct {
Chain shared.ChainType
IPFSPath string
IPFSMode shared.IPFSMode
DB *postgres.DB
DBConfig config.Database
Quit chan bool
// Server fields
Serve bool
ServeDBConn *postgres.DB
WSEndpoint string
HTTPEndpoint string
IPCEndpoint string
// Sync params
Sync bool
Workers int
WSClient interface{}
NodeInfo core.Node
Sync bool
SyncDBConn *postgres.DB
Workers int
WSClient interface{}
NodeInfo core.Node
// Backfiller params
BackFill bool
BackFillDBConn *postgres.DB
HTTPClient interface{}
Frequency time.Duration
BatchSize uint64
Expand Down Expand Up @@ -110,6 +124,8 @@ func NewSuperNodeConfig() (*Config, error) {
}
}

c.DBConfig.Init()

c.Sync = viper.GetBool("superNode.sync")
if c.Sync {
workers := viper.GetInt("superNode.workers")
Expand All @@ -128,6 +144,9 @@ func NewSuperNodeConfig() (*Config, error) {
btcWS := viper.GetString("bitcoin.wsPath")
c.NodeInfo, c.WSClient = shared.GetBtcNodeAndClient(btcWS)
}
syncDBConn := overrideDBConnConfig(c.DBConfig, Sync)
syncDB := utils.LoadPostgres(syncDBConn, c.NodeInfo)
c.SyncDBConn = &syncDB
}

c.Serve = viper.GetBool("superNode.server")
Expand All @@ -151,6 +170,9 @@ func NewSuperNodeConfig() (*Config, error) {
httpPath = "127.0.0.1:8081"
}
c.HTTPEndpoint = httpPath
serveDBConn := overrideDBConnConfig(c.DBConfig, Serve)
serveDB := utils.LoadPostgres(serveDBConn, c.NodeInfo)
c.ServeDBConn = &serveDB
}

c.BackFill = viper.GetBool("superNode.backFill")
Expand All @@ -160,9 +182,6 @@ func NewSuperNodeConfig() (*Config, error) {
}
}

c.DBConfig.Init()
db := utils.LoadPostgres(c.DBConfig, c.NodeInfo)
c.DB = &db
c.Quit = make(chan bool)

return c, nil
Expand Down Expand Up @@ -209,5 +228,45 @@ func (c *Config) BackFillFields() error {
c.BatchSize = uint64(viper.GetInt64("superNode.batchSize"))
c.BatchNumber = uint64(viper.GetInt64("superNode.batchNumber"))
c.ValidationLevel = viper.GetInt("superNode.validationLevel")

backFillDBConn := overrideDBConnConfig(c.DBConfig, BackFill)
backFillDB := utils.LoadPostgres(backFillDBConn, c.NodeInfo)
c.BackFillDBConn = &backFillDB
return nil
}

type mode string

var (
Sync mode = "sync"
BackFill mode = "backFill"
Serve mode = "serve"
)

func overrideDBConnConfig(con config.Database, m mode) config.Database {
switch m {
case Sync:
viper.BindEnv("database.sync.maxIdle", SYNC_MAX_IDLE_CONNECTIONS)
viper.BindEnv("database.sync.maxOpen", SYNC_MAX_OPEN_CONNECTIONS)
viper.BindEnv("database.sync.maxLifetime", SYNC_MAX_CONN_LIFETIME)
con.MaxIdle = viper.GetInt("database.sync.maxIdle")
con.MaxOpen = viper.GetInt("database.sync.maxOpen")
con.MaxLifetime = viper.GetInt("database.sync.maxLifetime")
case BackFill:
viper.BindEnv("database.backFill.maxIdle", BACKFILL_MAX_IDLE_CONNECTIONS)
viper.BindEnv("database.backFill.maxOpen", BACKFILL_MAX_OPEN_CONNECTIONS)
viper.BindEnv("database.backFill.maxLifetime", BACKFILL_MAX_CONN_LIFETIME)
con.MaxIdle = viper.GetInt("database.backFill.maxIdle")
con.MaxOpen = viper.GetInt("database.backFill.maxOpen")
con.MaxLifetime = viper.GetInt("database.backFill.maxLifetime")
case Serve:
viper.BindEnv("database.server.maxIdle", SERVER_MAX_IDLE_CONNECTIONS)
viper.BindEnv("database.server.maxOpen", SERVER_MAX_OPEN_CONNECTIONS)
viper.BindEnv("database.server.maxLifetime", SERVER_MAX_CONN_LIFETIME)
con.MaxIdle = viper.GetInt("database.server.maxIdle")
con.MaxOpen = viper.GetInt("database.server.maxOpen")
con.MaxLifetime = viper.GetInt("database.server.maxLifetime")
default:
}
return con
}
10 changes: 5 additions & 5 deletions pkg/super_node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ func NewSuperNode(settings *Config) (SuperNode, error) {
if err != nil {
return nil, err
}
sn.Publisher, err = NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.DB, settings.IPFSMode)
sn.Publisher, err = NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.SyncDBConn, settings.IPFSMode)
if err != nil {
return nil, err
}
sn.Indexer, err = NewCIDIndexer(settings.Chain, settings.DB, settings.IPFSMode)
sn.Indexer, err = NewCIDIndexer(settings.Chain, settings.SyncDBConn, settings.IPFSMode)
if err != nil {
return nil, err
}
Expand All @@ -124,14 +124,15 @@ func NewSuperNode(settings *Config) (SuperNode, error) {
}
// If we are serving, initialize the needed interfaces
if settings.Serve {
sn.Retriever, err = NewCIDRetriever(settings.Chain, settings.DB)
sn.Retriever, err = NewCIDRetriever(settings.Chain, settings.ServeDBConn)
if err != nil {
return nil, err
}
sn.IPLDFetcher, err = NewIPLDFetcher(settings.Chain, settings.IPFSPath, settings.DB, settings.IPFSMode)
sn.IPLDFetcher, err = NewIPLDFetcher(settings.Chain, settings.IPFSPath, settings.ServeDBConn, settings.IPFSMode)
if err != nil {
return nil, err
}
sn.db = settings.ServeDBConn
}
sn.QuitChan = settings.Quit
sn.Subscriptions = make(map[common.Hash]map[rpc.ID]Subscription)
Expand All @@ -140,7 +141,6 @@ func NewSuperNode(settings *Config) (SuperNode, error) {
sn.NodeInfo = &settings.NodeInfo
sn.ipfsPath = settings.IPFSPath
sn.chain = settings.Chain
sn.db = settings.DB
return sn, nil
}

Expand Down

0 comments on commit bb14c01

Please sign in to comment.