Skip to content

Commit

Permalink
fix: handle events where components cannot be fetched successfully fo…
Browse files Browse the repository at this point in the history
…r hash verification (#87)

* add logs

* Update hash_verifier.go

* use lru cache for verifier

* fix config

(cherry picked from commit 472ad59)

* remove logs

* update dependency

* Update go.mod

* update dependencies
  • Loading branch information
randyahx authored Sep 28, 2023
1 parent 7fbb405 commit e9937e2
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 42 deletions.
2 changes: 1 addition & 1 deletion app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func NewApp(cfg *config.Config) *App {
db, err := gorm.Open(mysql.Open(dbPath), &gorm.Config{})

// only for debug purpose
db = db.Debug()
//db = db.Debug()

if err != nil {
panic(fmt.Sprintf("open db error, err=%+v", err.Error()))
Expand Down
6 changes: 4 additions & 2 deletions config/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
"chain_id_string": "greenfield_9000-121",
"gas_limit": 1000,
"fee_amount": "5000000000000",
"fee_denom": "BNB",
"deduplication_interval": 100
"fee_denom": "BNB"
},
"metrics_config": {
"port": 8080
},
"log_config": {
"level": "DEBUG",
Expand Down
4 changes: 2 additions & 2 deletions executor/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
type JsonRpcClient = *jsonrpcclient.Client

type GnfdCompositeClient struct {
gnfdclient.Client
gnfdclient.IClient
client.TendermintClient
JsonRpcClient
Height int64
Expand All @@ -36,7 +36,7 @@ func NewGnfdCompositClients(rpcAddrs []string, chainId string, account *types.Ac
panic(err)
}
clients = append(clients, &GnfdCompositeClient{
Client: sdkClient,
IClient: sdkClient,
TendermintClient: client.NewTendermintClient(rpcAddrs[i]),
JsonRpcClient: jsonRpcClient,
})
Expand Down
22 changes: 11 additions & 11 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func getGreenfieldBlsPrivateKey(cfg *config.GreenfieldConfig) string {
func (e *Executor) GetBlockAndBlockResultAtHeight(height int64) (*tmtypes.Block, *ctypes.ResultBlockResults, error) {
block, err := e.clients.GetClient().TmClient.Block(context.Background(), &height)
if err != nil {
logging.Logger.Errorf("executor failed to get block at height %d, err=%+v", height, err.Error())
//logging.Logger.Errorf("executor failed to get block at height %d, err=%+v", height, err.Error())
return nil, nil, err
}
blockResults, err := e.clients.GetClient().TmClient.BlockResults(context.Background(), &height)
Expand All @@ -132,7 +132,7 @@ func (e *Executor) GetBlockAndBlockResultAtHeight(height int64) (*tmtypes.Block,
}

func (e *Executor) GetLatestBlockHeight() (uint64, error) {
client := e.clients.GetClient().Client
client := e.clients.GetClient()
res, err := client.GetLatestBlockHeight(context.Background())
latestHeight := uint64(res)
if err != nil {
Expand Down Expand Up @@ -212,7 +212,7 @@ func (e *Executor) GetValidatorsBlsPublicKey() ([]string, error) {
}

func (e *Executor) QueryInturnAttestationSubmitter() (*challengetypes.QueryInturnAttestationSubmitterResponse, error) {
client := e.clients.GetClient().Client
client := e.clients.GetClient()
res, err := client.InturnAttestationSubmitter(context.Background(), &challengetypes.QueryInturnAttestationSubmitterRequest{})
if err != nil {
logging.Logger.Errorf("executor failed to get inturn attestation submitter, err=%+v", err.Error())
Expand All @@ -222,7 +222,7 @@ func (e *Executor) QueryInturnAttestationSubmitter() (*challengetypes.QueryIntur
}

func (e *Executor) AttestChallenge(submitterAddress, challengerAddress, spOperatorAddress string, challengeId uint64, objectId sdkmath.Uint, voteResult challengetypes.VoteResult, voteValidatorSet []uint64, VoteAggSignature []byte, txOption sdktypes.TxOption) (bool, error) {
client := e.clients.GetClient().Client
client := e.clients.GetClient()
logging.Logger.Infof("attest challenge params: submitterAddress=%s, challengerAddress=%s, spOperatorAddress=%s, challengeId=%d, objectId=%s, voteResult=%s, voteValidatorSet=%+v, VoteAggSignature=%+v, txOption=%+v", submitterAddress, challengerAddress, spOperatorAddress, challengeId, objectId.String(), voteResult.String(), voteValidatorSet, VoteAggSignature, txOption)
res, err := client.AttestChallenge(context.Background(), submitterAddress, challengerAddress, spOperatorAddress, challengeId, objectId, voteResult, voteValidatorSet, VoteAggSignature, txOption)
if err != nil {
Expand All @@ -242,7 +242,7 @@ func (e *Executor) AttestChallenge(submitterAddress, challengerAddress, spOperat
}

func (e *Executor) QueryLatestAttestedChallengeIds() ([]uint64, error) {
client := e.clients.GetClient().Client
client := e.clients.GetClient()

res, err := client.LatestAttestedChallenges(context.Background(), &challengetypes.QueryLatestAttestedChallengesRequest{})
if err != nil {
Expand All @@ -259,7 +259,7 @@ func (e *Executor) QueryLatestAttestedChallengeIds() ([]uint64, error) {
}

func (e *Executor) queryChallengeHeartbeatInterval() (uint64, error) {
client := e.clients.GetClient().Client
client := e.clients.GetClient()
q := challengetypes.QueryParamsRequest{}
res, err := client.ChallengeParams(context.Background(), &q)
if err != nil {
Expand Down Expand Up @@ -288,7 +288,7 @@ func (e *Executor) QueryChallengeHeartbeatInterval() (uint64, error) {
}

func (e *Executor) QueryChallengeSlashCoolingOffPeriod() (uint64, error) {
client := e.clients.GetClient().Client
client := e.clients.GetClient()
params, err := client.ChallengeParams(context.Background(), &challengetypes.QueryParamsRequest{})
if err != nil {
logging.Logger.Errorf("query challenge params failed, err=%+v", err.Error())
Expand Down Expand Up @@ -324,7 +324,7 @@ func (e *Executor) GetHeightLoop() {
}

func (e *Executor) GetStorageProviderEndpoint(address string) (string, error) {
client := e.clients.GetClient().Client
client := e.clients.GetClient()
spAddr, err := sdk.AccAddressFromHexUnsafe(address)
if err != nil {
logging.Logger.Errorf("error converting addr from hex unsafe when getting sp endpoint, err=%+v", err.Error())
Expand All @@ -341,7 +341,7 @@ func (e *Executor) GetStorageProviderEndpoint(address string) (string, error) {
}

func (e *Executor) GetObjectInfoChecksums(objectId string) ([][]byte, error) {
client := e.clients.GetClient().Client
client := e.clients.GetClient()

res, err := client.HeadObjectByID(context.Background(), objectId)
if err != nil {
Expand All @@ -352,7 +352,7 @@ func (e *Executor) GetObjectInfoChecksums(objectId string) ([][]byte, error) {
}

func (e *Executor) GetChallengeResultFromSp(objectId, endpoint string, segmentIndex, redundancyIndex int) (*types.ChallengeResult, error) {
client := e.clients.GetClient().Client
client := e.clients.GetClient()

challengeInfoOpts := types.GetChallengeInfoOptions{
Endpoint: endpoint,
Expand Down Expand Up @@ -398,7 +398,7 @@ func (e *Executor) GetAddr() string {
}

func (e *Executor) GetNonce() (uint64, error) {
client := e.clients.GetClient().Client
client := e.clients.GetClient()
account, err := client.GetAccount(context.Background(), e.GetAddr())
if err != nil {
logging.Logger.Errorf("error getting account, err=%+v", err.Error())
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ require (
cosmossdk.io/math v1.0.1
github.com/avast/retry-go/v4 v4.3.1
github.com/aws/aws-sdk-go v1.40.45
github.com/bnb-chain/greenfield v0.2.5
github.com/bnb-chain/greenfield v0.2.6
github.com/bnb-chain/greenfield-common/go v0.0.0-20230830120314-a54ffd6da39f
github.com/bnb-chain/greenfield-go-sdk v0.2.5
github.com/bnb-chain/greenfield-go-sdk v0.2.6
github.com/cometbft/cometbft v0.37.2
github.com/cosmos/cosmos-sdk v0.47.3
github.com/ethereum/go-ethereum v1.10.26
Expand Down Expand Up @@ -173,7 +173,7 @@ replace (
github.com/cometbft/cometbft => github.com/bnb-chain/greenfield-cometbft v0.0.3
github.com/cometbft/cometbft-db => github.com/bnb-chain/greenfield-cometbft-db v0.8.1-alpha.1
github.com/confio/ics23/go => github.com/cosmos/cosmos-sdk/ics23/go v0.8.0
github.com/cosmos/cosmos-sdk => github.com/bnb-chain/greenfield-cosmos-sdk v0.2.5
github.com/cosmos/cosmos-sdk => github.com/bnb-chain/greenfield-cosmos-sdk v0.2.6
github.com/cosmos/iavl => github.com/bnb-chain/greenfield-iavl v0.20.1
github.com/ferranbt/fastssz => github.com/prysmaticlabs/fastssz v0.0.0-20220110145812-fafb696cae88
github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -166,22 +166,22 @@ github.com/bgentry/speakeasy v0.1.1-0.20220910012023-760eaf8b6816 h1:41iFGWnSlI2
github.com/bgentry/speakeasy v0.1.1-0.20220910012023-760eaf8b6816/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
github.com/bmizerany/pat v0.0.0-20170815010413-6226ea591a40/go.mod h1:8rLXio+WjiTceGBHIoTvn60HIbs7Hm7bcHjyrSqYB9c=
github.com/bnb-chain/greenfield v0.2.5 h1:AmHvGfJmHm7FXB4II4mAB6i68bw98p86ZlIs9r6YeRM=
github.com/bnb-chain/greenfield v0.2.5/go.mod h1:nNsy8QGR8+R0j6Qz/duNE94NDDNuomC6wWDCzfl7jKc=
github.com/bnb-chain/greenfield v0.2.6 h1:U40wmSBQR4Wd0HiVCu/J6zqoLS4YUrvNgyuX71kgK3U=
github.com/bnb-chain/greenfield v0.2.6/go.mod h1:8kGVzKu3BEbpotk2Lmp/OzPh+nhbMsuUJueZtn0he4s=
github.com/bnb-chain/greenfield-cometbft v0.0.3 h1:tv8NMy3bzX/1urqXGQIIAZSLy83loiI+dG0VKeyh1CY=
github.com/bnb-chain/greenfield-cometbft v0.0.3/go.mod h1:f35mk/r5ab6yvzlqEWZt68LfUje68sYgMpVlt2CUYMk=
github.com/bnb-chain/greenfield-cometbft-db v0.8.1-alpha.1 h1:XcWulGacHVRiSCx90Q8Y//ajOrLNBQWR/KDB89dy3cU=
github.com/bnb-chain/greenfield-cometbft-db v0.8.1-alpha.1/go.mod h1:ey1CiK4bYo1RBNJLRiVbYr5CMdSxci9S/AZRINLtppI=
github.com/bnb-chain/greenfield-common/go v0.0.0-20230830120314-a54ffd6da39f h1:zJvB2wCd80DQ9Nh/ZNQiP8MrHygSpDoav7OzHyIi/pM=
github.com/bnb-chain/greenfield-common/go v0.0.0-20230830120314-a54ffd6da39f/go.mod h1:it3JJVHeG9Wp4QED2GkY/7V9Qo3BuPdoC5/4/U6ecJM=
github.com/bnb-chain/greenfield-cosmos-sdk v0.2.5 h1:8zIn/ExfMHZVBbVwhILG9KYO1m3pZO8I8stpqbFgzkk=
github.com/bnb-chain/greenfield-cosmos-sdk v0.2.5/go.mod h1:y3hDhQhil5hMIhwBTpu07RZBF30ZITkoE+GHhVZChtY=
github.com/bnb-chain/greenfield-cosmos-sdk v0.2.6 h1:aPrd2nG2nYZr8nSmGgN8efus6sni4OIZ1Bdx6s+mS3A=
github.com/bnb-chain/greenfield-cosmos-sdk v0.2.6/go.mod h1:y3hDhQhil5hMIhwBTpu07RZBF30ZITkoE+GHhVZChtY=
github.com/bnb-chain/greenfield-cosmos-sdk/api v0.0.0-20230816082903-b48770f5e210 h1:GHPbV2bC+gmuO6/sG0Tm8oGal3KKSRlyE+zPscDjlA8=
github.com/bnb-chain/greenfield-cosmos-sdk/api v0.0.0-20230816082903-b48770f5e210/go.mod h1:vhsZxXE9tYJeYB5JR4hPhd6Pc/uPf7j1T8IJ7p9FdeM=
github.com/bnb-chain/greenfield-cosmos-sdk/math v0.0.0-20230816082903-b48770f5e210 h1:FLVOn4+OVbsKi2+YJX5kmD27/4dRu4FW7xCXFhzDO5s=
github.com/bnb-chain/greenfield-cosmos-sdk/math v0.0.0-20230816082903-b48770f5e210/go.mod h1:An0MllWJY6PxibUpnwGk8jOm+a/qIxlKmL5Zyp9NnaM=
github.com/bnb-chain/greenfield-go-sdk v0.2.5 h1:ErI7nUtNCRDvpXSVaG/XKXEBhTcmiofg/2ciKU1uBag=
github.com/bnb-chain/greenfield-go-sdk v0.2.5/go.mod h1:rWI9Xgcto7ujajNfG6x7+lW1SzbJWBkIgnOcoNjWS7U=
github.com/bnb-chain/greenfield-go-sdk v0.2.6 h1:CGmMK+Ie3ZXae86rGYJnaF7FmO+y92S4a/Qp7DzxhLU=
github.com/bnb-chain/greenfield-go-sdk v0.2.6/go.mod h1:MrxZcdoK/YC8o0wECOv8fskozBokekxApLbi1P6qeTI=
github.com/bnb-chain/greenfield-iavl v0.20.1 h1:y3L64GU99otNp27/xLVBTDbv4eroR6CzoYz0rbaVotM=
github.com/bnb-chain/greenfield-iavl v0.20.1/go.mod h1:oLksTs8dfh7DYIKBro7hbRQ+ewls7ghJ27pIXlbEXyI=
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
Expand Down
4 changes: 0 additions & 4 deletions monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,18 +172,15 @@ func (m *Monitor) monitorChallengeEvents(block *tmtypes.Block, blockResults *cty
func (m *Monitor) calNextHeight() (uint64, error) {
latestPolledBlock, err := m.dataProvider.GetLatestBlock()
if err != nil && err != gorm.ErrRecordNotFound {
logging.Logger.Errorf("failed to get latest block from db, error: %s", err.Error())
latestHeight, err := m.executor.GetLatestBlockHeight()
if err != nil {
logging.Logger.Errorf("failed to get latest block height, error: %s", err.Error())
return 0, err
}
return latestHeight, err
}
if latestPolledBlock.Height == 0 { // a fresh database
latestHeight, err := m.executor.GetLatestBlockHeight()
if err != nil {
logging.Logger.Errorf("failed to get latest block height, error: %s", err.Error())
return m.executor.GetCachedBlockHeight(), err
}
return latestHeight, nil
Expand All @@ -192,7 +189,6 @@ func (m *Monitor) calNextHeight() (uint64, error) {

latestBlockHeight, err := m.executor.GetLatestBlockHeight()
if err != nil {
logging.Logger.Errorf("failed to get latest block height, error: %s", err.Error())
return 0, err
}
// pauses challenger for a bit since it already caught the newest block
Expand Down
58 changes: 45 additions & 13 deletions verifier/hash_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/hex"
lru "github.com/hashicorp/golang-lru"
"golang.org/x/sync/semaphore"
"io"
"strings"
Expand All @@ -25,7 +26,7 @@ type Verifier struct {
config *config.Config
executor *executor.Executor
deduplicationInterval uint64
cachedChallengeIds map[uint64]bool
cachedChallengeIds *lru.Cache
mtx sync.RWMutex
dataProvider DataProvider
limiterSemaphore *semaphore.Weighted
Expand All @@ -37,6 +38,9 @@ func NewHashVerifier(cfg *config.Config, executor *executor.Executor, dataProvid
) *Verifier {
limiterSemaphore := semaphore.NewWeighted(20)

cacheSize := 1000
lruCache, _ := lru.New(cacheSize)

deduplicationInterval, err := executor.QueryChallengeSlashCoolingOffPeriod()
if err != nil {
logging.Logger.Errorf("verifier failed to query slash cooling off period, err=%+v", err)
Expand All @@ -46,6 +50,7 @@ func NewHashVerifier(cfg *config.Config, executor *executor.Executor, dataProvid
config: cfg,
executor: executor,
deduplicationInterval: deduplicationInterval,
cachedChallengeIds: lruCache,
mtx: sync.RWMutex{},
dataProvider: dataProvider,
limiterSemaphore: limiterSemaphore,
Expand All @@ -54,8 +59,6 @@ func NewHashVerifier(cfg *config.Config, executor *executor.Executor, dataProvid
}

func (v *Verifier) VerifyHashLoop() {
// Event lasts for 300 blocks, 2x for redundancy
v.cachedChallengeIds = make(map[uint64]bool, common.CacheSize)
for {
err := v.verifyHash()
if err != nil {
Expand Down Expand Up @@ -88,7 +91,7 @@ func (v *Verifier) verifyHash() error {

for _, event := range events {
v.mtx.Lock()
isCached := v.cachedChallengeIds[event.ChallengeId]
isCached := v.cachedChallengeIds.Contains(event.ChallengeId)
v.mtx.Unlock()

if isCached {
Expand All @@ -112,7 +115,7 @@ func (v *Verifier) verifyHash() error {
if err != nil {
if err.Error() == common.ErrEventExpired.Error() {
v.mtx.Lock()
delete(v.cachedChallengeIds, event.ChallengeId)
v.cachedChallengeIds.Remove(event.ChallengeId)
v.mtx.Unlock()
continue
}
Expand All @@ -121,7 +124,7 @@ func (v *Verifier) verifyHash() error {

if !isCached {
v.mtx.Lock()
v.cachedChallengeIds[event.ChallengeId] = true
v.cachedChallengeIds.Add(event.ChallengeId, true)
v.mtx.Unlock()
}
}
Expand All @@ -132,25 +135,54 @@ func (v *Verifier) verifyHash() error {
}

func (v *Verifier) verifyForSingleEvent(event *model.Event) error {
var err error
startTime := time.Now()
logging.Logger.Infof("verifier started for challengeId: %d %s", event.ChallengeId, time.Now().Format("15:04:05.000000"))
currentHeight := v.executor.GetCachedBlockHeight()
if err := v.preCheck(event, currentHeight); err != nil {
if err = v.preCheck(event, currentHeight); err != nil {
return err
}

endpoint, err := v.executor.GetStorageProviderEndpoint(event.SpOperatorAddress)
// Retry GetStorageProviderEndpoint and GetObjectInfoChecksums up to 5 times
var endpoint string
_ = retry.Do(
func() error {
endpoint, err = v.executor.GetStorageProviderEndpoint(event.SpOperatorAddress)
if err != nil {
logging.Logger.Errorf("verifier failed to get sp endpoint for challengeId: %s, objectId: %s, err=%+v", event.ChallengeId, event.ObjectId, err.Error())
}
return err
}, retry.Context(context.Background()), common.RtyAttem, common.RtyDelay, common.RtyErr)

if err != nil {
logging.Logger.Errorf("verifier failed to get sp endpoint for challengeId: %s, objectId: %s, err=%+v", err.Error(), event.ChallengeId, event.ObjectId)
err = v.dataProvider.UpdateEventStatusVerifyResult(event.ChallengeId, model.Verified, model.Unknown)
v.metricService.IncVerifiedChallenges()
v.metricService.IncChallengeFailed()
if err != nil {
return err
}
return err
}
logging.Logger.Infof("challengeId: %d, sp endpoint: %s, objectId: %s, segmentIndex: %d, redundancyIndex: %d", event.ChallengeId, endpoint, event.ObjectId, event.SegmentIndex, event.RedundancyIndex)

// Call blockchain for object info to get original hash
checksums, err := v.executor.GetObjectInfoChecksums(event.ObjectId)
var checksums [][]byte
_ = retry.Do(
func() error {
checksums, err = v.executor.GetObjectInfoChecksums(event.ObjectId)
if err != nil {
if strings.Contains(err.Error(), "No such object") {
logging.Logger.Errorf("No such object error for challengeId: %d", event.ChallengeId)
}
logging.Logger.Errorf("hash verifier error getting object checksums for challengeId: %d, err=%s", event.ChallengeId, err.Error())
}
return err
}, retry.Context(context.Background()), common.RtyAttem, common.RtyDelay, common.RtyErr)
if err != nil {
if strings.Contains(err.Error(), "No such object") {
logging.Logger.Errorf("No such object error for challengeId: %d", event.ChallengeId)
err = v.dataProvider.UpdateEventStatusVerifyResult(event.ChallengeId, model.Verified, model.Unknown)
v.metricService.IncVerifiedChallenges()
v.metricService.IncChallengeFailed()
if err != nil {
return err
}
return err
}
Expand Down

0 comments on commit e9937e2

Please sign in to comment.