Skip to content

Commit

Permalink
Improved Full Pipeline (#263)
Browse files Browse the repository at this point in the history
Some type changes to make things a little more smooth.
  • Loading branch information
bh2smith authored May 8, 2023
1 parent 3482278 commit 3daa90c
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 113 deletions.
4 changes: 2 additions & 2 deletions internal_transfers/actions/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ export const triggerInternalTransfersPipeline: ActionFn = async (
const transactionEvent = event as TransactionEvent;
const db = getDB(await context.secrets.get("DATABASE_URL"));
const simulator = new TenderlySimulator(
await context.secrets.get("TENDERLY_USER"),
await context.secrets.get("TENDERLY_PROJECT"),
"gp-v2",
"solver-slippage",
await context.secrets.get("TENDERLY_ACCESS_KEY")
);
await internalizedTokenImbalance(transactionEvent, db, simulator);
Expand Down
69 changes: 54 additions & 15 deletions internal_transfers/actions/src/accounting.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,28 +64,67 @@ export async function simulateSolverSolution(
return null;
}

const commonSimulationParams = {
contractAddress: SETTLEMENT_CONTRACT_ADDRESS,
sender: solverAddress,
value: "0",
blockNumber: competition.simulationBlock,
};
const simFull = await simulator.simulate({
...commonSimulationParams,
callData: competition.fullCallData,
});
const simReduced = await simulator.simulate({
...commonSimulationParams,
callData: competition.reducedCallData,
const { full, reduced } = await simulateBoth(simulator, {
full: competition.fullCallData,
reduced: competition.reducedCallData,
common: {
contractAddress: SETTLEMENT_CONTRACT_ADDRESS,
sender: solverAddress,
value: "0",
blockNumber: competition.simulationBlock,
},
});

return {
txHash: transaction.hash,
winningSettlement: competition,
full: simFull,
reduced: simReduced,
full,
reduced,
};
}

interface commonSimulationParams {
contractAddress: string;
sender: string;
value: string;
blockNumber: number;
}

interface SettlementSimulationParams {
full: string;
reduced: string;
common: commonSimulationParams;
}

interface SimulationPair {
full: SimulationData;
reduced: SimulationData;
}
async function simulateBoth(
simulator: TransactionSimulator,
params: SettlementSimulationParams,
numAttempts: number = 2
): Promise<SimulationPair> {
let attempts = 0;
while (attempts < numAttempts) {
try {
return {
full: await simulator.simulate({
...params.common,
callData: params.full,
}),
reduced: await simulator.simulate({
...params.common,
callData: params.reduced,
}),
};
} catch (error) {
attempts += 1;
}
}
throw new Error(`failed simulations after ${numAttempts} attempts`);
}

export function getInternalizedImbalance(
simulationData: SettlementSimulationData
): TokenImbalance[] {
Expand Down
13 changes: 12 additions & 1 deletion internal_transfers/actions/src/database.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Code Reference: https://www.atdatabases.org/docs/pg-guide-typescript

import createConnectionPool, { sql } from "@databases/pg";
import createConnectionPool, { Queryable, sql } from "@databases/pg";
import tables from "@databases/pg-typed";
import ConnectionPool from "@databases/pg/lib/types/Queryable";
import DatabaseSchema from "./__generated__";
Expand All @@ -20,6 +20,17 @@ function getDB(dbURL: string): ConnectionPool {
bigIntMode: "bigint",
});
}

export async function recordExists(
db: Queryable,
txHash: string
): Promise<boolean> {
const pgHash = txHash.replace("0x", "\\x");
const query = sql`SELECT count(*) from settlements where tx_hash = ${pgHash};`;
const { count: numRecords } = (await db.query(query))[0];
return numRecords > 0;
}

async function insertSettlementEvent(
db: ConnectionPool,
eventMeta: EventMeta,
Expand Down
57 changes: 23 additions & 34 deletions internal_transfers/actions/src/pipeline.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
import { partitionEventLogs } from "./parse";
import { insertPipelineResults, insertSettlementEvent } from "./database";
import {
insertPipelineResults,
insertSettlementEvent,
recordExists,
} from "./database";
import {
getInternalizedImbalance,
MinimalTxData,
simulateSolverSolution,
} from "./accounting";
import { Queryable, sql } from "@databases/pg";
import { Queryable } from "@databases/pg";
import { TransactionSimulator } from "./simulate/interface";

async function recordExists(db: Queryable, txHash: string): Promise<boolean> {
const pgHash = txHash.replace("0x", "\\x");
const query = sql`SELECT count(*) from settlements where tx_hash = ${pgHash};`;
const { count: numRecords } = (await db.query(query))[0];
return numRecords > 0;
}

export async function internalizedTokenImbalance(
txData: MinimalTxData,
db: Queryable,
Expand All @@ -31,33 +28,25 @@ export async function internalizedTokenImbalance(
// There are other events being returned here, but we only need the settlement(s)
const { settlements } = partitionEventLogs(txData.logs);

if (settlements.length > 1) {
console.warn(`Two settlements in same batch ${txHash}!`);
// TODO - alert team that such a batch has taken place!
// cf https://github.com/cowprotocol/solver-rewards/issues/187
}

// It's annoying to have to handle the possibility of multiple settlements
// in the same transaction, but it could happen.
for (const settlement of settlements) {
try {
const settlementSimulations = await simulateSolverSolution(
txData,
simulator
);
const eventMeta = { txHash, blockNumber: txData.blockNumber };
const settlementEvent = settlement;
if (settlementSimulations) {
// If there is a simulation, get imbalances otherwise assume none.
await insertPipelineResults(db, {
settlementSimulations,
imbalances: getInternalizedImbalance(settlementSimulations),
eventMeta,
settlementEvent,
});
} else {
await insertSettlementEvent(db, eventMeta, settlementEvent);
}
} catch (error) {}
const settlementSimulations = await simulateSolverSolution(
txData,
simulator
);
const eventMeta = { txHash, blockNumber: txData.blockNumber };
const settlementEvent = settlement;
if (settlementSimulations) {
// If there is a simulation, get imbalances otherwise assume none.
await insertPipelineResults(db, {
settlementSimulations,
imbalances: getInternalizedImbalance(settlementSimulations),
eventMeta,
settlementEvent,
});
} else {
await insertSettlementEvent(db, eventMeta, settlementEvent);
}
}
}
13 changes: 13 additions & 0 deletions internal_transfers/actions/tests/database.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
insertTokenImbalances,
jsonFromSettlementData,
insertPipelineResults,
recordExists,
} from "../src/database";
import * as process from "process";
import { sql } from "@databases/pg";
Expand Down Expand Up @@ -163,6 +164,18 @@ describe("All Database Tests", () => {
'duplicate key value violates unique constraint "settlement_simulations_pkey"'
);
});
test("recordExists(txHash) accurately performs its job", async () => {
const txHash =
"0x45f52ee09622eac16d0fe27b90a76749019b599c9566f10e21e8d0955a0e428e";

expect(await recordExists(db, txHash)).toEqual(false);
await insertSettlementEvent(
db,
{ txHash: txHash, blockNumber: 0 },
{ solver: "0xc9ec550bea1c64d779124b23a26292cc223327b6", logIndex: 0 }
);
expect(await recordExists(db, txHash)).toEqual(true);
});
});

