diff --git a/cmd/index.go b/cmd/index.go index 0aeb26e..d869bcb 100644 --- a/cmd/index.go +++ b/cmd/index.go @@ -1,7 +1,6 @@ package cmd import ( - "context" "fmt" "log" "math" @@ -21,7 +20,6 @@ import ( "github.com/DefiantLabs/cosmos-indexer/rpc" "github.com/DefiantLabs/cosmos-indexer/tasks" "github.com/spf13/cobra" - ctypes "github.com/tendermint/tendermint/rpc/core/types" "gorm.io/gorm" ) @@ -515,8 +513,7 @@ func (idxr *Indexer) indexBlockEvents(wg *sync.WaitGroup, failedBlockHandler cor config.Log.Infof("Indexing block events from block: %v to %v", startHeight, endHeight) - // TODO: Strip this out of the Osmosis module and make it generalized - rpcClient := osmosis.URIClient{ + rpcClient := rpc.URIClient{ Address: idxr.cl.Config.RPCAddr, Client: &http.Client{}, } @@ -524,7 +521,7 @@ func (idxr *Indexer) indexBlockEvents(wg *sync.WaitGroup, failedBlockHandler cor currentHeight := startHeight for endHeight == -1 || currentHeight <= endHeight { - bresults, err := getBlockResult(rpcClient, currentHeight) + bresults, err := rpc.GetBlockResultWithRetry(rpcClient, currentHeight, idxr.cfg.Base.RPCRetryAttempts, idxr.cfg.Base.RPCRetryMaxWait) if err != nil { config.Log.Error(fmt.Sprintf("Error receiving block result for block %d", currentHeight), err) failedBlockHandler(currentHeight, core.FailedBlockEventHandling, err) @@ -619,7 +616,7 @@ func (idxr *Indexer) indexEpochEvents(wg *sync.WaitGroup, failedBlockHandler cor config.Log.Infof("Indexing epoch events from epoch: %v to %v", startEpochNumber, endEpochNumber) - rpcClient := osmosis.URIClient{ + rpcClient := rpc.URIClient{ Address: idxr.cl.Config.RPCAddr, Client: &http.Client{}, } @@ -627,7 +624,7 @@ func (idxr *Indexer) indexEpochEvents(wg *sync.WaitGroup, failedBlockHandler cor for _, epoch := range epochsBetween { config.Log.Infof("Indexing epoch events for epoch %v at height %d", epoch.EpochNumber, epoch.StartHeight) - bresults, err := getBlockResult(rpcClient, int64(epoch.StartHeight)) + bresults, err := rpc.GetBlockResultWithRetry(rpcClient, int64(epoch.StartHeight), idxr.cfg.Base.RPCRetryAttempts, idxr.cfg.Base.RPCRetryMaxWait) if err != nil { config.Log.Error(fmt.Sprintf("Error receiving block result for block %d", epoch.StartHeight), err) failedBlockHandler(int64(epoch.StartHeight), core.FailedBlockEventHandling, err) @@ -689,18 +686,6 @@ func GetEpochsAtIdentifierBetweenStartAndEnd(db *gorm.DB, chainID uint, identifi return epochsBetween, dbResp.Error } -func getBlockResult(client osmosis.URIClient, height int64) (*ctypes.ResultBlockResults, error) { - brctx, cancel := context.WithTimeout(context.Background(), 100*time.Second) - defer cancel() - - bresults, err := client.DoBlockResults(brctx, &height) - if err != nil { - return nil, err - } - - return bresults, nil -} - // doDBUpdates will read the data out of the db data chan that had been processed by the workers // if this is a dry run, we will simply empty the channel and track progress // otherwise we will index the data in the DB. diff --git a/cmd/root.go b/cmd/root.go index 43729ac..0cd2f62 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -68,6 +68,8 @@ func init() { rootCmd.PersistentFlags().Int64Var(&conf.Base.WaitForChainDelay, "base.wait-for-chain-delay", 10, "seconds to wait between each check for node to catch up to the chain") rootCmd.PersistentFlags().Int64Var(&conf.Base.BlockTimer, "base.block-timer", 10000, "print out how long it takes to process this many blocks") rootCmd.PersistentFlags().BoolVar(&conf.Base.ExitWhenCaughtUp, "base.exit-when-caught-up", true, "mainly used for Osmosis rewards indexing") + rootCmd.PersistentFlags().Int64Var(&conf.Base.RPCRetryAttempts, "base.rpc-retry-attempts", 0, "number of RPC query retries to make") + rootCmd.PersistentFlags().Uint64Var(&conf.Base.RPCRetryMaxWait, "base.rpc-retry-max-wait", 30, "max retry incremental backoff wait time in seconds") // Lens rootCmd.PersistentFlags().StringVar(&conf.Lens.RPC, "lens.rpc", "", "node rpc endpoint") diff --git a/config.toml.example b/config.toml.example index 80615bf..9892474 100644 --- a/config.toml.example +++ b/config.toml.example @@ -26,6 +26,8 @@ epoch-events-end-epoch=752 dry = false # if true, indexing will occur but data will not be written to the database. api = "" # node api endpoint rpc-workers = 1 +rpc-retry-attempts=0 #RPC queries are configured to retry if failed. This value sets how many retries to do before giving up. (-1 for indefinite retries) +rpc-retry-max-wait=30 #RPC query failure backoff max wait time in seconds #Lens config options [lens] diff --git a/config/app_config.go b/config/app_config.go index 9e9d9f7..e9b7152 100644 --- a/config/app_config.go +++ b/config/app_config.go @@ -169,6 +169,8 @@ type base struct { EpochIndexingIdentifier string `mapstructure:"epoch-indexing-identifier"` EpochEventsStartEpoch int64 `mapstructure:"epoch-events-start-epoch"` EpochEventsEndEpoch int64 `mapstructure:"epoch-events-end-epoch"` + RPCRetryAttempts int64 `mapstructure:"rpc-retry-attempts"` + RPCRetryMaxWait uint64 `mapstructure:"rpc-retry-max-wait"` } type log struct { diff --git a/osmosis/helpers.go b/osmosis/helpers.go index 2d2e8b3..110ffdf 100644 --- a/osmosis/helpers.go +++ b/osmosis/helpers.go @@ -1,180 +1 @@ package osmosis - -import ( - "context" - "encoding/json" - "errors" - "fmt" - "io" - "net/http" - "net/url" - "reflect" - - tmjson "github.com/tendermint/tendermint/libs/json" - ctypes "github.com/tendermint/tendermint/rpc/core/types" - jsonrpc "github.com/tendermint/tendermint/rpc/jsonrpc/client" - types "github.com/tendermint/tendermint/rpc/jsonrpc/types" -) - -func DoHTTPReq(url string, authHeader string) (*http.Response, error) { - // Send req using http Client - client := &http.Client{} - req, _ := http.NewRequest("GET", url, nil) - req.Header.Add("Authorization", authHeader) - return client.Do(req) -} - -func argsToURLValues(args map[string]interface{}) (url.Values, error) { - values := make(url.Values) - if len(args) == 0 { - return values, nil - } - - err := argsToJSON(args) - if err != nil { - return nil, err - } - - for key, val := range args { - values.Set(key, val.(string)) - } - - return values, nil -} - -func argsToJSON(args map[string]interface{}) error { - for k, v := range args { - rt := reflect.TypeOf(v) - isByteSlice := rt.Kind() == reflect.Slice && rt.Elem().Kind() == reflect.Uint8 - if isByteSlice { - bytes := reflect.ValueOf(v).Bytes() - args[k] = fmt.Sprintf("0x%X", bytes) - continue - } - - data, err := tmjson.Marshal(v) - if err != nil { - return err - } - args[k] = string(data) - } - return nil -} - -// Call issues a POST form HTTP request. -func (c *URIClient) DoHTTPGet(ctx context.Context, method string, params map[string]interface{}, result interface{}) (interface{}, error) { - values, err := argsToURLValues(params) - if err != nil { - return nil, fmt.Errorf("failed to encode params: %w", err) - } - - req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.Address+"/"+method, nil) - if err != nil { - return nil, fmt.Errorf("error creating new request: %w", err) - } - - req.URL.RawQuery = values.Encode() - // fmt.Printf("Query string: %s\n", values.Encode()) - - // req.Header.Set("Content-Type", "application/x-www-form-urlencoded") - if c.AuthHeader != "" { - req.Header.Add("Authorization", c.AuthHeader) - } - - resp, err := c.Client.Do(req) - if err != nil { - return nil, fmt.Errorf("get: %w", err) - } - defer resp.Body.Close() - - responseBytes, err := io.ReadAll(resp.Body) - if err != nil { - return nil, fmt.Errorf("read response body: %w", err) - } - - return unmarshalResponseBytes(responseBytes, jsonrpc.URIClientRequestID, result) -} - -// From the JSON-RPC 2.0 spec: -// id: It MUST be the same as the value of the id member in the Request Object. -func validateAndVerifyID(res *types.RPCResponse, expectedID types.JSONRPCIntID) error { - if err := validateResponseID(res.ID); err != nil { - return err - } - if expectedID != res.ID.(types.JSONRPCIntID) { // validateResponseID ensured res.ID has the right type - return fmt.Errorf("response ID (%d) does not match request ID (%d)", res.ID, expectedID) - } - return nil -} - -func validateResponseID(id interface{}) error { - if id == nil { - return errors.New("no ID") - } - _, ok := id.(types.JSONRPCIntID) - if !ok { - return fmt.Errorf("expected JSONRPCIntID, but got: %T", id) - } - return nil -} - -func unmarshalResponseBytes(responseBytes []byte, expectedID types.JSONRPCIntID, result interface{}) (interface{}, error) { - // Read response. If rpc/core/types is imported, the result will unmarshal - // into the correct type. - response := &types.RPCResponse{} - if err := json.Unmarshal(responseBytes, response); err != nil { - return nil, fmt.Errorf("error unmarshalling: %w", err) - } - - if response.Error != nil { - return nil, response.Error - } - - if err := validateAndVerifyID(response, expectedID); err != nil { - return nil, fmt.Errorf("wrong ID: %w", err) - } - - // Unmarshal the RawMessage into the result. - if err := tmjson.Unmarshal(response.Result, result); err != nil { - return nil, fmt.Errorf("error unmarshalling result: %w", err) - } - - return result, nil -} - -func (c *URIClient) DoBlockSearch(ctx context.Context, query string, page, perPage *int, orderBy string) (*ctypes.ResultBlockSearch, error) { - result := new(ctypes.ResultBlockSearch) - params := map[string]interface{}{ - "query": query, - "order_by": orderBy, - } - - if page != nil { - params["page"] = page - } - if perPage != nil { - params["per_page"] = perPage - } - - _, err := c.DoHTTPGet(ctx, "block_search", params, result) - if err != nil { - return nil, err - } - - return result, nil -} - -func (c *URIClient) DoBlockResults(ctx context.Context, height *int64) (*ctypes.ResultBlockResults, error) { - result := new(ctypes.ResultBlockResults) - params := make(map[string]interface{}) - if height != nil { - params["height"] = height - } - - _, err := c.DoHTTPGet(ctx, "block_results", params, result) - if err != nil { - return nil, err - } - - return result, nil -} diff --git a/osmosis/types.go b/osmosis/types.go index dc287df..52414ef 100644 --- a/osmosis/types.go +++ b/osmosis/types.go @@ -1,41 +1,6 @@ package osmosis -import ( - "net/http" - "time" -) - const ( ChainID = "osmosis-1" Name = "osmosis" ) - -type Result struct { - Data Data -} - -type Data struct { - Value EventDataNewBlockHeader -} - -type EventDataNewBlockHeader struct { - Header Header `json:"header"` - NumTxs string `json:"num_txs"` // Number of txs in a block -} - -type Header struct { - // basic block info - ChainID string `json:"chain_id"` - Height string `json:"height"` - Time time.Time `json:"time"` -} - -type TendermintNewBlockHeader struct { - Result Result -} - -type URIClient struct { - Address string - Client *http.Client - AuthHeader string -} diff --git a/rpc/blocks.go b/rpc/blocks.go new file mode 100644 index 0000000..d5cf591 --- /dev/null +++ b/rpc/blocks.go @@ -0,0 +1,243 @@ +package rpc + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "math" + "net/http" + "net/url" + "reflect" + "time" + + "github.com/DefiantLabs/cosmos-indexer/config" + tmjson "github.com/tendermint/tendermint/libs/json" + ctypes "github.com/tendermint/tendermint/rpc/core/types" + jsonrpc "github.com/tendermint/tendermint/rpc/jsonrpc/client" + types "github.com/tendermint/tendermint/rpc/jsonrpc/types" +) + +func argsToURLValues(args map[string]interface{}) (url.Values, error) { + values := make(url.Values) + if len(args) == 0 { + return values, nil + } + + err := argsToJSON(args) + if err != nil { + return nil, err + } + + for key, val := range args { + values.Set(key, val.(string)) + } + + return values, nil +} + +func argsToJSON(args map[string]interface{}) error { + for k, v := range args { + rt := reflect.TypeOf(v) + isByteSlice := rt.Kind() == reflect.Slice && rt.Elem().Kind() == reflect.Uint8 + if isByteSlice { + bytes := reflect.ValueOf(v).Bytes() + args[k] = fmt.Sprintf("0x%X", bytes) + continue + } + + data, err := tmjson.Marshal(v) + if err != nil { + return err + } + args[k] = string(data) + } + return nil +} + +func (c *URIClient) DoHTTPGet(ctx context.Context, method string, params map[string]interface{}, result interface{}) (interface{}, error) { + values, err := argsToURLValues(params) + if err != nil { + return nil, fmt.Errorf("failed to encode params: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.Address+"/"+method, nil) + if err != nil { + return nil, fmt.Errorf("error creating new request: %w", err) + } + + req.URL.RawQuery = values.Encode() + // fmt.Printf("Query string: %s\n", values.Encode()) + + // req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + if c.AuthHeader != "" { + req.Header.Add("Authorization", c.AuthHeader) + } + + resp, err := c.Client.Do(req) + if err != nil { + return nil, fmt.Errorf("get: %w", err) + } + defer resp.Body.Close() + + responseBytes, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("read response body: %w", err) + } + + return unmarshalResponseBytes(responseBytes, jsonrpc.URIClientRequestID, result) +} + +type URIClient struct { + Address string + Client *http.Client + AuthHeader string +} + +func unmarshalResponseBytes(responseBytes []byte, expectedID types.JSONRPCIntID, result interface{}) (interface{}, error) { + // Read response. If rpc/core/types is imported, the result will unmarshal + // into the correct type. + response := &types.RPCResponse{} + if err := json.Unmarshal(responseBytes, response); err != nil { + return nil, fmt.Errorf("error unmarshalling: %w", err) + } + + if response.Error != nil { + return nil, response.Error + } + + if err := validateAndVerifyID(response, expectedID); err != nil { + return nil, fmt.Errorf("wrong ID: %w", err) + } + + // Unmarshal the RawMessage into the result. + if err := tmjson.Unmarshal(response.Result, result); err != nil { + return nil, fmt.Errorf("error unmarshalling result: %w", err) + } + + return result, nil +} + +func validateAndVerifyID(res *types.RPCResponse, expectedID types.JSONRPCIntID) error { + if err := validateResponseID(res.ID); err != nil { + return err + } + if expectedID != res.ID.(types.JSONRPCIntID) { // validateResponseID ensured res.ID has the right type + return fmt.Errorf("response ID (%d) does not match request ID (%d)", res.ID, expectedID) + } + return nil +} + +func validateResponseID(id interface{}) error { + if id == nil { + return errors.New("no ID") + } + _, ok := id.(types.JSONRPCIntID) + if !ok { + return fmt.Errorf("expected JSONRPCIntID, but got: %T", id) + } + return nil +} + +func (c *URIClient) DoBlockSearch(ctx context.Context, query string, page, perPage *int, orderBy string) (*ctypes.ResultBlockSearch, error) { + result := new(ctypes.ResultBlockSearch) + params := map[string]interface{}{ + "query": query, + "order_by": orderBy, + } + + if page != nil { + params["page"] = page + } + if perPage != nil { + params["per_page"] = perPage + } + + _, err := c.DoHTTPGet(ctx, "block_search", params, result) + if err != nil { + return nil, err + } + + return result, nil +} + +func (c *URIClient) DoBlockResults(ctx context.Context, height *int64) (*ctypes.ResultBlockResults, error) { + result := new(ctypes.ResultBlockResults) + params := make(map[string]interface{}) + if height != nil { + params["height"] = height + } + + _, err := c.DoHTTPGet(ctx, "block_results", params, result) + if err != nil { + return nil, err + } + + return result, nil +} + +func GetBlockResult(client URIClient, height int64) (*ctypes.ResultBlockResults, error) { + brctx, cancel := context.WithTimeout(context.Background(), 100*time.Second) + defer cancel() + + bresults, err := client.DoBlockResults(brctx, &height) + if err != nil { + return nil, err + } + + return bresults, nil +} + +func GetBlockResultWithRetry(client URIClient, height int64, retryMaxAttempts int64, retryMaxWaitSeconds uint64) (*ctypes.ResultBlockResults, error) { + if retryMaxAttempts == 0 { + return GetBlockResult(client, height) + } + + if retryMaxWaitSeconds < 2 { + retryMaxWaitSeconds = 2 + } + + var attempts int64 + maxRetryTime := time.Duration(retryMaxWaitSeconds) * time.Second + if maxRetryTime < 0 { + config.Log.Warn("Detected maxRetryTime overflow, setting time to sane maximum of 30s") + maxRetryTime = 30 * time.Second + } + + currentBackoffDuration, maxReached := getBackoffDurationForAttempts(attempts, maxRetryTime) + + for { + resp, err := GetBlockResult(client, height) + attempts++ + if err != nil && (retryMaxAttempts < 0 || (attempts <= retryMaxAttempts)) { + config.Log.Error("Error getting RPC response, backing off and trying again", err) + config.Log.Debugf("Attempt %d with wait time %+v", attempts, currentBackoffDuration) + time.Sleep(currentBackoffDuration) + + // guard against overflow + if !maxReached { + currentBackoffDuration, maxReached = getBackoffDurationForAttempts(attempts, maxRetryTime) + } + + } else { + if err != nil { + config.Log.Error("Error getting RPC response, reached max retry attempts") + } + return resp, err + } + } +} + +func getBackoffDurationForAttempts(numAttempts int64, maxRetryTime time.Duration) (time.Duration, bool) { + backoffBase := 1.5 + backoffDuration := time.Duration(math.Pow(backoffBase, float64(numAttempts)) * float64(time.Second)) + + maxReached := false + if backoffDuration > maxRetryTime || backoffDuration < 0 { + maxReached = true + backoffDuration = maxRetryTime + } + + return backoffDuration, maxReached +} diff --git a/tools/osmosis-rewards-validation/.env.template b/tools/osmosis-rewards-validation/.env.template new file mode 100644 index 0000000..958ef16 --- /dev/null +++ b/tools/osmosis-rewards-validation/.env.template @@ -0,0 +1,5 @@ +export DB_HOST="" +export DB_PORT="" +export DB_NAME="" +export DB_USER="" +export DB_PASSWORD="" diff --git a/tools/osmosis-rewards-validation/.gitignore b/tools/osmosis-rewards-validation/.gitignore new file mode 100644 index 0000000..3f9a892 --- /dev/null +++ b/tools/osmosis-rewards-validation/.gitignore @@ -0,0 +1,3 @@ +.env +venv +output/* diff --git a/tools/osmosis-rewards-validation/main.py b/tools/osmosis-rewards-validation/main.py new file mode 100644 index 0000000..9824911 --- /dev/null +++ b/tools/osmosis-rewards-validation/main.py @@ -0,0 +1,92 @@ +import psycopg2 +import os +import json +import traceback + +def get_env(): + ret = { + "host": os.environ.get("DB_HOST", ""), + "password": os.environ.get("DB_PASSWORD", ""), + "user": os.environ.get("DB_USER", ""), + "port": os.environ.get("DB_PORT", ""), + "db_name": os.environ.get("DB_NAME", "") + } + + if any([ret[x] == "" for x in ret]): + raise Exception("Must provide env vars") + + return ret + +EPOCHS_DAY_ALL_QUERY = "SELECT epoch_number, start_height FROM epoches WHERE identifier='day' ORDER BY start_height ASC;" +BLOCKS_BY_HEIGHT_QUERY = "SELECT id, height FROM blocks WHERE height=%s;" +EVENTS_BY_HEIGHT_AND_SOURCE_QUERY = "SELECT COUNT(*) FROM taxable_event WHERE block_id=%s AND source=0;" + +# An naive/unoptimized implementation to find counts for epoch reward events +# Does the following: +# 1. Finds all epochs +# 2. Finds the block ID for each epoch height +# 3. Counts the reward for each block id +# 4. Reassociates the block rewards back with its epoch +# 5. Outputs the counts for each epoch +if __name__ == "__main__": + env = get_env() + + conn = psycopg2.connect(f"dbname={env['db_name']} user={env['user']} host={env['host']} password={env['password']} port={env['port']}") + try: + epochs = [] + with conn.cursor() as cur: + cur.execute(EPOCHS_DAY_ALL_QUERY) + for record in cur.fetchall(): + epochs.append({ + "epoch_number": record[0], + "start_height": record[1] + }) + + json.dump(epochs, open("output/epochs_tracked.json", 'w'), indent=4) + blocks = [] + heights = [int(epoch["start_height"]) for epoch in epochs] + with conn.cursor() as cur: + for height in heights: + cur.execute(BLOCKS_BY_HEIGHT_QUERY, (height,)) + rec = cur.fetchone() + if rec is None: + print(height) + print(BLOCKS_BY_HEIGHT_QUERY % height) + continue + blocks.append({ + "id": rec[0], + "height": rec[1] + }) + + json.dump(blocks, open("output/blocks_tracked.json", 'w'), indent=4) + events_by_block_height_counts = {} + for block in blocks: + with conn.cursor() as cur: + cur.execute(EVENTS_BY_HEIGHT_AND_SOURCE_QUERY, (block["id"],)) + rec = cur.fetchone() + events_by_block_height_counts[block["height"]] = rec[0] + json.dump(events_by_block_height_counts, open("output/events_by_block_id_tracked.json", 'w'), indent=4) + + epoch_number_counts = [] + available_heights = events_by_block_height_counts.keys() + + for epoch in epochs: + if epoch["start_height"] in available_heights: + epoch_number_counts.append({ + "epoch_number": epoch["epoch_number"], + "count": events_by_block_height_counts[epoch["start_height"]] + }) + else: + epoch_number_counts.append({ + "epoch_number": epoch["epoch_number"], + "count": 0 + }) + + json.dump(epoch_number_counts, open("output/epoch_counts.json", 'w'), indent=4) + + + except Exception as err: + print(err) + traceback.print_exc(err) + finally: + conn.close() diff --git a/tools/osmosis-rewards-validation/requirements.txt b/tools/osmosis-rewards-validation/requirements.txt new file mode 100644 index 0000000..9edde47 --- /dev/null +++ b/tools/osmosis-rewards-validation/requirements.txt @@ -0,0 +1 @@ +psycopg2==2.9.7