Skip to content

Commit

Permalink
skip CI on some file changes (#712)
Browse files Browse the repository at this point in the history
* skip running CI on files
* disable http driver for CI workflow, enable it for main workflow
* CI: backport condition change
  • Loading branch information
Yaiba authored and charithabandi committed May 10, 2024
1 parent f2850cf commit 8d19d44
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 30 deletions.
5 changes: 4 additions & 1 deletion cmd/kwild/server/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,10 @@ func buildServer(d *coreDependencies, closers *closeFuncs) *Server {
cometBftNode := buildCometNode(d, closers, abciApp)

cometBftClient := buildCometBftClient(cometBftNode)
wrappedCmtClient := &wrappedCometBFTClient{cometBftClient}
wrappedCmtClient := &wrappedCometBFTClient{
cl: cometBftClient,
cache: abciApp,
}
txApp.SetReplayStatusChecker(cometBftNode.IsCatchup)

eventBroadcaster := buildEventBroadcaster(d, ev, wrappedCmtClient, txApp)
Expand Down
33 changes: 13 additions & 20 deletions cmd/kwild/server/utils.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package server

import (
"bytes"
"context"
"errors"
"fmt"
"math"
"strings"

types "github.com/kwilteam/kwil-db/core/types/admin"
Expand Down Expand Up @@ -55,7 +53,12 @@ func getExtensions(ctx context.Context, urls []string) (map[string]precompiles.I
// wrappedCometBFTClient satisfies the generic txsvc.BlockchainBroadcaster and
// admsvc.Node interfaces, hiding the details of cometBFT.
type wrappedCometBFTClient struct {
cl *cmtlocal.Local
cl *cmtlocal.Local
cache mempoolCache
}

type mempoolCache interface {
TxInMempool([]byte) bool
}

func convertNodeInfo(ni *p2p.DefaultNodeInfo) *types.NodeInfo {
Expand Down Expand Up @@ -168,23 +171,13 @@ func (wc *wrappedCometBFTClient) TxQuery(ctx context.Context, hash []byte, prove
return res, nil
}

// The transaction could be in mempool.
limit := math.MaxInt // cmt is bugged, -1 doesn't actually work (see rpc/core.validatePerPage and how it goes with 30 instead of no limit)
unconf, err := wc.cl.UnconfirmedTxs(ctx, &limit) // SLOW quite often!
if err != nil {
return nil, err
}
for _, tx := range unconf.Txs {
if bytes.Equal(tx.Hash(), hash) {
// Found it. Shoe-horn into a ResultTx with -1 height, and the zero
// values for ResponseDeliverTx and TxProof (it's checked and
// accepted to mempool, but not delivered in a block yet).
return &cmtCoreTypes.ResultTx{
Hash: hash,
Height: -1,
Tx: tx,
}, nil
}
// The transaction could be in the mempool, Check with ABCI directly if it heard of the transaction.
if wc.cache.TxInMempool(hash) {
return &cmtCoreTypes.ResultTx{
Hash: hash,
Height: -1,
Tx: nil, // The transaction is still in the mempool, so not indexed yet. Returning nil to avoid hash computations on all the transactions in the mempool (potential DoS attack vector).
}, nil
}
return nil, abci.ErrTxNotFound
}
Expand Down
8 changes: 7 additions & 1 deletion core/rpc/client/user/jsonrpc/methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,16 @@ func (cl *Client) TxQuery(ctx context.Context, txHash []byte) (*transactions.TcT
if err != nil {
return nil, err
}

var tx transactions.Transaction
if res.Tx != nil {
tx = *res.Tx
}

return &transactions.TcTxQueryResponse{
Hash: res.Hash,
Height: res.Height,
Tx: *res.Tx,
Tx: tx,
TxResult: *res.TxResult,
}, nil
}
29 changes: 25 additions & 4 deletions internal/abci/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ func NewAbciApp(cfg *AbciConfig, snapshotter SnapshotModule, statesyncer StateSy
log: log,

validatorAddressToPubKey: make(map[string][]byte),

txCache: make(map[string]bool),
}

return app
Expand Down Expand Up @@ -96,6 +98,13 @@ type AbciApp struct {

// validatorAddressToPubKey is a map of validator addresses to their public keys
validatorAddressToPubKey map[string][]byte

// txCache is Mempool transactions Cache
// This is to store precomputed hashes of all the transactions currently in the mempool
// to avoid recomputing the hash for all mempool transactions on every
// TxQuery request (to mitigate Potential DDOS attack vector).
// https://github.com/kwilteam/kwil-db/issues/714
txCache map[string]bool
}

func (a *AbciApp) ChainID() string {
Expand Down Expand Up @@ -150,12 +159,10 @@ func (a *AbciApp) CheckTx(ctx context.Context, incoming *abciTypes.RequestCheckT
return &abciTypes.ResponseCheckTx{Code: code.Uint32(), Log: err.Error()}, nil // return error now or is it still all about code?
}

txHash := cmtTypes.Tx(incoming.Tx).Hash()
logger.Debug("",
zap.String("sender", hex.EncodeToString(tx.Sender)),
zap.String("PayloadType", tx.Body.PayloadType.String()),
zap.Uint64("nonce", tx.Body.Nonce),
zap.String("hash", hex.EncodeToString(txHash)))
zap.Uint64("nonce", tx.Body.Nonce))

// For a new transaction (not re-check), before looking at execution cost or
// checking nonce validity, ensure the payload is recognized and signature is valid.
Expand All @@ -176,7 +183,7 @@ func (a *AbciApp) CheckTx(ctx context.Context, incoming *abciTypes.RequestCheckT
return &abciTypes.ResponseCheckTx{Code: code.Uint32(), Log: err.Error()}, nil
}
} else {
logger.Info("Recheck", zap.String("hash", hex.EncodeToString(txHash)), zap.Uint64("nonce", tx.Body.Nonce), zap.String("payloadType", tx.Body.PayloadType.String()), zap.String("sender", hex.EncodeToString(tx.Sender)))
logger.Info("Recheck", zap.String("sender", hex.EncodeToString(tx.Sender)), zap.Uint64("nonce", tx.Body.Nonce), zap.String("payloadType", tx.Body.PayloadType.String()))
}

err = a.txApp.ApplyMempool(ctx, tx)
Expand All @@ -197,6 +204,11 @@ func (a *AbciApp) CheckTx(ctx context.Context, incoming *abciTypes.RequestCheckT
return &abciTypes.ResponseCheckTx{Code: code.Uint32(), Log: err.Error()}, nil
}

// Cache the transaction hash
if newTx {
txHash := cmtTypes.Tx(incoming.Tx).Hash()
a.txCache[string(txHash)] = true
}
return &abciTypes.ResponseCheckTx{Code: code.Uint32()}, nil
}

Expand Down Expand Up @@ -275,6 +287,10 @@ func (a *AbciApp) FinalizeBlock(ctx context.Context, req *abciTypes.RequestFinal
abciRes.GasUsed = txRes.Spend

res.TxResults = append(res.TxResults, abciRes)

// Remove the transaction from the cache as it has been included in a block
hash := cmtTypes.Tx(tx).Hash()
delete(a.txCache, string(hash))
}

res.ConsensusParamUpdates = &tendermintTypes.ConsensusParams{ // why are we "updating" these on every block? Should be nil for no update.
Expand Down Expand Up @@ -937,3 +953,8 @@ type EventBroadcaster func(ctx context.Context, proposer []byte) error
func (a *AbciApp) SetEventBroadcaster(fn EventBroadcaster) {
a.broadcastFn = fn
}

func (a *AbciApp) TxInMempool(txHash []byte) bool {
_, ok := a.txCache[string(txHash)]
return ok
}
12 changes: 8 additions & 4 deletions internal/services/jsonrpc/usersvc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,10 +440,14 @@ func (svc *Service) TxQuery(ctx context.Context, req *jsonrpc.TxQueryRequest) (*
return nil, jsonrpc.NewError(jsonrpc.ErrorNodeInternal, "failed to query transaction", nil)
}

tx := &transactions.Transaction{}
if err := tx.UnmarshalBinary(cmtResult.Tx); err != nil {
logger.Error("failed to deserialize transaction", log.Error(err))
return nil, jsonrpc.NewError(jsonrpc.ErrorInternal, "failed to deserialize transaction", nil)
//cmtResult.Tx can be nil
var tx *transactions.Transaction
if cmtResult.Tx != nil {
tx = &transactions.Transaction{}
if err := tx.UnmarshalBinary(cmtResult.Tx); err != nil {
logger.Error("failed to deserialize transaction", log.Error(err))
return nil, jsonrpc.NewError(jsonrpc.ErrorInternal, "failed to deserialize transaction", nil)
}
}

txResult := &transactions.TransactionResult{
Expand Down

0 comments on commit 8d19d44

Please sign in to comment.