Skip to content

Commit c90f73a

Browse files
authored
feat(webhooks): add postgres persistence to clients and requests (feature branch) (#108)
1 parent 2117af5 commit c90f73a

28 files changed

+648
-305
lines changed

packages/indexer-api/src/main.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,10 @@ export async function Main(
8787
const redis = await initializeRedis(redisConfig, logger);
8888
const webhooks = Webhooks.WebhookFactory(
8989
{
90-
requireApiKey: false,
9190
enabledWebhooks: [Webhooks.WebhookTypes.DepositStatus],
91+
enabledWebhookRequestWorkers: false,
9292
},
93-
{ postgres, logger },
93+
{ postgres, logger, redis },
9494
);
9595

9696
const allRouters: Record<string, Router> = {
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import { Entity, Column, PrimaryGeneratedColumn, Unique } from "typeorm";
2+
3+
@Entity()
4+
@Unique("UK_webhook_client_api_key", ["apiKey"])
5+
export class WebhookClient {
6+
@PrimaryGeneratedColumn()
7+
id: number;
8+
9+
@Column()
10+
name: string;
11+
12+
@Column()
13+
apiKey: string;
14+
15+
@Column("jsonb")
16+
domains: string[];
17+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import {
2+
Entity,
3+
PrimaryColumn,
4+
Column,
5+
Unique,
6+
CreateDateColumn,
7+
Index,
8+
} from "typeorm";
9+
10+
@Entity()
11+
@Unique("UK_webhook_request_clientId_filter", ["clientId", "filter"])
12+
@Index("IX_webhook_request_filter", ["filter"])
13+
export class WebhookRequest {
14+
@PrimaryColumn()
15+
id: string;
16+
17+
@Column({ type: "integer" })
18+
clientId: number;
19+
20+
@Column()
21+
url: string;
22+
23+
@Column()
24+
filter: string;
25+
26+
@CreateDateColumn()
27+
createdAt: Date;
28+
}

packages/indexer-database/src/entities/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,6 @@ export * from "./BundleEvent";
1919
export * from "./BundleBlockRange";
2020
export * from "./RootBundleExecutedJoinTable";
2121
export * from "./RelayHashInfo";
22+
23+
export * from "./WebhookRequest";
24+
export * from "./WebhookClient";

packages/indexer-database/src/main.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ export const createDataSource = (config: DatabaseConfig): DataSource => {
3636
entities.RootBundleExecutedJoinTable,
3737
// Others
3838
entities.RelayHashInfo,
39+
// Webhooks
40+
entities.WebhookRequest,
41+
entities.WebhookClient,
3942
],
4043
migrationsTableName: "_migrations",
4144
migrations: ["migrations/*.ts"],
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import { MigrationInterface, QueryRunner } from "typeorm";
2+
3+
export class WebhookClient1732297474910 implements MigrationInterface {
4+
name = "WebhookClient1732297474910";
5+
6+
public async up(queryRunner: QueryRunner): Promise<void> {
7+
await queryRunner.query(`
8+
CREATE TABLE "webhook_client" (
9+
"id" SERIAL NOT NULL,
10+
"name" character varying NOT NULL,
11+
"apiKey" character varying NOT NULL,
12+
"domains" jsonb NOT NULL,
13+
CONSTRAINT "UK_webhook_client_api_key" UNIQUE ("apiKey"),
14+
CONSTRAINT "PK_f7330fb3bdb2e19534eae691d44" PRIMARY KEY ("id")
15+
)`);
16+
}
17+
18+
public async down(queryRunner: QueryRunner): Promise<void> {
19+
await queryRunner.query(`DROP TABLE "webhook_client"`);
20+
}
21+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import { MigrationInterface, QueryRunner } from "typeorm";
2+
3+
export class WebhookRequest1732297948190 implements MigrationInterface {
4+
name = "WebhookRequest1732297948190";
5+
6+
public async up(queryRunner: QueryRunner): Promise<void> {
7+
await queryRunner.query(`
8+
CREATE TABLE "webhook_request" (
9+
"id" character varying NOT NULL,
10+
"clientId" integer NOT NULL,
11+
"url" character varying NOT NULL,
12+
"filter" character varying NOT NULL,
13+
"createdAt" TIMESTAMP NOT NULL DEFAULT now(),
14+
CONSTRAINT "UK_webhook_request_clientId_filter" UNIQUE ("clientId", "filter"),
15+
CONSTRAINT "PK_67a7784045de2d1b7139b611b93" PRIMARY KEY ("id")
16+
)`);
17+
}
18+
19+
public async down(queryRunner: QueryRunner): Promise<void> {
20+
await queryRunner.query(`DROP TABLE "webhook_request"`);
21+
}
22+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import { MigrationInterface, QueryRunner } from "typeorm";
2+
3+
export class WebhookRequest1732310112989 implements MigrationInterface {
4+
name = "WebhookRequest1732310112989";
5+
6+
public async up(queryRunner: QueryRunner): Promise<void> {
7+
await queryRunner.query(
8+
`CREATE INDEX "IX_webhook_request_filter" ON "webhook_request" ("filter") `,
9+
);
10+
}
11+
12+
public async down(queryRunner: QueryRunner): Promise<void> {
13+
await queryRunner.query(`DROP INDEX "public"."IX_webhook_request_filter"`);
14+
}
15+
}

packages/indexer/src/data-indexing/service/AcrossIndexerManager.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { Logger } from "winston";
22

33
import { DataSource } from "@repo/indexer-database";
4+
import { eventProcessorManager } from "@repo/webhooks";
45

56
import { Config } from "../../parseEnv";
67
import { HubPoolRepository } from "../../database/HubPoolRepository";
@@ -39,6 +40,7 @@ export class AcrossIndexerManager {
3940
private spokePoolRepository: SpokePoolRepository,
4041
private redisCache: RedisCache,
4142
private indexerQueuesService: IndexerQueuesService,
43+
private webhookWriteFn?: eventProcessorManager.WebhookWriteFn,
4244
) {}
4345

4446
public async start() {
@@ -93,7 +95,12 @@ export class AcrossIndexerManager {
9395
this.hubPoolClientFactory,
9496
this.spokePoolClientFactory,
9597
this.spokePoolRepository,
96-
new SpokePoolProcessor(this.postgres, this.logger, chainId),
98+
new SpokePoolProcessor(
99+
this.postgres,
100+
this.logger,
101+
chainId,
102+
this.webhookWriteFn,
103+
),
97104
this.indexerQueuesService,
98105
);
99106
const spokePoolIndexer = new Indexer(

packages/indexer/src/main.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,10 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) {
5757
// Call write to kick off webhook calls
5858
const { write } = WebhookFactory(
5959
{
60-
requireApiKey: false,
6160
enabledWebhooks: [WebhookTypes.DepositStatus],
61+
enabledWebhookRequestWorkers: true,
6262
},
63-
{ postgres, logger },
63+
{ postgres, logger, redis },
6464
);
6565
// Retry providers factory
6666
const retryProvidersFactory = new RetryProvidersFactory(
@@ -96,6 +96,7 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) {
9696
new SpokePoolRepository(postgres, logger),
9797
redisCache,
9898
indexerQueuesService,
99+
write,
99100
);
100101
const bundleServicesManager = new BundleServicesManager(
101102
config,

0 commit comments

Comments
 (0)