Skip to content

Commit

Permalink
Implement logic for full pipeline and update tenderly action function…
Browse files Browse the repository at this point in the history
… to use it.
  • Loading branch information
bh2smith committed May 1, 2023
1 parent 82ba9ed commit 3482278
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 25 deletions.
34 changes: 9 additions & 25 deletions internal_transfers/actions/index.ts
Original file line number Diff line number Diff line change
@@ -1,35 +1,19 @@
import { ActionFn, Context, Event, TransactionEvent } from "@tenderly/actions";
import { partitionEventLogs } from "./src/parse";
import { getDB, insertSettlementEvent } from "./src/database";
import { getDB } from "./src/database";
import { TenderlySimulator } from "./src/simulate/tenderly";
import { internalizedTokenImbalance } from "./src/pipeline";

export const triggerInternalTransfersPipeline: ActionFn = async (
context: Context,
event: Event
) => {
// TODO - https://github.com/cowprotocol/solver-rewards/issues/219
const transactionEvent = event as TransactionEvent;
const txHash = transactionEvent.hash;
console.log(`Received Settlement Event with txHash ${txHash}`);

const { trades, transfers, settlements } = partitionEventLogs(
transactionEvent.logs
);
if (settlements.length > 1) {
console.warn(`Two settlements in same batch ${txHash}!`);
// TODO - alert team that such a batch has taken place!
// cf https://github.com/cowprotocol/solver-rewards/issues/187
}
console.log(`Parsed ${transfers.length} (relevant) transfer events`);
console.log(`Parsed ${trades.length} trade events`);

const dbUrl = await context.secrets.get("DATABASE_URL");
await Promise.all(
settlements.map(async (settlement) => {
await insertSettlementEvent(
getDB(dbUrl),
{ txHash: txHash, blockNumber: transactionEvent.blockNumber },
settlement
);
})
const db = getDB(await context.secrets.get("DATABASE_URL"));
const simulator = new TenderlySimulator(
await context.secrets.get("TENDERLY_USER"),
await context.secrets.get("TENDERLY_PROJECT"),
await context.secrets.get("TENDERLY_ACCESS_KEY")
);
await internalizedTokenImbalance(transactionEvent, db, simulator);
};
63 changes: 63 additions & 0 deletions internal_transfers/actions/src/pipeline.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { partitionEventLogs } from "./parse";
import { insertPipelineResults, insertSettlementEvent } from "./database";
import {
getInternalizedImbalance,
MinimalTxData,
simulateSolverSolution,
} from "./accounting";
import { Queryable, sql } from "@databases/pg";
import { TransactionSimulator } from "./simulate/interface";

async function recordExists(db: Queryable, txHash: string): Promise<boolean> {
const pgHash = txHash.replace("0x", "\\x");
const query = sql`SELECT count(*) from settlements where tx_hash = ${pgHash};`;
const { count: numRecords } = (await db.query(query))[0];
return numRecords > 0;
}

export async function internalizedTokenImbalance(
txData: MinimalTxData,
db: Queryable,
simulator: TransactionSimulator
): Promise<void> {
const txHash = txData.hash;
console.log(`processing settlement transaction with hash: ${txData.hash}`);
// Duplication Guard!
if (await recordExists(db, txHash)) {
console.warn(`record exists for tx: ${txHash}`);
return;
}

// There are other events being returned here, but we only need the settlement(s)
const { settlements } = partitionEventLogs(txData.logs);

if (settlements.length > 1) {
console.warn(`Two settlements in same batch ${txHash}!`);
// TODO - alert team that such a batch has taken place!
// cf https://github.com/cowprotocol/solver-rewards/issues/187
}

// It's annoying to have to handle the possibility of multiple settlements
// in the same transaction, but it could happen.
for (const settlement of settlements) {
try {
const settlementSimulations = await simulateSolverSolution(
txData,
simulator
);
const eventMeta = { txHash, blockNumber: txData.blockNumber };
const settlementEvent = settlement;
if (settlementSimulations) {
// If there is a simulation, get imbalances otherwise assume none.
await insertPipelineResults(db, {
settlementSimulations,
imbalances: getInternalizedImbalance(settlementSimulations),
eventMeta,
settlementEvent,
});
} else {
await insertSettlementEvent(db, eventMeta, settlementEvent);
}
} catch (error) {}
}
}
75 changes: 75 additions & 0 deletions internal_transfers/actions/tests/e2e/pipeline.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import { TenderlySimulator } from "../../src/simulate/tenderly";
import { internalizedTokenImbalance } from "../../src/pipeline";
import { getDB } from "../../src/database";

const { TENDERLY_USER, TENDERLY_PROJECT, TENDERLY_ACCESS_KEY } = process.env;

const simulator = new TenderlySimulator(
TENDERLY_USER || "INVALID_USER",
TENDERLY_PROJECT || "TENDERLY_PROJECT",
TENDERLY_ACCESS_KEY || "TENDERLY_ACCESS_KEY"
);

const db = getDB("postgresql://postgres:postgres@localhost:5432/postgres");

describe("Run Full Pipeline", () => {
test("run pipeline on notInternalized transaction", async () => {
// TODO - unavailable (i.e. null response from orderbook):
// choose very old transaction -- pre July 2022
const notInternalized = {
blockNumber: 15182101,
from: "0xe9ae2d792f981c53ea7f6493a17abf5b2a45a86b",
hash: "0x0f86c06d9ace6a88644db6b654a904aa62c82305023e094ce49650467c91bd6e",
logs: [
{
address: "0x9008d19f58aabd9ed0d60971565aa8510560ab41",
data: "0x",
topics: [
"0x40338ce1a7c49204f0099533b1e9a7ee0a3d261f84974ab7af36105b8c4e9db4",
"0x000000000000000000000000e9ae2d792f981c53ea7f6493a17abf5b2a45a86b",
],
},
],
};
await internalizedTokenImbalance(notInternalized, db, simulator);
}, 300000);
test("run pipeline on batch with internalized transfers", async () => {
const internalized = {
blockNumber: 16310552,
from: "0x97EC0A17432D71A3234EF7173C6B48A2C0940896",
hash: "0xDCD5CF12340B50ACC04DBE7E14A903BE373456C81E4DB20DD84CF0301F6AB869",
logs: [
{
address: "0x9008d19f58aabd9ed0d60971565aa8510560ab41",
data: "0x",
topics: [
"0x40338ce1a7c49204f0099533b1e9a7ee0a3d261f84974ab7af36105b8c4e9db4",
"0x00000000000000000000000097EC0A17432D71A3234EF7173C6B48A2C0940896",
],
},
],
};
await internalizedTokenImbalance(internalized, db, simulator);
}, 300000);

test("run pipeline on transaction with unavailable competition data", async () => {
// TODO - unavailable (i.e. null response from orderbook):
// choose very old transaction -- pre July 2022
// const unavailable = {
// blockNumber: 15182101,
// from: "0xe9ae2d792f981c53ea7f6493a17abf5b2a45a86b",
// hash: "0x0f86c06d9ace6a88644db6b654a904aa62c82305023e094ce49650467c91bd6e",
// logs: [
// {
// address: "0x9008d19f58aabd9ed0d60971565aa8510560ab41",
// data: "0x",
// topics: [
// "0x40338ce1a7c49204f0099533b1e9a7ee0a3d261f84974ab7af36105b8c4e9db4",
// "0x000000000000000000000000e9ae2d792f981c53ea7f6493a17abf5b2a45a86b",
// ],
// },
// ],
// };
// await internalizedTokenImbalance(unavailable, db, simulator);
}, 300000);
});

0 comments on commit 3482278

Please sign in to comment.