Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Validator and resolution processing fixes
Browse files Browse the repository at this point in the history
charithabandi committed Jan 13, 2025
1 parent 5fae5ef commit e4dca65
Showing 20 changed files with 253 additions and 157 deletions.
5 changes: 3 additions & 2 deletions core/types/payloads.go
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@ import (
"reflect"
"strconv"

"github.com/kwilteam/kwil-db/core/crypto"
"github.com/kwilteam/kwil-db/core/types/decimal"
"github.com/kwilteam/kwil-db/core/types/serialize"
)
@@ -607,7 +608,7 @@ func (v *ValidatorJoin) UnmarshalBinary(b []byte) error {
// ValidatorApprove is used to vote for a validators approval to join the network
type ValidatorApprove struct {
Candidate []byte
KeyType string
KeyType crypto.KeyType
}

func (v *ValidatorApprove) Type() PayloadType {
@@ -662,7 +663,7 @@ func (v *ValidatorApprove) UnmarshalBinary(b []byte) error {
// ValidatorRemove is used to vote for a validators removal from the network
type ValidatorRemove struct {
Validator []byte
KeyType string
KeyType crypto.KeyType
}

func (v *ValidatorRemove) Type() PayloadType {
55 changes: 3 additions & 52 deletions node/block_processor/processor.go
Original file line number Diff line number Diff line change
@@ -389,58 +389,6 @@ func (bp *BlockProcessor) InitChain(ctx context.Context) (int64, []byte, error)
return genCfg.InitialHeight, genCfg.StateHash, nil
}

// LoadFromDBState loads the chain state from the database. This is called when the node is
// bootstraped using statesync and the chain state is starting from a non-genesis height.
// In this scenario, the InitChain method is not called during the bootstrap process.
func (bp *BlockProcessor) LoadFromDBState(ctx context.Context) error {
bp.mtx.Lock()
defer bp.mtx.Unlock()

tx, err := bp.db.BeginReadTx(ctx)
if err != nil {
return fmt.Errorf("failed to begin read transaction: %w", err)
}
defer tx.Rollback(ctx)

height, appHash, dirty, err := meta.GetChainState(ctx, tx)
if err != nil {
return fmt.Errorf("failed to get chain state: %w", err)
}

if dirty {
return fmt.Errorf("chain state is dirty, should have been cleanedup in the blockprocesser constructor")
}

// set the block proposer back to it's state loaded from the database after the statesync
bp.height.Store(height)
copy(bp.appHash[:], appHash)

// persist last changeset height in the migrator if it is in migration
bp.migrator.PersistLastChangesetHeight(ctx, tx, height)

networkParams, err := meta.LoadParams(ctx, tx)
if err != nil {
return fmt.Errorf("failed to load the network parameters: %w", err)
}
bp.chainCtx.NetworkParameters = networkParams

// Also ensure that all the modules are initialized with the correct validatorset from the statesync.
// ce, bp, votestore, listenerMgr etc.
// Dang, this is really painful effect of statesync, we have to update any in-memory state of all the
// modules to that of the state from the snapshot.

// Votestore should be loaded from the database
err = bp.validators.LoadValidatorSet(ctx, tx)
if err != nil {
return fmt.Errorf("failed to load the validator set: %w", err)
}

// notify the validator updates to all the subscribers
bp.announceValidators()

return nil
}

func (bp *BlockProcessor) ExecuteBlock(ctx context.Context, req *ktypes.BlockExecRequest) (blkResult *ktypes.BlockExecResult, err error) {
bp.mtx.Lock()
defer bp.mtx.Unlock()
@@ -478,6 +426,8 @@ func (bp *BlockProcessor) ExecuteBlock(ctx context.Context, req *ktypes.BlockExe
inMigration := blockCtx.ChainContext.NetworkParameters.MigrationStatus == ktypes.MigrationInProgress
haltNetwork := blockCtx.ChainContext.NetworkParameters.MigrationStatus == ktypes.MigrationCompleted

bp.log.Info("Migration status: ", "status", blockCtx.ChainContext.NetworkParameters.MigrationStatus)

txResults := make([]ktypes.TxResult, len(req.Block.Txns))

txHashes := bp.initBlockExecutionStatus(req.Block)
@@ -553,6 +503,7 @@ func (bp *BlockProcessor) ExecuteBlock(ctx context.Context, req *ktypes.BlockExe
return nil, fmt.Errorf("failed to notify the migrator about the block height: %w", err)
}

bp.log.Info("Migration Status after block processor: ", "status", bp.chainCtx.NetworkParameters.MigrationStatus)
networkParams.MigrationStatus = bp.chainCtx.NetworkParameters.MigrationStatus

if err := meta.SetChainState(ctx, bp.consensusTx, req.Height, bp.appHash[:], true); err != nil {
12 changes: 11 additions & 1 deletion node/migrations/migrator.go
Original file line number Diff line number Diff line change
@@ -142,6 +142,8 @@ func (m *Migrator) NotifyHeight(ctx context.Context, block *common.BlockContext,
m.mu.Lock()
defer m.mu.Unlock()

m.Logger.Info("Notify Height: ", "height", block.Height, " Migration Status: ", block.ChainContext.NetworkParameters.MigrationStatus, " migration: ", m.activeMigration)

if block.ChainContext.NetworkParameters.MigrationStatus == types.ActivationPeriod && m.activeMigration == nil {
// if the network is in activation period, but there is no active migration, then
// this is the block at which the migration is approved by the network.
@@ -151,25 +153,30 @@ func (m *Migrator) NotifyHeight(ctx context.Context, block *common.BlockContext,
}

m.activeMigration = activeM
m.Logger.Info("Migration approved by the network: ", "migrationInfo", m.activeMigration)
}

// if there is no active migration, there is nothing to do
if m.activeMigration == nil {
m.Logger.Info("No active migration found")
return nil
}

// if not in a migration, we can return early
if block.Height < m.activeMigration.StartHeight-1 {
m.Logger.Info("Not in migration")
return nil
}

if block.Height > m.activeMigration.EndHeight {
m.Logger.Info("Migration already completed")
return nil
}

if block.Height == m.activeMigration.StartHeight-1 {
// set the migration in progress, so that we record the changesets starting from the next block
block.ChainContext.NetworkParameters.MigrationStatus = types.MigrationInProgress
m.Logger.Info("Migration Status shld be in migration: ", "status", block.ChainContext.NetworkParameters.MigrationStatus)
return nil
}

@@ -188,6 +195,9 @@ func (m *Migrator) NotifyHeight(ctx context.Context, block *common.BlockContext,
// and will instead need to be recorded as the first changeset of the migration.

if block.Height == m.activeMigration.StartHeight {
m.Logger.Info("Starting migration at height: ", "height", block.Height)
block.ChainContext.NetworkParameters.MigrationStatus = types.MigrationInProgress

tx, snapshotId, err := db.BeginSnapshotTx(ctx)
if err != nil {
return err
@@ -222,7 +232,7 @@ func (m *Migrator) NotifyHeight(ctx context.Context, block *common.BlockContext,
if block.Height == m.activeMigration.EndHeight {
// starting from here, no more transactions of any kind will be accepted or mined.
block.ChainContext.NetworkParameters.MigrationStatus = types.MigrationCompleted
m.Logger.Info("migration to chain completed, no new transactions will be accepted")
m.Logger.Info("migration to chain completed, no new transactions will be accepted", "status", block.ChainContext.NetworkParameters.MigrationStatus)
}

return nil
2 changes: 0 additions & 2 deletions node/node_test.go
Original file line number Diff line number Diff line change
@@ -245,8 +245,6 @@ func (bp *dummyBP) SubscribeValidators() <-chan []*ktypes.Validator {
}
func (bp *dummyBP) LoadFromDBState(ctx context.Context) error { return nil }

func (bp *dummyBP) LoadFromDBState(ctx context.Context) error { return nil }

// func hupStreamHandler(s network.Stream) { s.Close() }

var _ ConsensusEngine = &dummyCE{}
4 changes: 2 additions & 2 deletions node/services/jsonrpc/adminsvc/service.go
Original file line number Diff line number Diff line change
@@ -357,7 +357,7 @@ func (svc *Service) sendTx(ctx context.Context, payload ktypes.Payload) (*userjs
func (svc *Service) Approve(ctx context.Context, req *adminjson.ApproveRequest) (*userjson.BroadcastResponse, *jsonrpc.Error) {
return svc.sendTx(ctx, &ktypes.ValidatorApprove{
Candidate: req.PubKey,
KeyType: req.PubKeyType.String(),
KeyType: req.PubKeyType,
})
}

@@ -370,7 +370,7 @@ func (svc *Service) Join(ctx context.Context, req *adminjson.JoinRequest) (*user
func (svc *Service) Remove(ctx context.Context, req *adminjson.RemoveRequest) (*userjson.BroadcastResponse, *jsonrpc.Error) {
return svc.sendTx(ctx, &ktypes.ValidatorRemove{
Validator: req.PubKey,
KeyType: req.PubKeyType.String(),
KeyType: req.PubKeyType,
})
}

10 changes: 2 additions & 8 deletions node/txapp/routes.go
Original file line number Diff line number Diff line change
@@ -509,10 +509,7 @@ func (d *validatorApproveRoute) PreTx(ctx *common.TxContext, svc *common.Service
}

d.candidate = approve.Candidate
d.keyType, err = crypto.ParseKeyType(approve.KeyType)
if err != nil {
return types.CodeUnknownError, fmt.Errorf("failed to parse key type: %w", err)
}
d.keyType = approve.KeyType

return 0, nil
}
@@ -582,10 +579,7 @@ func (d *validatorRemoveRoute) PreTx(ctx *common.TxContext, svc *common.Service,
}

d.target = remove.Validator
d.keyType, err = crypto.ParseKeyType(remove.KeyType)
if err != nil {
return types.CodeUnknownError, fmt.Errorf("failed to parse key type: %w", err)
}
d.keyType = remove.KeyType

return 0, nil
}
5 changes: 4 additions & 1 deletion node/txapp/txapp.go
Original file line number Diff line number Diff line change
@@ -294,6 +294,7 @@ func (r *TxApp) processVotes(ctx context.Context, db sql.DB, block *common.Block
return err
}

fmt.Println("expired resolutions", expired)
expiredIDs := make([]*types.UUID, 0, len(expired))
requiredPowerMap := make(map[string]int64) // map of resolution type to required power

@@ -319,7 +320,7 @@ func (r *TxApp) processVotes(ctx context.Context, db sql.DB, block *common.Block
credits.applyResolution(resolution)
}

r.service.Logger.Debug("expiring resolution", "type", resolution.Type, "id", resolution.ID.String(), "refunded", refunded)
r.service.Logger.Info("expiring resolution", "type", resolution.Type, "id", resolution.ID.String(), "refunded", refunded)
}

allIDs := append(finalizedIDs, expiredIDs...)
@@ -372,6 +373,8 @@ type creditMap map[string]*big.Int
func (c creditMap) applyResolution(res *resolutions.Resolution) {
// reward voters.
// this will include the proposer, even if they did not submit a vote id
fmt.Println("crediting the resolution", res)

for _, voter := range res.Voters {
// if the voter is the proposer, then we will reward them below,
// since extra logic is required if they did not submit a vote id
5 changes: 4 additions & 1 deletion node/voting/voting.go
Original file line number Diff line number Diff line change
@@ -242,6 +242,7 @@ func DeleteResolution(ctx context.Context, db sql.TxMaker, id *types.UUID) error
// It expects there to be 7 columns in the row, in the following order:
// id, body, type, expiration, approved_power, voters, voteBodyProposer
func fromRow(ctx context.Context, db sql.Executor, row []any) (*resolutions.Resolution, error) {
fmt.Println("fromROW: ", row)
if len(row) != 7 {
return nil, fmt.Errorf("expected 7 columns, got %d", len(row))
}
@@ -310,7 +311,7 @@ func fromRow(ctx context.Context, db sql.Executor, row []any) (*resolutions.Reso
// retrieve the power of the proposer if it's issued by the Validator
proposer, err := getValidator(ctx, db, pubKey, keyType)
if err != nil {
return nil, fmt.Errorf("failed to get proposer: %w", err)
return nil, fmt.Errorf("failed to get proposer validator: %w", err)
}
if proposer != nil {
v.Proposer.Power = proposer.Power
@@ -385,6 +386,7 @@ func GetExpired(ctx context.Context, db sql.Executor, blockHeight int64) ([]*res

ids := make([]*resolutions.Resolution, len(res.Rows))
for i, row := range res.Rows {
fmt.Println("Expired ROW: ", row)
ids[i], err = fromRow(ctx, db, row)
if err != nil {
return nil, fmt.Errorf("internal bug: %w", err)
@@ -730,6 +732,7 @@ func getValidator(ctx context.Context, db sql.Executor, pubKey []byte, keyType c
}

row := res.Rows[0]
fmt.Println("Voter Row: ", row)
voterBts, ok := row[0].([]byte)
if !ok {
return nil, errors.New("invalid type for voter")
File renamed without changes.
130 changes: 89 additions & 41 deletions test/integration/kwild_test.go
Original file line number Diff line number Diff line change
@@ -29,10 +29,14 @@ func TestKwildDatabaseIntegration(t *testing.T) {

ctx := context.Background()

ping, err := p.Nodes[0].JSONRPCClient(t, ctx, false).Ping(ctx)
clt := p.Nodes[0].JSONRPCClient(t, ctx, false)

ping, err := clt.Ping(ctx)
require.NoError(t, err)

require.Equal(t, "pong", ping)

// specifications.CreateNamespaceSpecification(ctx, t, clt)
}

// TestKwildValidatorUpdates is to test the functionality of
@@ -51,9 +55,9 @@ func TestKwildValidatorUpdates(t *testing.T) {
nc.Validator = false
}),
},
ConfigureGenesis: func(genDoc *config.GenesisConfig) {
genDoc.JoinExpiry = 5 // 5 sec at 1block/sec
},
// ConfigureGenesis: func(genDoc *config.GenesisConfig) {
// genDoc.JoinExpiry = 5 // 5 sec at 1block/sec
// },
DBOwner: "0xabc",
},
})
@@ -77,9 +81,12 @@ func TestKwildValidatorUpdates(t *testing.T) {
// Reject leave requests from a non-validator
specifications.NonValidatorLeaveSpecification(ctx, t, n3Admin)

// Reject leave requests from the leader
specifications.InvalidLeaveSpecification(ctx, t, n0Admin)

time.Sleep(2 * time.Second)

// Node1 and 2 are Validators and Node3 will issue a join request and requires approval from both validators
// Node0 and 1 are Validators and Node2 will issue a join request and requires approval from both validators
specifications.ValidatorNodeJoinSpecification(ctx, t, n2Admin, p.Nodes[2].PrivateKey(), 2)

// Nodes can't self approve join requests
@@ -91,16 +98,16 @@ func TestKwildValidatorUpdates(t *testing.T) {
// node0 approves the join request
specifications.ValidatorNodeApproveSpecification(ctx, t, n0Admin, p.Nodes[2].PrivateKey(), 2, 2, false)

// node1 approves the join request and the node3 becomes a validator
// node1 approves the join request and the node2 becomes a validator
specifications.ValidatorNodeApproveSpecification(ctx, t, n1Admin, p.Nodes[2].PrivateKey(), 2, 3, true)

// Ensure that the network has 3 validators
specifications.CurrentValidatorsSpecification(ctx, t, n3Admin, 3)
specifications.CurrentValidatorsSpecification(ctx, t, n0Admin, 3)
specifications.CurrentValidatorsSpecification(ctx, t, n3Admin, 3)

/*
Leave Process:
- node3 issues a leave request -> removes it from the validator list
- node2 issues a leave request -> removes it from the validator list
- Validator set count should be reduced by 1
*/
specifications.ValidatorNodeLeaveSpecification(ctx, t, n2Admin)
@@ -110,6 +117,7 @@ func TestKwildValidatorUpdates(t *testing.T) {
time.Sleep(2 * time.Second)
specifications.ValidatorNodeApproveSpecification(ctx, t, n0Admin, p.Nodes[2].PrivateKey(), 2, 2, false)
specifications.ValidatorNodeApproveSpecification(ctx, t, n1Admin, p.Nodes[2].PrivateKey(), 2, 3, true)
time.Sleep(2 * time.Second)
specifications.CurrentValidatorsSpecification(ctx, t, n3Admin, 3)
}

@@ -125,8 +133,8 @@ func TestValidatorJoinExpirySpecification(t *testing.T) {
},
ConfigureGenesis: func(genDoc *config.GenesisConfig) {
genDoc.JoinExpiry = 5 // 5 sec at 1block/sec
genDoc.DBOwner = "0xabc"
},
DBOwner: "0xabc",
},
})

@@ -139,7 +147,7 @@ func TestValidatorJoinExpirySpecification(t *testing.T) {
n1Admin := p.Nodes[1].AdminClient(t, ctx)

// Ensure that the network has 2 validators
specifications.CurrentValidatorsSpecification(ctx, t, n0Admin, 2)
specifications.CurrentValidatorsSpecification(ctx, t, n0Admin, 1)

// Reject join requests from an existing validator
specifications.ValidatorJoinExpirySpecification(ctx, t, n1Admin, p.Nodes[1].PrivateKey(), 8*time.Second)
@@ -155,9 +163,6 @@ func TestKwildValidatorRemoveSpecification(t *testing.T) {
setup.DefaultNodeConfig(),
setup.DefaultNodeConfig(),
},
ConfigureGenesis: func(genDoc *config.GenesisConfig) {
genDoc.JoinExpiry = 5 // 5 sec at 1block/sec
},
DBOwner: "0xabc",
},
})
@@ -172,11 +177,13 @@ func TestKwildValidatorRemoveSpecification(t *testing.T) {
n2Admin := p.Nodes[2].AdminClient(t, ctx)

// 4 validators, remove one validator, requires approval from 2 validators
// TODO: (also ensure that the remove is not done on the leader? )
specifications.ValidatorNodeRemoveSpecificationV4R1(ctx, t, n0Admin, n1Admin, n2Admin, p.Nodes[3].PrivateKey())

// Node3 is no longer a validator
specifications.CurrentValidatorsSpecification(ctx, t, n0Admin, 3)

// leader can't be removed from the validator set
specifications.InvalidRemovalSpecification(ctx, t, n1Admin, p.Nodes[0].PrivateKey())
}

func TestKwildBlockSync(t *testing.T) {
@@ -305,12 +312,27 @@ func TestLongRunningNetworkMigrations(t *testing.T) {

var listenAddresses []string
for _, node := range net1.Nodes {
addr, err := node.JSONRPCEndpoint(t, ctx)
_, addr, err := node.JSONRPCEndpoint(t, ctx)
require.NoError(t, err)

listenAddresses = append(listenAddresses, addr)
}

n0Admin := net1.Nodes[0].AdminClient(t, ctx)
n1Admin := net1.Nodes[1].AdminClient(t, ctx)
// n2Admin := net1.Nodes[2].AdminClient(t, ctx)
n3Admin := net1.Nodes[3].AdminClient(t, ctx)

specifications.SubmitMigrationProposal(ctx, t, n0Admin)

// node1 approves the migration again and verifies that the migration is still pending
specifications.ApproveMigration(ctx, t, n0Admin, true)

// non validator can't approve the migration
specifications.NonValidatorApproveMigration(ctx, t, n3Admin)

// node1 approves the migration and verifies that the migration is no longer pending
specifications.ApproveMigration(ctx, t, n1Admin, false)

// Setup a new network with the same keys and enter the activation phase
net2 := setup.SetupTests(t, &setup.TestConfig{
ClientDriver: setup.CLI,
@@ -323,7 +345,7 @@ func TestLongRunningNetworkMigrations(t *testing.T) {
conf.Migrations.MigrateFrom = listenAddresses[0]

conf.Snapshots.Enable = true
conf.Snapshots.RecurringHeight = 50
conf.Snapshots.RecurringHeight = 25
}
}),
setup.CustomNodeConfig(func(nc *setup.NodeConfig) {
@@ -333,7 +355,7 @@ func TestLongRunningNetworkMigrations(t *testing.T) {
conf.Migrations.MigrateFrom = listenAddresses[1]

conf.Snapshots.Enable = true
conf.Snapshots.RecurringHeight = 50
conf.Snapshots.RecurringHeight = 25
}
}),
setup.CustomNodeConfig(func(nc *setup.NodeConfig) {
@@ -357,33 +379,22 @@ func TestLongRunningNetworkMigrations(t *testing.T) {
},
DBOwner: "0xabc",
},
InitialServices: []string{"new-node0", "new-node1", "new-node2", "new-pg0", "new-pg1", "new-pg2"}, // should bringup only node 0,1,2
ServicesPrefix: "new-",
DockerNetwork: net1.NetworkName(),
InitialServices: []string{"new-node0", "new-node1", "new-node2", "new-pg0", "new-pg1", "new-pg2"}, // should bringup only node 0,1,2
ServicesPrefix: "new-",
PortOffset: 100,
DockerNetwork: net1.NetworkName(),
ContainerStartTimeout: 2 * time.Minute, // increase the timeout for downloading the genesis state and starting migration
})

n0Admin := net1.Nodes[0].AdminClient(t, ctx)
n1Admin := net1.Nodes[1].AdminClient(t, ctx)
n2Admin := net1.Nodes[2].AdminClient(t, ctx)
n3Admin := net1.Nodes[3].AdminClient(t, ctx)
// Verify the existence of some data

specifications.SubmitMigrationProposal(ctx, t, n0Admin)

// node1 approves the migration and verifies that the migration is still pending
specifications.ApproveMigration(ctx, t, n1Admin, true)

// node1 approves again and verifies that the migration is still pending as duplicate approvals are not allowed
specifications.ApproveMigration(ctx, t, n1Admin, true)

// non validator can't approve the migration
specifications.NonValidatorApproveMigration(ctx, t, n3Admin)

// node2 approves the migration and verifies that the migration is no longer pending
specifications.ApproveMigration(ctx, t, n2Admin, false)

// Wait for the migration to start
time.Sleep(10 * time.Second)
// time for node to do blocksync and catchup
// net2.RunServices(t, ctx, []*setup.ServiceDefinition{
// setup.KwildServiceDefinition("new-node2"),
// setup.KwildServiceDefinition("new-pg2"),
// })

// time for node to do statesync and catchup
net2.RunServices(t, ctx, []*setup.ServiceDefinition{
setup.KwildServiceDefinition("new-node3"),
setup.KwildServiceDefinition("new-pg3"),
@@ -397,3 +408,40 @@ func TestLongRunningNetworkMigrations(t *testing.T) {
require.NoError(t, err)
require.Greater(t, info.BlockHeight, uint64(50)) // TODO: height > 50 + migration height
}

func TestSingleNodeKwildEthDepositsOracleIntegration(t *testing.T) {
// ctx := context.Background()

// p := setup.SetupTests(t, &setup.TestConfig{
// ClientDriver: setup.CLI,
// Network: &setup.NetworkConfig{
// ExtraServices: []*setup.CustomService{
// {
// ServiceName: "hardhat",
// DockerImage: "kwildb/hardhat:latest",
// ExposedPort: "8545",
// InternalPort: "8545",
// },
// },
// Nodes: []*setup.NodeConfig{
// setup.CustomNodeConfig(func(nc *setup.NodeConfig) {
// nc.Configure = func(conf *config.Config) {
// conf.Extensions = make(map[string]map[string]string)
// conf.Extensions["eth_deposits"] = make(map[string]string)
// conf.Extensions["eth_deposits"]["oracle"] = "http://hardhat:8545"

// }
// }),
// },
// DBOwner: "0xabc",
// },
// InitialServices: []string{"hardhat"},
// })

// Deploy the contract once the hardhat service is up

// clt := p.Nodes[0].JSONRPCClient(t, ctx, false)

// specifications.CreateNamespaceSpecification(ctx, t, clt)

}
6 changes: 3 additions & 3 deletions test/setup/admin_client.go
Original file line number Diff line number Diff line change
@@ -164,7 +164,7 @@ func (a *AdminClient) TxSuccess(ctx context.Context, txHash types.Hash) error {

func (a *AdminClient) SubmitMigrationProposal(ctx context.Context, activationHeight *big.Int, migrationDuration *big.Int) (types.Hash, error) {
var res display.TxHashResponse
err := exec(a, ctx, &res, "migrations", "propose", "-a", activationHeight.String(), "-d", migrationDuration.String())
err := exec(a, ctx, &res, "migrate", "propose", "-a", activationHeight.String(), "-d", migrationDuration.String())
if err != nil {
return types.Hash{}, err
}
@@ -173,7 +173,7 @@ func (a *AdminClient) SubmitMigrationProposal(ctx context.Context, activationHei

func (a *AdminClient) ApproveMigration(ctx context.Context, migrationResolutionID *types.UUID) (types.Hash, error) {
var res display.TxHashResponse
err := exec(a, ctx, &res, "migrations", "approve", migrationResolutionID.String())
err := exec(a, ctx, &res, "migrate", "approve", migrationResolutionID.String())
if err != nil {
return types.Hash{}, err
}
@@ -182,7 +182,7 @@ func (a *AdminClient) ApproveMigration(ctx context.Context, migrationResolutionI

func (a *AdminClient) ListMigrations(ctx context.Context) ([]*types.Migration, error) {
var res []*types.Migration
err := exec(a, ctx, &res, "migrations", "list")
err := exec(a, ctx, &res, "migrate", "proposals")
if err != nil {
return nil, err
}
6 changes: 3 additions & 3 deletions test/setup/compose.go
Original file line number Diff line number Diff line change
@@ -68,7 +68,7 @@ type headerTemplate struct {
// generateCompose generates a full docker-compose.yml file for a given number of nodes.
// It takes a network name, docker image, and node count.
// Optionally, it can also be given a user and group, which if set, will be used to run the nodes as.
func generateCompose(dockerNetwork string, testnetDir string, nodeConfs []*NodeConfig, otherSvcs []*CustomService, userAndGroupIDs *[2]string, networkPrefix string,
func generateCompose(dockerNetwork string, testnetDir string, nodeConfs []*NodeConfig, otherSvcs []*CustomService, userAndGroupIDs *[2]string, networkPrefix string, portsOffset int,
) (composeFilepath string, nodeGeneratedInfo []*generatedNodeInfo, err error) {
var res bytes.Buffer
err = headerComposeTemplate.Execute(&res, &headerTemplate{Network: dockerNetwork})
@@ -87,8 +87,8 @@ func generateCompose(dockerNetwork string, testnetDir string, nodeConfs []*NodeC
NodeServicePrefix: nodePrefix,
PGServicePrefix: pgPrefix,
TestnetDir: testnetDir,
ExposedJSONRPCPort: 8484 + i,
ExposedP2PPort: 6600 + i,
ExposedJSONRPCPort: 8484 + i + portsOffset,
ExposedP2PPort: 6600 + i + portsOffset,
DockerImage: nodeConf.DockerImage,
}

7 changes: 4 additions & 3 deletions test/setup/network.go
Original file line number Diff line number Diff line change
@@ -72,7 +72,8 @@ func getEndpoints(ctr *testcontainers.DockerContainer, ctx context.Context,
return
}

func kwildJSONRPCEndpoints(ctr *testcontainers.DockerContainer, ctx context.Context) (string, error) {
exposed, _, err := getEndpoints(ctr, ctx, nat.Port(fmt.Sprint(jsonRPCPort)), "http")
return exposed, err
func kwildJSONRPCEndpoints(ctr *testcontainers.DockerContainer, ctx context.Context) (string, string, error) {
exposed, unexposed, err := getEndpoints(ctr, ctx, nat.Port(fmt.Sprint(jsonRPCPort)), "http")
fmt.Printf("kwild JSONRPC exposed at %s, unexposed at %s\n", exposed, unexposed)
return exposed, unexposed, err
}
125 changes: 100 additions & 25 deletions test/setup/node.go
Original file line number Diff line number Diff line change
@@ -16,12 +16,18 @@ import (
"github.com/kwilteam/kwil-db/core/crypto"
"github.com/kwilteam/kwil-db/core/types"
"github.com/kwilteam/kwil-db/node"
ethdeployer "github.com/kwilteam/kwil-db/test/integration/eth-deployer"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/modules/compose"
"github.com/testcontainers/testcontainers-go/wait"
)

var (
UserPubKey1 = "f1aa5a7966c3863ccde3047f6a1e266cdc0c76b399e256b8fede92b1c69e4f4e"
UserPubKey2 = "43f149de89d64bf9a9099be19e1b1f7a4db784af8fa07caf6f08dc86ba65636b"
)

// TestConfig is the configuration for the test
type TestConfig struct {
// REQUIRED: ClientDriver is the driver to use for the client
@@ -38,6 +44,8 @@ type TestConfig struct {
DockerNetwork string
// ServicesPrefix is the prefix to use for the kwild and pg services
ServicesPrefix string
// PortOffset is the offset to use for the kwild and pg service ports
PortOffset int
}

func (c *TestConfig) ensureDefaults(t *testing.T) {
@@ -71,7 +79,8 @@ type NetworkConfig struct {
// Automatically runs kwild and Postgres, but this allows for geth, kgw,
// etc. to run as well.
ExtraServices []*CustomService // TODO: we need more in this service definition struct. Will come back when I am farther along
// TODO: we will probably need to add StateHash and MigrationParams when we add ZDT migration tests

DeployEth bool
}

func (n *NetworkConfig) ensureDefaults(t *testing.T) {
@@ -130,6 +139,7 @@ func CustomNodeConfig(f func(*NodeConfig)) *NodeConfig {
type Testnet struct {
Nodes []KwilNode
testCtx *testingContext
EthNode *EthNode
}

// ExtraServiceEndpoint gets the endpoint for an extra service that was configured in the testnet
@@ -139,10 +149,60 @@ func (t *Testnet) ExtraServiceEndpoint(ctx context.Context, serviceName string,
return "", fmt.Errorf("container not found")
}

exposed, _, err := getEndpoints(ct, ctx, nat.Port(port), protocol)
exposed, unexposed, err := getEndpoints(ct, ctx, nat.Port(port), protocol)
fmt.Printf("exposed: %s, unexposed: %s\n", exposed, unexposed)
return exposed, err
}

func CreateDockerNetwork(ctx context.Context, t *testing.T) (*testcontainers.DockerNetwork, error) {
dockerNetwork, err := ensureNetworkExist(ctx, t.Name())
require.NoError(t, err)

// the network will be removed by the testSetup that created it
t.Cleanup(func() {
if !t.Failed() {
t.Logf("teardown docker network %s from %s", dockerNetwork.Name, t.Name())
err := dockerNetwork.Remove(ctx)
require.NoErrorf(t, err, "failed to teardown network %s", dockerNetwork.Name)
}
})

return dockerNetwork, nil
}

func DeployETHDevNet(ctx context.Context, t *testing.T, testCtx *testingContext, composePath string) *EthNode {
hardHatService := NewServiceDefinition("hardhat", "")
runDockerCompose(ctx, t, testCtx, composePath, []*ServiceDefinition{hardHatService})

// check if the hardhat service is running
ctr, ok := testCtx.containers["hardhat"]
require.True(t, ok, "hardhat service not found")

// get the endpoint for the hardhat service
exposedChainRPC, unexposedChainRPC, err := getEndpoints(ctr, ctx, "8545", "ws")
require.NoError(t, err, "failed to get endpoints for hardhat service")

var deployers []*ethdeployer.Deployer

for i := 0; i < 2; i++ {
// Deploy contracts
ethDeployer, err := ethdeployer.NewDeployer(exposedChainRPC, UserPubKey1, 5)
require.NoError(t, err, "failed to get eth deployer")

// Deploy Token and Escrow contracts
err = ethDeployer.Deploy()
require.NoError(t, err, "failed to deploy contracts")

deployers = append(deployers, ethDeployer)
}

return &EthNode{
ExposedChainRPC: exposedChainRPC,
UnexposedChainRPC: unexposedChainRPC,
Deployers: deployers,
}
}

func SetupTests(t *testing.T, testConfig *TestConfig) *Testnet {
testConfig.ensureDefaults(t)

@@ -164,31 +224,35 @@ func SetupTests(t *testing.T, testConfig *TestConfig) *Testnet {

var dockerNetworkName string
if testConfig.DockerNetwork == "" {
dockerNetwork, err := ensureNetworkExist(ctx, t.Name())
dockerNetwork, err := CreateDockerNetwork(ctx, t)
require.NoError(t, err)
dockerNetworkName = dockerNetwork.Name

// the network will be removed by the testSetup that created it
t.Cleanup(func() {
if !t.Failed() {
t.Logf("teardown docker network %s from %s", dockerNetwork.Name, t.Name())
err := dockerNetwork.Remove(ctx)
require.NoErrorf(t, err, "failed to teardown network %s", dockerNetwork.Name)
}
})

} else {
dockerNetworkName = testConfig.DockerNetwork
}

composePath, nodeInfo, err := generateCompose(dockerNetworkName, tmpDir, testConfig.Network.Nodes, testConfig.Network.ExtraServices, nil, testConfig.ServicesPrefix) //TODO: need user id and groups
composePath, nodeInfo, err := generateCompose(dockerNetworkName, tmpDir, testConfig.Network.Nodes, testConfig.Network.ExtraServices, nil, testConfig.ServicesPrefix, testConfig.PortOffset) //TODO: need user id and groups
require.NoError(t, err)

require.Equal(t, len(testConfig.Network.Nodes), len(nodeInfo)) // ensure that the number of nodes is the same as the number of node info
if len(nodeInfo) == 0 {
t.Fatal("at least one node is required")
}

testCtx := &testingContext{
config: testConfig,
containers: make(map[string]*testcontainers.DockerContainer),
composePath: composePath, // used if we need to add more services later
generatedConfig: nil,
networkName: dockerNetworkName,
}

// deploy the hardhat service and the contracts first
var ethNode *EthNode
if testConfig.Network.DeployEth {
ethNode = DeployETHDevNet(ctx, t, testCtx, composePath)
}

genesisConfig := config.DefaultGenesisConfig()
genesisConfig.DBOwner = testConfig.Network.DBOwner
testConfig.Network.ConfigureGenesis(genesisConfig)
@@ -202,7 +266,7 @@ func SetupTests(t *testing.T, testConfig *TestConfig) *Testnet {
for _, node := range testConfig.InitialServices {
serviceFilter[node] = struct{}{}
}
filterServices := len(serviceFilter) > 0
filterServices := len(serviceFilter) > 0 || len(testConfig.Network.ExtraServices) > 0

generatedConfig := &generatedNodeConfig{
nodeConfigs: make(map[string]*config.Config),
@@ -293,19 +357,15 @@ func SetupTests(t *testing.T, testConfig *TestConfig) *Testnet {
err = setup.GenerateTestnetDir(tmpDir, genesisConfig, testnetNodeConfigs)
require.NoError(t, err)

testCtx := &testingContext{
config: testConfig,
containers: make(map[string]*testcontainers.DockerContainer),
composePath: composePath, // used if we need to add more services later
generatedConfig: generatedConfig,
networkName: dockerNetworkName,
}
testCtx.generatedConfig = generatedConfig

runDockerCompose(ctx, t, testCtx, composePath, servicesToRun)

tp := &Testnet{
testCtx: testCtx,
EthNode: ethNode,
}

for _, node := range generatedNodes {
node.testCtx = testCtx
tp.Nodes = append(tp.Nodes, node)
@@ -314,6 +374,14 @@ func SetupTests(t *testing.T, testConfig *TestConfig) *Testnet {
return tp
}

func (tt *Testnet) ServiceContainer(t *testing.T, serviceName string) (*testcontainers.DockerContainer, error) {
ct, ok := tt.testCtx.containers[serviceName]
if !ok {
return nil, fmt.Errorf("container %s not found", serviceName)
}
return ct, nil
}

func (t *Testnet) NetworkName() string {
return t.testCtx.networkName
}
@@ -497,6 +565,13 @@ type kwilNode struct {
client JSONRPCClient
}

type EthNode struct {
ExposedChainRPC string
UnexposedChainRPC string

Deployers []*ethdeployer.Deployer
}

func (k *kwilNode) PrivateKey() *crypto.Secp256k1PrivateKey {
return k.nodeTestConfig.PrivateKey
}
@@ -523,7 +598,7 @@ func (k *kwilNode) JSONRPCClient(t *testing.T, ctx context.Context, usingGateway
t.Fatalf("container %s not found", k.generatedInfo.KwilNodeServiceName)
}

endpoint, err := kwildJSONRPCEndpoints(container, ctx)
endpoint, _, err := kwildJSONRPCEndpoints(container, ctx)
require.NoError(t, err)

client, err := getNewClientFn(k.testCtx.config.ClientDriver)(ctx, endpoint, usingGateway, t.Logf)
@@ -544,7 +619,7 @@ func (k *kwilNode) AdminClient(t *testing.T, ctx context.Context) *AdminClient {
}
}

func (k *kwilNode) JSONRPCEndpoint(t *testing.T, ctx context.Context) (string, error) {
func (k *kwilNode) JSONRPCEndpoint(t *testing.T, ctx context.Context) (string, string, error) {
container, ok := k.testCtx.containers[k.generatedInfo.KwilNodeServiceName]
if !ok {
t.Fatalf("container %s not found", k.generatedInfo.KwilNodeServiceName)
@@ -558,7 +633,7 @@ type KwilNode interface {
PublicKey() *crypto.Secp256k1PublicKey
IsValidator() bool
Config() *config.Config
JSONRPCEndpoint(t *testing.T, ctx context.Context) (string, error)
JSONRPCEndpoint(t *testing.T, ctx context.Context) (string, string, error)
JSONRPCClient(t *testing.T, ctx context.Context, usingKGW bool) JSONRPCClient
AdminClient(t *testing.T, ctx context.Context) *AdminClient
}
24 changes: 24 additions & 0 deletions test/specifications/validator_ops.go
Original file line number Diff line number Diff line change
@@ -173,6 +173,30 @@ func ValidatorNodeRemoveSpecificationV4R1(ctx context.Context, t *testing.T, n0,
}
}

// InvalidRemovalSpecification tests the case where a remove request is issued
// on a non-validator node or on a leader node
func InvalidRemovalSpecification(ctx context.Context, t *testing.T, netops ValidatorRemoveDsl, targetKey crypto.PrivateKey) {
t.Log("Executing validator leader removal specification")
// node issues a remove request on the leader
rec, err := netops.ValidatorNodeRemove(ctx, targetKey.Bytes(), targetKey.Type())
require.NoError(t, err)

// Ensure that the Validator Leave Tx is mined.
expectTxFail(t, netops, ctx, rec, defaultTxQueryTimeout)()
}

// InvalidLeaveSpecification tests the case where either a leave request is issued
// by either the leader or a non-validator node
func InvalidLeaveSpecification(ctx context.Context, t *testing.T, netops ValidatorOpsDsl) {
t.Log("Executing validator leader leave specification")
// node issues a leave request on the leader
rec, err := netops.ValidatorNodeLeave(ctx)
require.NoError(t, err)

// Ensure that the Validator Leave Tx is mined.
expectTxFail(t, netops, ctx, rec, defaultTxQueryTimeout)()
}

func ValidatorNodeApproveSpecification(ctx context.Context, t *testing.T, netops ValidatorOpsDsl, joinerKey crypto.PrivateKey, preCnt int, postCnt int, approved bool) {
t.Log("Executing network node approve specification")

14 changes: 1 addition & 13 deletions test/specifications/vars.go
Original file line number Diff line number Diff line change
@@ -5,17 +5,5 @@ import (
)

var (
defaultTxQueryTimeout = 10 * time.Second
defaultTxQueryTimeout = 15 * time.Second
)

// var (
// SchemaLoader DatabaseSchemaLoader = &FileDatabaseSchemaLoader{
// Modifier: func(db *types.Schema) {
// // NOTE: this is a hack to make sure the db name is temporary unique
// db.Name = fmt.Sprintf("%s_%s", db.Name, time.Now().Format("20060102"))
// }}
// )

// func SetSchemaLoader(loader DatabaseSchemaLoader) {
// SchemaLoader = loader
// }

0 comments on commit e4dca65

Please sign in to comment.