describe("insertPipelineResults", () => {
Expand Down
10 changes: 10 additions & 0 deletions internal_transfers/actions/tests/e2e/helper.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { MinimalTxData } from "../../src/accounting";
import { ethers } from "ethers";
import { getTxDataFromHash } from "../../src/utils";

export async function getTxData(txHash: string): Promise<MinimalTxData> {
const provider = ethers.getDefaultProvider(
process.env["NODE_URL"] || "NODE_URL"
);
return getTxDataFromHash(provider, txHash);
}
63 changes: 12 additions & 51 deletions internal_transfers/actions/tests/e2e/pipeline.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { TenderlySimulator } from "../../src/simulate/tenderly";
import { internalizedTokenImbalance } from "../../src/pipeline";
import { getDB } from "../../src/database";
import { getTxData } from "./helper";

const { TENDERLY_USER, TENDERLY_PROJECT, TENDERLY_ACCESS_KEY } = process.env;

Expand All @@ -12,64 +13,24 @@ const simulator = new TenderlySimulator(

const db = getDB("postgresql://postgres:postgres@localhost:5432/postgres");

describe("Run Full Pipeline", () => {
describe.skip("Run Full Pipeline", () => {
test("run pipeline on notInternalized transaction", async () => {
// TODO - unavailable (i.e. null response from orderbook):
// choose very old transaction -- pre July 2022
const notInternalized = {
blockNumber: 15182101,
from: "0xe9ae2d792f981c53ea7f6493a17abf5b2a45a86b",
hash: "0x0f86c06d9ace6a88644db6b654a904aa62c82305023e094ce49650467c91bd6e",
logs: [
{
address: "0x9008d19f58aabd9ed0d60971565aa8510560ab41",
data: "0x",
topics: [
"0x40338ce1a7c49204f0099533b1e9a7ee0a3d261f84974ab7af36105b8c4e9db4",
"0x000000000000000000000000e9ae2d792f981c53ea7f6493a17abf5b2a45a86b",
],
},
],
};
const notInternalized = await getTxData(
"0x0f86c06d9ace6a88644db6b654a904aa62c82305023e094ce49650467c91bd6e"
);
await internalizedTokenImbalance(notInternalized, db, simulator);
}, 300000);
test("run pipeline on batch with internalized transfers", async () => {
const internalized = {
blockNumber: 16310552,
from: "0x97EC0A17432D71A3234EF7173C6B48A2C0940896",
hash: "0xDCD5CF12340B50ACC04DBE7E14A903BE373456C81E4DB20DD84CF0301F6AB869",
logs: [
{
address: "0x9008d19f58aabd9ed0d60971565aa8510560ab41",
data: "0x",
topics: [
"0x40338ce1a7c49204f0099533b1e9a7ee0a3d261f84974ab7af36105b8c4e9db4",
"0x00000000000000000000000097EC0A17432D71A3234EF7173C6B48A2C0940896",
],
},
],
};
const internalized = await getTxData(
"0xDCD5CF12340B50ACC04DBE7E14A903BE373456C81E4DB20DD84CF0301F6AB869"
);
await internalizedTokenImbalance(internalized, db, simulator);
}, 300000);

test("run pipeline on transaction with unavailable competition data", async () => {
// TODO - unavailable (i.e. null response from orderbook):
// choose very old transaction -- pre July 2022
// const unavailable = {
// blockNumber: 15182101,
// from: "0xe9ae2d792f981c53ea7f6493a17abf5b2a45a86b",
// hash: "0x0f86c06d9ace6a88644db6b654a904aa62c82305023e094ce49650467c91bd6e",
// logs: [
// {
// address: "0x9008d19f58aabd9ed0d60971565aa8510560ab41",
// data: "0x",
// topics: [
// "0x40338ce1a7c49204f0099533b1e9a7ee0a3d261f84974ab7af36105b8c4e9db4",
// "0x000000000000000000000000e9ae2d792f981c53ea7f6493a17abf5b2a45a86b",
// ],
// },
// ],
// };
// await internalizedTokenImbalance(unavailable, db, simulator);
const unavailable = await getTxData(
"0xe6a0fbad3f9571e7614dbbc1d65d523cbeb6929b59bd20cde80ac791899fccfb"
);
await internalizedTokenImbalance(unavailable, db, simulator);
}, 300000);
});
Original file line number Diff line number Diff line change
@@ -1,24 +1,15 @@
import {
getInternalizedImbalance,
MinimalTxData,
simulateSolverSolution,
} from "../../src/accounting";
import { TenderlySimulator } from "../../src/simulate/tenderly";
import { getTxDataFromHash } from "../../src/utils";
import { ethers } from "ethers";
import { getTxData } from "./helper";

const simulator = new TenderlySimulator(
process.env["TENDERLY_USER"] || "INVALID_USER",
process.env["TENDERLY_PROJECT"] || "TENDERLY_PROJECT",
process.env["TENDERLY_ACCESS_KEY"] || "TENDERLY_ACCESS_KEY"
);

async function getTxData(txHash: string): Promise<MinimalTxData> {
const provider = ethers.getDefaultProvider(
process.env["NODE_URL"] || "NODE_URL"
);
return getTxDataFromHash(provider, txHash);
}
describe.skip("simulateSolverSolution(transaction, simulator)", () => {
test("throws when no competition found", async () => {
const txHash =
Expand Down

0 comments on commit 3daa90c

Please sign in to comment.