diff --git a/api/src/osmosis/OsmosisHistoricalProcessor.ts b/api/src/osmosis/OsmosisHistoricalProcessor.ts index b0a6239..6198ca2 100644 --- a/api/src/osmosis/OsmosisHistoricalProcessor.ts +++ b/api/src/osmosis/OsmosisHistoricalProcessor.ts @@ -7,25 +7,34 @@ import { ConfigService } from "@nestjs/config"; import { ClickhouseService } from "../clickhouse/clickhouse.service.js"; import { NumiaConfig } from "../config/config.interface.js"; -interface MintBurnEvent { +interface PoolSwapEvent { timestamp: number; - type: string; + sender: string; + side: string; amount: string; - address: string; + txhash: string; } -interface TransferEvent { +interface MintEvent { timestamp: number; - from: string; - to: string; amount: string; + mint_to_address: string; + txhash: string; } -interface PoolSwapEvent { +interface BurnEvent { timestamp: number; - sender: string; - side: string; amount: string; + burn_from_address: string; + txhash: string; +} + +interface TransferEvent { + timestamp: number; + from_address: string; + to_address: string; + amount: string; + txhash: string; } @Processor("osmosis-historical") @@ -38,6 +47,8 @@ export class OsmosisHistoricalProcessor extends WorkerHost { private readonly bigQueryClient: BigQuery; + private readonly TOKEN_DENOM = 'factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA'; + constructor( config: ConfigService, @@ -76,8 +87,6 @@ export class OsmosisHistoricalProcessor extends WorkerHost { public async triggerFetchHistoricalData() { await this.osmosisQueue.add("fetchHistoricalData", undefined); - // await this.fetchMintBurnEvents(); - // await this.fetchPoolSwapEvents(); } private async fetchHistoricalData() { @@ -110,14 +119,26 @@ export class OsmosisHistoricalProcessor extends WorkerHost { type === "mint" ? event.messages[0].mint_to_address : event.messages[0].burn_from_address; - - await this.insertMintBurnEvent({ timestamp, type, amount, address }); + const txhash = event.hash; if (type === "mint") { + const mintEvent: MintEvent = { + timestamp, + amount, + mint_to_address: address, + txhash + }; + await this.insertMintEvent(mintEvent); // Not a proper DFS, just one hop away from minter - await this.fetchTransfersForAddress( - event.messages[0].mint_to_address, - ); + await this.fetchTransfersForAddress(event.messages[0].mint_to_address); + } else { + const burnEvent: BurnEvent = { + timestamp, + amount, + burn_from_address: address, + txhash + }; + await this.insertBurnEvent(burnEvent); } } } @@ -141,14 +162,14 @@ export class OsmosisHistoricalProcessor extends WorkerHost { if (event.messageTypes.includes("/cosmos.bank.v1beta1.MsgSend")) { for (const message of event.messages) { if ( - message.amount[0].denom === - "factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA" + message.amount[0].denom === this.TOKEN_DENOM ) { - const transferEvent = { + const transferEvent: TransferEvent = { timestamp: new Date(event.blockTimestamp).getTime(), - from: message.from_address, - to: message.to_address, + from_address: message.from_address, + to_address: message.to_address, amount: message.amount.amount, + txhash: event.hash, }; await this.insertTransferEvent(transferEvent); } @@ -176,49 +197,60 @@ export class OsmosisHistoricalProcessor extends WorkerHost { const [rows] = await this.bigQueryClient.query(query); for (const row of rows) { - const side = - row.denom_in === - "factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA" - ? "buy" - : "sell"; - const swapEvent = { + const side = row.denom_in === this.TOKEN_DENOM ? "buy" : "sell"; + const swapEvent: PoolSwapEvent = { timestamp: new Date(row.ingestion_timestamp.value).getTime(), sender: row.sender, side, amount: side === "buy" ? row.parsed_amount_in : row.parsed_amount_out, + txhash: row.tx_id, }; await this.insertSwapEvent(swapEvent); } } - private async insertMintBurnEvent(event: MintBurnEvent) { - // console.log('Mint/Burn Event:', event); + private async insertMintEvent(event: MintEvent) { + console.log('Mint Event:', JSON.stringify(event, null, 2)); + await this.clickhouseService.client.insert({ + table: "mint_events", + values: { + timestamp: event.timestamp, + amount: event.amount, + mint_to_address: event.mint_to_address, + txhash: event.txhash, + }, + }); + } + + private async insertBurnEvent(event: BurnEvent) { + console.log('Burn Event:', JSON.stringify(event, null, 2)); await this.clickhouseService.client.insert({ - table: "mint_burn_events", + table: "burn_events", values: { timestamp: event.timestamp, - type: event.type, amount: event.amount, - address: event.address, + burn_from_address: event.burn_from_address, + txhash: event.txhash, }, }); } private async insertTransferEvent(event: TransferEvent) { - // console.log('Transfer Event:', event); + console.log('Transfer Event:', JSON.stringify(event, null, 2)); await this.clickhouseService.client.insert({ table: "transfer_events", values: { timestamp: event.timestamp, - from: event.from, - to: event.to, + from_address: event.from_address, + to_address: event.to_address, amount: event.amount, + txhash: event.txhash, }, }); } private async insertSwapEvent(event: PoolSwapEvent) { - // console.log('Pool Swap Event:', event); + console.log('Pool Swap Event:', JSON.stringify(event, null, 2)); await this.clickhouseService.client.insert({ table: "pool_swap_events", values: { @@ -226,6 +258,7 @@ export class OsmosisHistoricalProcessor extends WorkerHost { sender: event.sender, side: event.side, amount: event.amount, + txhash: event.txhash, }, }); }