Skip to content

Commit

Permalink
Merge pull request #6 from oraichain/feat/oraidex-info
Browse files Browse the repository at this point in the history
Feat/oraidex info
  • Loading branch information
ducphamle2 authored Jul 24, 2023
2 parents 61aa552 + c5f73e4 commit 266dc6c
Show file tree
Hide file tree
Showing 9 changed files with 571 additions and 156 deletions.
16 changes: 9 additions & 7 deletions packages/oraidex-sync/src/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
WithdrawLiquidityOperationData
} from "./types";
import fs, { rename } from "fs";
import { renameKey, replaceAllNonAlphaBetChar } from "./helper";
import { isoToTimestampNumber, renameKey, replaceAllNonAlphaBetChar, toObject } from "./helper";

export class DuckDb {
protected constructor(public readonly conn: Connection, private db: Database) {}
Expand All @@ -32,10 +32,8 @@ export class DuckDb {
if (data.length === 0) return;
const tableFile = fileName ?? `${tableName}.json`;
// the file written out is temporary only. Will be deleted after insertion
await fs.promises.writeFile(tableFile, JSON.stringify(data));
const query = replace
? `INSERT OR REPLACE INTO ${tableName} SELECT * FROM read_json_auto(?)`
: `INSERT INTO ${tableName} SELECT * FROM read_json_auto(?)`;
await fs.promises.writeFile(tableFile, JSON.stringify(toObject(data)));
const query = `INSERT OR REPLACE INTO ${tableName} SELECT * FROM read_json_auto(?)`;
await this.conn.run(query, tableFile);
await fs.promises.unlink(tableFile);
}
Expand All @@ -62,6 +60,7 @@ export class DuckDb {
commissionAmount UBIGINT,
offerAmount UBIGINT,
offerDenom VARCHAR,
uniqueKey VARCHAR UNIQUE,
returnAmount UBIGINT,
spreadAmount UBIGINT,
taxAmount UBIGINT,
Expand Down Expand Up @@ -89,6 +88,7 @@ export class DuckDb {
firstTokenDenom VARCHAR,
firstTokenLp UBIGINT,
opType LPOPTYPE,
uniqueKey VARCHAR UNIQUE,
secondTokenAmount UBIGINT,
secondTokenDenom VARCHAR,
secondTokenLp UBIGINT,
Expand Down Expand Up @@ -228,7 +228,8 @@ export class DuckDb {

async queryLatestTimestampSwapOps(): Promise<number> {
const latestTimestamp = await this.conn.all("SELECT timestamp from swap_ops_data order by timestamp desc limit 1");
if (latestTimestamp.length === 0 || !latestTimestamp[0].timestamp) return new Date().getTime() / 1000; // fallback case
if (latestTimestamp.length === 0 || !latestTimestamp[0].timestamp)
return isoToTimestampNumber(new Date().toISOString()); // fallback case
return latestTimestamp[0].timestamp as number;
}

Expand Down Expand Up @@ -272,7 +273,7 @@ export class DuckDb {
);
// reset time to iso format after dividing in the query
result.forEach((item) => {
item.time = parseInt((item.time * tf).toFixed(0));
item.time = item.time * tf;
});
return result as TotalLiquidity[];
}
Expand All @@ -283,6 +284,7 @@ export class DuckDb {
denom VARCHAR,
timestamp UINTEGER,
txheight UINTEGER,
price DOUBLE,
volume UBIGINT)
`
);
Expand Down
124 changes: 120 additions & 4 deletions packages/oraidex-sync/src/helper.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,24 @@
import { Asset, AssetInfo, OraiswapRouterReadOnlyInterface, SwapOperation } from "@oraichain/oraidex-contracts-sdk";
import { AssetInfo, SwapOperation } from "@oraichain/oraidex-contracts-sdk";
import { pairs } from "./pairs";
import { ORAI, atomic, tenAmountInDecimalSix, truncDecimals, usdcCw20Address, usdtCw20Address } from "./constants";
import { PairInfoData, PairMapping, PrefixSumHandlingData } from "./types";
import { ORAI, atomic, tenAmountInDecimalSix, truncDecimals, usdtCw20Address } from "./constants";
import {
OraiDexType,
PairInfoData,
PrefixSumHandlingData,
ProvideLiquidityOperationData,
SwapOperationData,
WithdrawLiquidityOperationData
} from "./types";
import { PoolResponse } from "@oraichain/oraidex-contracts-sdk/build/OraiswapPair.types";

export function toObject(data: any[]) {
return JSON.parse(
JSON.stringify(
data,
(key, value) => (typeof value === "bigint" ? value.toString() : value) // return everything else unchanged
)
);
}

export const validateNumber = (amount: number | string): number => {
if (typeof amount === "string") return validateNumber(Number(amount));
Expand Down Expand Up @@ -45,8 +62,18 @@ export const toDisplay = (amount: string | bigint, sourceDecimals = 6, desDecima
return Number(returnAmount) / (displayDecimals === truncDecimals ? atomic : 10 ** displayDecimals);
};

export function concatDataToUniqueKey(data: {
firstDenom: string;
secondDenom: string;
firstAmount: number;
secondAmount: number;
timestamp: number;
}): string {
return `${data.timestamp}-${data.firstDenom}-${data.firstAmount}-${data.secondDenom}-${data.secondAmount}`;
}

export function isoToTimestampNumber(time: string) {
return new Date(time).getTime() / 1000;
return Math.floor(new Date(time).getTime() / 1000);
}

export function renameKey(object: Object, oldKey: string, newKey: string): any {
Expand Down Expand Up @@ -145,6 +172,95 @@ function calculatePriceByPool(offerPool: bigint, askPool: bigint, commissionRate
return (askPool - (offerPool * askPool) / (offerPool + BigInt(tenAmountInDecimalSix))) * BigInt(1 - commissionRate);
}

export function groupByTime(data: any[], timeframe?: number): any[] {
let ops: { [k: number]: any[] } = {};
for (const op of data) {
const roundedTime = roundTime(op.timestamp * 1000, timeframe || 60);
if (!ops[roundedTime]) {
ops[roundedTime] = [];
}
const newData: OraiDexType = {
...op,
timestamp: roundedTime
};
ops[roundedTime].push(newData);
}

return Object.values(ops).flat();
}

/**
* round time when dividing & getting the integral part of the value
* @param timeIn time to be divided in ms
* @param timeframe the timeframe to split the time chunk. in seconds
* @returns new time in seconds
*/
export function roundTime(timeIn: number, timeframe: number): number {
const roundTo = timeframe * 1000;

const dateOut = (Math.floor(timeIn / roundTo) * roundTo) / 1000; // getTime() returns data in ms
return dateOut;
}

export function isAssetInfoPairReverse(assetInfos: AssetInfo[]): boolean {
if (pairs.find((pair) => JSON.stringify(pair.asset_infos) === JSON.stringify(assetInfos.reverse()))) return true;
return false;
}

/**
* This function will accumulate the lp amount and modify the parameter
* @param data - lp ops. This param will be mutated.
* @param poolInfos - pool info data for initial lp accumulation
*/
export function collectAccumulateLpData(
data: ProvideLiquidityOperationData[] | WithdrawLiquidityOperationData[],
poolInfos: PoolResponse[]
) {
let accumulateData = {};
for (let op of data) {
const pool = poolInfos.find(
(info) =>
info.assets.some((assetInfo) => parseAssetInfoOnlyDenom(assetInfo.info) === op.firstTokenDenom) &&
info.assets.some((assetInfo) => parseAssetInfoOnlyDenom(assetInfo.info) === op.secondTokenDenom)
);
if (!pool) continue;
if (op.opType === "withdraw") {
op.firstTokenLp = BigInt(op.firstTokenLp) - BigInt(op.firstTokenLp) * 2n;
op.secondTokenLp = BigInt(op.secondTokenLp) - BigInt(op.secondTokenLp) * 2n;
}
const denom = `${op.firstTokenDenom} - ${op.secondTokenDenom}`;
if (!accumulateData[denom]) {
const initialFirstTokenAmount = parseInt(
pool.assets.find((info) => parseAssetInfoOnlyDenom(info.info) === op.firstTokenDenom).amount
);
const initialSecondTokenAmount = parseInt(
pool.assets.find((info) => parseAssetInfoOnlyDenom(info.info) === op.secondTokenDenom).amount
);
accumulateData[denom] = {
firstTokenAmount: BigInt(initialFirstTokenAmount) + BigInt(op.firstTokenLp),
secondTokenAmount: BigInt(initialSecondTokenAmount) + BigInt(op.secondTokenLp)
};
op.firstTokenLp = accumulateData[denom].firstTokenAmount;
op.secondTokenLp = accumulateData[denom].secondTokenAmount;
continue;
}
accumulateData[denom].firstTokenAmount += BigInt(op.firstTokenLp);
accumulateData[denom].secondTokenAmount += BigInt(op.secondTokenLp);
op.firstTokenLp = accumulateData[denom].firstTokenAmount;
op.secondTokenLp = accumulateData[denom].secondTokenAmount;
}

// convert bigint to number so we can store them into the db without error
}

export function removeOpsDuplication(ops: OraiDexType[]): OraiDexType[] {
let newOps: OraiDexType[] = [];
for (let op of ops) {
if (!newOps.some((newOp) => newOp.uniqueKey === op.uniqueKey)) newOps.push(op);
}
return newOps;
}

// /**
// *
// * @param infos
Expand Down
73 changes: 55 additions & 18 deletions packages/oraidex-sync/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@ import {
WithdrawLiquidityOperationData,
InitialData,
PairInfoData,
Env
Env,
VolumeInfo,
PrefixSumHandlingData
} from "./types";
import { MulticallQueryClient } from "@oraichain/common-contracts-sdk";
import { PoolResponse } from "@oraichain/oraidex-contracts-sdk/build/OraiswapPair.types";
import { getAllPairInfos, getPoolInfos, simulateSwapPriceWithUsdt } from "./query";
import { calculatePrefixSum, collectAccumulateLpData, parseAssetInfoOnlyDenom } from "./helper";

class WriteOrders extends WriteData {
private firstWrite: boolean;
constructor(private duckDb: DuckDb, private initialData: InitialData) {
constructor(private duckDb: DuckDb, private rpcUrl: string, private env: Env, private initialData: InitialData) {
super();
this.firstWrite = true;
}
Expand Down Expand Up @@ -54,6 +57,45 @@ class WriteOrders extends WriteData {
return this.duckDb.queryLpOps() as Promise<ProvideLiquidityOperationData[] | WithdrawLiquidityOperationData[]>;
}

private async getPoolInfos(pairAddrs: string[], wantedHeight?: number): Promise<PoolResponse[]> {
// adjust the query height to get data from the past
const cosmwasmClient = await CosmWasmClient.connect(this.rpcUrl);
cosmwasmClient.setQueryClientWithHeight(wantedHeight);
const multicall = new MulticallQueryClient(
cosmwasmClient,
this.env.MULTICALL_CONTRACT_ADDRESS || "orai1q7x644gmf7h8u8y6y8t9z9nnwl8djkmspypr6mxavsk9ual7dj0sxpmgwd"
);
const res = await getPoolInfos(pairAddrs, multicall);
// reset query client to latest for other functions to call
return res;
}

private async accumulatePoolAmount(data: ProvideLiquidityOperationData[] | WithdrawLiquidityOperationData[]) {
if (data.length === 0) return; // guard. If theres no data then we wont process anything
const pairInfos = await this.duckDb.queryPairInfos();
const poolInfos = await this.getPoolInfos(
pairInfos.map((pair) => pair.pairAddr),
data[0].txheight // assume data is sorted by height and timestamp
);
collectAccumulateLpData(data, poolInfos);
}

// private insertVolumeInfos(
// ...data: { denom: string; timestamp: number; txheight: number; amount: number }[]
// ): VolumeInfo[] {
// let volumeInfos: VolumeInfo[] = [];
// data.forEach((op) => {
// volumeInfos.push({
// denom: op.denom,
// timestamp: op.timestamp,
// txheight: op.txheight,
// volume: op.amount,
// price: 1
// });
// });
// return volumeInfos;
// }

async process(chunk: any): Promise<boolean> {
try {
// // first time calling of the application then we query past data and be ready to store them into the db for prefix sum
Expand All @@ -75,13 +117,21 @@ class WriteOrders extends WriteData {
// this.firstWrite = false;
// }
const { txs, offset: newOffset } = chunk as Txs;
const currentOffset = await this.duckDb.loadHeightSnapshot();
// edge case. If no new block has been found, then we skip processing to prevent duplication handling
if (currentOffset === newOffset) return true;
let result = parseTxs(txs);

// accumulate liquidity pool amount
await this.accumulatePoolAmount([...result.provideLiquidityOpsData, ...result.withdrawLiquidityOpsData]);
// process volume infos to insert price
// result.volumeInfos = insertVolumeInfos(result.swapOpsData);

// collect the latest offer & ask volume to accumulate the results
// insert txs
console.log("new offset: ", newOffset);
await this.duckDb.insertHeightSnapshot(newOffset);
await this.insertParsedTxs(result);
// hash to be promise all because if inserting height pass and txs fail then we will have duplications
await Promise.all([this.duckDb.insertHeightSnapshot(newOffset), this.insertParsedTxs(result)]);

const lpOps = await this.queryLpOps();
const swapOpsCount = await this.duckDb.querySwapOps();
Expand All @@ -108,19 +158,6 @@ class OraiDexSync {
return new OraiDexSync(duckDb, rpcUrl, cosmwasmClient, env);
}

private async getPoolInfos(pairs: PairInfo[], wantedHeight?: number): Promise<PoolResponse[]> {
// adjust the query height to get data from the past
this.cosmwasmClient.setQueryClientWithHeight(wantedHeight);
const multicall = new MulticallQueryClient(
this.cosmwasmClient,
this.env.MULTICALL_CONTRACT_ADDRESS || "orai1q7x644gmf7h8u8y6y8t9z9nnwl8djkmspypr6mxavsk9ual7dj0sxpmgwd"
);
const res = await getPoolInfos(pairs, multicall);
// reset query client to latest for other functions to call
this.cosmwasmClient.setQueryClientWithHeight();
return res;
}

private async getAllPairInfos(): Promise<PairInfo[]> {
const firstFactoryClient = new OraiswapFactoryQueryClient(
this.cosmwasmClient,
Expand Down Expand Up @@ -195,7 +232,7 @@ class OraiDexSync {
limit: parseInt(process.env.LIMIT) || 100,
maxThreadLevel: parseInt(process.env.MAX_THREAD_LEVEL) || 3,
interval: 5000
}).pipe(new WriteOrders(this.duckDb, initialData));
}).pipe(new WriteOrders(this.duckDb, this.rpcUrl, this.env, initialData));
} catch (error) {
console.log("error in start: ", error);
}
Expand Down
6 changes: 3 additions & 3 deletions packages/oraidex-sync/src/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import { pairs } from "./pairs";
import { findAssetInfoPathToUsdt, generateSwapOperations, parseAssetInfoOnlyDenom, toDisplay } from "./helper";
import { tenAmountInDecimalSix, usdtCw20Address } from "./constants";

async function getPoolInfos(pairs: PairInfo[], multicall: MulticallReadOnlyInterface): Promise<PoolResponse[]> {
async function getPoolInfos(pairAddrs: string[], multicall: MulticallReadOnlyInterface): Promise<PoolResponse[]> {
// adjust the query height to get data from the past
const res = await multicall.tryAggregate({
queries: pairs.map((pair) => {
queries: pairAddrs.map((pair) => {
return {
address: pair.contract_addr,
address: pair,
data: toBinary({
pool: {}
})
Expand Down
Loading

0 comments on commit 266dc6c

Please sign in to comment.