Skip to content

Commit

Permalink
Insert Settlement & Mark Processed (#276)
Browse files Browse the repository at this point in the history
Part of #177 

Doing a TODO to mark data as process when simulation is inserted. This
happens in both cases of the pipeline (when there is an is not a
simulation to go with it).

## Test Plan

Every component is tested individually. Could also add a test fot
`insertSettlementAndMarkProcessed` although it just runs both, tested
methods, together.
  • Loading branch information
bh2smith authored May 11, 2023
1 parent e69a942 commit 9537a04
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 14 deletions.
16 changes: 13 additions & 3 deletions internal_transfers/actions/src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,18 @@ export interface SlippagePipelineResults {
eventMeta: EventMeta;
settlementEvent: SettlementEvent;
}

export async function insertSettlementAndMarkProcessed(
db: ConnectionPool,
eventMeta: EventMeta,
settlementEvent: SettlementEvent
) {
await db.tx(async (db) => {
await insertSettlementEvent(db, eventMeta, settlementEvent);
await markReceiptProcessed(db, eventMeta.txHash);
});
}

export async function insertPipelineResults(
db: ConnectionPool,
pipelineResults: SlippagePipelineResults
Expand All @@ -141,9 +153,7 @@ export async function insertPipelineResults(
await db.tx(async (db) => {
await insertTokenImbalances(db, eventMeta.txHash, imbalances);
await insertSettlementSimulations(db, settlementSimulations);
await insertSettlementEvent(db, eventMeta, settlementEvent);
// TODO - markReceiptProcessed in follow up PR.
// await markReceiptProcessed(db, eventMeta.txHash);
await insertSettlementAndMarkProcessed(db, eventMeta, settlementEvent);
});
console.log(`wrote ${imbalances.length} imbalances for ${eventMeta.txHash}`);
}
Expand Down
6 changes: 3 additions & 3 deletions internal_transfers/actions/src/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { partitionEventLogs } from "./parse";
import {
getUnprocessedReceipts,
insertPipelineResults,
insertSettlementEvent,
insertSettlementAndMarkProcessed,
insertTxReceipt,
recordExists,
} from "./database";
Expand Down Expand Up @@ -63,7 +63,7 @@ export async function internalizedTokenImbalance(
console.log(`processing settlement transaction with hash: ${txData.hash}`);
// Duplication Guard!
if (await recordExists(db, txHash)) {
console.warn(`record exists for tx: ${txHash}`);
console.warn(`event record exists for tx: ${txHash}`);
return;
}

Expand All @@ -88,7 +88,7 @@ export async function internalizedTokenImbalance(
settlementEvent,
});
} else {
await insertSettlementEvent(db, eventMeta, settlementEvent);
await insertSettlementAndMarkProcessed(db, eventMeta, settlementEvent);
}
}
}
63 changes: 55 additions & 8 deletions internal_transfers/actions/tests/database.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
insertTxReceipt,
getUnprocessedReceipts,
markReceiptProcessed,
insertSettlementAndMarkProcessed,
} from "../src/database";
import * as process from "process";
import { sql } from "@databases/pg";
Expand All @@ -25,16 +26,19 @@ const dbURL: string =
process.env["DATABASE_URL"] ||
"postgresql://postgres:postgres@localhost:5432/postgres";
const db = getDB(dbURL);
async function truncateTables() {
await db.query(sql`TRUNCATE TABLE settlements;`);
await db.query(sql`TRUNCATE TABLE internalized_imbalances;`);
await db.query(sql`TRUNCATE TABLE settlement_simulations;`);
await db.query(sql`TRUNCATE TABLE tx_receipts;`);
}

const largeBigInt =
115792089237316195423570985008687907853269984665640564039457584007913129639935n;
const tinyBigInt = 1n;
describe("All Database Tests", () => {
beforeEach(async () => {
await db.query(sql`TRUNCATE TABLE settlements;`);
await db.query(sql`TRUNCATE TABLE internalized_imbalances;`);
await db.query(sql`TRUNCATE TABLE settlement_simulations;`);
await db.query(sql`TRUNCATE TABLE tx_receipts;`);
await truncateTables();
});

afterAll(async () => {
Expand Down Expand Up @@ -219,7 +223,7 @@ describe("All Database Tests", () => {
const twoResults = await getUnprocessedReceipts(db, -1);
expect(twoResults).toEqual(receipts);
});
test("markReceiptProcessed(hash)", async () => {
test("markReceiptProcessed works when exists", async () => {
const receipt = {
logs: [],
blockNumber: 0,
Expand All @@ -238,13 +242,56 @@ describe("All Database Tests", () => {
},
]);
});
test("markReceiptProcessed does nothing when hash doesn't exist", async () => {
await expect(
markReceiptProcessed(db, "0x01")
).resolves.not.toThrowError();
});

test("insertSettlementAndMarkProcessed works together", async () => {
const hash = "0x";
const solver = "0x50";
const receipt = {
logs: [],
blockNumber: 1,
hash,
from: solver,
};
await insertTxReceipt(db, receipt);
await insertSettlementAndMarkProcessed(
db,
{ txHash: hash, blockNumber: 0 },
{ solver: solver, logIndex: 0 }
);

expect(await db.query(sql`SELECT * from tx_receipts;`)).toEqual([
{
block_number: 1n,
data: {
blockNumber: 1,
from: "0x50",
hash: "0x",
logs: [],
},
hash: hexToBytea(hash),
// This is the key point (processed = true)
processed: true,
},
]);
expect(await db.query(sql`SELECT * from settlements;`)).toStrictEqual([
{
block_number: 0n,
log_index: 0n,
solver: hexToBytea(solver),
tx_hash: hexToBytea(hash),
},
]);
});
});

describe("insertPipelineResults", () => {
beforeEach(async () => {
await db.query(sql`TRUNCATE TABLE settlements;`);
await db.query(sql`TRUNCATE TABLE internalized_imbalances;`);
await db.query(sql`TRUNCATE TABLE settlement_simulations;`);
await truncateTables();
});

function getTestData() {
Expand Down

0 comments on commit 9537a04

Please sign in to comment.