Skip to content

Commit

Permalink
pg types and full changeset foundation
Browse files Browse the repository at this point in the history
This reworks the pg type system, allowing appropriately typed variables at higher
levels (outside pg package).

This also enables FULL replication identity for all tables, even those with a primary key,
which is to allow full changeset collection to support the upcoming network migrations
system.

---------

Co-authored-by: Jonathan Chappelow <[email protected]>

* avoid sentry table seq updates for non-2pc txns

* txapp: remove contextual params from mempool struct
  • Loading branch information
brennanjl authored Jul 15, 2024
1 parent c674ee9 commit c983b22
Show file tree
Hide file tree
Showing 50 changed files with 2,911 additions and 690 deletions.
61 changes: 40 additions & 21 deletions cmd/kwil-admin/cmds/snapshot/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"math/big"
Expand All @@ -16,6 +17,7 @@ import (
"strconv"
"strings"

"github.com/kwilteam/kwil-db/cmd/common/display"
"github.com/kwilteam/kwil-db/common/chain"

"github.com/spf13/cobra"
Expand All @@ -34,8 +36,6 @@ kwil-admin snapshot create --dbname kwildb --user user1 --password pass1 --host
# Snapshot and genesis files will be created in the snapshot directory
ls /path/to/snapshot/dir
genesis.json kwildb-snapshot.sql.gz`

createDatabase = `CREATE DATABASE `
)

/*
Expand All @@ -59,10 +59,16 @@ func createCmd() *cobra.Command {
RunE: func(cmd *cobra.Command, args []string) error {
snapshotDir, err := expandPath(snapshotDir)
if err != nil {
return fmt.Errorf("failed to expand snapshot directory path: %w", err)
return display.PrintErr(cmd, fmt.Errorf("failed to expand snapshot directory path: %v", err))
}

return pgDump(cmd.Context(), dbName, dbUser, dbPass, dbHost, dbPort, maxRowSize, snapshotDir)
logs, err := pgDump(cmd.Context(), dbName, dbUser, dbPass, dbHost, dbPort, maxRowSize, snapshotDir)
if err != nil {
return display.PrintErr(cmd, fmt.Errorf("failed to create database snapshot: %v", err))
}

r := &createSnapshotRes{Logs: logs}
return display.PrintCmd(cmd, r)
},
}

Expand All @@ -77,6 +83,18 @@ func createCmd() *cobra.Command {
return cmd
}

type createSnapshotRes struct {
Logs []string `json:"logs"`
}

func (c *createSnapshotRes) MarshalJSON() ([]byte, error) {
return json.Marshal(c)
}

func (c *createSnapshotRes) MarshalText() (text []byte, err error) {
return []byte(fmt.Sprintf("Snapshot created successfully\n%s", strings.Join(c.Logs, "\n"))), nil
}

func expandPath(path string) (string, error) {
if strings.HasPrefix(path, "~") {
home, err := os.UserHomeDir()
Expand All @@ -88,17 +106,19 @@ func expandPath(path string) (string, error) {
return filepath.Abs(path)
}

func pgDump(ctx context.Context, dbName, dbUser, dbPass, dbHost, dbPort string, maxRowSize int, snapshotDir string) (err error) {
// PGDump uses pg_dump to create a snapshot of the database.
// It returns messages to log and an error if any.
func pgDump(ctx context.Context, dbName, dbUser, dbPass, dbHost, dbPort string, maxRowSize int, snapshotDir string) (logs []string, err error) {
// Check if the snapshot directory exists, if not create it
err = os.MkdirAll(snapshotDir, 0755)
if err != nil {
return fmt.Errorf("failed to create snapshot directory: %w", err)
return nil, fmt.Errorf("failed to create snapshot directory: %w", err)
}

dumpFile := filepath.Join(snapshotDir, "kwildb-snapshot.sql.gz")
outputFile, err := os.Create(dumpFile)
if err != nil {
return fmt.Errorf("failed to create dump file: %w", err)
return nil, fmt.Errorf("failed to create dump file: %w", err)
}
// delete the dump file if an error occurs anywhere during the snapshot process
defer func() {
Expand Down Expand Up @@ -154,12 +174,12 @@ func pgDump(ctx context.Context, dbName, dbUser, dbPass, dbHost, dbPort string,
var stderr bytes.Buffer
pgDumpOutput, err := pgDumpCmd.StdoutPipe()
if err != nil {
return fmt.Errorf("failed to get stdout pipe: %w", err)
return nil, fmt.Errorf("failed to get stdout pipe: %w", err)
}
pgDumpCmd.Stderr = &stderr

if err := pgDumpCmd.Start(); err != nil {
return fmt.Errorf("failed to start pg_dump command: %w", err)
return nil, fmt.Errorf("failed to start pg_dump command: %w", err)
}
defer pgDumpOutput.Close()

Expand Down Expand Up @@ -187,24 +207,24 @@ func pgDump(ctx context.Context, dbName, dbUser, dbPass, dbHost, dbPort string,
inVotersBlock = false
n, err := multiWriter.Write([]byte(line + "\n"))
if err != nil {
return fmt.Errorf("failed to write to gzip writer: %w", err)
return nil, fmt.Errorf("failed to write to gzip writer: %w", err)
}
totalBytes += int64(n)
continue
}

strs := strings.Split(line, "\t")
if len(strs) != 3 {
return fmt.Errorf("invalid voter line: %s", line)
return nil, fmt.Errorf("invalid voter line: %s", line)
}
voterID, err := hex.DecodeString(strs[1][3:]) // Remove the leading \\x
if err != nil {
return fmt.Errorf("failed to decode voter ID: %w", err)
return nil, fmt.Errorf("failed to decode voter ID: %w", err)
}

power, err := strconv.ParseInt(strs[2], 10, 64)
if err != nil {
return fmt.Errorf("failed to parse power: %w", err)
return nil, fmt.Errorf("failed to parse power: %w", err)
}

genCfg.Validators = append(genCfg.Validators, &chain.GenesisValidator{
Expand All @@ -221,7 +241,7 @@ func pgDump(ctx context.Context, dbName, dbUser, dbPass, dbHost, dbPort string,
} else if strings.HasPrefix(line, "SET") || strings.HasPrefix(line, "SELECT") || strings.HasPrefix(line[1:], "connect") {
// Skip SET and SELECT and connect statements
continue
} else if strings.HasPrefix(line, createDatabase) {
} else if strings.HasPrefix(line, `CREATE DATABASE `) {
// Skip CREATE DATABASE statement
} else {
if strings.HasPrefix(line, "COPY kwild_voting.voters") && strings.Contains(line, "FROM stdin;") {
Expand All @@ -231,20 +251,20 @@ func pgDump(ctx context.Context, dbName, dbUser, dbPass, dbHost, dbPort string,
// Write the sanitized line to the gzip writer
n, err := multiWriter.Write([]byte(line + "\n"))
if err != nil {
return fmt.Errorf("failed to write to gzip writer: %w", err)
return nil, fmt.Errorf("failed to write to gzip writer: %w", err)
}
totalBytes += int64(n)
}
}
}

if err := scanner.Err(); err != nil {
return fmt.Errorf("failed to scan pg_dump output: %w", err)
return nil, fmt.Errorf("failed to scan pg_dump output: %w", err)
}

// Close the writer when pg_dump completes to signal EOF to sed
if err := pgDumpCmd.Wait(); err != nil {
return fmt.Errorf(stderr.String())
return nil, fmt.Errorf(stderr.String())
}

gzipWriter.Flush()
Expand All @@ -254,10 +274,9 @@ func pgDump(ctx context.Context, dbName, dbUser, dbPass, dbHost, dbPort string,
// Write the genesis config to a file
genesisFile := filepath.Join(snapshotDir, "genesis.json")
if err := genCfg.SaveAs(genesisFile); err != nil {
return fmt.Errorf("failed to save genesis config: %w", err)
return nil, fmt.Errorf("failed to save genesis config: %w", err)
}

fmt.Println("Snapshot created at: ", dumpFile, " Total bytes written: ", totalBytes)
fmt.Println("Genesis config created at: ", genesisFile, " Genesis hash: ", fmt.Sprintf("%x", hash))
return nil
return []string{fmt.Sprintf("Snapshot created at: %s, Total bytes written: %d", dumpFile, totalBytes),
fmt.Sprintf("Genesis config created at: %s, Genesis hash: %s", genesisFile, fmt.Sprintf("%x", hash))}, nil
}
12 changes: 6 additions & 6 deletions cmd/kwild/server/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func buildServer(d *coreDependencies, closers *closeFuncs) *Server {
rpcSvcLogger := increaseLogLevel("user-json-svc", &d.log, d.cfg.Logging.RPCLevel)
rpcServerLogger := increaseLogLevel("user-jsonrpc-server", &d.log, d.cfg.Logging.RPCLevel)

jsonRPCTxSvc := usersvc.NewService(db, e, wrappedCmtClient, txApp,
jsonRPCTxSvc := usersvc.NewService(db, e, wrappedCmtClient, txApp, abciApp,
*rpcSvcLogger, usersvc.WithReadTxTimeout(time.Duration(d.cfg.AppCfg.ReadTxTimeout)))
jsonRPCServer, err := rpcserver.NewServer(d.cfg.AppCfg.JSONRPCListenAddress,
*rpcServerLogger, rpcserver.WithTimeout(time.Duration(d.cfg.AppCfg.RPCTimeout)),
Expand All @@ -228,15 +228,15 @@ func buildServer(d *coreDependencies, closers *closeFuncs) *Server {

// admin service and server
signer := buildSigner(d)
jsonAdminSvc := adminsvc.NewService(db, wrappedCmtClient, txApp, signer, d.cfg,
jsonAdminSvc := adminsvc.NewService(db, wrappedCmtClient, txApp, abciApp, signer, d.cfg,
d.genesisCfg.ChainID, *d.log.Named("admin-json-svc"))
jsonRPCAdminServer := buildJRPCAdminServer(d)
jsonRPCAdminServer.RegisterSvc(jsonAdminSvc)
jsonRPCAdminServer.RegisterSvc(jsonRPCTxSvc)
jsonRPCAdminServer.RegisterSvc(&funcsvc.Service{})

// legacy tx service and grpc server
txsvc := buildTxSvc(d, db, e, wrappedCmtClient, txApp)
txsvc := buildTxSvc(d, db, e, wrappedCmtClient, txApp, abciApp)
grpcServer := buildGrpcServer(d, txsvc)

return &Server{
Expand Down Expand Up @@ -377,7 +377,7 @@ func buildAbci(d *coreDependencies, db *pg.DB, txApp abci.TxApp, snapshotter *st
}

func buildEventBroadcaster(d *coreDependencies, ev broadcast.EventStore, b broadcast.Broadcaster, txapp *txapp.TxApp) *broadcast.EventBroadcaster {
return broadcast.NewEventBroadcaster(ev, b, txapp, buildSigner(d), d.genesisCfg.ChainID, d.genesisCfg.ConsensusParams.Votes.MaxVotesPerTx)
return broadcast.NewEventBroadcaster(ev, b, txapp, buildSigner(d), d.genesisCfg.ChainID, d.genesisCfg.ConsensusParams.Votes.MaxVotesPerTx, *d.log.Named("event-broadcaster"))
}

func buildEventStore(d *coreDependencies, closers *closeFuncs) *voting.EventStore {
Expand All @@ -396,8 +396,8 @@ func buildEventStore(d *coreDependencies, closers *closeFuncs) *voting.EventStor
return e
}

func buildTxSvc(d *coreDependencies, db *pg.DB, txsvc txSvc.EngineReader, cometBftClient txSvc.BlockchainTransactor, nodeApp txSvc.NodeApplication) *txSvc.Service {
return txSvc.NewService(db, txsvc, cometBftClient, nodeApp,
func buildTxSvc(d *coreDependencies, db *pg.DB, txsvc txSvc.EngineReader, cometBftClient txSvc.BlockchainTransactor, nodeApp txSvc.NodeApplication, pricer txSvc.Pricer) *txSvc.Service {
return txSvc.NewService(db, txsvc, cometBftClient, nodeApp, pricer,
txSvc.WithLogger(*d.log.Named("tx-service")),
txSvc.WithReadTxTimeout(time.Duration(d.cfg.AppCfg.ReadTxTimeout)),
)
Expand Down
19 changes: 9 additions & 10 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,13 @@ type NetworkParameters struct {
VoteExpiry int64
// DisabledGasCosts dictates whether gas costs are disabled.
DisabledGasCosts bool
}

// Copy returns a deep copy of the network parameters.
func (n *NetworkParameters) Copy() *NetworkParameters {
return &NetworkParameters{
MaxBlockSize: n.MaxBlockSize,
JoinExpiry: n.JoinExpiry,
VoteExpiry: n.VoteExpiry,
DisabledGasCosts: n.DisabledGasCosts,
}
// InMigration is true if the network is being migrated to a new network.
// Once this is set to true, it can never be set to false. If true,
// new databases cannot be created, old databases cannot be deleted,
// balances cannot be transferred
// and the vote store is paused.
InMigration bool
// MaxVotesPerTx is the maximum number of votes that can be included in a
// single transaction.
MaxVotesPerTx int64
}
31 changes: 18 additions & 13 deletions common/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package sql
import (
"context"
"errors"
"io"
)

var (
Expand Down Expand Up @@ -58,15 +59,16 @@ type Tx interface {
// create transactions, which may be closed or create additional nested
// transactions.
//
// Some implementations may also be an OuterTxMaker and/or a ReadTxMaker. Embed
// Some implementations may also be an PreparedTxMaker and/or a ReadTxMaker. Embed
// with those interfaces to compose the minimal interface required.
type DB interface {
Executor
TxMaker
}

// ReadTxMaker can make read-only transactions.
// Many read-only transactions can be made at once.
// ReadTxMaker can make read-only transactions. This is necessarily an outermost
// transaction since nested transactions inherit their access mode from their
// parent. Many read-only transactions can be made at once.
type ReadTxMaker interface {
BeginReadTx(ctx context.Context) (Tx, error)
}
Expand All @@ -78,22 +80,25 @@ type DelayedReadTxMaker interface {
BeginDelayedReadTx() Tx
}

// OuterTx is the outermost database transaction.
// PreparedTx is an outermost database transaction that uses two-phase commit
// with the Precommit method.
//
// NOTE: An OuterTx may be used where only a Tx or DB is required since those
// interfaces are a subset of the OuterTx method set.
type OuterTx interface {
// NOTE: A PreparedTx may be used where only a Tx or DB is required since those
// interfaces are a subset of the PreparedTx method set.
// It takes a writer to write the full changeset to.
// If the writer is nil, the changeset will not be written.
type PreparedTx interface {
Tx
Precommit(ctx context.Context) ([]byte, error)
Precommit(ctx context.Context, writer io.Writer) ([]byte, error)
}

// OuterTxMaker is the special kind of transaction that creates a transaction
// that has a Precommit method (see OuterTx), which supports obtaining a commit
// PreparedTxMaker is the special kind of transaction that creates a transaction
// that has a Precommit method (see PreparedTx), which supports obtaining a commit
// ID using a (two-phase) prepared transaction prior to Commit. This is a
// different method name so that an implementation may satisfy both OuterTxMaker
// different method name so that an implementation may satisfy both PreparedTxMaker
// and TxMaker.
type OuterTxMaker interface {
BeginOuterTx(ctx context.Context) (OuterTx, error)
type PreparedTxMaker interface {
BeginPreparedTx(ctx context.Context) (PreparedTx, error)
}

// SnapshotTxMaker is an interface that creates a transaction for taking a
Expand Down
24 changes: 19 additions & 5 deletions core/types/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -1254,21 +1254,35 @@ var (
IntType = &DataType{
Name: intStr,
}
TextType = &DataType{
IntArrayType = ArrayType(IntType)
TextType = &DataType{
Name: textStr,
}
BoolType = &DataType{
TextArrayType = ArrayType(TextType)
BoolType = &DataType{
Name: boolStr,
}
BlobType = &DataType{
BoolArrayType = ArrayType(BoolType)
BlobType = &DataType{
Name: blobStr,
}
UUIDType = &DataType{
BlobArrayType = ArrayType(BlobType)
UUIDType = &DataType{
Name: uuidStr,
}
Uint256Type = &DataType{
UUIDArrayType = ArrayType(UUIDType)
// DecimalType contains 1,0 metadata.
// For type detection, users should prefer compare a datatype
// name with the DecimalStr constant.
DecimalType = &DataType{
Name: DecimalStr,
Metadata: [2]uint16{1, 0}, // the minimum precision and scale
}
DecimalArrayType = ArrayType(DecimalType)
Uint256Type = &DataType{
Name: uint256Str,
}
Uint256ArrayType = ArrayType(Uint256Type)
// NullType is a special type used internally
NullType = &DataType{
Name: nullStr,
Expand Down
14 changes: 14 additions & 0 deletions core/types/transactions/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const (
CodeDatasetMissing TxCode = 110
CodeDatasetExists TxCode = 120

CodeNetworkInMigration TxCode = 200

CodeUnknownError TxCode = math.MaxUint32
)

Expand Down Expand Up @@ -71,6 +73,18 @@ func (c TxCode) String() string {
return "insufficient fee"
case CodeInvalidAmount:
return "invalid amount"
case CodeInvalidSender:
return "invalid sender"
case CodeInvalidSchema:
return "invalid schema"
case CodeDatasetMissing:
return "dataset missing"
case CodeDatasetExists:
return "dataset exists"
case CodeNetworkInMigration:
return "network in migration"
case CodeUnknownError:
return "unknown error"
default:
return "unknown tx error"
}
Expand Down
Loading

0 comments on commit c983b22

Please sign in to comment.