Skip to content
This repository has been archived by the owner on Aug 31, 2021. It is now read-only.

Pool optimization #196

Merged
merged 3 commits into from
May 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion db/migrations/00020_create_btc_header_cids_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ CREATE TABLE btc.header_cids (
cid TEXT NOT NULL,
timestamp NUMERIC NOT NULL,
bits BIGINT NOT NULL,
node_id INTEGER NOT NULL REFERENCES nodes (id) ON DELETE CASCADE,
node_id INTEGER NOT NULL REFERENCES nodes (id) ON DELETE CASCADE,
UNIQUE (block_number, block_hash)
);

Expand Down
14 changes: 8 additions & 6 deletions environments/superNodeBTC.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@
user = "vdbm" # $DATABASE_USER
password = "" # $DATABASE_PASSWORD

[ipfs]
path = "~/.ipfs" # $IPFS_PATH
[database.sync]
maxIdle = 1
[database.backFill]
maxIdle = 5

[resync]
chain = "bitcoin" # $RESYNC_CHAIN
type = "full" # $RESYNC_TYPE
start = 0 # $RESYNC_START
stop = 0 # $RESYNC_STOP
batchSize = 1 # $RESYNC_BATCH_SIZE
batchNumber = 50 # $RESYNC_BATCH_NUMBER
batchSize = 5 # $RESYNC_BATCH_SIZE
batchNumber = 5 # $RESYNC_BATCH_NUMBER
clearOldCache = false # $RESYNC_CLEAR_OLD_CACHE
resetValidation = true # $RESYNC_RESET_VALIDATION

Expand All @@ -28,8 +30,8 @@
workers = 1 # $SUPERNODE_WORKERS
backFill = true # $SUPERNODE_BACKFILL
frequency = 45 # $SUPERNODE_FREQUENCY
batchSize = 1 # $SUPERNODE_BATCH_SIZE
batchNumber = 50 # $SUPERNODE_BATCH_NUMBER
batchSize = 5 # $SUPERNODE_BATCH_SIZE
batchNumber = 5 # $SUPERNODE_BATCH_NUMBER
validationLevel = 1 # $SUPERNODE_VALIDATION_LEVEL

[bitcoin]
Expand Down
12 changes: 7 additions & 5 deletions environments/superNodeETH.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,20 @@
user = "vdbm" # $DATABASE_USER
password = "" # $DATABASE_PASSWORD

[ipfs]
path = "~/.ipfs" # $IPFS_PATH
[database.sync]
maxIdle = 1
[database.backFill]
maxIdle = 5

[resync]
chain = "ethereum" # $RESYNC_CHAIN
type = "state" # $RESYNC_TYPE
start = 0 # $RESYNC_START
stop = 0 # $RESYNC_STOP
batchSize = 5 # $RESYNC_BATCH_SIZE
batchNumber = 50 # $RESYNC_BATCH_NUMBER
batchNumber = 5 # $RESYNC_BATCH_NUMBER
timeout = 300 # $HTTP_TIMEOUT
clearOldCache = false # $RESYNC_CLEAR_OLD_CACHE
clearOldCache = true # $RESYNC_CLEAR_OLD_CACHE
resetValidation = true # $RESYNC_RESET_VALIDATION

[superNode]
Expand All @@ -30,7 +32,7 @@
backFill = true # $SUPERNODE_BACKFILL
frequency = 15 # $SUPERNODE_FREQUENCY
batchSize = 5 # $SUPERNODE_BATCH_SIZE
batchNumber = 50 # $SUPERNODE_BATCH_NUMBER
batchNumber = 5 # $SUPERNODE_BATCH_NUMBER
timeout = 300 # $HTTP_TIMEOUT
validationLevel = 1 # $SUPERNODE_VALIDATION_LEVEL

Expand Down
22 changes: 0 additions & 22 deletions pkg/config/config.go

This file was deleted.

