From d1079ad3a2ed2a22dc25f7248f929f35567f8686 Mon Sep 17 00:00:00 2001 From: Roberto Lucas Date: Tue, 2 Jan 2024 23:16:26 -0600 Subject: [PATCH 1/5] wip: impr delphioracle upsert --- src/indexer/real-time.ts | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/src/indexer/real-time.ts b/src/indexer/real-time.ts index e7a0b68..d285299 100755 --- a/src/indexer/real-time.ts +++ b/src/indexer/real-time.ts @@ -1,8 +1,6 @@ -import { loadReader } from '../reader/ship-reader' +import { concat, uniqBy } from 'lodash' import omit from 'lodash.omit' -import { logger } from '../lib/logger' -import { getChainGraphTableRowData, getPrimaryKey } from './utils' -import { MappingsReader } from '../mappings' +import { config } from '../config' import { deleteTableRows, upsertActions, @@ -10,12 +8,14 @@ import { upsertTableRows, upsertTransactions, } from '../database' -import { ChainGraphAction } from '../types' -import { config } from '../config' import { deleteBlock } from '../database/queries' +import { logger } from '../lib/logger' +import { MappingsReader } from '../mappings' +import { loadReader } from '../reader/ship-reader' +import { ChainGraphAction } from '../types' import { WhitelistReader } from '../whitelist' -import { concat, uniqBy } from 'lodash' import { loadCurrentTableState } from './load-state' +import { getChainGraphTableRowData } from './utils' export const startRealTimeStreaming = async ( mappingsReader: MappingsReader, @@ -60,13 +60,15 @@ export const startRealTimeStreaming = async ( Boolean(row.primary_key) && !Boolean( row.primary_key.normalize().toLowerCase().includes('undefined'), - ) + ) && + // TODO: configurable env owner filter + row.value.owner.match(/^(eosiodetroit|criptolions1|ivote4eosusa|eostitanprod|alohaeosprod|teamgreymass)$/) ) }) .map((row) => { // Regulating the ID type // This avoid when real-time change historical with the upsert and since ID is a number for historical and a string for real-time, we turn the ID into a number - const digested_row = { + const digestedRow = { ...row, value: { ...row.value, @@ -74,12 +76,18 @@ export const startRealTimeStreaming = async ( }, } - return getChainGraphTableRowData(digested_row, mappingsReader) + return ({ + ...getChainGraphTableRowData(digestedRow, mappingsReader), + // mapping the id to make it unique + primary_key: `${row.scope}-${row.value?.owner}-${row.value.id}` + }) }), 'primary_key', ) if (table_rows_deltas.length > 0 || delphioracle_rows_deltas.length > 0) { + // TODO: check if delphiorableRows should be included in the upsertTableRows... + // ! check if previous upsert is the same as this one await upsertTableRows( concat(table_rows_deltas, delphioracle_rows_deltas), ) @@ -151,6 +159,6 @@ export const startRealTimeStreaming = async ( logger.warn(`Microfork on block number : ${block_num}`) // load current state of whitelisted tables, loadCurrentTableState(mappingsReader, whitelistReader) - } + } ) } From e97a498b32c541c7588ca919d7cfed6e29b99011 Mon Sep 17 00:00:00 2001 From: Roberto Lucas Date: Wed, 3 Jan 2024 19:48:16 -0600 Subject: [PATCH 2/5] feat: delphioracle mapping & control --- src/indexer/load-history.ts | 17 ++-- src/indexer/load-state.ts | 193 +++++++++++++++++++----------------- src/indexer/real-time.ts | 141 ++++++++++++++++++-------- src/mappings.ts | 6 +- src/types.ts | 2 +- 5 files changed, 215 insertions(+), 144 deletions(-) diff --git a/src/indexer/load-history.ts b/src/indexer/load-history.ts index 328a2e6..78cb355 100755 --- a/src/indexer/load-history.ts +++ b/src/indexer/load-history.ts @@ -1,17 +1,16 @@ -import { logger } from '../lib/logger' import uniqBy from 'lodash.uniqby' import pThrottle from 'p-throttle' -import { whilst } from '../lib/promises' -import { MappingsReader } from '../mappings' +import { config } from '../config' import { upsertActions, upsertBlocks, upsertTransactions } from '../database' +import { HyperionAction, hyperion } from '../lib/hyperion' +import { logger } from '../lib/logger' +import { whilst } from '../lib/promises' import { ChainGraphAction, ChainGraphActionWhitelist, ChainGraphBlock, ChainGraphTransaction, } from '../types' -import { hyperion, HyperionAction } from '../lib/hyperion' -import { config } from '../config' import { WhitelistReader } from '../whitelist' type UpsertCollections = { @@ -112,9 +111,7 @@ export const loadActionHistory = async (account: string, filter: string) => { }) const loadHyperionPages = async () => { - const filter_page = `filter: ${account}:${filter}, limit: ${hyperionPageSize}, skip: ${ - hyperionPageSize * page - }, page ${page}` + const filter_page = `filter: ${account}:${filter}, limit: ${hyperionPageSize}, skip: ${hyperionPageSize * page}, page ${page}` logger.info(`Loading action history from Hyperion for ${filter_page}`) try { @@ -146,7 +143,7 @@ export const loadHistory = async (whitelistReader: WhitelistReader) => { .map(({ contract: code, actions }) => { // if wildcard we need reformat for hyperion if (actions[0] === '*') return [{ code, action: '*' }] - + return (actions as ChainGraphActionWhitelist[]).map(({ action }) => { return { code, @@ -157,7 +154,7 @@ export const loadHistory = async (whitelistReader: WhitelistReader) => { .flat() await Promise.all( - actionFilters.map(async ({action, code}) => loadActionHistory(code, action), + actionFilters.map(async ({ action, code }) => loadActionHistory(code, action), ), ) } catch (error) { diff --git a/src/indexer/load-state.ts b/src/indexer/load-state.ts index cf63fc4..adeeb9b 100755 --- a/src/indexer/load-state.ts +++ b/src/indexer/load-state.ts @@ -1,12 +1,12 @@ -import { logger } from '../lib/logger' +import Promise from 'bluebird' +import _ from 'lodash' +import { upsertTableRows } from '../database' import { rpc } from '../lib/eosio' -import { getChainGraphTableRowData } from './utils' +import { logger } from '../lib/logger' import { MappingsReader } from '../mappings' -import { WhitelistReader } from '../whitelist' import { ChainGraphTableWhitelist } from '../types' -import { upsertTableRows } from '../database' -import _ from 'lodash' -import Promise from 'bluebird' +import { WhitelistReader } from '../whitelist' +import { getChainGraphTableRowData } from './utils' const getTableScopes = async (code: string, table: string) => { // logger.info(`getTableScopes for ${code} table ${table}`) @@ -17,13 +17,13 @@ const getTableScopes = async (code: string, table: string) => { } // logger.info('getTableScopes params', params) -let response - try { - response = await rpc.v1.chain.get_table_by_scope(params) - } catch (error) { + let response + try { + response = await rpc.v1.chain.get_table_by_scope(params) + } catch (error) { console.log(params) - console.log(error) - } + console.log(error) + } // const response = await getTableByScope(params) @@ -39,92 +39,103 @@ export const loadCurrentTableState = async ( logger.info('Loading current table state ...') const mapper = async ({ contract, tables: tables_filter }) => { - // TODO: if eosio.token skip for now - // TODO: Reconsider to re-open for wallet balances? @gaboesquivel - if (contract === 'eosio.token') return - // logger.info('Preparing', { contract, tables_filter }) - let tables: ChainGraphTableWhitelist[] = [] - - if (tables_filter[0] === '*') { - // get all table names from mappings - const res = mappingsReader.mappings.find( - (m) => m.contract === contract, - ) - if (!res) { - throw new Error(`No mappings for contract ${contract} where found`) - } - const table_names = res.tables.map((t) => t.table) - - tables = await Promise.map( - table_names, async (table) => ({ - table, - scopes: await getTableScopes(contract, table), - }), {concurrency: 1} - ) - } else { - tables = await Promise.all( - tables_filter.map(async (filter) => { - if (filter.scopes[0] === '*') { - logger.info('Wildcard in scopes!', filter) - return { - table: filter.table, - scopes: await getTableScopes(contract, filter.table), - } - } else { - return filter + // TODO: if eosio.token skip for now + // TODO: Reconsider to re-open for wallet balances? @gaboesquivel + if (contract === 'eosio.token') return + // logger.info('Preparing', { contract, tables_filter }) + let tables: ChainGraphTableWhitelist[] = [] + + if (tables_filter[0] === '*') { + // get all table names from mappings + const res = mappingsReader.mappings.find( + (m) => m.contract === contract, + ) + if (!res) { + throw new Error(`No mappings for contract ${contract} where found`) + } + const table_names = res.tables.map((t) => t.table) + + tables = await Promise.map( + table_names, async (table) => ({ + table, + scopes: await getTableScopes(contract, table), + }), { concurrency: 1 }, + ) + } else { + tables = await Promise.all( + tables_filter.map(async (filter) => { + if (filter.scopes[0] === '*') { + logger.info('Wildcard in scopes!', filter) + return { + table: filter.table, + scopes: await getTableScopes(contract, filter.table), } - }), + } + return filter + }), + ) + } + + // logger.info(contract, JSON.stringify(tables, null, 2)) + Promise.map(tables, async ({ table, scopes }) => { + // if scopes is emtpy here it means there's no data to load + if (scopes.length === 0) return + // tables rows requests for this table + async function fn(scope) { + const { rows } = await rpc.v1.chain.get_table_rows({ + code: contract, + scope, + table, + limit: 1000000, + }) + // for each row get the right format for ChainGraph + const tableDataDeltas = rows.map((row) => + getChainGraphTableRowData( + { + primary_key: '0', // also fixed cos getChainGraphTableRowData determines the real primary_key value + present: '2', // fixed cos it always exist, it will never be a deletion + code: contract, + table, + scope, + value: row, + }, + mappingsReader, + ), ) - } + const tableData = tableDataDeltas.map((row) => { + if (row.contract === 'delphioracle') { + return { + ...row, + id: parseInt(row.data.id.toString(), 10), + primary_key: `${row.scope}-${row.data.owner}-${row.data.id}`, + scope: row.scope.normalize().replace(/\"/g, ''), + } + } + + return row + }) - // logger.info(contract, JSON.stringify(tables, null, 2)) - Promise.map(tables, async ({ table, scopes }) => { - // if scopes is emtpy here it means there's no data to load - if (scopes.length === 0) return - // tables rows requests for this table - async function fn(scope){ - const { rows } = await rpc.v1.chain.get_table_rows({ - code: contract, - scope, - table, - limit: 1000000, - }) - // for each row get the right format for ChainGraph - const tableData = rows.map((row) => - getChainGraphTableRowData( - { - primary_key: '0', // also fixed cos getChainGraphTableRowData determines the real primary_key value - present: '2', // fixed cos it always exist, it will never be a deletion - code: contract, - table, - scope, - value: row, - }, - mappingsReader, - ), + // NOTE: not sure why I'm getting duplicated rows. + const unique_row_deltas: any[] = _.uniqBy(tableData, (row) => { + return ( + row.chain + row.contract + row.table + row.scope + row.primary_key ) + }) - // NOTE: not sure why I'm getting duplicated rows. - const unique_row_deltas: any[] = _.uniqBy(tableData, (row) => { - return ( - row.chain + row.contract + row.table + row.scope + row.primary_key - ) - }) + return unique_row_deltas + } - return unique_row_deltas - } + // get all table rows acrross all scope flat them out on all_rows array + const all_rows: any[] = (await Promise.map(scopes as any, fn, { concurrency: 1 })).flat() + // upsert all table rows on the database + const all_filtered_rows = all_rows.filter(row => row.primary_key && !Boolean(row.primary_key.toString().normalize().toLowerCase().match(/(undefined|\[object object\])g/))) - // get all table rows acrross all scope flat them out on all_rows array - const all_rows: any[] = (await Promise.map(scopes as any, fn, {concurrency: 1})).flat() - // upsert all table rows on the database - const all_filtered_rows = all_rows.filter(row => row.primary_key && !Boolean(row.primary_key.toString().normalize().toLowerCase().match(/(undefined|\[object object\])g/))) + await upsertTableRows(all_filtered_rows) - await upsertTableRows(all_filtered_rows) + // logger.info(`Loaded state for ${JSON.stringify(all_rows.filter(f => f), null, 2)}!`) + }, { concurrency: 1 }) + } - // logger.info(`Loaded state for ${JSON.stringify(all_rows.filter(f => f), null, 2)}!`) - }, {concurrency: 1}) - } - - // for each table in registry get all of its data ( all scopes and rows ) and pushed it to the database - Promise.map(whitelistReader.whitelist as any, mapper, {concurrency: 1}) + // for each table in registry get all of its data ( all scopes and rows ) and pushed it to the database + Promise.map(whitelistReader.whitelist as any, mapper, { concurrency: 1 }) } diff --git a/src/indexer/real-time.ts b/src/indexer/real-time.ts index d285299..0ee5b50 100755 --- a/src/indexer/real-time.ts +++ b/src/indexer/real-time.ts @@ -1,4 +1,4 @@ -import { concat, uniqBy } from 'lodash' +import { isEqual, uniqBy } from 'lodash' import omit from 'lodash.omit' import { config } from '../config' import { @@ -12,11 +12,14 @@ import { deleteBlock } from '../database/queries' import { logger } from '../lib/logger' import { MappingsReader } from '../mappings' import { loadReader } from '../reader/ship-reader' -import { ChainGraphAction } from '../types' +import { ChainGraphAction, ChainGraphTableRow } from '../types' import { WhitelistReader } from '../whitelist' import { loadCurrentTableState } from './load-state' import { getChainGraphTableRowData } from './utils' +const DELPHIORACLE_FOREX_PRICE_UPDATE_INTERVAL = 1 * (60 * (60 * 1000)) // 1hr (60min * (60sec * 1000ms)) +const DELPHIORACLE_CRYPTO_PRICE_UPDATE_INTERVAL = 5 * (60 * 1000) // 5min * (60sec * 1000ms) + export const startRealTimeStreaming = async ( mappingsReader: MappingsReader, whitelistReader: WhitelistReader, @@ -28,6 +31,9 @@ export const startRealTimeStreaming = async ( whitelistReader, ) + let pendingCommitDelphioracleTableRows: ChainGraphTableRow[] = [] + let completedCommitDelphioracleTableRows: ChainGraphTableRow[] = [] + // we subscribe to eosio ship reader whitelisted block stream and insert the data in postgres thru prisma // this stream contains only the blocks that are relevant to the whitelisted contract tables and actions blocks$.subscribe(async (block) => { @@ -37,7 +43,7 @@ export const startRealTimeStreaming = async ( ) // insert table_rows and filtering them by unique p_key to avoid duplicates and real-time crash - const table_rows_deltas = block.table_rows + const tableRowsDeltas = block.table_rows .filter((row) => { logger.warn('> The received row =>', { row }) return ( @@ -51,48 +57,105 @@ export const startRealTimeStreaming = async ( }) .map((row) => getChainGraphTableRowData(row, mappingsReader)) - const delphioracle_rows_deltas = uniqBy( - block.table_rows - .filter((row) => { - return ( - row.code === 'delphioracle' && - row.present && - Boolean(row.primary_key) && - !Boolean( - row.primary_key.normalize().toLowerCase().includes('undefined'), - ) && - // TODO: configurable env owner filter - row.value.owner.match(/^(eosiodetroit|criptolions1|ivote4eosusa|eostitanprod|alohaeosprod|teamgreymass)$/) - ) + const delphioracleRowsDeltas: ChainGraphTableRow[] = block.table_rows + .filter((row) => { + return ( + row.code === 'delphioracle' && + row.present && + Boolean(row.primary_key) && + !Boolean( + row.primary_key.normalize().toLowerCase().includes('undefined'), + ) && + // TODO: configurable env owner filter + row.value.owner.match(/^(eosiodetroit|criptolions1|ivote4eosusa|eostitanprod|alohaeosprod|teamgreymass)$/) + ) + }) + .map((row) => { + // Regulating the ID type + // This avoid when real-time change historical with the upsert and since ID is a number for historical and a string for real-time, we turn the ID into a number + const digestedRow = getChainGraphTableRowData({ + ...row, + value: { + ...row.value, + id: parseInt(row.value.id, 10), + }, + }, mappingsReader) + + return ({ + ...digestedRow, + // mapping the id to make it unique + primary_key: `${row.scope}-${row.value?.owner}-${row.value.id}`, + scope: digestedRow.scope.normalize().replace(/\"/g, ''), }) - .map((row) => { - // Regulating the ID type - // This avoid when real-time change historical with the upsert and since ID is a number for historical and a string for real-time, we turn the ID into a number - const digestedRow = { - ...row, - value: { - ...row.value, - id: parseInt(row.value.id), - }, - } + }) + + pendingCommitDelphioracleTableRows = uniqBy(delphioracleRowsDeltas, 'primary_key') + const filteredLimitDelphioracleBPRows = [] + const upsertPendingRows = [] + + if (pendingCommitDelphioracleTableRows.length > 0 && completedCommitDelphioracleTableRows.length > 0) { + const previousCompletedCommitDelphioracleTableRows = completedCommitDelphioracleTableRows + + completedCommitDelphioracleTableRows = previousCompletedCommitDelphioracleTableRows.map((row) => { + const pendingCommitRow = pendingCommitDelphioracleTableRows.find( + (pendingRow) => { + const pendingForexRowTime = new Date(pendingRow.data.timestamp).getTime() - DELPHIORACLE_FOREX_PRICE_UPDATE_INTERVAL + const pendingCryptoRowTime = new Date(pendingRow.data.timestamp).getTime() - DELPHIORACLE_CRYPTO_PRICE_UPDATE_INTERVAL + const rowTime = new Date(row.data.timestamp).getTime() - return ({ - ...getChainGraphTableRowData(digestedRow, mappingsReader), - // mapping the id to make it unique - primary_key: `${row.scope}-${row.value?.owner}-${row.value.id}` + return pendingRow.primary_key === row.primary_key && + // * this is to avoid the real-time to override the historical data that is already in the database and has same value + ((pendingForexRowTime > rowTime && pendingRow.scope.match(/^usdt/)) || pendingCryptoRowTime > rowTime) && + pendingRow.data.value !== row.data.value }) - }), - 'primary_key', - ) - if (table_rows_deltas.length > 0 || delphioracle_rows_deltas.length > 0) { - // TODO: check if delphiorableRows should be included in the upsertTableRows... + if (pendingCommitRow) { + return pendingCommitRow + } + + return row + }).sort((a, b) => { + // * This is to sort the rows by timestamp. Latest first + const aTime = new Date(a.data.timestamp).getTime() + const bTime = new Date(b.data.timestamp).getTime() + + return bTime - aTime + }) + + if (!isEqual(completedCommitDelphioracleTableRows, previousCompletedCommitDelphioracleTableRows)) { + completedCommitDelphioracleTableRows.forEach((row) => { + const rowBPOwner = row.data.owner + + if (filteredLimitDelphioracleBPRows.filter((filteredRow) => filteredRow.data.owner === rowBPOwner)?.length < 5) { + filteredLimitDelphioracleBPRows.push(row) + } + }) + } + + pendingCommitDelphioracleTableRows = [] + } else if (pendingCommitDelphioracleTableRows.length > 0) { + completedCommitDelphioracleTableRows = completedCommitDelphioracleTableRows.concat(pendingCommitDelphioracleTableRows) + completedCommitDelphioracleTableRows.forEach((row) => { + const rowBPOwner = row.data.owner + + if (filteredLimitDelphioracleBPRows.filter((filteredRow) => filteredRow.data.owner === rowBPOwner)?.length < 5) { + filteredLimitDelphioracleBPRows.push(row) + } + }) + + pendingCommitDelphioracleTableRows = [] + } + + upsertPendingRows.push(...filteredLimitDelphioracleBPRows) + + if (tableRowsDeltas.length > 0) { + // TODO: check if delphioracleRows should be included in the upsertTableRows... // ! check if previous upsert is the same as this one - await upsertTableRows( - concat(table_rows_deltas, delphioracle_rows_deltas), - ) + upsertPendingRows.push(...tableRowsDeltas) } + await upsertTableRows(upsertPendingRows) + // delete table_rows const deleted_table_rows = block.table_rows .filter((row) => !row.present) @@ -159,6 +222,6 @@ export const startRealTimeStreaming = async ( logger.warn(`Microfork on block number : ${block_num}`) // load current state of whitelisted tables, loadCurrentTableState(mappingsReader, whitelistReader) - } + }, ) } diff --git a/src/mappings.ts b/src/mappings.ts index 97330ed..0224ee2 100755 --- a/src/mappings.ts +++ b/src/mappings.ts @@ -1,8 +1,8 @@ import { Subject } from 'rxjs' -import { logger } from './lib/logger' +import { config } from './config' import { db } from './database' +import { logger } from './lib/logger' import { ChainGraphMappings } from './types' -import { config } from './config' export interface MappingsReader { mappings$: Subject @@ -34,6 +34,6 @@ export const createMappingsReader = async (): Promise => { // resolve promise only after data has been loaded return new Promise((resolve) => - mappings$.subscribe(() => resolve({ mappings, mappings$ })) + mappings$.subscribe(() => resolve({ mappings, mappings$ })) ) } diff --git a/src/types.ts b/src/types.ts index 956db42..26b1cab 100755 --- a/src/types.ts +++ b/src/types.ts @@ -13,7 +13,7 @@ export interface ChainGraphTableRow { table: string scope: string primary_key: string - data: {} + data: Record } export interface ChainGraphChain { From f0934238557275f8b0f9bdcefd6a8304bfff384a Mon Sep 17 00:00:00 2001 From: Roberto Lucas Date: Fri, 2 Feb 2024 17:12:16 -0600 Subject: [PATCH 3/5] impr(wip): delphioracle upt limit guard --- .env-sample | 3 +- src/config/index.ts | 2 ++ src/indexer/load-state.ts | 52 +++++++++++++++++++++++----- src/indexer/real-time.ts | 73 ++++++++++++++++++++++----------------- src/lib/eosio.ts | 6 ++-- 5 files changed, 92 insertions(+), 44 deletions(-) diff --git a/.env-sample b/.env-sample index 81ea906..6d6add7 100755 --- a/.env-sample +++ b/.env-sample @@ -1,3 +1,4 @@ DATABASE_URL=postgres://user:pass@localhost:5432/chaingraph?sslmode=disable WS_URL=ws://127.0.0.1:8080 -RPC_URL=http://127.0.0.1:8888 +RPC_URL= +DELPHIORACLE_PRODUCERS=eosiodetroit,criptolions1,ivote4eosusa,eostitanprod,alohaeosprod,teamgreymass diff --git a/src/config/index.ts b/src/config/index.ts index f5b168a..f79bf98 100755 --- a/src/config/index.ts +++ b/src/config/index.ts @@ -20,12 +20,14 @@ export interface EosioReaderConfig { export interface Config { database_url: string + delphioracle_producers: string[] hyperion_url: string reader: EosioReaderConfig } export const config: Config = { database_url: env.get('DATABASE_URL').required().asString(), + delphioracle_producers: env.get('DELPHIORACLE_PRODUCERS').required().asArray(), reader: { chain: 'eos', chain_id: diff --git a/src/indexer/load-state.ts b/src/indexer/load-state.ts index adeeb9b..e299cb9 100755 --- a/src/indexer/load-state.ts +++ b/src/indexer/load-state.ts @@ -102,18 +102,52 @@ export const loadCurrentTableState = async ( mappingsReader, ), ) - const tableData = tableDataDeltas.map((row) => { - if (row.contract === 'delphioracle') { + const tableData = tableDataDeltas.filter((row) => row.contract !== 'delphioracle') + + let delphioracleProducersFilter: { + producer: string + // Up to 5 scopes per scope value + scope: string + // cannot be more than 5 + count: number + }[] = [] + + const delphiOracleRows = tableData + .filter((row) => row.contract === 'delphioracle') + .sort((a, b) => { + const aTime = new Date(a.data.timestamp).getTime() + const bTime = new Date(b.data.timestamp).getTime() + + return bTime - aTime + }) + .filter((row) => { + const filteredRowData = delphioracleProducersFilter.filter((producer) => producer.producer === row.data.owner && producer.scope === row.scope) + + if (filteredRowData.length === 0) { + delphioracleProducersFilter.push({ + producer: row.data.owner as string, + scope: row.scope, + count: 1, + }) + } else if (filteredRowData.length > 0) { + const producer = delphioracleProducersFilter.find((producer) => producer.producer === row.data.owner) + + if (producer.count < 5) { + producer.count += 1 + } + } + + return filteredRowData[0].count <= 5 + }) + .map((row) => { + const normalizedScope = row.scope.normalize().replace(/\"/g, '') return { ...row, id: parseInt(row.data.id.toString(), 10), - primary_key: `${row.scope}-${row.data.owner}-${row.data.id}`, - scope: row.scope.normalize().replace(/\"/g, ''), + primary_key: `${normalizedScope}-${row.data.owner}-${row.data.id}`, + scope: normalizedScope, } - } - - return row - }) + }) // NOTE: not sure why I'm getting duplicated rows. const unique_row_deltas: any[] = _.uniqBy(tableData, (row) => { @@ -122,7 +156,7 @@ export const loadCurrentTableState = async ( ) }) - return unique_row_deltas + return unique_row_deltas.concat(delphiOracleRows) } // get all table rows acrross all scope flat them out on all_rows array diff --git a/src/indexer/real-time.ts b/src/indexer/real-time.ts index 0ee5b50..cb17ad6 100755 --- a/src/indexer/real-time.ts +++ b/src/indexer/real-time.ts @@ -19,6 +19,7 @@ import { getChainGraphTableRowData } from './utils' const DELPHIORACLE_FOREX_PRICE_UPDATE_INTERVAL = 1 * (60 * (60 * 1000)) // 1hr (60min * (60sec * 1000ms)) const DELPHIORACLE_CRYPTO_PRICE_UPDATE_INTERVAL = 5 * (60 * 1000) // 5min * (60sec * 1000ms) +const delphioracleProducers = config.delphioracle_producers export const startRealTimeStreaming = async ( mappingsReader: MappingsReader, @@ -57,36 +58,64 @@ export const startRealTimeStreaming = async ( }) .map((row) => getChainGraphTableRowData(row, mappingsReader)) + let delphioracleProducersFilter: { + producer: string + // Up to 5 scopes per scope value + scope: string + // cannot be more than 5 + count: number + }[] = [] + + // sort first by timestamp and then filter by delphioracle producers const delphioracleRowsDeltas: ChainGraphTableRow[] = block.table_rows + .sort((a, b) => { + const aTime = new Date(a.value.timestamp).getTime() + const bTime = new Date(b.value.timestamp).getTime() + + return bTime - aTime + }) .filter((row) => { - return ( - row.code === 'delphioracle' && + const isDelphioracle = row.code === 'delphioracle' && row.present && Boolean(row.primary_key) && !Boolean( row.primary_key.normalize().toLowerCase().includes('undefined'), ) && - // TODO: configurable env owner filter - row.value.owner.match(/^(eosiodetroit|criptolions1|ivote4eosusa|eostitanprod|alohaeosprod|teamgreymass)$/) - ) + delphioracleProducers.some((producer) => row.value.owner === producer) + const filteredRowData = delphioracleProducersFilter.filter((producer) => producer.producer === row.value.owner && producer.scope === row.scope) + + if (isDelphioracle && filteredRowData.length === 0) { + delphioracleProducersFilter.push({ + producer: row.value.owner, + scope: row.scope, + count: 1, + }) + } else if (isDelphioracle && filteredRowData.length > 0) { + const producer = delphioracleProducersFilter.find((producer) => producer.producer === row.value.owner) + + if (producer.count < 5) { + producer.count += 1 + } + } + + return isDelphioracle && filteredRowData[0].count <= 5 }) .map((row) => { // Regulating the ID type // This avoid when real-time change historical with the upsert and since ID is a number for historical and a string for real-time, we turn the ID into a number + const normalizedScope = row.scope.normalize().replace(/\"/g, '') const digestedRow = getChainGraphTableRowData({ ...row, value: { ...row.value, id: parseInt(row.value.id, 10), }, + // mapping the id to make it unique + primary_key: `${normalizedScope}-${row.value?.owner}-${row.value.id}`, + scope: normalizedScope, }, mappingsReader) - return ({ - ...digestedRow, - // mapping the id to make it unique - primary_key: `${row.scope}-${row.value?.owner}-${row.value.id}`, - scope: digestedRow.scope.normalize().replace(/\"/g, ''), - }) + return digestedRow }) pendingCommitDelphioracleTableRows = uniqBy(delphioracleRowsDeltas, 'primary_key') @@ -114,34 +143,16 @@ export const startRealTimeStreaming = async ( } return row - }).sort((a, b) => { - // * This is to sort the rows by timestamp. Latest first - const aTime = new Date(a.data.timestamp).getTime() - const bTime = new Date(b.data.timestamp).getTime() - - return bTime - aTime }) if (!isEqual(completedCommitDelphioracleTableRows, previousCompletedCommitDelphioracleTableRows)) { - completedCommitDelphioracleTableRows.forEach((row) => { - const rowBPOwner = row.data.owner - - if (filteredLimitDelphioracleBPRows.filter((filteredRow) => filteredRow.data.owner === rowBPOwner)?.length < 5) { - filteredLimitDelphioracleBPRows.push(row) - } - }) + filteredLimitDelphioracleBPRows.push(completedCommitDelphioracleTableRows) } pendingCommitDelphioracleTableRows = [] } else if (pendingCommitDelphioracleTableRows.length > 0) { completedCommitDelphioracleTableRows = completedCommitDelphioracleTableRows.concat(pendingCommitDelphioracleTableRows) - completedCommitDelphioracleTableRows.forEach((row) => { - const rowBPOwner = row.data.owner - - if (filteredLimitDelphioracleBPRows.filter((filteredRow) => filteredRow.data.owner === rowBPOwner)?.length < 5) { - filteredLimitDelphioracleBPRows.push(row) - } - }) + filteredLimitDelphioracleBPRows.push(completedCommitDelphioracleTableRows) pendingCommitDelphioracleTableRows = [] } diff --git a/src/lib/eosio.ts b/src/lib/eosio.ts index 75ba12d..96235a3 100755 --- a/src/lib/eosio.ts +++ b/src/lib/eosio.ts @@ -1,10 +1,10 @@ -import fetch from 'node-fetch' import { APIClient, FetchProvider } from "@wharfkit/antelope" +import fetch from 'node-fetch' import { config } from '../config' const provider = new FetchProvider(config.reader.rpc_url, { fetch, }) -export const rpc = new APIClient({provider}) +export const rpc = new APIClient({ provider }) export const getInfo = async () => fetch(`${config.reader.rpc_url}/v1/chain/get_info`).then((res: any) => @@ -12,4 +12,4 @@ export const getInfo = async () => ) export const getNationInfo = () => - fetch('http://api.eosn.io/v1/chain/get_info').then((res: any) => res.json()) \ No newline at end of file + fetch('https://eos.api.eosnation.io/v1/chain/get_info').then((res: any) => res.json()) \ No newline at end of file From 741538159bb6aacee3215b58c206f1aa3c7bfe5e Mon Sep 17 00:00:00 2001 From: Roberto Lucas Date: Fri, 7 Jun 2024 09:36:57 -0600 Subject: [PATCH 4/5] fix: special char on table row scopes --- package.json | 4 ++-- src/indexer/load-history.ts | 3 +-- src/indexer/load-state.ts | 4 +--- src/indexer/real-time.ts | 4 +--- src/indexer/utils.ts | 7 ++++--- src/reader/reader-helper.ts | 10 +++++++++- yarn.lock | 16 ++++++++-------- 7 files changed, 26 insertions(+), 22 deletions(-) diff --git a/package.json b/package.json index 842f768..9368813 100755 --- a/package.json +++ b/package.json @@ -47,8 +47,8 @@ "@blockmatic/eosio-ship-reader": "^0.5.0", "@eoscafe/hyperion": "^3.3.2", "@eosrio/node-abieos": "^2.1.1", - "@greymass/eosio": "^0.6.4", - "@wharfkit/antelope": "^0.10.0", + "@greymass/eosio": "^0.7.0", + "@wharfkit/antelope": "^1.0.7", "bluebird": "^3.7.2", "eos-common": "^0.8.1", "isomorphic-fetch": "^3.0.0", diff --git a/src/indexer/load-history.ts b/src/indexer/load-history.ts index 78cb355..9ca9d6b 100755 --- a/src/indexer/load-history.ts +++ b/src/indexer/load-history.ts @@ -125,8 +125,7 @@ export const loadActionHistory = async (account: string, filter: string) => { morePages = false return false } catch (error) { - logger.error(error) - logger.info('hyperion request failed') + logger.error('Hyperion request failed: ', error) return process.exit(0) // keep trying } } diff --git a/src/indexer/load-state.ts b/src/indexer/load-state.ts index e299cb9..82c4544 100755 --- a/src/indexer/load-state.ts +++ b/src/indexer/load-state.ts @@ -140,12 +140,10 @@ export const loadCurrentTableState = async ( return filteredRowData[0].count <= 5 }) .map((row) => { - const normalizedScope = row.scope.normalize().replace(/\"/g, '') return { ...row, id: parseInt(row.data.id.toString(), 10), - primary_key: `${normalizedScope}-${row.data.owner}-${row.data.id}`, - scope: normalizedScope, + primary_key: `${row.scope}-${row.data.owner}-${row.data.id}`, } }) diff --git a/src/indexer/real-time.ts b/src/indexer/real-time.ts index cb17ad6..ef227ba 100755 --- a/src/indexer/real-time.ts +++ b/src/indexer/real-time.ts @@ -103,7 +103,6 @@ export const startRealTimeStreaming = async ( .map((row) => { // Regulating the ID type // This avoid when real-time change historical with the upsert and since ID is a number for historical and a string for real-time, we turn the ID into a number - const normalizedScope = row.scope.normalize().replace(/\"/g, '') const digestedRow = getChainGraphTableRowData({ ...row, value: { @@ -111,8 +110,7 @@ export const startRealTimeStreaming = async ( id: parseInt(row.value.id, 10), }, // mapping the id to make it unique - primary_key: `${normalizedScope}-${row.value?.owner}-${row.value.id}`, - scope: normalizedScope, + primary_key: `${row.scope}-${row.value?.owner}-${row.value.id}`, }, mappingsReader) return digestedRow diff --git a/src/indexer/utils.ts b/src/indexer/utils.ts index 152edff..10cdb54 100755 --- a/src/indexer/utils.ts +++ b/src/indexer/utils.ts @@ -36,9 +36,8 @@ export const getPrimaryKey = ( case 'extended_asset_symbol': primary_key = row.table === 'stablev2' - ? `balance_${ - row.value[tableMappings.table_key].quantity.split(' ')[1] - }` + ? `balance_${row.value[tableMappings.table_key].quantity.split(' ')[1] + }` : row.value[tableMappings.table_key].quantity.split(' ')[1] break default: @@ -58,8 +57,10 @@ export const getChainGraphTableRowData = ( row: EosioReaderTableRow, mappingsReader: MappingsReader, ): ChainGraphTableRow => { + const normalizedScope = row.scope.toString().normalize().replace(/"/g, '') return { ...omit(row, 'value', 'code', 'present', 'primary_key'), + scope: normalizedScope, primary_key: getPrimaryKey(row, mappingsReader), data: row.value, contract: row.code, diff --git a/src/reader/reader-helper.ts b/src/reader/reader-helper.ts index a8841dc..fb9df2c 100755 --- a/src/reader/reader-helper.ts +++ b/src/reader/reader-helper.ts @@ -5,6 +5,7 @@ import { ShipTableDeltaName, } from '@blockmatic/eosio-ship-reader' import { rpc } from '../lib/eosio' +import { logger } from '../lib/logger' import { MappingsReader } from '../mappings' import { ChainGraphActionWhitelist, @@ -86,7 +87,14 @@ export const createShipReaderDataHelper = async ( // TODO: load abis to db when contracts are listed, and keep in sync with then chain, listed to set abi actions. abis = new Map() const contracts = whitelistReader.whitelist.map(({ contract }) => contract) - const abisArr = await Promise.all(contracts.map((c) => rpc.v1.chain.get_abi(c))) + const abisArr = await Promise.all(contracts.map((c) => { + logger.info('Getting abi for contract -> ', c) + try { + return rpc.v1.chain.get_abi(c) + } catch (error) { + console.error('Error getting abi', error) + } + })) abisArr.forEach(({ account_name, abi }) => abis.set(account_name, abi as any)) // return latest abis in memory diff --git a/yarn.lock b/yarn.lock index 008dff9..a9ba983 100755 --- a/yarn.lock +++ b/yarn.lock @@ -140,10 +140,10 @@ minimatch "^3.0.4" strip-json-comments "^3.1.1" -"@greymass/eosio@^0.6.4": - version "0.6.4" - resolved "https://registry.yarnpkg.com/@greymass/eosio/-/eosio-0.6.4.tgz#9baf8aadacad69d61be20a53e235a9b4cbd97ffa" - integrity sha512-Wxr46tgh4J5Nm5tdTx5btJ/MZu36bZfxxCXfWRDAMfOn6FFFwgcZpY/sEv5eH9L2rCvxVrZtd2R8vPGsF/amQQ== +"@greymass/eosio@^0.7.0": + version "0.7.0" + resolved "https://registry.yarnpkg.com/@greymass/eosio/-/eosio-0.7.0.tgz#f68fa78112e205ac7e9baab7417c4e5da217f12d" + integrity sha512-eEFwzHPUO+WIIoEMHzaYPXLOklEKIXkjJFQ6iekfYzZ02eupW9RJdk8vr31u36TESVxKnmZ5Xnux613t+FNq2A== dependencies: bn.js "^4.11.9" brorand "^1.1.0" @@ -341,10 +341,10 @@ "@typescript-eslint/types" "4.29.2" eslint-visitor-keys "^2.0.0" -"@wharfkit/antelope@^0.10.0": - version "0.10.0" - resolved "https://registry.yarnpkg.com/@wharfkit/antelope/-/antelope-0.10.0.tgz#73ae86ea9c60e04dbe57f24249524672066d2fd3" - integrity sha512-yga4lbha7XChWikBjgCfjLvQtANOi1NtdECRZ8Z85juMwEXCZ+v8yjMSOcrnKzUzS3/WXxYRkSnA97UE+CXOOQ== +"@wharfkit/antelope@^1.0.7": + version "1.0.7" + resolved "https://registry.yarnpkg.com/@wharfkit/antelope/-/antelope-1.0.7.tgz#5ca010db963e061b2e8c47c14e55f86817718c2e" + integrity sha512-C4DRC4U+qC2XQKUgwznKX6i8xdKo+ZqxkZURrPTtY3v53IvKwj+3amTQQSpuECechS5x6ItcT+/fOFBNlQ2Juw== dependencies: bn.js "^4.11.9" brorand "^1.1.0" From 82dda3fe526923b571c1386676537e9eea4113fe Mon Sep 17 00:00:00 2001 From: Roberto Lucas Date: Thu, 20 Jun 2024 14:58:59 -0600 Subject: [PATCH 5/5] impr: db upsert/del param length verif + shipReader process.exit on sub error --- src/database/index.ts | 47 ++++++++++++++++++++++++---------------- src/indexer/real-time.ts | 6 ++++- 2 files changed, 33 insertions(+), 20 deletions(-) diff --git a/src/database/index.ts b/src/database/index.ts index 4aa8e90..582e50b 100755 --- a/src/database/index.ts +++ b/src/database/index.ts @@ -1,4 +1,9 @@ -import { logger } from '../lib/logger' +import { + ChainGraphAction, + ChainGraphBlock, + ChainGraphTableRow, + ChainGraphTransaction, +} from '../types' import { db } from './db' import { createDeleteTableRowsQuery, @@ -7,35 +12,39 @@ import { createUpsertTableRowsQuery, createUpsertTransactionsQuery, } from './queries' -import { - ChainGraphAction, - ChainGraphBlock, - ChainGraphTableRow, - ChainGraphTransaction, -} from '../types' export * from './db' const runQuery = async (query: string) => { - logger.info(query) + // logger.info(query) return db.none(query) } -export const upsertBlocks = async (blocks: ChainGraphBlock[]) => - runQuery(createUpsertBlocksQuery(blocks)) +export const upsertBlocks = async (blocks: ChainGraphBlock[]) => { + if (!blocks.length) return Promise.resolve() + return runQuery(createUpsertBlocksQuery(blocks)) +} -export const upsertTableRows = async (tableRows: ChainGraphTableRow[]) =>{ - console.log('upsertTableRows', tableRows) - runQuery(createUpsertTableRowsQuery(tableRows))} +export const upsertTableRows = async (tableRows: ChainGraphTableRow[]) => { + // console.log('upsertTableRows', tableRows) + if (!tableRows.length) return Promise.resolve() + return runQuery(createUpsertTableRowsQuery(tableRows)) +} -export const deleteTableRows = async (tableRows: ChainGraphTableRow[]) => - runQuery(createDeleteTableRowsQuery(tableRows)) +export const deleteTableRows = async (tableRows: ChainGraphTableRow[]) => { + if (!tableRows.length) return Promise.resolve() + return runQuery(createDeleteTableRowsQuery(tableRows)) +} export const upsertTransactions = async ( transactions: ChainGraphTransaction[], -) => runQuery(createUpsertTransactionsQuery(transactions)) +) => { + if (!transactions.length) return Promise.resolve() + return runQuery(createUpsertTransactionsQuery(transactions)) +} -export const upsertActions = async (actions: ChainGraphAction[]) =>{ - console.log('upsertActions', actions) - runQuery(createUpsertActionsQuery(actions)) +export const upsertActions = async (actions: ChainGraphAction[]) => { + // console.log('upsertActions', actions) + if (!actions.length) return Promise.resolve() + return runQuery(createUpsertActionsQuery(actions)) } \ No newline at end of file diff --git a/src/indexer/real-time.ts b/src/indexer/real-time.ts index ef227ba..96e085d 100755 --- a/src/indexer/real-time.ts +++ b/src/indexer/real-time.ts @@ -225,7 +225,11 @@ export const startRealTimeStreaming = async ( close$.subscribe(() => logger.info('connection closed')) // log$.subscribe(({ message }: any) => logger.info('ShipReader:', message)) - errors$.subscribe((error) => logger.error('ShipReader:', error)) + errors$.subscribe((error) => { + logger.error('ShipReader Connection Error:', { error }) + // send message to discord + process.exit(1) + }) forks$.subscribe((block_num) => { logger.warn(`Microfork on block number : ${block_num}`)