Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: event based history tables, dbs and importer #20

Merged
merged 5 commits into from
Apr 15, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix: re-added task module to app module, added correct filter in impo…
…rter v2
  • Loading branch information
tmrdlt committed Apr 4, 2024

Verified

This commit was signed with the committer’s verified signature.
AmorfEvo Imre Hajagos
commit 02723cab0b438b65399734e8d75f222efebd5bab
Original file line number Diff line number Diff line change
@@ -25,7 +25,7 @@ CREATE TABLE "PairLiquidityInfoHistoryV2Error" (
"id" SERIAL NOT NULL,
"pairId" INTEGER NOT NULL,
"microBlockHash" TEXT NOT NULL,
"logIndex" TEXT NOT NULL,
"logIndex" INTEGER NOT NULL,
"error" TEXT NOT NULL,
"timesOccurred" INTEGER NOT NULL DEFAULT 1,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
@@ -35,7 +35,7 @@ CREATE TABLE "PairLiquidityInfoHistoryV2Error" (
);

-- CreateIndex
CREATE UNIQUE INDEX "PairLiquidityInfoHistoryV2_pairId_microBlockHash_key" ON "PairLiquidityInfoHistoryV2"("pairId", "microBlockHash");
CREATE UNIQUE INDEX "PairLiquidityInfoHistoryV2_pairId_microBlockHash_logIndex_key" ON "PairLiquidityInfoHistoryV2"("pairId", "microBlockHash", "logIndex");

-- CreateIndex
CREATE UNIQUE INDEX "PairLiquidityInfoHistoryV2Error_pairId_microBlockHash_logIn_key" ON "PairLiquidityInfoHistoryV2Error"("pairId", "microBlockHash", "logIndex", "error");
4 changes: 2 additions & 2 deletions prisma/schema.prisma
Original file line number Diff line number Diff line change
@@ -95,15 +95,15 @@ model PairLiquidityInfoHistoryV2 {
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt

@@unique(name: "pairIdMicroBlockHashUniqueIndex", [pairId, microBlockHash])
@@unique(name: "pairIdMicroBlockHashLogIndexUniqueIndex", [pairId, microBlockHash, logIndex])
}

model PairLiquidityInfoHistoryV2Error {
id Int @id @default(autoincrement())
pair Pair @relation(fields: [pairId], references: [id])
pairId Int
microBlockHash String
logIndex String
logIndex Int
error String
timesOccurred Int @default(1)
createdAt DateTime @default(now())
3 changes: 2 additions & 1 deletion src/app.module.ts
Original file line number Diff line number Diff line change
@@ -7,9 +7,10 @@ import { ClientsModule } from './clients/clients.module';
import { ApiModule } from './api/api.module';
import { PairSyncService } from './tasks/pair-sync/pair-sync.service';
import { MdwWsClientService } from './clients/mdw-ws-client.service';
import { TasksModule } from './tasks/tasks.module';

@Module({
imports: [ApiModule, ClientsModule, DatabaseModule],
imports: [ApiModule, ClientsModule, DatabaseModule, TasksModule],
controllers: [AppController],
providers: [MdwWsClientService, PairsService, TokensService, PairSyncService],
})
5 changes: 3 additions & 2 deletions src/clients/clients.module.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { Module } from '@nestjs/common';
import { MdwHttpClientService } from './mdw-http-client.service';
import { SdkClientService } from './sdk-client.service';
import { MdwWsClientService } from './mdw-ws-client.service';

@Module({
providers: [MdwHttpClientService, MdwHttpClientService, SdkClientService],
exports: [MdwHttpClientService, MdwHttpClientService, SdkClientService],
providers: [MdwHttpClientService, MdwWsClientService, SdkClientService],
exports: [MdwHttpClientService, MdwWsClientService, SdkClientService],
})
export class ClientsModule {}
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ export class PairLiquidityInfoHistoryV2ErrorDbService {
getErrorByPairIdAndMicroBlockHashWithinHours(
pairId: number,
microBlockHash: string,
logIndex: string,
logIndex: number,
withinHours: number,
): Promise<PairLiquidityInfoHistoryV2Error | null> {
return this.prisma.pairLiquidityInfoHistoryV2Error.findFirst({
Original file line number Diff line number Diff line change
@@ -80,7 +80,7 @@ export class PairLiquidityInfoHistoryDbService {
microBlockHash: data.microBlockHash,
},
},
update: {},
update: data,
create: data,
});
}
Original file line number Diff line number Diff line change
@@ -11,12 +11,13 @@ export class PairLiquidityInfoHistoryV2DbService {
) {
return this.prisma.pairLiquidityInfoHistoryV2.upsert({
where: {
pairIdMicroBlockHashUniqueIndex: {
pairIdMicroBlockHashLogIndexUniqueIndex: {
pairId: data.pairId,
microBlockHash: data.microBlockHash,
logIndex: data.logIndex,
},
},
update: {},
update: data,
create: data,
});
}
Original file line number Diff line number Diff line change
@@ -41,7 +41,7 @@ export class PairLiquidityInfoHistoryImporterV2Service {
await this.pairLiquidityInfoHistoryErrorDb.getErrorByPairIdAndMicroBlockHashWithinHours(
pairWithTokens.id,
'',
'',
-1,
this.WITHIN_HOURS_TO_SKIP_IF_ERROR,
);
if (error) {
@@ -66,6 +66,7 @@ export class PairLiquidityInfoHistoryImporterV2Service {
}
const lastSyncedHeight = lastSyncedLog?.height || 0;
const lastSyncedBlockTime = lastSyncedLog?.microBlockTime || 0n;
const lastSyncedLogIndex = lastSyncedLog?.logIndex || -1;

// Determine which micro blocks to sync based on the lastly synced block
// Strategy:
@@ -75,31 +76,39 @@ export class PairLiquidityInfoHistoryImporterV2Service {

// To make sure we get all desired micro blocks, fetch all contract log pages
// until the page contains a non-desired micro block
const fetchContractLogsFilter = (contractLog: ContractLog) =>
const fetchContractLogsLimit = (contractLog: ContractLog) =>
isHistoryOutdated
? BigInt(contractLog.block_time) <= lastSyncedBlockTime
? BigInt(contractLog.block_time) < lastSyncedBlockTime
: parseInt(contractLog.height) < currentHeight - 10;

const contractLogsToInsertFilter = (contractLog: ContractLog) =>
isHistoryOutdated
? (BigInt(contractLog.block_time) === lastSyncedBlockTime &&
parseInt(contractLog.log_idx) > lastSyncedLogIndex) ||
BigInt(contractLog.block_time) > lastSyncedBlockTime
: parseInt(contractLog.height) >= currentHeight - 10;

const pairContractLogs =
await this.mdwClient.getContractLogsUntilCondition(
fetchContractLogsFilter,
fetchContractLogsLimit,
pairWithTokens.address as ContractAddress,
);
const logsToInsert = orderBy(
pairContractLogs.filter(contractLogsToInsertFilter),
['block_time', 'log_idx'],
['asc', 'asc'],
);

let numUpserted = 0;
// Fetch and insert liquidity (totalSupply, reserve0, reserve1) for every micro block
for (const log of orderBy(
pairContractLogs,
['microBlockTime', 'logIndex'],
['asc', 'asc'],
)) {
for (const log of logsToInsert) {
try {
// If an error occurred for this block recently, skip block
const error =
await this.pairLiquidityInfoHistoryErrorDb.getErrorByPairIdAndMicroBlockHashWithinHours(
pairWithTokens.id,
log.block_hash,
log.log_idx,
parseInt(log.log_idx),
this.WITHIN_HOURS_TO_SKIP_IF_ERROR,
);
if (error) {
@@ -121,7 +130,7 @@ export class PairLiquidityInfoHistoryImporterV2Service {
.upsert({
pairId: pairWithTokens.id,
eventType: 'parsedEventType', // TODO change
logIndex: 0,
logIndex: parseInt(log.log_idx),
height: parseInt(log.height),
microBlockHash: log.block_hash,
microBlockTime: BigInt(log.block_time),
@@ -140,7 +149,7 @@ export class PairLiquidityInfoHistoryImporterV2Service {
const errorData = {
pairId: pairWithTokens.id,
microBlockHash: log.block_hash,
logIndex: log.log_idx,
logIndex: parseInt(log.log_idx),
error: error.toString(),
};
this.logger.error(`Skipped log. ${JSON.stringify(errorData)}`);
@@ -150,14 +159,14 @@ export class PairLiquidityInfoHistoryImporterV2Service {

if (numUpserted > 0) {
this.logger.log(
`Completed sync for pair ${pairWithTokens.id} ${pairWithTokens.address}. Synced ${numUpserted} micro block(s).`,
`Completed sync for pair ${pairWithTokens.id} ${pairWithTokens.address}. Synced ${numUpserted} log(s).`,
);
}
} catch (error) {
const errorData = {
pairId: pairWithTokens.id,
microBlockHash: '',
logIndex: '',
logIndex: -1,
error: error.toString(),
};
this.logger.error(`Skipped pair. ${JSON.stringify(errorData)}`);
@@ -193,7 +202,7 @@ export class PairLiquidityInfoHistoryImporterV2Service {
})
.then(() =>
this.logger.log(
`Inserted initial liquidity for pair ${pairWithTokens.address}.`,
`Inserted initial liquidity for pair ${pairWithTokens.id} ${pairWithTokens.address}.`,
),
);
}
4 changes: 2 additions & 2 deletions src/tasks/tasks.service.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { Injectable } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { PairLiquidityInfoHistoryImporterService } from './pair-liquidity-info-history-importer/pair-liquidity-info-history-importer.service';
import { PairLiquidityInfoHistoryValidatorService } from './pair-liquidity-info-history-validator/pair-liquidity-info-history-validator.service';
import { PairLiquidityInfoHistoryImporterV2Service } from './pair-liquidity-info-history-importer/pair-liquidity-info-history-importer-v2.service';

const EVERY_5_MINUTES_STARTING_AT_02_30 = '30 2-57/5 * * * *';

@Injectable()
export class TasksService {
constructor(
private pairLiquidityInfoHistoryImporterService: PairLiquidityInfoHistoryImporterService,
private pairLiquidityInfoHistoryImporterService: PairLiquidityInfoHistoryImporterV2Service,
private pairLiquidityInfoHistoryValidatorService: PairLiquidityInfoHistoryValidatorService,
) {}

Loading