From 642b1214348a7720602548ea534b04b1af1b7501 Mon Sep 17 00:00:00 2001 From: iuwqyir Date: Thu, 16 Jan 2025 02:04:02 +0200 Subject: [PATCH] use inserts instead of lightweight deletes to remove reorged data --- internal/common/utils.go | 2 + internal/storage/clickhouse.go | 379 ++++++++++--------------- internal/storage/connector.go | 4 +- internal/storage/memory.go | 24 +- test/mocks/MockIMainStorage.go | 34 +-- test/mocks/MockIOrchestratorStorage.go | 2 +- test/mocks/MockIRPCClient.go | 14 +- test/mocks/MockIStagingStorage.go | 2 +- 8 files changed, 196 insertions(+), 265 deletions(-) diff --git a/internal/common/utils.go b/internal/common/utils.go index efdbaff..baa9b52 100644 --- a/internal/common/utils.go +++ b/internal/common/utils.go @@ -173,6 +173,8 @@ var allowedFunctions = map[string]struct{}{ "toStartOfDay": {}, "toDate": {}, "concat": {}, + "in": {}, + "IN": {}, } var disallowedPatterns = []string{ diff --git a/internal/storage/clickhouse.go b/internal/storage/clickhouse.go index 52cb7cc..db76b97 100644 --- a/internal/storage/clickhouse.go +++ b/internal/storage/clickhouse.go @@ -64,9 +64,8 @@ func connectDB(cfg *config.ClickhouseConfig) (clickhouse.Conn, error) { Settings: func() clickhouse.Settings { if cfg.AsyncInsert { return clickhouse.Settings{ - "async_insert": "1", - "wait_for_async_insert": "1", - "lightweight_deletes_sync": "0", + "async_insert": "1", + "wait_for_async_insert": "1", } } return clickhouse.Settings{} @@ -78,13 +77,13 @@ func connectDB(cfg *config.ClickhouseConfig) (clickhouse.Conn, error) { return conn, nil } -func (c *ClickHouseConnector) insertBlocks(blocks *[]common.Block) error { +func (c *ClickHouseConnector) insertBlocks(blocks *[]common.Block, asDeleted bool) error { query := ` INSERT INTO ` + c.cfg.Database + `.blocks ( chain_id, number, timestamp, hash, parent_hash, sha3_uncles, nonce, mix_hash, miner, state_root, transactions_root, receipts_root, size, logs_bloom, extra_data, difficulty, total_difficulty, transaction_count, gas_limit, - gas_used, withdrawals_root, base_fee_per_gas + gas_used, withdrawals_root, base_fee_per_gas, is_deleted ) ` for i := 0; i < len(*blocks); i += c.cfg.MaxRowsPerInsert { @@ -122,6 +121,12 @@ func (c *ClickHouseConnector) insertBlocks(blocks *[]common.Block) error { block.GasUsed, block.WithdrawalsRoot, block.BaseFeePerGas, + func() int8 { + if asDeleted { + return 1 + } + return 0 + }(), ) if err != nil { return err @@ -134,12 +139,13 @@ func (c *ClickHouseConnector) insertBlocks(blocks *[]common.Block) error { return nil } -func (c *ClickHouseConnector) insertTransactions(txs *[]common.Transaction) error { +func (c *ClickHouseConnector) insertTransactions(txs *[]common.Transaction, asDeleted bool) error { query := ` INSERT INTO ` + c.cfg.Database + `.transactions ( chain_id, hash, nonce, block_hash, block_number, block_timestamp, transaction_index, from_address, to_address, value, gas, gas_price, data, function_selector, max_fee_per_gas, max_priority_fee_per_gas, - transaction_type, r, s, v, access_list, contract_address, gas_used, cumulative_gas_used, effective_gas_price, blob_gas_used, blob_gas_price, logs_bloom, status + transaction_type, r, s, v, access_list, contract_address, gas_used, cumulative_gas_used, effective_gas_price, blob_gas_used, + blob_gas_price, logs_bloom, status, is_deleted ) ` for i := 0; i < len(*txs); i += c.cfg.MaxRowsPerInsert { @@ -184,6 +190,12 @@ func (c *ClickHouseConnector) insertTransactions(txs *[]common.Transaction) erro tx.BlobGasPrice, tx.LogsBloom, tx.Status, + func() int8 { + if asDeleted { + return 1 + } + return 0 + }(), ) if err != nil { return err @@ -198,11 +210,11 @@ func (c *ClickHouseConnector) insertTransactions(txs *[]common.Transaction) erro return nil } -func (c *ClickHouseConnector) insertLogs(logs *[]common.Log) error { +func (c *ClickHouseConnector) insertLogs(logs *[]common.Log, asDeleted bool) error { query := ` INSERT INTO ` + c.cfg.Database + `.logs ( chain_id, block_number, block_hash, block_timestamp, transaction_hash, transaction_index, - log_index, address, data, topic_0, topic_1, topic_2, topic_3 + log_index, address, data, topic_0, topic_1, topic_2, topic_3, is_deleted ) ` for i := 0; i < len(*logs); i += c.cfg.MaxRowsPerInsert { @@ -251,6 +263,12 @@ func (c *ClickHouseConnector) insertLogs(logs *[]common.Log) error { } return "" }(), + func() int8 { + if asDeleted { + return 1 + } + return 0 + }(), ) if err != nil { return err @@ -291,55 +309,9 @@ func (c *ClickHouseConnector) StoreBlockFailures(failures []common.BlockFailure) return batch.Send() } -func (c *ClickHouseConnector) GetBlocks(qf QueryFilter) (blocks []common.Block, err error) { +func (c *ClickHouseConnector) GetBlocks(qf QueryFilter) (blocks QueryResult[common.Block], err error) { columns := "chain_id, number, hash, parent_hash, timestamp, nonce, sha3_uncles, logs_bloom, receipts_root, difficulty, total_difficulty, size, extra_data, gas_limit, gas_used, transaction_count, base_fee_per_gas, withdrawals_root" - query := fmt.Sprintf("SELECT %s FROM %s.blocks WHERE number IN (%s) AND is_deleted = 0", - columns, c.cfg.Database, getBlockNumbersStringArray(qf.BlockNumbers)) - - if qf.ChainId.Sign() > 0 { - query += fmt.Sprintf(" AND chain_id = %s", qf.ChainId.String()) - } - - query += getLimitClause(int(qf.Limit)) - - if err := common.ValidateQuery(query); err != nil { - return nil, err - } - rows, err := c.conn.Query(context.Background(), query) - if err != nil { - return nil, err - } - defer rows.Close() - - for rows.Next() { - var block common.Block - err := rows.Scan( - &block.ChainId, - &block.Number, - &block.Hash, - &block.ParentHash, - &block.Timestamp, - &block.Nonce, - &block.Sha3Uncles, - &block.LogsBloom, - &block.ReceiptsRoot, - &block.Difficulty, - &block.TotalDifficulty, - &block.Size, - &block.ExtraData, - &block.GasLimit, - &block.GasUsed, - &block.TransactionCount, - &block.BaseFeePerGas, - &block.WithdrawalsRoot, - ) - if err != nil { - zLog.Error().Err(err).Msg("Error scanning block") - return nil, err - } - blocks = append(blocks, block) - } - return blocks, nil + return executeQuery[common.Block](c, "blocks", columns, qf, scanBlock) } func (c *ClickHouseConnector) GetTransactions(qf QueryFilter) (QueryResult[common.Transaction], error) { @@ -352,6 +324,11 @@ func (c *ClickHouseConnector) GetLogs(qf QueryFilter) (QueryResult[common.Log], return executeQuery[common.Log](c, "logs", columns, qf, scanLog) } +func (c *ClickHouseConnector) GetTraces(qf QueryFilter) (QueryResult[common.Trace], error) { + columns := "chain_id, block_number, block_hash, block_timestamp, transaction_hash, transaction_index, subtraces, trace_address, type, call_type, error, from_address, to_address, gas, gas_used, input, output, value, author, reward_type, refund_address" + return executeQuery[common.Trace](c, "traces", columns, qf, scanTrace) +} + func (c *ClickHouseConnector) GetAggregations(table string, qf QueryFilter) (QueryResult[interface{}], error) { // Build the SELECT clause with aggregates selectColumns := strings.Join(append(qf.GroupBy, qf.Aggregates...), ", ") @@ -459,6 +436,7 @@ func (c *ClickHouseConnector) buildQuery(table, columns string, qf QueryFilter) query = addFilterParams("chain_id", qf.ChainId.String(), query) } query = addContractAddress(table, query, qf.ContractAddress) + query = addBlockNumbersFilter(table, query, qf.BlockNumbers) // Add signature clause if qf.Signature != "" { @@ -526,6 +504,17 @@ func addContractAddress(table, query string, contractAddress string) string { return query } +func addBlockNumbersFilter(table string, query string, blockNumbers []*big.Int) string { + if len(blockNumbers) > 0 { + columnName := "block_number" + if table == "blocks" { + columnName = "number" + } + query += fmt.Sprintf(" AND %s IN (%s)", columnName, getBlockNumbersStringArray(blockNumbers)) + } + return query +} + func addSignatureClause(table, query, signature string) string { if table == "logs" { query += fmt.Sprintf(" AND topic_0 = '%s'", signature) @@ -548,6 +537,34 @@ func getTopicValueFormat(topic string) string { return result } +func scanBlock(rows driver.Rows) (common.Block, error) { + var block common.Block + err := rows.Scan( + &block.ChainId, + &block.Number, + &block.Hash, + &block.ParentHash, + &block.Timestamp, + &block.Nonce, + &block.Sha3Uncles, + &block.LogsBloom, + &block.ReceiptsRoot, + &block.Difficulty, + &block.TotalDifficulty, + &block.Size, + &block.ExtraData, + &block.GasLimit, + &block.GasUsed, + &block.TransactionCount, + &block.BaseFeePerGas, + &block.WithdrawalsRoot, + ) + if err != nil { + return common.Block{}, fmt.Errorf("error scanning block: %w", err) + } + return block, nil +} + func scanTransaction(rows driver.Rows) (common.Transaction, error) { var tx common.Transaction err := rows.Scan( @@ -608,6 +625,37 @@ func scanLog(rows driver.Rows) (common.Log, error) { return log, nil } +func scanTrace(rows driver.Rows) (common.Trace, error) { + var trace common.Trace + err := rows.Scan( + &trace.ChainID, + &trace.BlockNumber, + &trace.BlockHash, + &trace.BlockTimestamp, + &trace.TransactionHash, + &trace.TransactionIndex, + &trace.Subtraces, + &trace.TraceAddress, + &trace.TraceType, + &trace.CallType, + &trace.Error, + &trace.FromAddress, + &trace.ToAddress, + &trace.Gas, + &trace.GasUsed, + &trace.Input, + &trace.Output, + &trace.Value, + &trace.Author, + &trace.RewardType, + &trace.RefundAddress, + ) + if err != nil { + return common.Trace{}, fmt.Errorf("error scanning trace: %w", err) + } + return trace, nil +} + func (c *ClickHouseConnector) GetMaxBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error) { query := fmt.Sprintf("SELECT number FROM %s.blocks WHERE is_deleted = 0", c.cfg.Database) if chainId.Sign() > 0 { @@ -799,12 +847,12 @@ func (c *ClickHouseConnector) DeleteStagingData(data *[]common.BlockData) error return batch.Send() } -func (c *ClickHouseConnector) insertTraces(traces *[]common.Trace) error { +func (c *ClickHouseConnector) insertTraces(traces *[]common.Trace, asDeleted bool) error { query := ` INSERT INTO ` + c.cfg.Database + `.traces ( chain_id, block_number, block_hash, block_timestamp, transaction_hash, transaction_index, subtraces, trace_address, type, call_type, error, from_address, to_address, - gas, gas_used, input, output, value, author, reward_type, refund_address + gas, gas_used, input, output, value, author, reward_type, refund_address, is_deleted ) ` for i := 0; i < len(*traces); i += c.cfg.MaxRowsPerInsert { @@ -841,6 +889,12 @@ func (c *ClickHouseConnector) insertTraces(traces *[]common.Trace) error { trace.Author, trace.RewardType, trace.RefundAddress, + func() int8 { + if asDeleted { + return 1 + } + return 0 + }(), ) if err != nil { return err @@ -855,60 +909,6 @@ func (c *ClickHouseConnector) insertTraces(traces *[]common.Trace) error { return nil } -func (c *ClickHouseConnector) GetTraces(qf QueryFilter) (traces []common.Trace, err error) { - columns := "chain_id, block_number, block_hash, block_timestamp, transaction_hash, transaction_index, subtraces, trace_address, type, call_type, error, from_address, to_address, gas, gas_used, input, output, value, author, reward_type, refund_address" - query := fmt.Sprintf("SELECT %s FROM %s.traces WHERE block_number IN (%s) AND is_deleted = 0", - columns, c.cfg.Database, getBlockNumbersStringArray(qf.BlockNumbers)) - - if qf.ChainId.Sign() > 0 { - query += fmt.Sprintf(" AND chain_id = %s", qf.ChainId.String()) - } - - query += getLimitClause(int(qf.Limit)) - - if err := common.ValidateQuery(query); err != nil { - return nil, err - } - rows, err := c.conn.Query(context.Background(), query) - if err != nil { - return nil, err - } - defer rows.Close() - - for rows.Next() { - var trace common.Trace - err := rows.Scan( - &trace.ChainID, - &trace.BlockNumber, - &trace.BlockHash, - &trace.BlockTimestamp, - &trace.TransactionHash, - &trace.TransactionIndex, - &trace.Subtraces, - &trace.TraceAddress, - &trace.TraceType, - &trace.CallType, - &trace.Error, - &trace.FromAddress, - &trace.ToAddress, - &trace.Gas, - &trace.GasUsed, - &trace.Input, - &trace.Output, - &trace.Value, - &trace.Author, - &trace.RewardType, - &trace.RefundAddress, - ) - if err != nil { - zLog.Error().Err(err).Msg("Error scanning transaction") - return nil, err - } - traces = append(traces, trace) - } - return traces, nil -} - func (c *ClickHouseConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) { query := fmt.Sprintf("SELECT cursor_value FROM %s.cursors FINAL WHERE cursor_type = 'reorg'", c.cfg.Database) if chainId.Sign() > 0 { @@ -962,7 +962,7 @@ func (c *ClickHouseConnector) DeleteBlockData(chainId *big.Int, blockNumbers []* go func() { defer wg.Done() - if err := c.deleteBlocksByNumbers(chainId, blockNumbers); err != nil { + if err := c.deleteBlocks(chainId, blockNumbers); err != nil { deleteErrMutex.Lock() deleteErr = fmt.Errorf("error deleting blocks: %v", err) deleteErrMutex.Unlock() @@ -971,7 +971,7 @@ func (c *ClickHouseConnector) DeleteBlockData(chainId *big.Int, blockNumbers []* go func() { defer wg.Done() - if err := c.deleteLogsByNumbers(chainId, blockNumbers); err != nil { + if err := c.deleteLogs(chainId, blockNumbers); err != nil { deleteErrMutex.Lock() deleteErr = fmt.Errorf("error deleting logs: %v", err) deleteErrMutex.Unlock() @@ -980,7 +980,7 @@ func (c *ClickHouseConnector) DeleteBlockData(chainId *big.Int, blockNumbers []* go func() { defer wg.Done() - if err := c.deleteTransactionsByNumbers(chainId, blockNumbers); err != nil { + if err := c.deleteTransactions(chainId, blockNumbers); err != nil { deleteErrMutex.Lock() deleteErr = fmt.Errorf("error deleting transactions: %v", err) deleteErrMutex.Unlock() @@ -989,7 +989,7 @@ func (c *ClickHouseConnector) DeleteBlockData(chainId *big.Int, blockNumbers []* go func() { defer wg.Done() - if err := c.deleteTracesByNumbers(chainId, blockNumbers); err != nil { + if err := c.deleteTraces(chainId, blockNumbers); err != nil { deleteErrMutex.Lock() deleteErr = fmt.Errorf("error deleting traces: %v", err) deleteErrMutex.Unlock() @@ -1004,135 +1004,68 @@ func (c *ClickHouseConnector) DeleteBlockData(chainId *big.Int, blockNumbers []* return nil } -func (c *ClickHouseConnector) deleteBlocksByNumbers(chainId *big.Int, blockNumbers []*big.Int) error { - query := fmt.Sprintf("DELETE FROM %s.blocks WHERE _partition_value.1 = ? AND chain_id = ? AND number IN (?)", c.cfg.Database) - - blockNumbersStr := make([]string, len(blockNumbers)) - for i, bn := range blockNumbers { - blockNumbersStr[i] = bn.String() - } - err := c.conn.Exec(context.Background(), query, chainId, chainId, blockNumbersStr) +func (c *ClickHouseConnector) deleteBlocks(chainId *big.Int, blockNumbers []*big.Int) error { + blocksQueryResult, err := c.GetBlocks(QueryFilter{ + ChainId: chainId, + BlockNumbers: blockNumbers, + }) if err != nil { - return fmt.Errorf("error deleting blocks: %w", err) + return err } - return nil -} -func (c *ClickHouseConnector) deleteLogsByNumbers(chainId *big.Int, blockNumbers []*big.Int) error { - blockNumbersStr := make([]string, len(blockNumbers)) - for i, bn := range blockNumbers { - blockNumbersStr[i] = bn.String() + if len(blocksQueryResult.Data) == 0 { + return nil // No blocks to delete } - getQuery := fmt.Sprintf("SELECT block_number, transaction_hash, log_index FROM %s.logs WHERE chain_id = %s AND block_number IN (?) AND is_deleted = 0", c.cfg.Database, chainId.String()) - rows, getErr := c.conn.Query(context.Background(), getQuery, blockNumbersStr) - if getErr != nil { - return getErr - } - defer rows.Close() + return c.insertBlocks(&blocksQueryResult.Data, true) +} - blockNumbersToDelete := common.NewSet[string]() - txHashesToDelete := common.NewSet[string]() - logIndexesToDelete := common.NewSet[uint64]() - for rows.Next() { - var logToDelete common.Log - err := rows.ScanStruct(&logToDelete) - if err != nil { - return err - } - blockNumbersToDelete.Add(logToDelete.BlockNumber.String()) - txHashesToDelete.Add(logToDelete.TransactionHash) - logIndexesToDelete.Add(logToDelete.LogIndex) +func (c *ClickHouseConnector) deleteLogs(chainId *big.Int, blockNumbers []*big.Int) error { + logsQueryResult, err := c.GetLogs(QueryFilter{ + ChainId: chainId, + BlockNumbers: blockNumbers, + }) + if err != nil { + return err } - if txHashesToDelete.Size() == 0 { + if len(logsQueryResult.Data) == 0 { return nil // No logs to delete } - deleteQuery := fmt.Sprintf("DELETE FROM %s.logs WHERE _partition_value.1 = ? AND chain_id = ? AND block_number IN (?) AND transaction_hash IN (?) AND log_index IN (?)", c.cfg.Database) + return c.insertLogs(&logsQueryResult.Data, true) +} - err := c.conn.Exec(context.Background(), deleteQuery, chainId, chainId, blockNumbersToDelete.List(), txHashesToDelete.List(), logIndexesToDelete.List()) +func (c *ClickHouseConnector) deleteTransactions(chainId *big.Int, blockNumbers []*big.Int) error { + txsQueryResult, err := c.GetTransactions(QueryFilter{ + ChainId: chainId, + BlockNumbers: blockNumbers, + }) if err != nil { return err } - return nil -} -func (c *ClickHouseConnector) deleteTransactionsByNumbers(chainId *big.Int, blockNumbers []*big.Int) error { - blockNumbersStr := make([]string, len(blockNumbers)) - for i, bn := range blockNumbers { - blockNumbersStr[i] = bn.String() - } - getQuery := fmt.Sprintf("SELECT block_number, hash FROM %s.transactions WHERE chain_id = %s AND block_number IN (?) AND is_deleted = 0", c.cfg.Database, chainId.String()) - - rows, getErr := c.conn.Query(context.Background(), getQuery, blockNumbersStr) - if getErr != nil { - return getErr - } - defer rows.Close() - - blockNumbersToDelete := common.NewSet[string]() - hashesToDelete := common.NewSet[string]() - - for rows.Next() { - var txToDelete common.Transaction - err := rows.ScanStruct(&txToDelete) - if err != nil { - return err - } - blockNumbersToDelete.Add(txToDelete.BlockNumber.String()) - hashesToDelete.Add(txToDelete.Hash) - } - - if hashesToDelete.Size() == 0 { + if len(txsQueryResult.Data) == 0 { return nil // No transactions to delete } - deleteQuery := fmt.Sprintf("DELETE FROM %s.transactions WHERE _partition_value.1 = ? AND chain_id = ? AND block_number IN (?) AND hash IN (?)", c.cfg.Database) + return c.insertTransactions(&txsQueryResult.Data, true) +} - err := c.conn.Exec(context.Background(), deleteQuery, chainId, chainId, blockNumbersToDelete.List(), hashesToDelete.List()) +func (c *ClickHouseConnector) deleteTraces(chainId *big.Int, blockNumbers []*big.Int) error { + tracesQueryResult, err := c.GetTraces(QueryFilter{ + ChainId: chainId, + BlockNumbers: blockNumbers, + }) if err != nil { return err } - return nil -} -func (c *ClickHouseConnector) deleteTracesByNumbers(chainId *big.Int, blockNumbers []*big.Int) error { - blockNumbersStr := make([]string, len(blockNumbers)) - for i, bn := range blockNumbers { - blockNumbersStr[i] = bn.String() - } - getQuery := fmt.Sprintf("SELECT block_number, transaction_hash FROM %s.traces WHERE chain_id = %s AND block_number IN (?) AND is_deleted = 0", c.cfg.Database, chainId.String()) - - rows, getErr := c.conn.Query(context.Background(), getQuery, blockNumbersStr) - if getErr != nil { - return getErr - } - defer rows.Close() - - blockNumbersToDelete := common.NewSet[string]() - txHashesToDelete := common.NewSet[string]() - for rows.Next() { - var traceToDelete common.Trace - err := rows.ScanStruct(&traceToDelete) - if err != nil { - return err - } - blockNumbersToDelete.Add(traceToDelete.BlockNumber.String()) - txHashesToDelete.Add(traceToDelete.TransactionHash) - } - - if txHashesToDelete.Size() == 0 { + if len(tracesQueryResult.Data) == 0 { return nil // No traces to delete } - deleteQuery := fmt.Sprintf("DELETE FROM %s.traces WHERE _partition_value.1 = ? AND chain_id = ? AND block_number IN (?) AND transaction_hash IN (?)", c.cfg.Database) - - err := c.conn.Exec(context.Background(), deleteQuery, chainId, chainId, blockNumbersToDelete.List(), txHashesToDelete.List()) - if err != nil { - return err - } - return nil + return c.insertTraces(&tracesQueryResult.Data, true) } // TODO make this atomic @@ -1157,7 +1090,7 @@ func (c *ClickHouseConnector) InsertBlockData(data *[]common.BlockData) error { wg.Add(1) go func() { defer wg.Done() - if err := c.insertBlocks(&blocks); err != nil { + if err := c.insertBlocks(&blocks, false); err != nil { saveErrMutex.Lock() saveErr = fmt.Errorf("error inserting blocks: %v", err) saveErrMutex.Unlock() @@ -1169,7 +1102,7 @@ func (c *ClickHouseConnector) InsertBlockData(data *[]common.BlockData) error { wg.Add(1) go func() { defer wg.Done() - if err := c.insertLogs(&logs); err != nil { + if err := c.insertLogs(&logs, false); err != nil { saveErrMutex.Lock() saveErr = fmt.Errorf("error inserting logs: %v", err) saveErrMutex.Unlock() @@ -1181,7 +1114,7 @@ func (c *ClickHouseConnector) InsertBlockData(data *[]common.BlockData) error { wg.Add(1) go func() { defer wg.Done() - if err := c.insertTransactions(&transactions); err != nil { + if err := c.insertTransactions(&transactions, false); err != nil { saveErrMutex.Lock() saveErr = fmt.Errorf("error inserting transactions: %v", err) saveErrMutex.Unlock() @@ -1193,7 +1126,7 @@ func (c *ClickHouseConnector) InsertBlockData(data *[]common.BlockData) error { wg.Add(1) go func() { defer wg.Done() - if err := c.insertTraces(&traces); err != nil { + if err := c.insertTraces(&traces, false); err != nil { saveErrMutex.Lock() saveErr = fmt.Errorf("error inserting traces: %v", err) saveErrMutex.Unlock() diff --git a/internal/storage/connector.go b/internal/storage/connector.go index 86cddbf..032b126 100644 --- a/internal/storage/connector.go +++ b/internal/storage/connector.go @@ -53,11 +53,11 @@ type IStagingStorage interface { type IMainStorage interface { InsertBlockData(data *[]common.BlockData) error - GetBlocks(qf QueryFilter) (blocks []common.Block, err error) + GetBlocks(qf QueryFilter) (blocks QueryResult[common.Block], err error) GetTransactions(qf QueryFilter) (transactions QueryResult[common.Transaction], err error) GetLogs(qf QueryFilter) (logs QueryResult[common.Log], err error) GetAggregations(table string, qf QueryFilter) (QueryResult[interface{}], error) - GetTraces(qf QueryFilter) (traces []common.Trace, err error) + GetTraces(qf QueryFilter) (traces QueryResult[common.Trace], err error) GetMaxBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error) /** * Get block headers ordered from latest to oldest. diff --git a/internal/storage/memory.go b/internal/storage/memory.go index 99568ab..d92ff3e 100644 --- a/internal/storage/memory.go +++ b/internal/storage/memory.go @@ -85,7 +85,7 @@ func (m *MemoryConnector) insertBlocks(blocks *[]common.Block) error { return nil } -func (m *MemoryConnector) GetBlocks(qf QueryFilter) ([]common.Block, error) { +func (m *MemoryConnector) GetBlocks(qf QueryFilter) (QueryResult[common.Block], error) { blocks := []common.Block{} limit := getLimit(qf) blockNumbersToCheck := getBlockNumbersToCheck(qf) @@ -100,13 +100,13 @@ func (m *MemoryConnector) GetBlocks(qf QueryFilter) ([]common.Block, error) { block := common.Block{} err := json.Unmarshal([]byte(value), &block) if err != nil { - return nil, err + return QueryResult[common.Block]{}, err } blocks = append(blocks, block) } } } - return blocks, nil + return QueryResult[common.Block]{Data: blocks}, nil } func (m *MemoryConnector) insertTransactions(txs *[]common.Transaction) error { @@ -120,7 +120,7 @@ func (m *MemoryConnector) insertTransactions(txs *[]common.Transaction) error { return nil } -func (m *MemoryConnector) GetTransactions(qf QueryFilter) ([]common.Transaction, error) { +func (m *MemoryConnector) GetTransactions(qf QueryFilter) (QueryResult[common.Transaction], error) { txs := []common.Transaction{} limit := getLimit(qf) blockNumbersToCheck := getBlockNumbersToCheck(qf) @@ -134,13 +134,13 @@ func (m *MemoryConnector) GetTransactions(qf QueryFilter) ([]common.Transaction, tx := common.Transaction{} err := json.Unmarshal([]byte(value), &tx) if err != nil { - return nil, err + return QueryResult[common.Transaction]{}, err } txs = append(txs, tx) } } } - return txs, nil + return QueryResult[common.Transaction]{Data: txs}, nil } func (m *MemoryConnector) insertLogs(logs *[]common.Log) error { @@ -154,7 +154,7 @@ func (m *MemoryConnector) insertLogs(logs *[]common.Log) error { return nil } -func (m *MemoryConnector) GetLogs(qf QueryFilter) ([]common.Log, error) { +func (m *MemoryConnector) GetLogs(qf QueryFilter) (QueryResult[common.Log], error) { logs := []common.Log{} limit := getLimit(qf) blockNumbersToCheck := getBlockNumbersToCheck(qf) @@ -168,13 +168,13 @@ func (m *MemoryConnector) GetLogs(qf QueryFilter) ([]common.Log, error) { log := common.Log{} err := json.Unmarshal([]byte(value), &log) if err != nil { - return nil, err + return QueryResult[common.Log]{}, err } logs = append(logs, log) } } } - return logs, nil + return QueryResult[common.Log]{Data: logs}, nil } func (m *MemoryConnector) GetMaxBlockNumber(chainId *big.Int) (*big.Int, error) { @@ -314,7 +314,7 @@ func (m *MemoryConnector) insertTraces(traces *[]common.Trace) error { return nil } -func (m *MemoryConnector) GetTraces(qf QueryFilter) ([]common.Trace, error) { +func (m *MemoryConnector) GetTraces(qf QueryFilter) (QueryResult[common.Trace], error) { traces := []common.Trace{} limit := getLimit(qf) blockNumbersToCheck := getBlockNumbersToCheck(qf) @@ -328,13 +328,13 @@ func (m *MemoryConnector) GetTraces(qf QueryFilter) ([]common.Trace, error) { trace := common.Trace{} err := json.Unmarshal([]byte(value), &trace) if err != nil { - return nil, err + return QueryResult[common.Trace]{}, err } traces = append(traces, trace) } } } - return traces, nil + return QueryResult[common.Trace]{Data: traces}, nil } func traceAddressToString(traceAddress []uint64) string { diff --git a/test/mocks/MockIMainStorage.go b/test/mocks/MockIMainStorage.go index 5423841..7b4500d 100644 --- a/test/mocks/MockIMainStorage.go +++ b/test/mocks/MockIMainStorage.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.46.3. DO NOT EDIT. +// Code generated by mockery v2.50.4. DO NOT EDIT. //go:build !production @@ -131,24 +131,22 @@ func (_c *MockIMainStorage_GetAggregations_Call) RunAndReturn(run func(string, s } // GetBlocks provides a mock function with given fields: qf -func (_m *MockIMainStorage) GetBlocks(qf storage.QueryFilter) ([]common.Block, error) { +func (_m *MockIMainStorage) GetBlocks(qf storage.QueryFilter) (storage.QueryResult[common.Block], error) { ret := _m.Called(qf) if len(ret) == 0 { panic("no return value specified for GetBlocks") } - var r0 []common.Block + var r0 storage.QueryResult[common.Block] var r1 error - if rf, ok := ret.Get(0).(func(storage.QueryFilter) ([]common.Block, error)); ok { + if rf, ok := ret.Get(0).(func(storage.QueryFilter) (storage.QueryResult[common.Block], error)); ok { return rf(qf) } - if rf, ok := ret.Get(0).(func(storage.QueryFilter) []common.Block); ok { + if rf, ok := ret.Get(0).(func(storage.QueryFilter) storage.QueryResult[common.Block]); ok { r0 = rf(qf) } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]common.Block) - } + r0 = ret.Get(0).(storage.QueryResult[common.Block]) } if rf, ok := ret.Get(1).(func(storage.QueryFilter) error); ok { @@ -178,12 +176,12 @@ func (_c *MockIMainStorage_GetBlocks_Call) Run(run func(qf storage.QueryFilter)) return _c } -func (_c *MockIMainStorage_GetBlocks_Call) Return(blocks []common.Block, err error) *MockIMainStorage_GetBlocks_Call { +func (_c *MockIMainStorage_GetBlocks_Call) Return(blocks storage.QueryResult[common.Block], err error) *MockIMainStorage_GetBlocks_Call { _c.Call.Return(blocks, err) return _c } -func (_c *MockIMainStorage_GetBlocks_Call) RunAndReturn(run func(storage.QueryFilter) ([]common.Block, error)) *MockIMainStorage_GetBlocks_Call { +func (_c *MockIMainStorage_GetBlocks_Call) RunAndReturn(run func(storage.QueryFilter) (storage.QueryResult[common.Block], error)) *MockIMainStorage_GetBlocks_Call { _c.Call.Return(run) return _c } @@ -303,24 +301,22 @@ func (_c *MockIMainStorage_GetMaxBlockNumber_Call) RunAndReturn(run func(*big.In } // GetTraces provides a mock function with given fields: qf -func (_m *MockIMainStorage) GetTraces(qf storage.QueryFilter) ([]common.Trace, error) { +func (_m *MockIMainStorage) GetTraces(qf storage.QueryFilter) (storage.QueryResult[common.Trace], error) { ret := _m.Called(qf) if len(ret) == 0 { panic("no return value specified for GetTraces") } - var r0 []common.Trace + var r0 storage.QueryResult[common.Trace] var r1 error - if rf, ok := ret.Get(0).(func(storage.QueryFilter) ([]common.Trace, error)); ok { + if rf, ok := ret.Get(0).(func(storage.QueryFilter) (storage.QueryResult[common.Trace], error)); ok { return rf(qf) } - if rf, ok := ret.Get(0).(func(storage.QueryFilter) []common.Trace); ok { + if rf, ok := ret.Get(0).(func(storage.QueryFilter) storage.QueryResult[common.Trace]); ok { r0 = rf(qf) } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]common.Trace) - } + r0 = ret.Get(0).(storage.QueryResult[common.Trace]) } if rf, ok := ret.Get(1).(func(storage.QueryFilter) error); ok { @@ -350,12 +346,12 @@ func (_c *MockIMainStorage_GetTraces_Call) Run(run func(qf storage.QueryFilter)) return _c } -func (_c *MockIMainStorage_GetTraces_Call) Return(traces []common.Trace, err error) *MockIMainStorage_GetTraces_Call { +func (_c *MockIMainStorage_GetTraces_Call) Return(traces storage.QueryResult[common.Trace], err error) *MockIMainStorage_GetTraces_Call { _c.Call.Return(traces, err) return _c } -func (_c *MockIMainStorage_GetTraces_Call) RunAndReturn(run func(storage.QueryFilter) ([]common.Trace, error)) *MockIMainStorage_GetTraces_Call { +func (_c *MockIMainStorage_GetTraces_Call) RunAndReturn(run func(storage.QueryFilter) (storage.QueryResult[common.Trace], error)) *MockIMainStorage_GetTraces_Call { _c.Call.Return(run) return _c } diff --git a/test/mocks/MockIOrchestratorStorage.go b/test/mocks/MockIOrchestratorStorage.go index 84b9003..fe382f0 100644 --- a/test/mocks/MockIOrchestratorStorage.go +++ b/test/mocks/MockIOrchestratorStorage.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.46.3. DO NOT EDIT. +// Code generated by mockery v2.50.4. DO NOT EDIT. //go:build !production diff --git a/test/mocks/MockIRPCClient.go b/test/mocks/MockIRPCClient.go index 4b062a7..855d41e 100644 --- a/test/mocks/MockIRPCClient.go +++ b/test/mocks/MockIRPCClient.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.46.3. DO NOT EDIT. +// Code generated by mockery v2.50.4. DO NOT EDIT. //go:build !production @@ -72,7 +72,7 @@ func (_c *MockIRPCClient_GetBlocks_Call) RunAndReturn(run func([]*big.Int) []rpc return _c } -// GetBlocksPerRequest provides a mock function with given fields: +// GetBlocksPerRequest provides a mock function with no fields func (_m *MockIRPCClient) GetBlocksPerRequest() rpc.BlocksPerRequestConfig { ret := _m.Called() @@ -117,7 +117,7 @@ func (_c *MockIRPCClient_GetBlocksPerRequest_Call) RunAndReturn(run func() rpc.B return _c } -// GetChainID provides a mock function with given fields: +// GetChainID provides a mock function with no fields func (_m *MockIRPCClient) GetChainID() *big.Int { ret := _m.Called() @@ -212,7 +212,7 @@ func (_c *MockIRPCClient_GetFullBlocks_Call) RunAndReturn(run func([]*big.Int) [ return _c } -// GetLatestBlockNumber provides a mock function with given fields: +// GetLatestBlockNumber provides a mock function with no fields func (_m *MockIRPCClient) GetLatestBlockNumber() (*big.Int, error) { ret := _m.Called() @@ -269,7 +269,7 @@ func (_c *MockIRPCClient_GetLatestBlockNumber_Call) RunAndReturn(run func() (*bi return _c } -// GetURL provides a mock function with given fields: +// GetURL provides a mock function with no fields func (_m *MockIRPCClient) GetURL() string { ret := _m.Called() @@ -314,7 +314,7 @@ func (_c *MockIRPCClient_GetURL_Call) RunAndReturn(run func() string) *MockIRPCC return _c } -// IsWebsocket provides a mock function with given fields: +// IsWebsocket provides a mock function with no fields func (_m *MockIRPCClient) IsWebsocket() bool { ret := _m.Called() @@ -359,7 +359,7 @@ func (_c *MockIRPCClient_IsWebsocket_Call) RunAndReturn(run func() bool) *MockIR return _c } -// SupportsTraceBlock provides a mock function with given fields: +// SupportsTraceBlock provides a mock function with no fields func (_m *MockIRPCClient) SupportsTraceBlock() bool { ret := _m.Called() diff --git a/test/mocks/MockIStagingStorage.go b/test/mocks/MockIStagingStorage.go index dc0a55b..dc4b958 100644 --- a/test/mocks/MockIStagingStorage.go +++ b/test/mocks/MockIStagingStorage.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.46.3. DO NOT EDIT. +// Code generated by mockery v2.50.4. DO NOT EDIT. //go:build !production