Skip to content

Commit

Permalink
Matched models between live and history data
Browse files Browse the repository at this point in the history
  • Loading branch information
hemulin committed May 30, 2024
1 parent 7b452fa commit fc07273
Showing 1 changed file with 69 additions and 36 deletions.
105 changes: 69 additions & 36 deletions api/src/osmosis/OsmosisHistoricalProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,34 @@ import { ConfigService } from "@nestjs/config";
import { ClickhouseService } from "../clickhouse/clickhouse.service.js";
import { NumiaConfig } from "../config/config.interface.js";

interface MintBurnEvent {
interface PoolSwapEvent {
timestamp: number;
type: string;
sender: string;
side: string;
amount: string;
address: string;
txhash: string;
}

interface TransferEvent {
interface MintEvent {
timestamp: number;
from: string;
to: string;
amount: string;
mint_to_address: string;
txhash: string;
}

interface PoolSwapEvent {
interface BurnEvent {
timestamp: number;
sender: string;
side: string;
amount: string;
burn_from_address: string;
txhash: string;
}

interface TransferEvent {
timestamp: number;
from_address: string;
to_address: string;
amount: string;
txhash: string;
}

@Processor("osmosis-historical")
Expand All @@ -38,6 +47,8 @@ export class OsmosisHistoricalProcessor extends WorkerHost {

private readonly bigQueryClient: BigQuery;

private readonly TOKEN_DENOM = 'factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA';

constructor(
config: ConfigService,

Expand Down Expand Up @@ -76,8 +87,6 @@ export class OsmosisHistoricalProcessor extends WorkerHost {

public async triggerFetchHistoricalData() {
await this.osmosisQueue.add("fetchHistoricalData", undefined);
// await this.fetchMintBurnEvents();
// await this.fetchPoolSwapEvents();
}

private async fetchHistoricalData() {
Expand Down Expand Up @@ -110,14 +119,26 @@ export class OsmosisHistoricalProcessor extends WorkerHost {
type === "mint"
? event.messages[0].mint_to_address
: event.messages[0].burn_from_address;

await this.insertMintBurnEvent({ timestamp, type, amount, address });
const txhash = event.hash;

if (type === "mint") {
const mintEvent: MintEvent = {
timestamp,
amount,
mint_to_address: address,
txhash
};
await this.insertMintEvent(mintEvent);
// Not a proper DFS, just one hop away from minter
await this.fetchTransfersForAddress(
event.messages[0].mint_to_address,
);
await this.fetchTransfersForAddress(event.messages[0].mint_to_address);
} else {
const burnEvent: BurnEvent = {
timestamp,
amount,
burn_from_address: address,
txhash
};
await this.insertBurnEvent(burnEvent);
}
}
}
Expand All @@ -141,14 +162,14 @@ export class OsmosisHistoricalProcessor extends WorkerHost {
if (event.messageTypes.includes("/cosmos.bank.v1beta1.MsgSend")) {
for (const message of event.messages) {
if (
message.amount[0].denom ===
"factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA"
message.amount[0].denom === this.TOKEN_DENOM
) {
const transferEvent = {
const transferEvent: TransferEvent = {
timestamp: new Date(event.blockTimestamp).getTime(),
from: message.from_address,
to: message.to_address,
from_address: message.from_address,
to_address: message.to_address,
amount: message.amount.amount,
txhash: event.hash,
};
await this.insertTransferEvent(transferEvent);
}
Expand Down Expand Up @@ -176,56 +197,68 @@ export class OsmosisHistoricalProcessor extends WorkerHost {
const [rows] = await this.bigQueryClient.query(query);

for (const row of rows) {
const side =
row.denom_in ===
"factory/osmo19hdqma2mj0vnmgcxag6ytswjnr8a3y07q7e70p/wLIBRA"
? "buy"
: "sell";
const swapEvent = {
const side = row.denom_in === this.TOKEN_DENOM ? "buy" : "sell";
const swapEvent: PoolSwapEvent = {
timestamp: new Date(row.ingestion_timestamp.value).getTime(),
sender: row.sender,
side,
amount: side === "buy" ? row.parsed_amount_in : row.parsed_amount_out,
txhash: row.tx_id,
};
await this.insertSwapEvent(swapEvent);
}
}

private async insertMintBurnEvent(event: MintBurnEvent) {
// console.log('Mint/Burn Event:', event);
private async insertMintEvent(event: MintEvent) {
console.log('Mint Event:', JSON.stringify(event, null, 2));
await this.clickhouseService.client.insert({
table: "mint_events",
values: {
timestamp: event.timestamp,
amount: event.amount,
mint_to_address: event.mint_to_address,
txhash: event.txhash,
},
});
}

private async insertBurnEvent(event: BurnEvent) {
console.log('Burn Event:', JSON.stringify(event, null, 2));
await this.clickhouseService.client.insert({
table: "mint_burn_events",
table: "burn_events",
values: {
timestamp: event.timestamp,
type: event.type,
amount: event.amount,
address: event.address,
burn_from_address: event.burn_from_address,
txhash: event.txhash,
},
});
}

private async insertTransferEvent(event: TransferEvent) {
// console.log('Transfer Event:', event);
console.log('Transfer Event:', JSON.stringify(event, null, 2));
await this.clickhouseService.client.insert({
table: "transfer_events",
values: {
timestamp: event.timestamp,
from: event.from,
to: event.to,
from_address: event.from_address,
to_address: event.to_address,
amount: event.amount,
txhash: event.txhash,
},
});
}

private async insertSwapEvent(event: PoolSwapEvent) {
// console.log('Pool Swap Event:', event);
console.log('Pool Swap Event:', JSON.stringify(event, null, 2));
await this.clickhouseService.client.insert({
table: "pool_swap_events",
values: {
timestamp: event.timestamp,
sender: event.sender,
side: event.side,
amount: event.amount,
txhash: event.txhash,
},
});
}
Expand Down

0 comments on commit fc07273

Please sign in to comment.