diff --git a/src/Data/Collector.ts b/src/Data/Collector.ts index 32f612bf..a507c217 100644 --- a/src/Data/Collector.ts +++ b/src/Data/Collector.ts @@ -32,6 +32,8 @@ import { verifyAppReceiptData } from '../shardeum/verifyAppReceiptData' import { Cycle as DbCycle } from '../dbstore/types' import { Utils as StringUtils } from '@shardus/types' import { offloadReceipt } from '../primary-process' +import { verifyPayload } from '../types/ajv/Helpers' +import { AJVSchemaEnum } from '../types/enum/AJVSchemaEnum' export let storingAccountData = false const processedReceiptsMap: Map = new Map() @@ -276,167 +278,17 @@ const isReceiptRobust = async ( * @returns boolean */ export const validateArchiverReceipt = (receipt: Receipt.ArchiverReceipt): boolean => { - // Add type and field existence check - let err = Utils.validateTypes(receipt, { - tx: 'o', - cycle: 'n', - afterStates: 'a', - beforeStates: 'a', - signedReceipt: 'o', - appReceiptData: 'o?', - executionShardKey: 's', - globalModification: 'b', - }) - if (err) { - Logger.mainLogger.error('Invalid receipt data', err) - return false - } - err = Utils.validateTypes(receipt.tx, { - txId: 's', - timestamp: 'n', - originalTxData: 'o', - }) - if (err) { - Logger.mainLogger.error('Invalid receipt tx data', err) - return false - } - for (const account of receipt.beforeStates) { - err = Utils.validateTypes(account, { - hash: 's', - data: 'o', - isGlobal: 'b', - accountId: 's', - timestamp: 'n', - // cycleNumber: 'n', it is not present in the beforeStateAccounts data - }) - if (err) { - Logger.mainLogger.error('Invalid receipt beforeStateAccounts data', err) - return false - } - } - for (const account of receipt.afterStates) { - err = Utils.validateTypes(account, { - hash: 's', - data: 'o', - isGlobal: 'b', - accountId: 's', - timestamp: 'n', - // cycleNumber: 'n', it is not present in the beforeStateAccounts data - }) - if (err) { - Logger.mainLogger.error('Invalid receipt accounts data', err) - return false - } - } - if (receipt.globalModification) { - const signedReceipt = receipt.signedReceipt as P2PTypes.GlobalAccountsTypes.GlobalTxReceipt - err = Utils.validateTypes(signedReceipt, { - tx: 'o', - signs: 'a', - }) - if (err) { - Logger.mainLogger.error('Invalid receipt globalModification data', err) - return false - } - err = Utils.validateTypes(signedReceipt.tx, { - address: 's', - addressHash: 's', - value: 'o', - when: 'n', - source: 's', - }) - if (err) { - Logger.mainLogger.error('Invalid receipt globalModification tx data', err) - return false - } - for (const sign of signedReceipt.signs) { - err = Utils.validateTypes(sign, { - owner: 's', - sig: 's', - }) - if (err) { - Logger.mainLogger.error('Invalid receipt globalModification signs data', err) - return false - } - } - return true - } - // Global Modification Tx does not have appliedReceipt - const signedReceipt = receipt.signedReceipt as Receipt.SignedReceipt - const signedReceiptToValidate = { - proposal: 'o', - proposalHash: 's', - signaturePack: 'a', - voteOffsets: 'a', - } - // if (config.newPOQReceipt === false) delete appliedReceiptToValidate.confirmOrChallenge - err = Utils.validateTypes(signedReceipt, signedReceiptToValidate) - if (err) { - Logger.mainLogger.error('Invalid receipt signedReceipt data', err) - return false - } - const proposalToValidate = { - txid: 's', - applied: 'b', - accountIDs: 'a', - cant_preApply: 'b', - afterStateHashes: 'a', - beforeStateHashes: 'a', - appReceiptDataHash: 's', - } - // if (config.newPOQReceipt === false) { - // delete appliedVoteToValidate.node_id - // delete appliedVoteToValidate.sign - // } - err = Utils.validateTypes(signedReceipt.proposal, proposalToValidate) - if (err) { - Logger.mainLogger.error('Invalid receipt signedReceipt appliedVote data', err) + + let errors = verifyPayload(AJVSchemaEnum.ArchiverReceipt, receipt) + + if (errors) { + Logger.mainLogger.error( + 'Invalid Archiver Receipt', + errors, + 'where receipt was', StringUtils.safeStringify(receipt) + ); return false } - for (const signature of signedReceipt.signaturePack) { - err = Utils.validateTypes(signature, { - owner: 's', - sig: 's', - }) - if (err) { - Logger.mainLogger.error('Invalid receipt signedReceipt signatures data', err) - return false - } - } - for (const voteOffset of signedReceipt.voteOffsets) { - const isValid = typeof voteOffset === 'number' || !isNaN(voteOffset) - if (!isValid) { - Logger.mainLogger.error('Invalid receipt signedReceipt voteOffsets data', voteOffset) - return false - } - } - // if (config.newPOQReceipt === false) return true - // err = Utils.validateTypes(receipt.appliedReceipt.appliedVote.sign, { - // owner: 's', - // sig: 's', - // }) - // if (err) { - // Logger.mainLogger.error('Invalid receipt appliedReceipt appliedVote signature data', err) - // return false - // } - // err = Utils.validateTypes(receipt.appliedReceipt.confirmOrChallenge, { - // message: 's', - // nodeId: 's', - // appliedVote: 'o', - // sign: 'o', - // }) - // if (err) { - // Logger.mainLogger.error('Invalid receipt appliedReceipt confirmOrChallenge data', err) - // return false - // } - // err = Utils.validateTypes(receipt.appliedReceipt.confirmOrChallenge.sign, { - // owner: 's', - // sig: 's', - // }) - // if (err) { - // Logger.mainLogger.error('Invalid receipt appliedReceipt confirmOrChallenge signature data', err) - // return false - // } return true } @@ -1423,25 +1275,18 @@ interface validateResponse { } export const validateOriginalTxData = (originalTxData: OriginalTxsData.OriginalTxData): boolean => { - const err = Utils.validateTypes(originalTxData, { - txId: 's', - timestamp: 'n', - cycle: 'n', - // sign: 'o', - originalTxData: 'o', - }) - if (err) { - Logger.mainLogger.error('Invalid originalTxsData', err) - return false - } - // err = Utils.validateTypes(originalTxData.sign, { - // owner: 's', - // sig: 's', - // }) - if (err) { - Logger.mainLogger.error('Invalid originalTxsData signature', err) - return false + + const errors = verifyPayload(AJVSchemaEnum.OriginalTxData, originalTxData) + + if (errors) { + Logger.mainLogger.error( + 'Invalid originalTxsData', + errors, + 'where originalTxData was: ', StringUtils.safeStringify(originalTxData) + ); + return false; } + return true } diff --git a/src/dbstore/accounts.ts b/src/dbstore/accounts.ts index 13f2bbf4..41fcd2c2 100644 --- a/src/dbstore/accounts.ts +++ b/src/dbstore/accounts.ts @@ -4,6 +4,7 @@ import * as Logger from '../Logger' import { config } from '../Config' import { DeSerializeFromJsonString, SerializeToJsonString } from '../utils/serialization' + /** Same as type AccountsCopy in the shardus core */ export type AccountsCopy = { accountId: string @@ -19,38 +20,67 @@ type DbAccountCopy = AccountsCopy & { } export async function insertAccount(account: AccountsCopy): Promise { + try { - const fields = Object.keys(account).join(', ') - const placeholders = Object.keys(account).fill('?').join(', ') - const values = db.extractValues(account) - const sql = 'INSERT OR REPLACE INTO accounts (' + fields + ') VALUES (' + placeholders + ')' - await db.run(accountDatabase, sql, values) + + // Define the table columns based on schema + const columns = ['accountId', 'data', 'timestamp', 'hash', 'cycleNumber', 'isGlobal']; + + // Construct the SQL query with placeholders + const placeholders = `(${columns.map(() => '?').join(', ')})`; + const sql = `INSERT OR REPLACE INTO accounts (${columns.join(', ')}) VALUES ${placeholders}`; + + // Map the `account` object to match the columns + const values = columns.map((column) => + typeof account[column] === 'object' + ? SerializeToJsonString(account[column]) // Serialize objects to JSON + : account[column] + ); + + // Execute the query directly (single-row insert) + await db.run(accountDatabase, sql, values); + if (config.VERBOSE) { - Logger.mainLogger.debug('Successfully inserted Account', account.accountId) + Logger.mainLogger.debug('Successfully inserted Account', account.accountId); } - } catch (e) { - Logger.mainLogger.error(e) + } catch (err) { + Logger.mainLogger.error(err); Logger.mainLogger.error( - 'Unable to insert Account or it is already stored in to database', + 'Unable to insert Account or it is already stored in the database', account.accountId - ) + ); } } export async function bulkInsertAccounts(accounts: AccountsCopy[]): Promise { + try { - const fields = Object.keys(accounts[0]).join(', ') - const placeholders = Object.keys(accounts[0]).fill('?').join(', ') - const values = db.extractValuesFromArray(accounts) - let sql = 'INSERT OR REPLACE INTO accounts (' + fields + ') VALUES (' + placeholders + ')' - for (let i = 1; i < accounts.length; i++) { - sql = sql + ', (' + placeholders + ')' + + // Define the table columns based on schema + const columns = ['accountId', 'data', 'timestamp', 'hash', 'cycleNumber', 'isGlobal']; + + // Construct the SQL query for bulk insertion with all placeholders + const placeholders = accounts.map(() => `(${columns.map(() => '?').join(', ')})`).join(', '); + const sql = `INSERT OR REPLACE INTO accounts (${columns.join(', ')}) VALUES ${placeholders}`; + + // Flatten the `accounts` array into a single list of values + const values = accounts.flatMap((account) => + columns.map((column) => + typeof account[column] === 'object' + ? SerializeToJsonString(account[column]) // Serialize objects to JSON + : account[column] + ) + ); + + // Execute the single query for all accounts + await db.run(accountDatabase, sql, values); + + if (config.VERBOSE) { + Logger.mainLogger.debug('Successfully inserted Accounts', accounts.length); } - await db.run(accountDatabase, sql, values) - if (config.VERBOSE) Logger.mainLogger.debug('Successfully inserted Accounts', accounts.length) - } catch (e) { - Logger.mainLogger.error(e) - Logger.mainLogger.error('Unable to bulk insert Accounts', accounts.length) + } catch (err) { + Logger.mainLogger.error(err); + Logger.mainLogger.error('Unable to bulk insert Accounts', accounts.length); } } diff --git a/src/dbstore/cycles.ts b/src/dbstore/cycles.ts index 65842694..148f013f 100644 --- a/src/dbstore/cycles.ts +++ b/src/dbstore/cycles.ts @@ -6,38 +6,72 @@ import { config } from '../Config' import { DeSerializeFromJsonString, SerializeToJsonString } from '../utils/serialization' import { Cycle, DbCycle } from './types' + export async function insertCycle(cycle: Cycle): Promise { + try { - const fields = Object.keys(cycle).join(', ') - const placeholders = Object.keys(cycle).fill('?').join(', ') - const values = db.extractValues(cycle) - const sql = 'INSERT OR REPLACE INTO cycles (' + fields + ') VALUES (' + placeholders + ')' - await db.run(cycleDatabase, sql, values) - Logger.mainLogger.debug('Successfully inserted Cycle', cycle.cycleRecord.counter, cycle.cycleMarker) - } catch (e) { - Logger.mainLogger.error(e) + // Define the table columns based on schema + const columns = ['cycleMarker', 'counter', 'cycleRecord']; + + // Construct the SQL query with placeholders + const placeholders = `(${columns.map(() => '?').join(', ')})`; + const sql = `INSERT OR REPLACE INTO cycles (${columns.join(', ')}) VALUES ${placeholders}`; + + // Map the `cycle` object to match the columns + const values = columns.map((column) => + typeof cycle[column] === 'object' + ? SerializeToJsonString(cycle[column]) // Serialize objects to JSON + : cycle[column] + ); + + // Execute the query directly (single-row insert) + await db.run(cycleDatabase, sql, values); + + if (config.VERBOSE) { + Logger.mainLogger.debug( + 'Successfully inserted Cycle', + cycle.counter, + cycle.cycleMarker + ); + } + } catch (err) { + Logger.mainLogger.error(err); Logger.mainLogger.error( - 'Unable to insert cycle or it is already stored in to database', - cycle.cycleRecord.counter, + 'Unable to insert cycle or it is already stored in the database', + cycle.counter, cycle.cycleMarker - ) + ); } } export async function bulkInsertCycles(cycles: Cycle[]): Promise { + try { - const fields = Object.keys(cycles[0]).join(', ') - const placeholders = Object.keys(cycles[0]).fill('?').join(', ') - const values = db.extractValuesFromArray(cycles) - let sql = 'INSERT OR REPLACE INTO cycles (' + fields + ') VALUES (' + placeholders + ')' - for (let i = 1; i < cycles.length; i++) { - sql = sql + ', (' + placeholders + ')' + // Define the table columns based on schema + const columns = ['cycleMarker', 'counter', 'cycleRecord']; + + // Construct the SQL query for bulk insertion with all placeholders + const placeholders = cycles.map(() => `(${columns.map(() => '?').join(', ')})`).join(', '); + const sql = `INSERT OR REPLACE INTO cycles (${columns.join(', ')}) VALUES ${placeholders}`; + + // Flatten the `cycles` array into a single list of values + const values = cycles.flatMap((cycle) => + columns.map((column) => + typeof cycle[column] === 'object' + ? SerializeToJsonString(cycle[column]) // Serialize objects to JSON + : cycle[column] + ) + ); + + // Execute the single query for all cycles + await db.run(cycleDatabase, sql, values); + + if (config.VERBOSE) { + Logger.mainLogger.debug('Successfully inserted Cycles', cycles.length); } - await db.run(cycleDatabase, sql, values) - if (config.VERBOSE) Logger.mainLogger.debug('Successfully inserted Cycles', cycles.length) - } catch (e) { - Logger.mainLogger.error(e) - Logger.mainLogger.error('Unable to bulk insert Cycles', cycles.length) + } catch (err) { + Logger.mainLogger.error(err); + Logger.mainLogger.error('Unable to bulk insert Cycles', cycles.length); } } diff --git a/src/dbstore/originalTxsData.ts b/src/dbstore/originalTxsData.ts index 4ab8029b..9dd87da1 100644 --- a/src/dbstore/originalTxsData.ts +++ b/src/dbstore/originalTxsData.ts @@ -3,7 +3,7 @@ import * as db from './sqlite3storage' import { originalTxDataDatabase } from '.' import * as Logger from '../Logger' import { config } from '../Config' -import { DeSerializeFromJsonString } from '../utils/serialization' +import { DeSerializeFromJsonString, SerializeToJsonString } from '../utils/serialization' export interface OriginalTxData { txId: string @@ -27,43 +27,74 @@ type DbOriginalTxDataCount = OriginalTxDataCount & { 'COUNT(*)': number } -export async function insertOriginalTxData(OriginalTxData: OriginalTxData): Promise { +export async function insertOriginalTxData(originalTxData: OriginalTxData): Promise { + try { - const fields = Object.keys(OriginalTxData).join(', ') - const placeholders = Object.keys(OriginalTxData).fill('?').join(', ') - const values = db.extractValues(OriginalTxData) - const sql = 'INSERT OR REPLACE INTO originalTxsData (' + fields + ') VALUES (' + placeholders + ')' - await db.run(originalTxDataDatabase, sql, values) + + // Define the table columns based on schema + const columns = ['txId', 'timestamp', 'cycle', 'originalTxData']; + + // Construct the SQL query with placeholders + const placeholders = `(${columns.map(() => '?').join(', ')})`; + const sql = `INSERT OR REPLACE INTO originalTxsData (${columns.join(', ')}) VALUES ${placeholders}`; + + // Map the `originalTxData` object to match the columns + const values = columns.map((column) => + typeof originalTxData[column] === 'object' + ? SerializeToJsonString(originalTxData[column]) // Serialize objects to JSON + : originalTxData[column] + ); + + // Execute the query directly (single-row insert) + await db.run(originalTxDataDatabase, sql, values); + if (config.VERBOSE) { - Logger.mainLogger.debug('Successfully inserted OriginalTxData', OriginalTxData.txId) + Logger.mainLogger.debug('Successfully inserted OriginalTxData', originalTxData.txId); } - } catch (e) { - Logger.mainLogger.error(e) + } catch (err) { + Logger.mainLogger.error(err); Logger.mainLogger.error( - 'Unable to insert OriginalTxData or it is already stored in to database', - OriginalTxData.txId - ) + 'Unable to insert OriginalTxData or it is already stored in the database', + originalTxData.txId + ); } } + export async function bulkInsertOriginalTxsData(originalTxsData: OriginalTxData[]): Promise { + try { - const fields = Object.keys(originalTxsData[0]).join(', ') - const placeholders = Object.keys(originalTxsData[0]).fill('?').join(', ') - const values = db.extractValuesFromArray(originalTxsData) - let sql = 'INSERT OR REPLACE INTO originalTxsData (' + fields + ') VALUES (' + placeholders + ')' - for (let i = 1; i < originalTxsData.length; i++) { - sql = sql + ', (' + placeholders + ')' + + // Define the table columns + const columns = ['txId', 'timestamp', 'cycle', 'originalTxData']; + + // Construct the SQL query for bulk insertion with all placeholders + const placeholders = originalTxsData.map(() => `(${columns.map(() => '?').join(', ')})`).join(', '); + const sql = `INSERT OR REPLACE INTO originalTxsData (${columns.join(', ')}) VALUES ${placeholders}`; + + // Flatten the `originalTxsData` array into a single list of values + const values = originalTxsData.flatMap((txData) => + columns.map((column) => + typeof txData[column] === 'object' + ? SerializeToJsonString(txData[column]) // Serialize objects to JSON + : txData[column] + ) + ); + + // Execute the single query for all originalTxsData + await db.run(originalTxDataDatabase, sql, values); + + if (config.VERBOSE) { + Logger.mainLogger.debug('Successfully inserted OriginalTxsData', originalTxsData.length); } - await db.run(originalTxDataDatabase, sql, values) - if (config.VERBOSE) - Logger.mainLogger.debug('Successfully inserted OriginalTxsData', originalTxsData.length) - } catch (e) { - Logger.mainLogger.error(e) - Logger.mainLogger.error('Unable to bulk insert OriginalTxsData', originalTxsData.length) + } catch (err) { + Logger.mainLogger.error(err); + Logger.mainLogger.error('Unable to bulk insert OriginalTxsData', originalTxsData.length); } } + + export async function queryOriginalTxDataCount(startCycle?: number, endCycle?: number): Promise { let originalTxsData try { diff --git a/src/dbstore/processedTxs.ts b/src/dbstore/processedTxs.ts index 259d032d..ee913484 100644 --- a/src/dbstore/processedTxs.ts +++ b/src/dbstore/processedTxs.ts @@ -3,6 +3,7 @@ import { processedTxDatabase } from './' import * as Logger from '../Logger' import { config } from '../Config' +// const superjson = require('superjson') /** * ProcessedTransaction stores transactions which have a receipt */ @@ -14,58 +15,78 @@ export interface ProcessedTransaction { } export async function insertProcessedTx(processedTx: ProcessedTransaction): Promise { + try { - const fields = Object.keys(processedTx).join(', ') - const placeholders = Object.keys(processedTx).fill('?').join(', ') - const values = db.extractValues(processedTx) - const sql = - 'INSERT INTO processedTxs (' + - fields + - ') VALUES (' + - placeholders + - ') ON CONFLICT (txId) DO UPDATE SET ' + - 'cycle = excluded.cycle, ' + - 'txTimestamp = excluded.txTimestamp, ' + - 'applyTimestamp = excluded.applyTimestamp' - - await db.run(processedTxDatabase, sql, values) + + // Define the table columns based on schema + const columns = ['txId', 'cycle', 'txTimestamp', 'applyTimestamp']; + + // Construct the SQL query with placeholders + const placeholders = `(${columns.map(() => '?').join(', ')})`; + const sql = ` + INSERT INTO processedTxs (${columns.join(', ')}) VALUES ${placeholders} + ON CONFLICT (txId) DO UPDATE SET + cycle = excluded.cycle, + txTimestamp = excluded.txTimestamp, + applyTimestamp = excluded.applyTimestamp + `; + + // Map the `processedTx` object to match the columns + const values = columns.map((column) => processedTx[column]); + + // Execute the query directly (single-row insert/update) + await db.run(processedTxDatabase, sql, values); + if (config.VERBOSE) { - Logger.mainLogger.debug('Successfully inserted ProcessedTransaction', processedTx.txId) + Logger.mainLogger.debug('Successfully inserted ProcessedTransaction', processedTx.txId); } - } catch (e) { - Logger.mainLogger.error(e) + } catch (err) { + Logger.mainLogger.error(err); Logger.mainLogger.error( - 'Unable to insert ProcessedTransaction or it is already stored in to database', + 'Unable to insert ProcessedTransaction or it is already stored in the database', processedTx.txId - ) + ); } } + + export async function bulkInsertProcessedTxs(processedTxs: ProcessedTransaction[]): Promise { + try { - const fields = Object.keys(processedTxs[0]).join(', ') - const placeholders = Object.keys(processedTxs[0]).fill('?').join(', ') - const values = db.extractValuesFromArray(processedTxs) - let sql = 'INSERT INTO processedTxs (' + fields + ') VALUES (' + placeholders + ')' - for (let i = 1; i < processedTxs.length; i++) { - sql = sql + ', (' + placeholders + ')' + + // Define the table columns based on schema + const columns = ['txId', 'cycle', 'txTimestamp', 'applyTimestamp']; + + // Construct the SQL query for bulk insertion + const placeholders = processedTxs.map(() => `(${columns.map(() => '?').join(', ')})`).join(', '); + const sql = ` + INSERT INTO processedTxs (${columns.join(', ')}) VALUES ${placeholders} + ON CONFLICT (txId) DO UPDATE SET + cycle = excluded.cycle, + txTimestamp = excluded.txTimestamp, + applyTimestamp = excluded.applyTimestamp + `; + + // Flatten the `processedTxs` array into a single list of values + const values = processedTxs.flatMap((tx) => + columns.map((column) => tx[column]) + ); + + // Execute the single query + await db.run(processedTxDatabase, sql, values); + + if (config.VERBOSE) { + Logger.mainLogger.debug('Successfully inserted ProcessedTransactions', processedTxs.length); } - sql = - sql + - ' ON CONFLICT (txId) DO UPDATE SET ' + - 'cycle = excluded.cycle, ' + - 'txTimestamp = excluded.txTimestamp, ' + - 'applyTimestamp = excluded.applyTimestamp' - - await db.run(processedTxDatabase, sql, values) - if (config.VERBOSE) - Logger.mainLogger.debug('Successfully inserted ProcessedTransaction', processedTxs.length) - } catch (e) { - Logger.mainLogger.error(e) - Logger.mainLogger.error('Unable to bulk insert ProcessedTransaction', processedTxs.length) + } catch (err) { + Logger.mainLogger.error(err); + Logger.mainLogger.error('Unable to bulk insert ProcessedTransactions', processedTxs.length); } } + + export async function queryProcessedTxByTxId(txId: string): Promise { try { const sql = `SELECT * FROM processedTxs WHERE txId=?` diff --git a/src/dbstore/receipts.ts b/src/dbstore/receipts.ts index 95c5ec5a..65820fb2 100644 --- a/src/dbstore/receipts.ts +++ b/src/dbstore/receipts.ts @@ -4,9 +4,10 @@ import * as db from './sqlite3storage' import { receiptDatabase } from '.' import * as Logger from '../Logger' import { config } from '../Config' -import { DeSerializeFromJsonString } from '../utils/serialization' +import { DeSerializeFromJsonString , SerializeToJsonString} from '../utils/serialization' import { AccountsCopy } from '../dbstore/accounts' +// const superjson = require('superjson') export type Proposal = { applied: boolean cant_preApply: boolean @@ -108,40 +109,94 @@ type DbReceiptCount = ReceiptCount & { export async function insertReceipt(receipt: Receipt): Promise { try { - const fields = Object.keys(receipt).join(', ') - const placeholders = Object.keys(receipt).fill('?').join(', ') - const values = db.extractValues(receipt) - const sql = 'INSERT OR REPLACE INTO receipts (' + fields + ') VALUES (' + placeholders + ')' - await db.run(receiptDatabase, sql, values) + // Define the columns to match the database schema + const columns = [ + 'receiptId', + 'tx', + 'cycle', + 'applyTimestamp', + 'timestamp', + 'signedReceipt', + 'afterStates', + 'beforeStates', + 'appReceiptData', + 'executionShardKey', + 'globalModification', + ]; + + // Create placeholders for the values + const placeholders = `(${columns.map(() => '?').join(', ')})`; + const sql = `INSERT OR REPLACE INTO receipts (${columns.join(', ')}) VALUES ${placeholders}`; + + // Map the receipt object to match the columns + const values = columns.map((column) => + typeof receipt[column] === 'object' + ? SerializeToJsonString(receipt[column]) // Serialize objects to JSON strings + : receipt[column] + ); + + // Execute the query directly + await db.run(receiptDatabase, sql, values); + if (config.VERBOSE) { - Logger.mainLogger.debug('Successfully inserted Receipt', receipt.receiptId) + Logger.mainLogger.debug('Successfully inserted Receipt', receipt.receiptId); } - } catch (e) { - Logger.mainLogger.error(e) + } catch (err) { + Logger.mainLogger.error(err); Logger.mainLogger.error( - 'Unable to insert Receipt or it is already stored in to database', + 'Unable to insert Receipt or it is already stored in the database', receipt.receiptId - ) + ); } } export async function bulkInsertReceipts(receipts: Receipt[]): Promise { + try { - const fields = Object.keys(receipts[0]).join(', ') - const placeholders = Object.keys(receipts[0]).fill('?').join(', ') - const values = db.extractValuesFromArray(receipts) - let sql = 'INSERT OR REPLACE INTO receipts (' + fields + ') VALUES (' + placeholders + ')' - for (let i = 1; i < receipts.length; i++) { - sql = sql + ', (' + placeholders + ')' + + // Define the table columns based on schema + const columns = [ + 'receiptId', + 'tx', + 'cycle', + 'applyTimestamp', + 'timestamp', + 'signedReceipt', + 'afterStates', + 'beforeStates', + 'appReceiptData', + 'executionShardKey', + 'globalModification', + ]; + + // Construct the SQL query with placeholders + const placeholders = receipts.map(() => `(${columns.map(() => '?').join(', ')})`).join(', '); + const sql = `INSERT OR REPLACE INTO receipts (${columns.join(', ')}) VALUES ${placeholders}`; + + // Flatten the `receipts` array into a single list of values + const values = receipts.flatMap((receipt) => + columns.map((column) => + typeof receipt[column] === 'object' + ? SerializeToJsonString(receipt[column]) // Serialize objects to JSON + : receipt[column] + ) + ); + + // Execute the query in a single call + await db.run(receiptDatabase, sql, values); + + if (config.VERBOSE) { + Logger.mainLogger.debug('Successfully inserted Receipts', receipts.length); } - await db.run(receiptDatabase, sql, values) - if (config.VERBOSE) Logger.mainLogger.debug('Successfully inserted Receipts', receipts.length) - } catch (e) { - Logger.mainLogger.error(e) - Logger.mainLogger.error('Unable to bulk insert Receipts', receipts.length) + } catch (err) { + Logger.mainLogger.error(err); + Logger.mainLogger.error('Unable to bulk insert Receipts', receipts.length); } } + + + export async function queryReceiptByReceiptId(receiptId: string, timestamp = 0): Promise { try { const sql = `SELECT * FROM receipts WHERE receiptId=?` + (timestamp ? ` AND timestamp=?` : '') diff --git a/src/dbstore/transactions.ts b/src/dbstore/transactions.ts index 47fc8d2c..e96f271b 100644 --- a/src/dbstore/transactions.ts +++ b/src/dbstore/transactions.ts @@ -3,8 +3,9 @@ import * as db from './sqlite3storage' import { transactionDatabase } from '.' import * as Logger from '../Logger' import { config } from '../Config' -import { DeSerializeFromJsonString } from '../utils/serialization' +import { DeSerializeFromJsonString, SerializeToJsonString } from '../utils/serialization' +// const superjson = require('superjson') /** * Transaction is for storing dapp receipt (eg. evm receipt in shardeum) * If there is no dapp receipt, we can skip storing in transactions table and use receipts table @@ -26,37 +27,65 @@ type DbTransaction = Transaction & { export async function insertTransaction(transaction: Transaction): Promise { try { - const fields = Object.keys(transaction).join(', ') - const placeholders = Object.keys(transaction).fill('?').join(', ') - const values = db.extractValues(transaction) - const sql = 'INSERT OR REPLACE INTO transactions (' + fields + ') VALUES (' + placeholders + ')' - await db.run(transactionDatabase, sql, values) + // Define the table columns based on schema + const columns = ['txId', 'appReceiptId', 'timestamp', 'cycleNumber', 'data', 'originalTxData']; + + // Construct the SQL query with placeholders + const placeholders = `(${columns.map(() => '?').join(', ')})`; + const sql = `INSERT OR REPLACE INTO transactions (${columns.join(', ')}) VALUES ${placeholders}`; + + // Map the `transaction` object to match the columns + const values = columns.map((column) => + typeof transaction[column] === 'object' + ? SerializeToJsonString(transaction[column]) // Serialize objects to JSON + : transaction[column] + ); + + // Execute the query directly + await db.run(transactionDatabase, sql, values); + if (config.VERBOSE) { - Logger.mainLogger.debug('Successfully inserted Transaction', transaction.txId) + Logger.mainLogger.debug('Successfully inserted Transaction', transaction.txId); } - } catch (e) { - Logger.mainLogger.error(e) + } catch (err) { + Logger.mainLogger.error(err); Logger.mainLogger.error( - 'Unable to insert Transaction or it is already stored in to database', + 'Unable to insert Transaction or it is already stored in the database', transaction.txId - ) + ); } } + export async function bulkInsertTransactions(transactions: Transaction[]): Promise { + try { - const fields = Object.keys(transactions[0]).join(', ') - const placeholders = Object.keys(transactions[0]).fill('?').join(', ') - const values = db.extractValuesFromArray(transactions) - let sql = 'INSERT OR REPLACE INTO transactions (' + fields + ') VALUES (' + placeholders + ')' - for (let i = 1; i < transactions.length; i++) { - sql = sql + ', (' + placeholders + ')' + + // Define the table columns based on schema + const columns = ['txId', 'appReceiptId', 'timestamp', 'cycleNumber', 'data', 'originalTxData']; + + // Construct the SQL query for bulk insertion with all placeholders + const placeholders = transactions.map(() => `(${columns.map(() => '?').join(', ')})`).join(', '); + const sql = `INSERT OR REPLACE INTO transactions (${columns.join(', ')}) VALUES ${placeholders}`; + + // Flatten the `transactions` array into a single list of values + const values = transactions.flatMap((transaction) => + columns.map((column) => + typeof transaction[column] === 'object' + ? SerializeToJsonString(transaction[column]) // Serialize objects to JSON + : transaction[column] + ) + ); + + // Execute the single query for all transactions + await db.run(transactionDatabase, sql, values); + + if (config.VERBOSE) { + Logger.mainLogger.debug('Successfully inserted Transactions', transactions.length); } - await db.run(transactionDatabase, sql, values) - if (config.VERBOSE) Logger.mainLogger.debug('Successfully inserted Transactions', transactions.length) - } catch (e) { - Logger.mainLogger.error(e) - Logger.mainLogger.error('Unable to bulk insert Transactions', transactions.length) + } catch (err) { + Logger.mainLogger.error(err); + Logger.mainLogger.error('Unable to bulk insert Transactions', transactions.length); } } diff --git a/src/server.ts b/src/server.ts index 82a35f84..e72815d3 100644 --- a/src/server.ts +++ b/src/server.ts @@ -44,6 +44,8 @@ import { Utils as StringUtils } from '@shardus/types' import { healthCheckRouter } from './routes/healthCheck' import { setupWorkerProcesses } from './primary-process' import { initWorkerProcess } from './worker-process' +import { initAjvSchemas } from './types/ajv/Helpers' +import { initializeSerialization } from './utils/serialization/SchemaHelpers' const configFile = join(process.cwd(), 'archiver-config.json') let logDir: string @@ -51,6 +53,8 @@ const cluster = clusterModule as unknown as clusterModule.Cluster async function start(): Promise { overrideDefaultConfig(configFile) + initAjvSchemas(); + initializeSerialization() // Set crypto hash keys from config const hashKey = config.ARCHIVER_HASH_KEY Crypto.setCryptoHashKey(hashKey) diff --git a/src/types/ajv/Accounts.ts b/src/types/ajv/Accounts.ts new file mode 100644 index 00000000..4d3cd25d --- /dev/null +++ b/src/types/ajv/Accounts.ts @@ -0,0 +1,41 @@ +import { addSchema } from '../../utils/serialization/SchemaHelpers'; +import { AJVSchemaEnum } from '../enum/AJVSchemaEnum'; +// Define the schema for AccountsCopy +const schemaAccountsCopy = { + type: 'object', + properties: { + accountId: { type: 'string' }, + data: { type: 'object', additionalProperties: true }, // Allows nested objects with dynamic keys + timestamp: { type: 'integer' }, + hash: { type: 'string' }, + cycleNumber: { type: 'integer', nullable: true }, // Optional field + isGlobal: { type: 'boolean' } + }, + required: ['accountId', 'data', 'timestamp', 'hash', 'isGlobal'] // cycleNumber is optional +}; + +// Define the schema for DbAccountCopy +const schemaDbAccountCopy = { + type: 'object', + properties: { + ...schemaAccountsCopy.properties, + data: { type: 'string' } // Overriding the `data` field to be a string in DbAccountCopy + }, + required: ['accountId', 'data', 'timestamp', 'hash', 'isGlobal'] // Required fields remain the same +}; + +// Function to initialize schemas +export function initAccounts(): void { + addSchemaDependencies(); + addSchemas(); +} + +// Function to add schema dependencies +function addSchemaDependencies(): void { + // No external dependencies +} + +// Function to register schemas +function addSchemas(): void { + addSchema( AJVSchemaEnum.AccountsCopy, schemaAccountsCopy); +} diff --git a/src/types/ajv/Helpers.ts b/src/types/ajv/Helpers.ts new file mode 100644 index 00000000..d639121e --- /dev/null +++ b/src/types/ajv/Helpers.ts @@ -0,0 +1,35 @@ +import { Utils } from '@shardus/types' +import { ErrorObject } from 'ajv' +import { getVerifyFunction } from '../../utils/serialization/SchemaHelpers' +import { initReceipts } from './Receipts' +import { initAccounts } from './Accounts' +import { initOriginalTxData } from './OriginalTxData' + +export function initAjvSchemas(): void { + initAccounts() + initReceipts() + initOriginalTxData() + +} + +export function verifyPayload(name: string, payload: T): string[] | null { + const verifyFn = getVerifyFunction(name) + const isValid = verifyFn(payload) + if (!isValid) { + return parseAjvErrors(verifyFn.errors) + } else { + return null + } +} + +function parseAjvErrors(errors: Array | null): string[] | null { + if (!errors) return null + + return errors.map((error) => { + let errorMsg = `${error.message}` + if (error.params && Object.keys(error.params).length > 0) { + errorMsg += `: ${Utils.safeStringify(error.params)}` + } + return errorMsg + }) +} diff --git a/src/types/ajv/OriginalTxData.ts b/src/types/ajv/OriginalTxData.ts new file mode 100644 index 00000000..b3b7f4fa --- /dev/null +++ b/src/types/ajv/OriginalTxData.ts @@ -0,0 +1,37 @@ + +import { addSchema } from '../../utils/serialization/SchemaHelpers'; +import { AJVSchemaEnum } from '../enum/AJVSchemaEnum'; + +// Define the schema for OriginalTxData +const schemaOriginalTxData = { + type: 'object', + properties: { + txId: { type: 'string' }, // txId must be a string + timestamp: { type: 'integer' }, // timestamp must be an integer + cycle: { type: 'integer' }, // cycle must be an integer + originalTxData: { type: 'object' }, // originalTxData must be an object + // Uncomment if sign is required: + // sign: { type: 'string' } // Sign (if used) must be a string + }, + required: ['txId', 'timestamp', 'cycle', 'originalTxData'], // Required fields + additionalProperties: false, // Disallow other fields +}; + + +// Function to initialize schemas +export function initOriginalTxData(): void { + addSchemaDependencies(); + addSchemas(); + } + + // Function to add schema dependencies (if any external schemas are needed) + function addSchemaDependencies(): void { + // No external dependencies for now + } + + // Function to register schemas + function addSchemas(): void { + addSchema(AJVSchemaEnum.OriginalTxData, schemaOriginalTxData); + + } + \ No newline at end of file diff --git a/src/types/ajv/Receipts.ts b/src/types/ajv/Receipts.ts new file mode 100644 index 00000000..ffb9f1f2 --- /dev/null +++ b/src/types/ajv/Receipts.ts @@ -0,0 +1,198 @@ +import { addSchema } from '../../utils/serialization/SchemaHelpers'; +import { AJVSchemaEnum } from '../enum/AJVSchemaEnum'; +// import { schemaAccountsCopy } from './Accounts'; // Import the schema from Accounts.ts + +// Define the regex for IPv4 validation (if needed in nested objects) +const ipv4Regex = /^(?:(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)\.){3}(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)$/; + +// Define schemas for nested components +const schemaProposal = { + type: 'object', + properties: { + applied: { type: 'boolean' }, + cant_preApply: { type: 'boolean' }, + accountIDs: { type: 'array', items: { type: 'string' } }, + beforeStateHashes: { type: 'array', items: { type: 'string' } }, + afterStateHashes: { type: 'array', items: { type: 'string' } }, + appReceiptDataHash: { type: 'string' }, + txid: { type: 'string' } + }, + required: ['applied', 'cant_preApply', 'accountIDs', 'beforeStateHashes', 'afterStateHashes', 'appReceiptDataHash', 'txid'], + additionalProperties: false +}; + +const schemaSignature = { + type: 'object', + properties: { + owner: { type: 'string' }, + sig: { type: 'string' } + }, + required: ['owner', 'sig'], + additionalProperties: false +}; + +const schemaSignedReceipt = { + type: 'object', + properties: { + proposal: schemaProposal, + proposalHash: { type: 'string' }, + signaturePack: { + type: 'array', + items: schemaSignature + }, + voteOffsets: { + type: 'array', + items: { type: 'integer' } + }, + sign: { type: 'object', ...schemaSignature }, + txGroupCycle: { type: 'integer', minimum: 0 } + }, + required: ['proposal', 'proposalHash', 'signaturePack', 'voteOffsets'], + additionalProperties: false +}; + +const schemaGlobalTxReceipt = { + type: 'object', + properties: { + signs: { + type: 'array', + items: schemaSignature + }, + tx: { + type: 'object', + properties: { + address: { type: 'string' }, + addressHash: { type: 'string' }, + value: {}, + when: { type: 'integer' }, + source: { type: 'string' } + }, + required: ['address', 'addressHash', 'value', 'when', 'source'], + additionalProperties: false + }, + txGroupCycle: { type: 'integer', minimum: 0 } + }, + required: ['signs', 'tx'], + additionalProperties: false // Excludes `consensusGroup` by default +}; + + +const schemaAppReceiptData = { + type: 'object', + properties: { + accountId: { type: 'string' }, + data: { type: 'object', additionalProperties: true } + }, + required: ['data'], + additionalProperties: true +}; + +const schemaTx = { + type: 'object', + properties: { + originalTxData: { type: 'object', additionalProperties: true }, + txId: { type: 'string' }, + timestamp: { type: 'integer' } + }, + required: ['originalTxData', 'txId', 'timestamp'], + additionalProperties: false +}; + +// Define the main ArchiverReceipt schema +const schemaArchiverReceipt = { + type: 'object', + properties: { + tx: schemaTx, + cycle: { type: 'integer', minimum: 0 }, + signedReceipt: { oneOf: [schemaSignedReceipt, schemaGlobalTxReceipt] }, + afterStates: { type: 'array', items: { $ref: AJVSchemaEnum.AccountsCopy } }, // Using imported schema + beforeStates: { type: 'array', items: { $ref: AJVSchemaEnum.AccountsCopy } }, // Using imported schema + appReceiptData: schemaAppReceiptData, + executionShardKey: { type: 'string' }, + globalModification: { type: 'boolean' } + }, + required: ['tx', 'cycle', 'signedReceipt', 'appReceiptData', 'executionShardKey', 'globalModification'], + additionalProperties: false +}; + + +const schemaAppliedVote = { + type: 'object', + properties: { + txid: { type: 'string' }, + transaction_result: { type: 'boolean' }, + account_id: { + type: 'array', + items: { type: 'string' } + }, + account_state_hash_after: { + type: 'array', + items: { type: 'string' } + }, + account_state_hash_before: { + type: 'array', + items: { type: 'string' } + }, + cant_apply: { type: 'boolean' }, + node_id: { type: 'string' }, + sign: schemaSignature, // Reference to schemaSignature + app_data_hash: { type: 'string' } + }, + required: [ + 'txid', + 'transaction_result', + 'account_id', + 'account_state_hash_after', + 'account_state_hash_before', + 'cant_apply', + 'node_id', + 'sign', + 'app_data_hash' + ], + additionalProperties: false +}; + +const schemaConfirmOrChallengeMessage = { + type: 'object', + properties: { + message: { type: 'string' }, + nodeId: { type: 'string' }, + appliedVote: schemaAppliedVote , + sign: schemaSignature + }, + required: ['message', 'nodeId', 'appliedVote', 'sign'], // All properties are required + additionalProperties: false +}; + + +// Define the main Receipt schema +const schemaReceipt = { + type: 'object', + properties: { + receiptId: { type: 'string' }, + timestamp: { type: 'integer' }, + applyTimestamp: { type: 'integer' }, + ...schemaArchiverReceipt.properties + }, + required: ['receiptId', 'timestamp', 'applyTimestamp', ...schemaArchiverReceipt.required +], + additionalProperties: false +}; + +// Function to initialize schemas +export function initReceipts(): void { + addSchemaDependencies(); + addSchemas(); +} + +// Function to add schema dependencies +function addSchemaDependencies(): void { + // No external dependencies +} + +// Function to register schemas +function addSchemas(): void { + // addSchema('ReceiptTx', schemaTx); + addSchema(AJVSchemaEnum.ArchiverReceipt, schemaArchiverReceipt); + addSchema(AJVSchemaEnum.Receipt, schemaReceipt); +} diff --git a/src/types/enum/AJVSchemaEnum.ts b/src/types/enum/AJVSchemaEnum.ts new file mode 100644 index 00000000..4a3f1da1 --- /dev/null +++ b/src/types/enum/AJVSchemaEnum.ts @@ -0,0 +1,6 @@ +export enum AJVSchemaEnum { + Receipt = 'Receipt', + AccountsCopy = 'AccountsCopy', + ArchiverReceipt = 'ArchiverReceipt', + OriginalTxData = 'OriginalTxData' +} diff --git a/src/types/enum/TypeIdentifierEnum.ts b/src/types/enum/TypeIdentifierEnum.ts new file mode 100644 index 00000000..1188aec8 --- /dev/null +++ b/src/types/enum/TypeIdentifierEnum.ts @@ -0,0 +1,3 @@ +export enum TypeIdentifierEnum { + +} diff --git a/src/utils/serialization/SchemaHelpers.ts b/src/utils/serialization/SchemaHelpers.ts new file mode 100644 index 00000000..bf8843b4 --- /dev/null +++ b/src/utils/serialization/SchemaHelpers.ts @@ -0,0 +1,34 @@ +import * as Ajv from 'ajv' + +const ajv = new Ajv() + +const schemaMap: Map = new Map() +const verifyFunctions: Map = new Map() + +export function addSchema(name: string, schema: object): void { + if (schemaMap.has(name)) { + throw new Error(`error already registered ${name}`) + } + schemaMap.set(name, schema) +} + +export function initializeSerialization(): void { + // Register each schema exactly once in AJV + for (const [name, schema] of schemaMap.entries()) { + ajv.addSchema(schema, name) + } +} + +export function getVerifyFunction(name: string): Ajv.ValidateFunction { + const existingFn = verifyFunctions.get(name) + if (existingFn) { + return existingFn + } + const schema = schemaMap.get(name) + if (!schema) { + throw new Error(`error missing schema ${name}`) + } + const verifyFn = ajv.compile(schema) + verifyFunctions.set(name, verifyFn) + return verifyFn +}