Skip to content

Commit

Permalink
Merge pull request #28 from 0xPolygon/sync-tasks-table
Browse files Browse the repository at this point in the history
Storage optimized sync tracking
  • Loading branch information
christophercampbell authored Oct 23, 2023
2 parents 57e2e31 + 8379172 commit 3488606
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 62 deletions.
39 changes: 12 additions & 27 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/0xPolygon/cdk-data-availability/offchaindata"
"github.com/0xPolygon/cdk-data-availability/rpc"
"github.com/ethereum/go-ethereum/common"
"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
)
Expand Down Expand Up @@ -76,7 +75,7 @@ func (db *DB) GetOffChainData(ctx context.Context, key common.Hash, dbTx pgx.Tx)

// Exists checks if a key exists in offchain data table
func (db *DB) Exists(ctx context.Context, key common.Hash) bool {
var keyExists = "SELECT COUNT(*) FROM data_node.offchain_data WHERE key = $1"
var keyExists = "SELECT COUNT(*) FROM data_node.offchain_data WHERE key = $1;"
var (
count uint
)
Expand All @@ -87,42 +86,28 @@ func (db *DB) Exists(ctx context.Context, key common.Hash) bool {
return count > 0
}

// GetLastProcessedBlock returns the latest block successfully processed by the synchronizer
func (db *DB) GetLastProcessedBlock(ctx context.Context) (uint64, error) {
const getLastProcessedBlockSQL = "SELECT max(block) FROM data_node.sync_info;"
// GetLastProcessedBlock returns the latest block successfully processed by the synchronizer for named task
func (db *DB) GetLastProcessedBlock(ctx context.Context, task string) (uint64, error) {
const getLastProcessedBlockSQL = "SELECT block FROM data_node.sync_tasks WHERE task = $1;"
var (
lastBlock uint64
)

if err := db.pg.QueryRow(ctx, getLastProcessedBlockSQL).Scan(&lastBlock); err != nil {
if err := db.pg.QueryRow(ctx, getLastProcessedBlockSQL, task).Scan(&lastBlock); err != nil {
return 0, err
}
return lastBlock, nil
}

// ResetLastProcessedBlock removes all sync_info for blocks greater than `block`
func (db *DB) ResetLastProcessedBlock(ctx context.Context, block uint64) (uint64, error) {
const resetLastProcessedBlock = "DELETE FROM data_node.sync_info WHERE block > $1"
var (
ct pgconn.CommandTag
err error
)
if ct, err = db.pg.Exec(ctx, resetLastProcessedBlock, block); err != nil {
return 0, err
}
return uint64(ct.RowsAffected()), nil
}

// StoreLastProcessedBlock stores a record of a block processed by the synchronizer
func (db *DB) StoreLastProcessedBlock(ctx context.Context, block uint64, dbTx pgx.Tx) error {
// StoreLastProcessedBlock stores a record of a block processed by the synchronizer for named task
func (db *DB) StoreLastProcessedBlock(ctx context.Context, task string, block uint64, dbTx pgx.Tx) error {
const storeLastProcessedBlockSQL = `
INSERT INTO data_node.sync_info (block)
VALUES ($1)
ON CONFLICT (block) DO UPDATE
SET processed = NOW();
INSERT INTO data_node.sync_tasks (task, block)
VALUES ($1, $2)
ON CONFLICT (task) DO UPDATE
SET block = EXCLUDED.block, processed = NOW();
`

if _, err := dbTx.Exec(ctx, storeLastProcessedBlockSQL, block); err != nil {
if _, err := dbTx.Exec(ctx, storeLastProcessedBlockSQL, task, block); err != nil {
return err
}
return nil
Expand Down
16 changes: 16 additions & 0 deletions db/migrations/0003.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-- +migrate Down
DROP TABLE IF EXISTS data_node.sync_tasks CASCADE;

-- +migrate Up
CREATE TABLE data_node.sync_tasks
(
task VARCHAR PRIMARY KEY,
block BIGINT NOT NULL,
processed TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);

-- transfer data from old table to new
INSERT INTO data_node.sync_tasks (task, block)
SELECT 'L1', MAX(block) FROM data_node.sync_info;

DROP TABLE IF EXISTS data_node.sync_info CASCADE;
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ require (
github.com/gorilla/websocket v1.5.0
github.com/hermeznetwork/tracerr v0.3.2
github.com/invopop/jsonschema v0.7.0
github.com/jackc/pgconn v1.14.1
github.com/jackc/pgx/v4 v4.18.1
github.com/miguelmota/go-solidity-sha3 v0.1.1
github.com/mitchellh/mapstructure v1.5.0
Expand Down Expand Up @@ -43,6 +42,7 @@ require (
github.com/holiman/uint256 v1.2.3 // indirect
github.com/iancoleman/orderedmap v0.0.0-20190318233801-ac98e3ecb4b0 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.14.1 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.3.2 // indirect
Expand Down
2 changes: 1 addition & 1 deletion synchronizer/batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (bs *BatchSynchronizer) handleReorgs() {
// only reset start block if necessary
continue
}
err = rewindStartBlock(bs.db, r.Number)
err = setStartBlock(bs.db, r.Number)
if err != nil {
log.Errorf("failed to store new start block to %d: %v", r.Number, err)
}
Expand Down
20 changes: 4 additions & 16 deletions synchronizer/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ import (

const dbTimeout = 2 * time.Second

const l1SyncTask = "L1"

func getStartBlock(db *db.DB) (uint64, error) {
ctx, cancel := context.WithTimeout(context.Background(), dbTimeout)
defer cancel()

start, err := db.GetLastProcessedBlock(ctx)
start, err := db.GetLastProcessedBlock(ctx, l1SyncTask)
if err != nil {
log.Errorf("error retrieving last processed block, starting from 0: %v", err)
}
Expand All @@ -37,7 +39,7 @@ func setStartBlock(db *db.DB, block uint64) error {
if dbTx, err = db.BeginStateTransaction(ctx); err != nil {
return err
}
err = db.StoreLastProcessedBlock(ctx, block, dbTx)
err = db.StoreLastProcessedBlock(ctx, l1SyncTask, block, dbTx)
if err != nil {
return err
}
Expand All @@ -47,20 +49,6 @@ func setStartBlock(db *db.DB, block uint64) error {
return nil
}

func rewindStartBlock(db *db.DB, lca uint64) error {
ctx, cancel := context.WithTimeout(context.Background(), dbTimeout)
defer cancel()

rewind, err := db.ResetLastProcessedBlock(ctx, lca)
if err != nil {
return err
}
if rewind > 0 {
log.Infof("rewound %d blocks", rewind)
}
return nil
}

func exists(db *db.DB, key common.Hash) bool {
ctx, cancel := context.WithTimeout(context.Background(), dbTimeout)
defer cancel()
Expand Down
21 changes: 4 additions & 17 deletions test/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@ run: stop ## Runs a full node for e2e
.PHONY: stop
stop: ## Stop a full data node
make stop-dacs
make stop-dac-dbs
$(STOP)

.PHONY: stop-dacs
stop-dacs:
@./stop-dacs > /dev/null 2>&1

.PHONY: test-e2e
test-e2e: run ## Runs the E2E tests
trap '$(STOP)' EXIT; MallocNanoZone=0 go test -count=1 -race -v -p 1 -timeout 600s ./e2e/...
Expand Down Expand Up @@ -54,19 +57,3 @@ run-network: ## Runs the l1 network
stop-network: ## Stops the l1 network
docker compose stop l1 && docker compose rm -f l1


### manual utility to stop dac nodes that were started by e2e, but not stopped by e2e ###
.PHONY: stop-dac-nodes
stop-dac-nodes:
for i in 0 1 2 3 4 ; do \
(docker kill cdk-data-availability-$$i || true) && (docker rm cdk-data-availability-$$i || true) ; \
done

.PHONY: stop-dac-dbs
stop-dac-dbs:
for i in 0 1 2 3 4 ; do \
(docker kill cdk-validium-data-node-db-$$i || true) && (docker rm cdk-validium-data-node-db-$$i || true); \
done

.PHONY: stop-dacs
stop-dacs: stop-dac-nodes stop-dac-dbs
9 changes: 9 additions & 0 deletions test/stop-dacs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/bin/sh

for x in cdk-data-availability cdk-validium-data-node-db; do
echo $x
for i in 0 1 2 3 4; do
docker kill $x-$i || true
docker rm $x-$i || true
done
done

0 comments on commit 3488606

Please sign in to comment.