Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: handle events where components cannot be fetched successfully for hash verification #87

Merged
merged 8 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func NewApp(cfg *config.Config) *App {
db, err := gorm.Open(mysql.Open(dbPath), &gorm.Config{})

// only for debug purpose
db = db.Debug()
//db = db.Debug()

if err != nil {
panic(fmt.Sprintf("open db error, err=%+v", err.Error()))
Expand Down
6 changes: 4 additions & 2 deletions config/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
"chain_id_string": "greenfield_9000-121",
"gas_limit": 1000,
"fee_amount": "5000000000000",
"fee_denom": "BNB",
"deduplication_interval": 100
"fee_denom": "BNB"
},
"metrics_config": {
"port": 8080
},
"log_config": {
"level": "DEBUG",
Expand Down
4 changes: 2 additions & 2 deletions executor/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
type JsonRpcClient = *jsonrpcclient.Client

type GnfdCompositeClient struct {
gnfdclient.Client
gnfdclient.IClient
client.TendermintClient
JsonRpcClient
Height int64
Expand All @@ -36,7 +36,7 @@ func NewGnfdCompositClients(rpcAddrs []string, chainId string, account *types.Ac
panic(err)
}
clients = append(clients, &GnfdCompositeClient{
Client: sdkClient,
IClient: sdkClient,
TendermintClient: client.NewTendermintClient(rpcAddrs[i]),
JsonRpcClient: jsonRpcClient,
})
Expand Down
22 changes: 11 additions & 11 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func getGreenfieldBlsPrivateKey(cfg *config.GreenfieldConfig) string {
func (e *Executor) GetBlockAndBlockResultAtHeight(height int64) (*tmtypes.Block, *ctypes.ResultBlockResults, error) {
block, err := e.clients.GetClient().TmClient.Block(context.Background(), &height)
if err != nil {
logging.Logger.Errorf("executor failed to get block at height %d, err=%+v", height, err.Error())
//logging.Logger.Errorf("executor failed to get block at height %d, err=%+v", height, err.Error())
return nil, nil, err
}
blockResults, err := e.clients.GetClient().TmClient.BlockResults(context.Background(), &height)
Expand All @@ -132,7 +132,7 @@ func (e *Executor) GetBlockAndBlockResultAtHeight(height int64) (*tmtypes.Block,
}

func (e *Executor) GetLatestBlockHeight() (uint64, error) {
client := e.clients.GetClient().Client
client := e.clients.GetClient()
res, err := client.GetLatestBlockHeight(context.Background())
latestHeight := uint64(res)
if err != nil {
Expand Down Expand Up @@ -212,7 +212,7 @@ func (e *Executor) GetValidatorsBlsPublicKey() ([]string, error) {
}

func (e *Executor) QueryInturnAttestationSubmitter() (*challengetypes.QueryInturnAttestationSubmitterResponse, error) {
client := e.clients.GetClient().Client
client := e.clients.GetClient()
res, err := client.InturnAttestationSubmitter(context.Background(), &challengetypes.QueryInturnAttestationSubmitterRequest{})
if err != nil {
logging.Logger.Errorf("executor failed to get inturn attestation submitter, err=%+v", err.Error())
Expand All @@ -222,7 +222,7 @@ func (e *Executor) QueryInturnAttestationSubmitter() (*challengetypes.QueryIntur
}

func (e *Executor) AttestChallenge(submitterAddress, challengerAddress, spOperatorAddress string, challengeId uint64, objectId sdkmath.Uint, voteResult challengetypes.VoteResult, voteValidatorSet []uint64, VoteAggSignature []byte, txOption sdktypes.TxOption) (bool, error) {
client := e.clients.GetClient().Client
client := e.clients.GetClient()
logging.Logger.Infof("attest challenge params: submitterAddress=%s, challengerAddress=%s, spOperatorAddress=%s, challengeId=%d, objectId=%s, voteResult=%s, voteValidatorSet=%+v, VoteAggSignature=%+v, txOption=%+v", submitterAddress, challengerAddress, spOperatorAddress, challengeId, objectId.String(), voteResult.String(), voteValidatorSet, VoteAggSignature, txOption)
res, err := client.AttestChallenge(context.Background(), submitterAddress, challengerAddress, spOperatorAddress, challengeId, objectId, voteResult, voteValidatorSet, VoteAggSignature, txOption)
if err != nil {
Expand All @@ -242,7 +242,7 @@ func (e *Executor) AttestChallenge(submitterAddress, challengerAddress, spOperat
}

func (e *Executor) QueryLatestAttestedChallengeIds() ([]uint64, error) {
client := e.clients.GetClient().Client
client := e.clients.GetClient()

res, err := client.LatestAttestedChallenges(context.Background(), &challengetypes.QueryLatestAttestedChallengesRequest{})
if err != nil {
Expand All @@ -259,7 +259,7 @@ func (e *Executor) QueryLatestAttestedChallengeIds() ([]uint64, error) {
}

func (e *Executor) queryChallengeHeartbeatInterval() (uint64, error) {
client := e.clients.GetClient().Client
client := e.clients.GetClient()
q := challengetypes.QueryParamsRequest{}
res, err := client.ChallengeParams(context.Background(), &q)
if err != nil {
Expand Down Expand Up @@ -288,7 +288,7 @@ func (e *Executor) QueryChallengeHeartbeatInterval() (uint64, error) {
}

func (e *Executor) QueryChallengeSlashCoolingOffPeriod() (uint64, error) {
client := e.clients.GetClient().Client
client := e.clients.GetClient()
params, err := client.ChallengeParams(context.Background(), &challengetypes.QueryParamsRequest{})
if err != nil {
logging.Logger.Errorf("query challenge params failed, err=%+v", err.Error())
Expand Down Expand Up @@ -324,7 +324,7 @@ func (e *Executor) GetHeightLoop() {
}

func (e *Executor) GetStorageProviderEndpoint(address string) (string, error) {
client := e.clients.GetClient().Client
client := e.clients.GetClient()
spAddr, err := sdk.AccAddressFromHexUnsafe(address)
if err != nil {
logging.Logger.Errorf("error converting addr from hex unsafe when getting sp endpoint, err=%+v", err.Error())
Expand All @@ -341,7 +341,7 @@ func (e *Executor) GetStorageProviderEndpoint(address string) (string, error) {
}

func (e *Executor) GetObjectInfoChecksums(objectId string) ([][]byte, error) {
client := e.clients.GetClient().Client
client := e.clients.GetClient()

res, err := client.HeadObjectByID(context.Background(), objectId)
if err != nil {
Expand All @@ -352,7 +352,7 @@ func (e *Executor) GetObjectInfoChecksums(objectId string) ([][]byte, error) {
}

func (e *Executor) GetChallengeResultFromSp(objectId, endpoint string, segmentIndex, redundancyIndex int) (*types.ChallengeResult, error) {
client := e.clients.GetClient().Client
client := e.clients.GetClient()

challengeInfoOpts := types.GetChallengeInfoOptions{
Endpoint: endpoint,
Expand Down Expand Up @@ -398,7 +398,7 @@ func (e *Executor) GetAddr() string {
}

func (e *Executor) GetNonce() (uint64, error) {
client := e.clients.GetClient().Client
client := e.clients.GetClient()
account, err := client.GetAccount(context.Background(), e.GetAddr())
if err != nil {
logging.Logger.Errorf("error getting account, err=%+v", err.Error())
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ require (
cosmossdk.io/math v1.0.1
github.com/avast/retry-go/v4 v4.3.1
github.com/aws/aws-sdk-go v1.40.45
github.com/bnb-chain/greenfield v0.2.5
github.com/bnb-chain/greenfield v0.2.6
github.com/bnb-chain/greenfield-common/go v0.0.0-20230830120314-a54ffd6da39f
github.com/bnb-chain/greenfield-go-sdk v0.2.5
github.com/bnb-chain/greenfield-go-sdk v0.2.6
github.com/cometbft/cometbft v0.37.2
github.com/cosmos/cosmos-sdk v0.47.3
github.com/ethereum/go-ethereum v1.10.26
Expand Down Expand Up @@ -173,7 +173,7 @@ replace (
github.com/cometbft/cometbft => github.com/bnb-chain/greenfield-cometbft v0.0.3
github.com/cometbft/cometbft-db => github.com/bnb-chain/greenfield-cometbft-db v0.8.1-alpha.1
github.com/confio/ics23/go => github.com/cosmos/cosmos-sdk/ics23/go v0.8.0
github.com/cosmos/cosmos-sdk => github.com/bnb-chain/greenfield-cosmos-sdk v0.2.5
github.com/cosmos/cosmos-sdk => github.com/bnb-chain/greenfield-cosmos-sdk v0.2.6
github.com/cosmos/iavl => github.com/bnb-chain/greenfield-iavl v0.20.1
github.com/ferranbt/fastssz => github.com/prysmaticlabs/fastssz v0.0.0-20220110145812-fafb696cae88
github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -166,22 +166,22 @@ github.com/bgentry/speakeasy v0.1.1-0.20220910012023-760eaf8b6816 h1:41iFGWnSlI2
github.com/bgentry/speakeasy v0.1.1-0.20220910012023-760eaf8b6816/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
github.com/bmizerany/pat v0.0.0-20170815010413-6226ea591a40/go.mod h1:8rLXio+WjiTceGBHIoTvn60HIbs7Hm7bcHjyrSqYB9c=
github.com/bnb-chain/greenfield v0.2.5 h1:AmHvGfJmHm7FXB4II4mAB6i68bw98p86ZlIs9r6YeRM=
github.com/bnb-chain/greenfield v0.2.5/go.mod h1:nNsy8QGR8+R0j6Qz/duNE94NDDNuomC6wWDCzfl7jKc=
github.com/bnb-chain/greenfield v0.2.6 h1:U40wmSBQR4Wd0HiVCu/J6zqoLS4YUrvNgyuX71kgK3U=
github.com/bnb-chain/greenfield v0.2.6/go.mod h1:8kGVzKu3BEbpotk2Lmp/OzPh+nhbMsuUJueZtn0he4s=
github.com/bnb-chain/greenfield-cometbft v0.0.3 h1:tv8NMy3bzX/1urqXGQIIAZSLy83loiI+dG0VKeyh1CY=
github.com/bnb-chain/greenfield-cometbft v0.0.3/go.mod h1:f35mk/r5ab6yvzlqEWZt68LfUje68sYgMpVlt2CUYMk=
github.com/bnb-chain/greenfield-cometbft-db v0.8.1-alpha.1 h1:XcWulGacHVRiSCx90Q8Y//ajOrLNBQWR/KDB89dy3cU=
github.com/bnb-chain/greenfield-cometbft-db v0.8.1-alpha.1/go.mod h1:ey1CiK4bYo1RBNJLRiVbYr5CMdSxci9S/AZRINLtppI=
github.com/bnb-chain/greenfield-common/go v0.0.0-20230830120314-a54ffd6da39f h1:zJvB2wCd80DQ9Nh/ZNQiP8MrHygSpDoav7OzHyIi/pM=
github.com/bnb-chain/greenfield-common/go v0.0.0-20230830120314-a54ffd6da39f/go.mod h1:it3JJVHeG9Wp4QED2GkY/7V9Qo3BuPdoC5/4/U6ecJM=
github.com/bnb-chain/greenfield-cosmos-sdk v0.2.5 h1:8zIn/ExfMHZVBbVwhILG9KYO1m3pZO8I8stpqbFgzkk=
github.com/bnb-chain/greenfield-cosmos-sdk v0.2.5/go.mod h1:y3hDhQhil5hMIhwBTpu07RZBF30ZITkoE+GHhVZChtY=
github.com/bnb-chain/greenfield-cosmos-sdk v0.2.6 h1:aPrd2nG2nYZr8nSmGgN8efus6sni4OIZ1Bdx6s+mS3A=
github.com/bnb-chain/greenfield-cosmos-sdk v0.2.6/go.mod h1:y3hDhQhil5hMIhwBTpu07RZBF30ZITkoE+GHhVZChtY=
github.com/bnb-chain/greenfield-cosmos-sdk/api v0.0.0-20230816082903-b48770f5e210 h1:GHPbV2bC+gmuO6/sG0Tm8oGal3KKSRlyE+zPscDjlA8=
github.com/bnb-chain/greenfield-cosmos-sdk/api v0.0.0-20230816082903-b48770f5e210/go.mod h1:vhsZxXE9tYJeYB5JR4hPhd6Pc/uPf7j1T8IJ7p9FdeM=
github.com/bnb-chain/greenfield-cosmos-sdk/math v0.0.0-20230816082903-b48770f5e210 h1:FLVOn4+OVbsKi2+YJX5kmD27/4dRu4FW7xCXFhzDO5s=
github.com/bnb-chain/greenfield-cosmos-sdk/math v0.0.0-20230816082903-b48770f5e210/go.mod h1:An0MllWJY6PxibUpnwGk8jOm+a/qIxlKmL5Zyp9NnaM=
github.com/bnb-chain/greenfield-go-sdk v0.2.5 h1:ErI7nUtNCRDvpXSVaG/XKXEBhTcmiofg/2ciKU1uBag=
github.com/bnb-chain/greenfield-go-sdk v0.2.5/go.mod h1:rWI9Xgcto7ujajNfG6x7+lW1SzbJWBkIgnOcoNjWS7U=
github.com/bnb-chain/greenfield-go-sdk v0.2.6 h1:CGmMK+Ie3ZXae86rGYJnaF7FmO+y92S4a/Qp7DzxhLU=
github.com/bnb-chain/greenfield-go-sdk v0.2.6/go.mod h1:MrxZcdoK/YC8o0wECOv8fskozBokekxApLbi1P6qeTI=
github.com/bnb-chain/greenfield-iavl v0.20.1 h1:y3L64GU99otNp27/xLVBTDbv4eroR6CzoYz0rbaVotM=
github.com/bnb-chain/greenfield-iavl v0.20.1/go.mod h1:oLksTs8dfh7DYIKBro7hbRQ+ewls7ghJ27pIXlbEXyI=
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
Expand Down
4 changes: 0 additions & 4 deletions monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,18 +172,15 @@ func (m *Monitor) monitorChallengeEvents(block *tmtypes.Block, blockResults *cty
func (m *Monitor) calNextHeight() (uint64, error) {
latestPolledBlock, err := m.dataProvider.GetLatestBlock()
if err != nil && err != gorm.ErrRecordNotFound {
logging.Logger.Errorf("failed to get latest block from db, error: %s", err.Error())
latestHeight, err := m.executor.GetLatestBlockHeight()
if err != nil {
logging.Logger.Errorf("failed to get latest block height, error: %s", err.Error())
return 0, err
}
return latestHeight, err
}
if latestPolledBlock.Height == 0 { // a fresh database
latestHeight, err := m.executor.GetLatestBlockHeight()
if err != nil {
logging.Logger.Errorf("failed to get latest block height, error: %s", err.Error())
return m.executor.GetCachedBlockHeight(), err
}
return latestHeight, nil
Expand All @@ -192,7 +189,6 @@ func (m *Monitor) calNextHeight() (uint64, error) {

latestBlockHeight, err := m.executor.GetLatestBlockHeight()
if err != nil {
logging.Logger.Errorf("failed to get latest block height, error: %s", err.Error())
return 0, err
}
// pauses challenger for a bit since it already caught the newest block
Expand Down
58 changes: 45 additions & 13 deletions verifier/hash_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/hex"
lru "github.com/hashicorp/golang-lru"
"golang.org/x/sync/semaphore"
"io"
"strings"
Expand All @@ -25,7 +26,7 @@ type Verifier struct {
config *config.Config
executor *executor.Executor
deduplicationInterval uint64
cachedChallengeIds map[uint64]bool
cachedChallengeIds *lru.Cache
mtx sync.RWMutex
dataProvider DataProvider
limiterSemaphore *semaphore.Weighted
Expand All @@ -37,6 +38,9 @@ func NewHashVerifier(cfg *config.Config, executor *executor.Executor, dataProvid
) *Verifier {
limiterSemaphore := semaphore.NewWeighted(20)

cacheSize := 1000
lruCache, _ := lru.New(cacheSize)

deduplicationInterval, err := executor.QueryChallengeSlashCoolingOffPeriod()
if err != nil {
logging.Logger.Errorf("verifier failed to query slash cooling off period, err=%+v", err)
Expand All @@ -46,6 +50,7 @@ func NewHashVerifier(cfg *config.Config, executor *executor.Executor, dataProvid
config: cfg,
executor: executor,
deduplicationInterval: deduplicationInterval,
cachedChallengeIds: lruCache,
mtx: sync.RWMutex{},
dataProvider: dataProvider,
limiterSemaphore: limiterSemaphore,
Expand All @@ -54,8 +59,6 @@ func NewHashVerifier(cfg *config.Config, executor *executor.Executor, dataProvid
}

func (v *Verifier) VerifyHashLoop() {
// Event lasts for 300 blocks, 2x for redundancy
v.cachedChallengeIds = make(map[uint64]bool, common.CacheSize)
for {
err := v.verifyHash()
if err != nil {
Expand Down Expand Up @@ -88,7 +91,7 @@ func (v *Verifier) verifyHash() error {

for _, event := range events {
v.mtx.Lock()
isCached := v.cachedChallengeIds[event.ChallengeId]
isCached := v.cachedChallengeIds.Contains(event.ChallengeId)
v.mtx.Unlock()

if isCached {
Expand All @@ -112,7 +115,7 @@ func (v *Verifier) verifyHash() error {
if err != nil {
if err.Error() == common.ErrEventExpired.Error() {
v.mtx.Lock()
delete(v.cachedChallengeIds, event.ChallengeId)
v.cachedChallengeIds.Remove(event.ChallengeId)
v.mtx.Unlock()
continue
}
Expand All @@ -121,7 +124,7 @@ func (v *Verifier) verifyHash() error {

if !isCached {
v.mtx.Lock()
v.cachedChallengeIds[event.ChallengeId] = true
v.cachedChallengeIds.Add(event.ChallengeId, true)
v.mtx.Unlock()
}
}
Expand All @@ -132,25 +135,54 @@ func (v *Verifier) verifyHash() error {
}

func (v *Verifier) verifyForSingleEvent(event *model.Event) error {
var err error
startTime := time.Now()
logging.Logger.Infof("verifier started for challengeId: %d %s", event.ChallengeId, time.Now().Format("15:04:05.000000"))
currentHeight := v.executor.GetCachedBlockHeight()
if err := v.preCheck(event, currentHeight); err != nil {
if err = v.preCheck(event, currentHeight); err != nil {
return err
}

endpoint, err := v.executor.GetStorageProviderEndpoint(event.SpOperatorAddress)
// Retry GetStorageProviderEndpoint and GetObjectInfoChecksums up to 5 times
var endpoint string
_ = retry.Do(
func() error {
endpoint, err = v.executor.GetStorageProviderEndpoint(event.SpOperatorAddress)
if err != nil {
logging.Logger.Errorf("verifier failed to get sp endpoint for challengeId: %s, objectId: %s, err=%+v", event.ChallengeId, event.ObjectId, err.Error())
}
return err
}, retry.Context(context.Background()), common.RtyAttem, common.RtyDelay, common.RtyErr)

if err != nil {
logging.Logger.Errorf("verifier failed to get sp endpoint for challengeId: %s, objectId: %s, err=%+v", err.Error(), event.ChallengeId, event.ObjectId)
err = v.dataProvider.UpdateEventStatusVerifyResult(event.ChallengeId, model.Verified, model.Unknown)
v.metricService.IncVerifiedChallenges()
v.metricService.IncChallengeFailed()
if err != nil {
return err
}
return err
}
logging.Logger.Infof("challengeId: %d, sp endpoint: %s, objectId: %s, segmentIndex: %d, redundancyIndex: %d", event.ChallengeId, endpoint, event.ObjectId, event.SegmentIndex, event.RedundancyIndex)

// Call blockchain for object info to get original hash
checksums, err := v.executor.GetObjectInfoChecksums(event.ObjectId)
var checksums [][]byte
_ = retry.Do(
func() error {
checksums, err = v.executor.GetObjectInfoChecksums(event.ObjectId)
if err != nil {
if strings.Contains(err.Error(), "No such object") {
logging.Logger.Errorf("No such object error for challengeId: %d", event.ChallengeId)
}
logging.Logger.Errorf("hash verifier error getting object checksums for challengeId: %d, err=%s", event.ChallengeId, err.Error())
}
return err
}, retry.Context(context.Background()), common.RtyAttem, common.RtyDelay, common.RtyErr)
if err != nil {
if strings.Contains(err.Error(), "No such object") {
logging.Logger.Errorf("No such object error for challengeId: %d", event.ChallengeId)
err = v.dataProvider.UpdateEventStatusVerifyResult(event.ChallengeId, model.Verified, model.Unknown)
v.metricService.IncVerifiedChallenges()
v.metricService.IncChallengeFailed()
if err != nil {
return err
}
return err
}
Expand Down