From 9537a0404eecf8e750e6295c8ec4e54ff7abfbbf Mon Sep 17 00:00:00 2001 From: Benjamin Smith Date: Thu, 11 May 2023 13:18:02 +0200 Subject: [PATCH] Insert Settlement & Mark Processed (#276) Part of #177 Doing a TODO to mark data as process when simulation is inserted. This happens in both cases of the pipeline (when there is an is not a simulation to go with it). ## Test Plan Every component is tested individually. Could also add a test fot `insertSettlementAndMarkProcessed` although it just runs both, tested methods, together. --- internal_transfers/actions/src/database.ts | 16 ++++- internal_transfers/actions/src/pipeline.ts | 6 +- .../actions/tests/database.spec.ts | 63 ++++++++++++++++--- 3 files changed, 71 insertions(+), 14 deletions(-) diff --git a/internal_transfers/actions/src/database.ts b/internal_transfers/actions/src/database.ts index 5341e9b1..6cbd6863 100644 --- a/internal_transfers/actions/src/database.ts +++ b/internal_transfers/actions/src/database.ts @@ -132,6 +132,18 @@ export interface SlippagePipelineResults { eventMeta: EventMeta; settlementEvent: SettlementEvent; } + +export async function insertSettlementAndMarkProcessed( + db: ConnectionPool, + eventMeta: EventMeta, + settlementEvent: SettlementEvent +) { + await db.tx(async (db) => { + await insertSettlementEvent(db, eventMeta, settlementEvent); + await markReceiptProcessed(db, eventMeta.txHash); + }); +} + export async function insertPipelineResults( db: ConnectionPool, pipelineResults: SlippagePipelineResults @@ -141,9 +153,7 @@ export async function insertPipelineResults( await db.tx(async (db) => { await insertTokenImbalances(db, eventMeta.txHash, imbalances); await insertSettlementSimulations(db, settlementSimulations); - await insertSettlementEvent(db, eventMeta, settlementEvent); - // TODO - markReceiptProcessed in follow up PR. - // await markReceiptProcessed(db, eventMeta.txHash); + await insertSettlementAndMarkProcessed(db, eventMeta, settlementEvent); }); console.log(`wrote ${imbalances.length} imbalances for ${eventMeta.txHash}`); } diff --git a/internal_transfers/actions/src/pipeline.ts b/internal_transfers/actions/src/pipeline.ts index 9383e812..7f8db934 100644 --- a/internal_transfers/actions/src/pipeline.ts +++ b/internal_transfers/actions/src/pipeline.ts @@ -2,7 +2,7 @@ import { partitionEventLogs } from "./parse"; import { getUnprocessedReceipts, insertPipelineResults, - insertSettlementEvent, + insertSettlementAndMarkProcessed, insertTxReceipt, recordExists, } from "./database"; @@ -63,7 +63,7 @@ export async function internalizedTokenImbalance( console.log(`processing settlement transaction with hash: ${txData.hash}`); // Duplication Guard! if (await recordExists(db, txHash)) { - console.warn(`record exists for tx: ${txHash}`); + console.warn(`event record exists for tx: ${txHash}`); return; } @@ -88,7 +88,7 @@ export async function internalizedTokenImbalance( settlementEvent, }); } else { - await insertSettlementEvent(db, eventMeta, settlementEvent); + await insertSettlementAndMarkProcessed(db, eventMeta, settlementEvent); } } } diff --git a/internal_transfers/actions/tests/database.spec.ts b/internal_transfers/actions/tests/database.spec.ts index 1fb25112..83468598 100644 --- a/internal_transfers/actions/tests/database.spec.ts +++ b/internal_transfers/actions/tests/database.spec.ts @@ -11,6 +11,7 @@ import { insertTxReceipt, getUnprocessedReceipts, markReceiptProcessed, + insertSettlementAndMarkProcessed, } from "../src/database"; import * as process from "process"; import { sql } from "@databases/pg"; @@ -25,16 +26,19 @@ const dbURL: string = process.env["DATABASE_URL"] || "postgresql://postgres:postgres@localhost:5432/postgres"; const db = getDB(dbURL); +async function truncateTables() { + await db.query(sql`TRUNCATE TABLE settlements;`); + await db.query(sql`TRUNCATE TABLE internalized_imbalances;`); + await db.query(sql`TRUNCATE TABLE settlement_simulations;`); + await db.query(sql`TRUNCATE TABLE tx_receipts;`); +} const largeBigInt = 115792089237316195423570985008687907853269984665640564039457584007913129639935n; const tinyBigInt = 1n; describe("All Database Tests", () => { beforeEach(async () => { - await db.query(sql`TRUNCATE TABLE settlements;`); - await db.query(sql`TRUNCATE TABLE internalized_imbalances;`); - await db.query(sql`TRUNCATE TABLE settlement_simulations;`); - await db.query(sql`TRUNCATE TABLE tx_receipts;`); + await truncateTables(); }); afterAll(async () => { @@ -219,7 +223,7 @@ describe("All Database Tests", () => { const twoResults = await getUnprocessedReceipts(db, -1); expect(twoResults).toEqual(receipts); }); - test("markReceiptProcessed(hash)", async () => { + test("markReceiptProcessed works when exists", async () => { const receipt = { logs: [], blockNumber: 0, @@ -238,13 +242,56 @@ describe("All Database Tests", () => { }, ]); }); + test("markReceiptProcessed does nothing when hash doesn't exist", async () => { + await expect( + markReceiptProcessed(db, "0x01") + ).resolves.not.toThrowError(); + }); + + test("insertSettlementAndMarkProcessed works together", async () => { + const hash = "0x"; + const solver = "0x50"; + const receipt = { + logs: [], + blockNumber: 1, + hash, + from: solver, + }; + await insertTxReceipt(db, receipt); + await insertSettlementAndMarkProcessed( + db, + { txHash: hash, blockNumber: 0 }, + { solver: solver, logIndex: 0 } + ); + + expect(await db.query(sql`SELECT * from tx_receipts;`)).toEqual([ + { + block_number: 1n, + data: { + blockNumber: 1, + from: "0x50", + hash: "0x", + logs: [], + }, + hash: hexToBytea(hash), + // This is the key point (processed = true) + processed: true, + }, + ]); + expect(await db.query(sql`SELECT * from settlements;`)).toStrictEqual([ + { + block_number: 0n, + log_index: 0n, + solver: hexToBytea(solver), + tx_hash: hexToBytea(hash), + }, + ]); + }); }); describe("insertPipelineResults", () => { beforeEach(async () => { - await db.query(sql`TRUNCATE TABLE settlements;`); - await db.query(sql`TRUNCATE TABLE internalized_imbalances;`); - await db.query(sql`TRUNCATE TABLE settlement_simulations;`); + await truncateTables(); }); function getTestData() {