|
1 | | -import * as fs from 'fs'; |
2 | 1 | import { |
3 | 2 | ChainhookEventObserver, |
4 | 3 | ChainhookNodeOptions, |
| 4 | + EventObserverOptions, |
| 5 | + EventObserverPredicate, |
5 | 6 | Payload, |
6 | | - ServerOptions, |
7 | | - ServerPredicate, |
8 | 7 | StacksPayload, |
9 | 8 | } from '@hirosystems/chainhook-client'; |
10 | 9 | import { PgStore } from '../pg/pg-store'; |
11 | 10 | import { ENV } from '../env'; |
12 | 11 | import { logger } from '@hirosystems/api-toolkit'; |
13 | | -import { randomUUID } from 'node:crypto'; |
14 | | - |
15 | | -export function getPersistedPredicateFromDisk(): ServerPredicate | undefined { |
16 | | - const predicatePath = `${ENV.CHAINHOOK_PREDICATE_PATH}/predicate.json`; |
17 | | - try { |
18 | | - if (!fs.existsSync(predicatePath)) { |
19 | | - return; |
20 | | - } |
21 | | - const fileData = fs.readFileSync(predicatePath, 'utf-8'); |
22 | | - return JSON.parse(fileData) as ServerPredicate; |
23 | | - } catch (error) { |
24 | | - logger.error(error, `ChainhookServer unable to get persisted predicate`); |
25 | | - } |
26 | | -} |
27 | | - |
28 | | -export function persistPredicateToDisk(predicate: ServerPredicate) { |
29 | | - const predicatePath = `${ENV.CHAINHOOK_PREDICATE_PATH}/predicate.json`; |
30 | | - try { |
31 | | - fs.mkdirSync(ENV.CHAINHOOK_PREDICATE_PATH, { recursive: true }); |
32 | | - fs.writeFileSync(predicatePath, JSON.stringify(predicate, null, 2)); |
33 | | - } catch (error) { |
34 | | - logger.error(error, `ChainhookServer unable to persist predicate to disk`); |
35 | | - } |
36 | | -} |
37 | 12 |
|
38 | 13 | export async function startChainhookServer(args: { db: PgStore }): Promise<ChainhookEventObserver> { |
39 | 14 | const blockHeight = await args.db.getChainTipBlockHeight(); |
40 | 15 | logger.info(`ChainhookServer is at block ${blockHeight}`); |
41 | 16 |
|
42 | | - const predicates: ServerPredicate[] = []; |
| 17 | + const predicates: EventObserverPredicate[] = []; |
43 | 18 | if (ENV.CHAINHOOK_AUTO_PREDICATE_REGISTRATION) { |
44 | | - const existingPredicate = getPersistedPredicateFromDisk(); |
45 | | - if (existingPredicate) { |
46 | | - logger.info( |
47 | | - `ChainhookServer will attempt to resume existing predicate ${existingPredicate.uuid}` |
48 | | - ); |
49 | | - } |
50 | 19 | const header = { |
51 | | - uuid: existingPredicate?.uuid ?? randomUUID(), |
52 | | - name: 'block', |
| 20 | + name: 'metadata-api-blocks', |
53 | 21 | version: 1, |
54 | 22 | chain: 'stacks', |
55 | 23 | }; |
@@ -87,38 +55,33 @@ export async function startChainhookServer(args: { db: PgStore }): Promise<Chain |
87 | 55 | } |
88 | 56 | } |
89 | 57 |
|
90 | | - const opts: ServerOptions = { |
| 58 | + const observer: EventObserverOptions = { |
91 | 59 | hostname: ENV.API_HOST, |
92 | 60 | port: ENV.EVENT_PORT, |
93 | 61 | auth_token: ENV.CHAINHOOK_NODE_AUTH_TOKEN, |
94 | 62 | external_base_url: `http://${ENV.EXTERNAL_HOSTNAME}`, |
95 | 63 | wait_for_chainhook_node: ENV.CHAINHOOK_AUTO_PREDICATE_REGISTRATION, |
96 | 64 | validate_chainhook_payloads: false, |
97 | 65 | body_limit: ENV.EVENT_SERVER_BODY_LIMIT, |
| 66 | + predicate_disk_file_path: ENV.CHAINHOOK_PREDICATE_PATH, |
| 67 | + predicate_health_check_interval_ms: 300_000, |
98 | 68 | node_type: 'chainhook', |
99 | 69 | }; |
100 | 70 | const chainhook: ChainhookNodeOptions = { |
101 | 71 | base_url: `http://${ENV.CHAINHOOK_NODE_RPC_HOST}:${ENV.CHAINHOOK_NODE_RPC_PORT}`, |
102 | 72 | }; |
103 | | - const server = new ChainhookEventObserver(opts, chainhook); |
104 | | - await server.start(predicates, async (uuid: string, payload: Payload) => { |
| 73 | + const server = new ChainhookEventObserver(observer, chainhook); |
| 74 | + await server.start(predicates, async (payload: Payload) => { |
105 | 75 | logger.info( |
106 | 76 | `ChainhookServer received ${ |
107 | 77 | payload.chainhook.is_streaming_blocks ? 'streamed' : 'replay' |
108 | | - } payload from predicate ${uuid}` |
| 78 | + } payload from predicate ${payload.chainhook.uuid}` |
109 | 79 | ); |
110 | 80 | await args.db.chainhook.processPayload(payload as StacksPayload); |
111 | 81 | }); |
112 | | - if (predicates.length) persistPredicateToDisk(predicates[0]); |
113 | 82 | return server; |
114 | 83 | } |
115 | 84 |
|
116 | 85 | export async function closeChainhookServer(server: ChainhookEventObserver) { |
117 | | - try { |
118 | | - const predicatePath = `${ENV.CHAINHOOK_PREDICATE_PATH}/predicate.json`; |
119 | | - if (fs.existsSync(predicatePath)) fs.rmSync(predicatePath); |
120 | | - } catch (error) { |
121 | | - logger.error(error, `ChainhookServer unable to delete persisted predicate`); |
122 | | - } |
123 | 86 | await server.close(); |
124 | 87 | } |
0 commit comments