From bf9528ef4e52e6f13d0389499ce99c0a9eedf92a Mon Sep 17 00:00:00 2001 From: jairajdev Date: Fri, 2 Aug 2024 11:03:38 +0545 Subject: [PATCH 1/6] Added composite indexes ( cycle and timesamp ) on each data and added debug logs for account query --- src/Data/Data.ts | 2 +- src/dbstore/index.ts | 35 ++++++++++++++++++++++++++++++----- 2 files changed, 31 insertions(+), 6 deletions(-) diff --git a/src/Data/Data.ts b/src/Data/Data.ts index 50bd9685..2c77af9d 100644 --- a/src/Data/Data.ts +++ b/src/Data/Data.ts @@ -576,7 +576,7 @@ async function syncFromNetworkConfig(): Promise { // typeof devPublicKey === typeof config.DevPublicKey && // devPublicKey !== config.DevPublicKey // ) - // updateConfig({ DevPublicKey: devPublicKey }) + // updateConfig({ DevPublicKey: devPublicKey })count query) if ( !Utils.isUndefined(newPOQReceipt) && typeof newPOQReceipt === typeof config.newPOQReceipt && diff --git a/src/dbstore/index.ts b/src/dbstore/index.ts index dd4a5d47..3a4def23 100644 --- a/src/dbstore/index.ts +++ b/src/dbstore/index.ts @@ -18,7 +18,11 @@ export const initializeDB = async (config: Config): Promise => { ) await runCreate( transactionDatabase, - 'CREATE INDEX if not exists `transactions_idx` ON `transactions` (`cycleNumber` DESC, `timestamp` DESC)' + 'CREATE INDEX if not exists `transactions_timestamp` ON `transactions` (`timestamp` ASC)' + ) + await runCreate( + transactionDatabase, + 'CREATE INDEX if not exists `transactions_cycleNumber_timestamp` ON `transactions` (`cycleNumber` ASC, `timestamp` ASC)' ) await runCreate( transactionDatabase, @@ -35,15 +39,28 @@ export const initializeDB = async (config: Config): Promise => { ) await runCreate( accountDatabase, - 'CREATE INDEX if not exists `accounts_idx` ON `accounts` (`cycleNumber` DESC, `timestamp` DESC)' + 'CREATE INDEX if not exists `accounts_cycleNumber` ON `accounts` (`cycleNumber` ASC)' + ) + await runCreate( + accountDatabase, + 'CREATE INDEX if not exists `accounts_timestamp` ON `accounts` (`timestamp` ASC)' + ) + await runCreate( + accountDatabase, + 'CREATE INDEX if not exists `accounts_cycleNumber_timestamp` ON `accounts` (`cycleNumber` ASC, `timestamp` ASC)' ) await runCreate( receiptDatabase, 'CREATE TABLE if not exists `receipts` (`receiptId` TEXT NOT NULL UNIQUE PRIMARY KEY, `tx` JSON NOT NULL, `cycle` NUMBER NOT NULL, `applyTimestamp` BIGINT NOT NULL, `timestamp` BIGINT NOT NULL, `signedReceipt` JSON NOT NULL, `afterStates` JSON, `beforeStates` JSON, `appReceiptData` JSON, `executionShardKey` TEXT NOT NULL, `globalModification` BOOLEAN NOT NULL)' ) + await runCreate(receiptDatabase, 'CREATE INDEX if not exists `receipts_cycle` ON `receipts` (`cycle` ASC)') await runCreate( receiptDatabase, - 'CREATE INDEX if not exists `receipts_idx` ON `receipts` (`cycle` ASC, `timestamp` ASC)' + 'CREATE INDEX if not exists `receipts_timestamp` ON `receipts` (`timestamp` ASC)' + ) + await runCreate( + receiptDatabase, + 'CREATE INDEX if not exists `receipts_cycle_timestamp` ON `receipts` (`cycle` ASC, `timestamp` ASC)' ) await runCreate( originalTxDataDatabase, @@ -51,11 +68,19 @@ export const initializeDB = async (config: Config): Promise => { ) await runCreate( originalTxDataDatabase, - 'CREATE INDEX if not exists `originalTxsData_idx` ON `originalTxsData` (`cycle` ASC, `timestamp` ASC)' + 'CREATE INDEX if not exists `originalTxsData_cycle` ON `originalTxsData` (`cycle` ASC)' + ) + await runCreate( + originalTxDataDatabase, + 'CREATE INDEX if not exists `originalTxsData_timestamp` ON `originalTxsData` (`timestamp` ASC)' + ) + await runCreate( + originalTxDataDatabase, + 'CREATE INDEX if not exists `originalTxsData_cycle_timestamp` ON `originalTxsData` (`cycle` ASC, `timestamp` ASC)' ) await runCreate( originalTxDataDatabase, - 'CREATE INDEX if not exists `originalTxsData_txId_idx` ON `originalTxsData` (`txId`)' + 'CREATE INDEX if not exists `originalTxsData_txId` ON `originalTxsData` (`txId`)' ) } From f8b12db4418ded26c98805d82da087598b51f803 Mon Sep 17 00:00:00 2001 From: jairajdev Date: Fri, 23 Aug 2024 11:30:21 +0545 Subject: [PATCH 2/6] Added profiler and nestedCuonters in the gossip data + collecting missing data --- src/Data/Collector.ts | 11 +++++++++-- src/Data/GossipData.ts | 14 +++++++++++++- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/src/Data/Collector.ts b/src/Data/Collector.ts index 3155d470..07bfc904 100644 --- a/src/Data/Collector.ts +++ b/src/Data/Collector.ts @@ -1380,6 +1380,8 @@ export const collectMissingReceipts = async ( `Collecting missing receipt for txId ${txId} with timestamp ${txTimestamp} from archivers`, senderArchivers.map((a) => a.ip + ':' + a.port) ) + if (nestedCountersInstance) nestedCountersInstance.countEvent('receipt', 'Collect_missing_receipt') + if (profilerInstance) profilerInstance.profileSectionStart('Collect_missing_receipt') for (const senderArchiver of senderArchivers) { if ( (processedReceiptsMap.has(txId) && processedReceiptsMap.get(txId) === txTimestamp) || @@ -1409,7 +1411,8 @@ export const collectMissingReceipts = async ( `Failed to collect receipt for txId ${txId} with timestamp ${txTimestamp} from archivers ${senders}` ) } - collectingMissingOriginalTxsMap.delete(txId) + collectingMissingReceiptsMap.delete(txId) + if (profilerInstance) profilerInstance.profileSectionEnd('Collect_missing_receipt') } const collectMissingOriginalTxsData = async ( @@ -1424,6 +1427,9 @@ const collectMissingOriginalTxsData = async ( `Collecting missing originalTxData for txId ${txId} with timestamp ${txTimestamp} from archivers`, senderArchivers.map((a) => a.ip + ':' + a.port) ) + if (nestedCountersInstance) + nestedCountersInstance.countEvent('originalTxData', 'Collect_missing_originalTxData') + if (profilerInstance) profilerInstance.profileSectionStart('Collect_missing_originalTxData') for (const senderArchiver of senderArchivers) { if ( (processedOriginalTxsMap.has(txId) && processedOriginalTxsMap.get(txId) === txTimestamp) || @@ -1451,7 +1457,8 @@ const collectMissingOriginalTxsData = async ( `Failed to collect originalTxData for txId ${txId} with timestamp ${txTimestamp} from archivers ${senders}` ) } - collectingMissingReceiptsMap.delete(txId) + collectingMissingOriginalTxsMap.delete(txId) + if (profilerInstance) profilerInstance.profileSectionEnd('Collect_missing_originalTxData') } type TxDataFromArchiversResponse = { diff --git a/src/Data/GossipData.ts b/src/Data/GossipData.ts index d2414e6d..e713eac1 100644 --- a/src/Data/GossipData.ts +++ b/src/Data/GossipData.ts @@ -6,6 +6,7 @@ import { Signature } from '@shardus/crypto-utils' import { P2P as P2PTypes } from '@shardus/types' import * as Utils from '../Utils' import { config } from '../Config' +import { nestedCountersInstance } from '../profiler/nestedCounters' // adjacentArchivers are one archiver from left and one archiver from right of the current archiver export let adjacentArchivers: State.ArchiverNodeInfo[] = [] @@ -102,12 +103,23 @@ export async function sendDataToAdjacentArchivers( } } try { - await Promise.allSettled(promises) + await Promise.allSettled(promises).then((results) => { + results.forEach((result) => { + if (nestedCountersInstance) { + if (result.status === 'fulfilled') { + if (result.value !== null) nestedCountersInstance.countEvent('gossip-data', 'success') + else nestedCountersInstance.countEvent('gossip-data', 'failure') + } else nestedCountersInstance.countEvent('gossip-data', 'failure') + } + }) + }) } catch (err) { Logger.mainLogger.error('Gossip Error: ' + err) + if (nestedCountersInstance) nestedCountersInstance.countEvent('gossip-data', 'error 1', err) } } catch (ex) { Logger.mainLogger.debug(ex) Logger.mainLogger.debug('Fail to gossip') + if (nestedCountersInstance) nestedCountersInstance.countEvent('gossip-data', 'error 2', ex) } } From f29cde5dac6099ef846863e52d6ff85f0371b0ad Mon Sep 17 00:00:00 2001 From: jairajdev Date: Fri, 23 Aug 2024 11:40:41 +0545 Subject: [PATCH 3/6] Added nested counters for fail to collector receipt/originalTxData --- src/Data/Collector.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/Data/Collector.ts b/src/Data/Collector.ts index 07bfc904..0cb87dc0 100644 --- a/src/Data/Collector.ts +++ b/src/Data/Collector.ts @@ -1407,6 +1407,8 @@ export const collectMissingReceipts = async ( if (foundTxData) break } if (!foundTxData) { + if (nestedCountersInstance) + nestedCountersInstance.countEvent('receipt', 'Failed to collect missing receipt from archivers') Logger.mainLogger.error( `Failed to collect receipt for txId ${txId} with timestamp ${txTimestamp} from archivers ${senders}` ) @@ -1453,6 +1455,8 @@ const collectMissingOriginalTxsData = async ( if (foundTxData) break } if (!foundTxData) { + if (nestedCountersInstance) + nestedCountersInstance.countEvent('originalTxData', 'Failed to collect_missing_originalTxData') Logger.mainLogger.error( `Failed to collect originalTxData for txId ${txId} with timestamp ${txTimestamp} from archivers ${senders}` ) From cde15ed665afb97f7cea661c31c1acdd9f0d77a6 Mon Sep 17 00:00:00 2001 From: jairajdev Date: Fri, 23 Aug 2024 13:00:07 +0545 Subject: [PATCH 4/6] fix false worker debug logs --- src/primary-process/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/primary-process/index.ts b/src/primary-process/index.ts index 94aada51..8ad5169e 100644 --- a/src/primary-process/index.ts +++ b/src/primary-process/index.ts @@ -172,7 +172,7 @@ const setupWorkerListeners = (worker: Worker): void => { workers[workerIndex]?.kill() workers.splice(workerIndex, 1) } else { - if (!isExtraWorker || !isNewWorker) console.error(`Worker ${workerId} is not in workers list`) + if (!isExtraWorker && !isNewWorker) console.error(`Worker ${workerId} is not in workers list`) } }) } From c90a6fcfc602a94c842d4d5799af52551db0f462 Mon Sep 17 00:00:00 2001 From: jairajdev Date: Fri, 23 Aug 2024 19:20:21 +0545 Subject: [PATCH 5/6] Fix for original receipt getting mutated --- src/primary-process/index.ts | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/src/primary-process/index.ts b/src/primary-process/index.ts index 8ad5169e..bbd4b2ed 100644 --- a/src/primary-process/index.ts +++ b/src/primary-process/index.ts @@ -5,6 +5,7 @@ import { verifyArchiverReceipt, ReceiptVerificationResult } from '../Data/Collec import { config } from '../Config' import { EventEmitter } from 'events' import { StateManager, Utils as StringUtils } from '@shardus/types' +import * as Utils from '../Utils' const MAX_WORKERS = cpus().length - 1 // Leaving 1 core for the master process @@ -41,8 +42,8 @@ export const setupWorkerProcesses = (cluster: Cluster): void => { console.log(`Master ${process.pid} is running`) // Set interval to check receipt count every 15 seconds setInterval(async () => { - for (const [,worker] of newWorkers) { - worker.kill() + for (const [, worker] of newWorkers) { + worker.kill() } if (receiptLoadTraker < config.receiptLoadTrakerLimit) { console.log(`Receipt load is below the limit: ${receiptLoadTraker}/${config.receiptLoadTrakerLimit}`) @@ -83,7 +84,9 @@ export const setupWorkerProcesses = (cluster: Cluster): void => { } } console.log( - `Adjusted worker count to ${workers.length + newWorkers.size}, based on ${receiptLoadTraker} receipts received.` + `Adjusted worker count to ${ + workers.length + newWorkers.size + }, based on ${receiptLoadTraker} receipts received.` ) receiptLoadTraker = 0 // Reset the count }, config.receiptLoadTrakerInterval) @@ -132,16 +135,16 @@ const setupWorkerListeners = (worker: Worker): void => { } break case 'child_ready': - console.log(`Worker ${workerId} is ready for the duty`) - // Check if the worker is in the newWorkers map - if (newWorkers.has(workerId)) { - console.log(`Worker ${workerId} is in newWorkers, moving it to the workers list`) - workers.push(newWorkers.get(workerId)) - newWorkers.delete(workerId) - } else { - console.error(`Worker ${workerId}is not in the newWorkers list`) - } - break + console.log(`Worker ${workerId} is ready for the duty`) + // Check if the worker is in the newWorkers map + if (newWorkers.has(workerId)) { + console.log(`Worker ${workerId} is in newWorkers, moving it to the workers list`) + workers.push(newWorkers.get(workerId)) + newWorkers.delete(workerId) + } else { + console.error(`Worker ${workerId}is not in the newWorkers list`) + } + break default: console.log(`Worker ${process.pid} is sending unknown message type: ${type}`) console.log(data) @@ -159,7 +162,7 @@ const setupWorkerListeners = (worker: Worker): void => { extraWorkers.delete(workerId) } let isNewWorker = false - if(newWorkers.has(workerId)) { + if (newWorkers.has(workerId)) { console.log(`Worker ${workerId} is in newWorkers, removing it now`) isNewWorker = true newWorkers.get(workerId)?.kill() @@ -221,7 +224,7 @@ export const offloadReceipt = async ( currentWorker = (currentWorker + 1) % workers.length if (worker) { console.log('Verifying on the worker process 2', txId, timestamp, worker.process.pid) - const cloneReceipt = { ...receipt } + const cloneReceipt = Utils.deepCopy(receipt) delete cloneReceipt.tx.originalTxData delete cloneReceipt.executionShardKey const stringifiedReceipt = StringUtils.safeStringify(cloneReceipt) From 658c72a668c011089f5c491ba3a1ef153bc22fc3 Mon Sep 17 00:00:00 2001 From: jairajdev Date: Sun, 25 Aug 2024 18:54:17 +0545 Subject: [PATCH 6/6] refactor db initialization --- archiver-config.json | 3 +-- debug_mode.patch | 3 +-- src/Config.ts | 2 -- src/dbstore/accounts.ts | 6 +++--- src/dbstore/cycles.ts | 6 +++--- src/dbstore/index.ts | 39 ++++++++++++++++++++++++---------- src/dbstore/originalTxsData.ts | 6 +++--- src/dbstore/receipts.ts | 6 +++--- src/dbstore/sqlite3storage.ts | 33 +--------------------------- src/dbstore/transactions.ts | 6 +++--- src/primary-process/index.ts | 2 +- 11 files changed, 47 insertions(+), 65 deletions(-) diff --git a/archiver-config.json b/archiver-config.json index a892baf9..7fafd0a8 100644 --- a/archiver-config.json +++ b/archiver-config.json @@ -57,6 +57,5 @@ } ], "ARCHIVER_MODE": "release", - "DevPublicKey": "", - "EXISTING_ARCHIVER_DB_PATH": "" + "DevPublicKey": "" } \ No newline at end of file diff --git a/debug_mode.patch b/debug_mode.patch index cc6058c0..b29aeba5 100644 --- a/debug_mode.patch +++ b/debug_mode.patch @@ -8,8 +8,7 @@ index a892baf..ffac25a 100644 ], - "ARCHIVER_MODE": "release", + "ARCHIVER_MODE": "debug", - "DevPublicKey": "", - "EXISTING_ARCHIVER_DB_PATH": "" + "DevPublicKey": "" } \ No newline at end of file diff --git a/src/Config.ts b/src/Config.ts diff --git a/src/Config.ts b/src/Config.ts index c89ba630..ec6838ff 100644 --- a/src/Config.ts +++ b/src/Config.ts @@ -20,7 +20,6 @@ export interface Config { receiptDB: string originalTxDataDB: string } - EXISTING_ARCHIVER_DB_PATH: string DATASENDER_TIMEOUT: number RATE_LIMIT: number // number of allowed request per second, N_NODE_REJECT_PERCENT: number @@ -104,7 +103,6 @@ let config: Config = { receiptDB: 'receipts.sqlite3', originalTxDataDB: 'originalTxsData.sqlite3', }, - EXISTING_ARCHIVER_DB_PATH: '', DATASENDER_TIMEOUT: 1000 * 60 * 5, RATE_LIMIT: 100, // 100 req per second, N_NODE_REJECT_PERCENT: 5, // Percentage of old nodes to remove from nodelist diff --git a/src/dbstore/accounts.ts b/src/dbstore/accounts.ts index 181e1df2..5102d603 100644 --- a/src/dbstore/accounts.ts +++ b/src/dbstore/accounts.ts @@ -1,5 +1,5 @@ import * as db from './sqlite3storage' -import { accountDatabase, extractValues, extractValuesFromArray } from './sqlite3storage' +import { accountDatabase } from '.' import * as Logger from '../Logger' import { config } from '../Config' import { DeSerializeFromJsonString, SerializeToJsonString } from '../utils/serialization' @@ -22,7 +22,7 @@ export async function insertAccount(account: AccountsCopy): Promise { try { const fields = Object.keys(account).join(', ') const placeholders = Object.keys(account).fill('?').join(', ') - const values = extractValues(account) + const values = db.extractValues(account) const sql = 'INSERT OR REPLACE INTO accounts (' + fields + ') VALUES (' + placeholders + ')' await db.run(accountDatabase, sql, values) if (config.VERBOSE) { @@ -41,7 +41,7 @@ export async function bulkInsertAccounts(accounts: AccountsCopy[]): Promise { try { const fields = Object.keys(cycle).join(', ') const placeholders = Object.keys(cycle).fill('?').join(', ') - const values = extractValues(cycle) + const values = db.extractValues(cycle) const sql = 'INSERT OR REPLACE INTO cycles (' + fields + ') VALUES (' + placeholders + ')' await db.run(cycleDatabase, sql, values) Logger.mainLogger.debug('Successfully inserted Cycle', cycle.cycleRecord.counter, cycle.cycleMarker) @@ -28,7 +28,7 @@ export async function bulkInsertCycles(cycles: Cycle[]): Promise { try { const fields = Object.keys(cycles[0]).join(', ') const placeholders = Object.keys(cycles[0]).fill('?').join(', ') - const values = extractValuesFromArray(cycles) + const values = db.extractValuesFromArray(cycles) let sql = 'INSERT OR REPLACE INTO cycles (' + fields + ') VALUES (' + placeholders + ')' for (let i = 1; i < cycles.length; i++) { sql = sql + ', (' + placeholders + ')' diff --git a/src/dbstore/index.ts b/src/dbstore/index.ts index 3a4def23..d8ad1da6 100644 --- a/src/dbstore/index.ts +++ b/src/dbstore/index.ts @@ -1,17 +1,28 @@ +import * as fs from 'fs' +import * as path from 'path' +import { Database } from 'sqlite3' import { Config } from '../Config' -import { - init, - runCreate, - close, - accountDatabase, - transactionDatabase, - cycleDatabase, - receiptDatabase, - originalTxDataDatabase, -} from './sqlite3storage' +import { createDB, runCreate, close } from './sqlite3storage' + +export let cycleDatabase: Database +export let accountDatabase: Database +export let transactionDatabase: Database +export let receiptDatabase: Database +export let originalTxDataDatabase: Database export const initializeDB = async (config: Config): Promise => { - await init(config) + createDirectories(config.ARCHIVER_DB) + accountDatabase = await createDB(`${config.ARCHIVER_DB}/${config.ARCHIVER_DATA.accountDB}`, 'Account') + cycleDatabase = await createDB(`${config.ARCHIVER_DB}/${config.ARCHIVER_DATA.cycleDB}`, 'Cycle') + transactionDatabase = await createDB( + `${config.ARCHIVER_DB}/${config.ARCHIVER_DATA.transactionDB}`, + 'Transaction' + ) + receiptDatabase = await createDB(`${config.ARCHIVER_DB}/${config.ARCHIVER_DATA.receiptDB}`, 'Receipt') + originalTxDataDatabase = await createDB( + `${config.ARCHIVER_DB}/${config.ARCHIVER_DATA.originalTxDataDB}`, + 'OriginalTxData' + ) await runCreate( transactionDatabase, 'CREATE TABLE if not exists `transactions` (`txId` TEXT NOT NULL UNIQUE PRIMARY KEY, `appReceiptId` TEXT, `timestamp` BIGINT NOT NULL, `cycleNumber` NUMBER NOT NULL, `data` JSON NOT NULL, `originalTxData` JSON NOT NULL)' @@ -91,3 +102,9 @@ export const closeDatabase = async (): Promise => { await close(receiptDatabase, 'Receipt') await close(originalTxDataDatabase, 'OriginalTxData') } + +function createDirectories(pathname: string): void { + const __dirname = path.resolve() + pathname = pathname.replace(/^\.*\/|\/?[^/]+\.[a-z]+|\/$/g, '') // Remove leading directory markers, and remove ending /file-name.extension + fs.mkdirSync(path.resolve(__dirname, pathname), { recursive: true }) // eslint-disable-line security/detect-non-literal-fs-filename +} diff --git a/src/dbstore/originalTxsData.ts b/src/dbstore/originalTxsData.ts index 2e83bff3..c0dd1df4 100644 --- a/src/dbstore/originalTxsData.ts +++ b/src/dbstore/originalTxsData.ts @@ -1,6 +1,6 @@ // import { Signature } from 'shardus-crypto-types' import * as db from './sqlite3storage' -import { originalTxDataDatabase, extractValues, extractValuesFromArray } from './sqlite3storage' +import { originalTxDataDatabase } from '.' import * as Logger from '../Logger' import { config } from '../Config' import { DeSerializeFromJsonString } from '../utils/serialization' @@ -31,7 +31,7 @@ export async function insertOriginalTxData(OriginalTxData: OriginalTxData): Prom try { const fields = Object.keys(OriginalTxData).join(', ') const placeholders = Object.keys(OriginalTxData).fill('?').join(', ') - const values = extractValues(OriginalTxData) + const values = db.extractValues(OriginalTxData) const sql = 'INSERT OR REPLACE INTO originalTxsData (' + fields + ') VALUES (' + placeholders + ')' await db.run(originalTxDataDatabase, sql, values) if (config.VERBOSE) { @@ -50,7 +50,7 @@ export async function bulkInsertOriginalTxsData(originalTxsData: OriginalTxData[ try { const fields = Object.keys(originalTxsData[0]).join(', ') const placeholders = Object.keys(originalTxsData[0]).fill('?').join(', ') - const values = extractValuesFromArray(originalTxsData) + const values = db.extractValuesFromArray(originalTxsData) let sql = 'INSERT OR REPLACE INTO originalTxsData (' + fields + ') VALUES (' + placeholders + ')' for (let i = 1; i < originalTxsData.length; i++) { sql = sql + ', (' + placeholders + ')' diff --git a/src/dbstore/receipts.ts b/src/dbstore/receipts.ts index ca3889a8..65591ffe 100644 --- a/src/dbstore/receipts.ts +++ b/src/dbstore/receipts.ts @@ -1,6 +1,6 @@ import { Signature } from '@shardus/crypto-utils' import * as db from './sqlite3storage' -import { receiptDatabase, extractValues, extractValuesFromArray } from './sqlite3storage' +import { receiptDatabase } from '.' import * as Logger from '../Logger' import { config } from '../Config' import { DeSerializeFromJsonString } from '../utils/serialization' @@ -109,7 +109,7 @@ export async function insertReceipt(receipt: Receipt): Promise { try { const fields = Object.keys(receipt).join(', ') const placeholders = Object.keys(receipt).fill('?').join(', ') - const values = extractValues(receipt) + const values = db.extractValues(receipt) const sql = 'INSERT OR REPLACE INTO receipts (' + fields + ') VALUES (' + placeholders + ')' await db.run(receiptDatabase, sql, values) if (config.VERBOSE) { @@ -128,7 +128,7 @@ export async function bulkInsertReceipts(receipts: Receipt[]): Promise { try { const fields = Object.keys(receipts[0]).join(', ') const placeholders = Object.keys(receipts[0]).fill('?').join(', ') - const values = extractValuesFromArray(receipts) + const values = db.extractValuesFromArray(receipts) let sql = 'INSERT OR REPLACE INTO receipts (' + fields + ') VALUES (' + placeholders + ')' for (let i = 1; i < receipts.length; i++) { sql = sql + ', (' + placeholders + ')' diff --git a/src/dbstore/sqlite3storage.ts b/src/dbstore/sqlite3storage.ts index 596c454f..b0a74869 100644 --- a/src/dbstore/sqlite3storage.ts +++ b/src/dbstore/sqlite3storage.ts @@ -1,32 +1,7 @@ -import * as fs from 'fs' -import * as path from 'path' -import { Config } from '../Config' import { SerializeToJsonString } from '../utils/serialization' import { Database } from 'sqlite3' -// eslint-disable-next-line @typescript-eslint/no-var-requires -export let cycleDatabase: Database -export let accountDatabase: Database -export let transactionDatabase: Database -export let receiptDatabase: Database -export let originalTxDataDatabase: Database - -export async function init(config: Config): Promise { - createDirectories(config.ARCHIVER_DB) - accountDatabase = await createDB(`${config.ARCHIVER_DB}/${config.ARCHIVER_DATA.accountDB}`, 'Account') - cycleDatabase = await createDB(`${config.ARCHIVER_DB}/${config.ARCHIVER_DATA.cycleDB}`, 'Cycle') - transactionDatabase = await createDB( - `${config.ARCHIVER_DB}/${config.ARCHIVER_DATA.transactionDB}`, - 'Transaction' - ) - receiptDatabase = await createDB(`${config.ARCHIVER_DB}/${config.ARCHIVER_DATA.receiptDB}`, 'Receipt') - originalTxDataDatabase = await createDB( - `${config.ARCHIVER_DB}/${config.ARCHIVER_DATA.originalTxDataDB}`, - 'OriginalTxData' - ) -} - -const createDB = async (dbPath: string, dbName: string): Promise => { +export const createDB = async (dbPath: string, dbName: string): Promise => { console.log('dbName', dbName, 'dbPath', dbPath) const db = new Database(dbPath, (err) => { if (err) { @@ -146,9 +121,3 @@ export function extractValuesFromArray(arr: object[]): unknown[] { return null } } - -function createDirectories(pathname: string): void { - const __dirname = path.resolve() - pathname = pathname.replace(/^\.*\/|\/?[^/]+\.[a-z]+|\/$/g, '') // Remove leading directory markers, and remove ending /file-name.extension - fs.mkdirSync(path.resolve(__dirname, pathname), { recursive: true }) // eslint-disable-line security/detect-non-literal-fs-filename -} diff --git a/src/dbstore/transactions.ts b/src/dbstore/transactions.ts index 916f054b..c0d40a86 100644 --- a/src/dbstore/transactions.ts +++ b/src/dbstore/transactions.ts @@ -1,6 +1,6 @@ // import { Signature } from 'shardus-crypto-types' import * as db from './sqlite3storage' -import { transactionDatabase, extractValues, extractValuesFromArray } from './sqlite3storage' +import { transactionDatabase } from '.' import * as Logger from '../Logger' import { config } from '../Config' import { DeSerializeFromJsonString } from '../utils/serialization' @@ -28,7 +28,7 @@ export async function insertTransaction(transaction: Transaction): Promise try { const fields = Object.keys(transaction).join(', ') const placeholders = Object.keys(transaction).fill('?').join(', ') - const values = extractValues(transaction) + const values = db.extractValues(transaction) const sql = 'INSERT OR REPLACE INTO transactions (' + fields + ') VALUES (' + placeholders + ')' await db.run(transactionDatabase, sql, values) if (config.VERBOSE) { @@ -47,7 +47,7 @@ export async function bulkInsertTransactions(transactions: Transaction[]): Promi try { const fields = Object.keys(transactions[0]).join(', ') const placeholders = Object.keys(transactions[0]).fill('?').join(', ') - const values = extractValuesFromArray(transactions) + const values = db.extractValuesFromArray(transactions) let sql = 'INSERT OR REPLACE INTO transactions (' + fields + ') VALUES (' + placeholders + ')' for (let i = 1; i < transactions.length; i++) { sql = sql + ', (' + placeholders + ')' diff --git a/src/primary-process/index.ts b/src/primary-process/index.ts index bbd4b2ed..caa74f2e 100644 --- a/src/primary-process/index.ts +++ b/src/primary-process/index.ts @@ -241,7 +241,7 @@ export const offloadReceipt = async ( } } else { console.log('Verifying on the worker process 1', txId, timestamp, worker.process.pid) - const cloneReceipt = { ...receipt } + const cloneReceipt = Utils.deepCopy(receipt) delete cloneReceipt.tx.originalTxData delete cloneReceipt.executionShardKey const stringifiedReceipt = StringUtils.safeStringify(cloneReceipt)