33 changes: 23 additions & 10 deletions pkg/config/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,25 @@ import (

// Env variables
const (
DATABASE_NAME = "DATABASE_NAME"
DATABASE_HOSTNAME = "DATABASE_HOSTNAME"
DATABASE_PORT = "DATABASE_PORT"
DATABASE_USER = "DATABASE_USER"
DATABASE_PASSWORD = "DATABASE_PASSWORD"
DATABASE_NAME = "DATABASE_NAME"
DATABASE_HOSTNAME = "DATABASE_HOSTNAME"
DATABASE_PORT = "DATABASE_PORT"
DATABASE_USER = "DATABASE_USER"
DATABASE_PASSWORD = "DATABASE_PASSWORD"
DATABASE_MAX_IDLE_CONNECTIONS = "DATABASE_MAX_IDLE_CONNECTIONS"
DATABASE_MAX_OPEN_CONNECTIONS = "DATABASE_MAX_OPEN_CONNECTIONS"
DATABASE_MAX_CONN_LIFETIME = "DATABASE_MAX_CONN_LIFETIME"
)

type Database struct {
Hostname string
Name string
User string
Password string
Port int
Hostname string
Name string
User string
Password string
Port int
MaxIdle int
MaxOpen int
MaxLifetime int
}

func DbConnectionString(dbConfig Database) string {
Expand All @@ -57,9 +63,16 @@ func (d *Database) Init() {
viper.BindEnv("database.port", DATABASE_PORT)
viper.BindEnv("database.user", DATABASE_USER)
viper.BindEnv("database.password", DATABASE_PASSWORD)
viper.BindEnv("database.maxIdle", DATABASE_MAX_IDLE_CONNECTIONS)
viper.BindEnv("database.maxOpen", DATABASE_MAX_OPEN_CONNECTIONS)
viper.BindEnv("database.maxLifetime", DATABASE_MAX_CONN_LIFETIME)

d.Name = viper.GetString("database.name")
d.Hostname = viper.GetString("database.hostname")
d.Port = viper.GetInt("database.port")
d.User = viper.GetString("database.user")
d.Password = viper.GetString("database.password")
d.MaxIdle = viper.GetInt("database.maxIdle")
d.MaxOpen = viper.GetInt("database.maxOpen")
d.MaxLifetime = viper.GetInt("database.maxLifetime")
}
12 changes: 12 additions & 0 deletions pkg/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package postgres

import (
"time"

"github.com/jmoiron/sqlx"
_ "github.com/lib/pq" //postgres driver
"github.com/vulcanize/vulcanizedb/pkg/config"
Expand All @@ -35,6 +37,16 @@ func NewDB(databaseConfig config.Database, node core.Node) (*DB, error) {
if connectErr != nil {
return &DB{}, ErrDBConnectionFailed(connectErr)
}
if databaseConfig.MaxOpen > 0 {
db.SetMaxOpenConns(databaseConfig.MaxOpen)
}
if databaseConfig.MaxIdle > 0 {
db.SetMaxIdleConns(databaseConfig.MaxIdle)
}
if databaseConfig.MaxLifetime > 0 {
lifetime := time.Duration(databaseConfig.MaxLifetime) * time.Second
db.SetConnMaxLifetime(lifetime)
}
pg := DB{DB: db, Node: node}
nodeErr := pg.CreateNode(&node)
if nodeErr != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/super_node/backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,19 @@ type BackFillService struct {

// NewBackFillService returns a new BackFillInterface
func NewBackFillService(settings *Config, screenAndServeChan chan shared.ConvertedData) (BackFillInterface, error) {
publisher, err := NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.DB, settings.IPFSMode)
publisher, err := NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.BackFillDBConn, settings.IPFSMode)
if err != nil {
return nil, err
}
indexer, err := NewCIDIndexer(settings.Chain, settings.DB, settings.IPFSMode)
indexer, err := NewCIDIndexer(settings.Chain, settings.BackFillDBConn, settings.IPFSMode)
if err != nil {
return nil, err
}
converter, err := NewPayloadConverter(settings.Chain)
if err != nil {
return nil, err
}
retriever, err := NewCIDRetriever(settings.Chain, settings.DB)
retriever, err := NewCIDRetriever(settings.Chain, settings.BackFillDBConn)
if err != nil {
return nil, err
}
Expand Down
75 changes: 67 additions & 8 deletions pkg/super_node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,18 @@ const (
SUPERNODE_BATCH_SIZE = "SUPERNODE_BATCH_SIZE"
SUPERNODE_BATCH_NUMBER = "SUPERNODE_BATCH_NUMBER"
SUPERNODE_VALIDATION_LEVEL = "SUPERNODE_VALIDATION_LEVEL"

SYNC_MAX_IDLE_CONNECTIONS = "SYNC_MAX_IDLE_CONNECTIONS"
SYNC_MAX_OPEN_CONNECTIONS = "SYNC_MAX_OPEN_CONNECTIONS"
SYNC_MAX_CONN_LIFETIME = "SYNC_MAX_CONN_LIFETIME"

BACKFILL_MAX_IDLE_CONNECTIONS = "BACKFILL_MAX_IDLE_CONNECTIONS"
BACKFILL_MAX_OPEN_CONNECTIONS = "BACKFILL_MAX_OPEN_CONNECTIONS"
BACKFILL_MAX_CONN_LIFETIME = "BACKFILL_MAX_CONN_LIFETIME"

SERVER_MAX_IDLE_CONNECTIONS = "SERVER_MAX_IDLE_CONNECTIONS"
SERVER_MAX_OPEN_CONNECTIONS = "SERVER_MAX_OPEN_CONNECTIONS"
SERVER_MAX_CONN_LIFETIME = "SERVER_MAX_CONN_LIFETIME"
)

// Config struct
Expand All @@ -53,21 +65,23 @@ type Config struct {
Chain shared.ChainType
IPFSPath string
IPFSMode shared.IPFSMode
DB *postgres.DB
DBConfig config.Database
Quit chan bool
// Server fields
Serve bool
ServeDBConn *postgres.DB
WSEndpoint string
HTTPEndpoint string
IPCEndpoint string
// Sync params
Sync bool
Workers int
WSClient interface{}
NodeInfo core.Node
Sync bool
SyncDBConn *postgres.DB
Workers int
WSClient interface{}
NodeInfo core.Node
// Backfiller params
BackFill bool
BackFillDBConn *postgres.DB
HTTPClient interface{}
Frequency time.Duration
BatchSize uint64
Expand Down Expand Up @@ -110,6 +124,8 @@ func NewSuperNodeConfig() (*Config, error) {
}
}

c.DBConfig.Init()

c.Sync = viper.GetBool("superNode.sync")
if c.Sync {
workers := viper.GetInt("superNode.workers")
Expand All @@ -128,6 +144,9 @@ func NewSuperNodeConfig() (*Config, error) {
btcWS := viper.GetString("bitcoin.wsPath")
c.NodeInfo, c.WSClient = shared.GetBtcNodeAndClient(btcWS)
}
syncDBConn := overrideDBConnConfig(c.DBConfig, Sync)
syncDB := utils.LoadPostgres(syncDBConn, c.NodeInfo)
c.SyncDBConn = &syncDB
}

c.Serve = viper.GetBool("superNode.server")
Expand All @@ -151,6 +170,9 @@ func NewSuperNodeConfig() (*Config, error) {
httpPath = "127.0.0.1:8081"
}
c.HTTPEndpoint = httpPath
serveDBConn := overrideDBConnConfig(c.DBConfig, Serve)
serveDB := utils.LoadPostgres(serveDBConn, c.NodeInfo)
c.ServeDBConn = &serveDB
}

c.BackFill = viper.GetBool("superNode.backFill")
Expand All @@ -160,9 +182,6 @@ func NewSuperNodeConfig() (*Config, error) {
}
}

c.DBConfig.Init()
db := utils.LoadPostgres(c.DBConfig, c.NodeInfo)
c.DB = &db
c.Quit = make(chan bool)

return c, nil
Expand Down Expand Up @@ -209,5 +228,45 @@ func (c *Config) BackFillFields() error {
c.BatchSize = uint64(viper.GetInt64("superNode.batchSize"))
c.BatchNumber = uint64(viper.GetInt64("superNode.batchNumber"))
c.ValidationLevel = viper.GetInt("superNode.validationLevel")

backFillDBConn := overrideDBConnConfig(c.DBConfig, BackFill)
backFillDB := utils.LoadPostgres(backFillDBConn, c.NodeInfo)
c.BackFillDBConn = &backFillDB
return nil
}

type mode string

var (
Sync mode = "sync"
BackFill mode = "backFill"
Serve mode = "serve"
)

func overrideDBConnConfig(con config.Database, m mode) config.Database {
switch m {
case Sync:
viper.BindEnv("database.sync.maxIdle", SYNC_MAX_IDLE_CONNECTIONS)
viper.BindEnv("database.sync.maxOpen", SYNC_MAX_OPEN_CONNECTIONS)
viper.BindEnv("database.sync.maxLifetime", SYNC_MAX_CONN_LIFETIME)
con.MaxIdle = viper.GetInt("database.sync.maxIdle")
con.MaxOpen = viper.GetInt("database.sync.maxOpen")
con.MaxLifetime = viper.GetInt("database.sync.maxLifetime")
case BackFill:
viper.BindEnv("database.backFill.maxIdle", BACKFILL_MAX_IDLE_CONNECTIONS)
viper.BindEnv("database.backFill.maxOpen", BACKFILL_MAX_OPEN_CONNECTIONS)
viper.BindEnv("database.backFill.maxLifetime", BACKFILL_MAX_CONN_LIFETIME)
con.MaxIdle = viper.GetInt("database.backFill.maxIdle")
con.MaxOpen = viper.GetInt("database.backFill.maxOpen")
con.MaxLifetime = viper.GetInt("database.backFill.maxLifetime")
case Serve:
viper.BindEnv("database.server.maxIdle", SERVER_MAX_IDLE_CONNECTIONS)
viper.BindEnv("database.server.maxOpen", SERVER_MAX_OPEN_CONNECTIONS)
viper.BindEnv("database.server.maxLifetime", SERVER_MAX_CONN_LIFETIME)
con.MaxIdle = viper.GetInt("database.server.maxIdle")
con.MaxOpen = viper.GetInt("database.server.maxOpen")
con.MaxLifetime = viper.GetInt("database.server.maxLifetime")
default:
}
return con
}
10 changes: 5 additions & 5 deletions pkg/super_node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ func NewSuperNode(settings *Config) (SuperNode, error) {
if err != nil {
return nil, err
}
sn.Publisher, err = NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.DB, settings.IPFSMode)
sn.Publisher, err = NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.SyncDBConn, settings.IPFSMode)
if err != nil {
return nil, err
}
sn.Indexer, err = NewCIDIndexer(settings.Chain, settings.DB, settings.IPFSMode)
sn.Indexer, err = NewCIDIndexer(settings.Chain, settings.SyncDBConn, settings.IPFSMode)
if err != nil {
return nil, err
}
Expand All @@ -124,14 +124,15 @@ func NewSuperNode(settings *Config) (SuperNode, error) {
}
// If we are serving, initialize the needed interfaces
if settings.Serve {
sn.Retriever, err = NewCIDRetriever(settings.Chain, settings.DB)
sn.Retriever, err = NewCIDRetriever(settings.Chain, settings.ServeDBConn)
if err != nil {
return nil, err
}
sn.IPLDFetcher, err = NewIPLDFetcher(settings.Chain, settings.IPFSPath, settings.DB, settings.IPFSMode)
sn.IPLDFetcher, err = NewIPLDFetcher(settings.Chain, settings.IPFSPath, settings.ServeDBConn, settings.IPFSMode)
if err != nil {
return nil, err
}
sn.db = settings.ServeDBConn
}
sn.QuitChan = settings.Quit
sn.Subscriptions = make(map[common.Hash]map[rpc.ID]Subscription)
Expand All @@ -140,7 +141,6 @@ func NewSuperNode(settings *Config) (SuperNode, error) {
sn.NodeInfo = &settings.NodeInfo
sn.ipfsPath = settings.IPFSPath
sn.chain = settings.Chain
sn.db = settings.DB
return sn, nil
}

Expand Down