-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathexample-replicator.js
327 lines (283 loc) · 9.33 KB
/
example-replicator.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
const {
Message,
isCastAddMessage,
fromFarcasterTime,
} = require("@farcaster/hub-nodejs");
const { Casts, Messages } = require("./models/farcaster");
/**
* Helper function to convert byte arrays to hex strings
*/
function bytesToHex(bytes) {
if (bytes === undefined) return undefined;
if (bytes === null) return null;
return `0x${Buffer.from(bytes).toString("hex")}`;
}
/**
* Helper function to convert Farcaster timestamp to JavaScript Date
*/
function farcasterTimeToDate(time) {
if (time === undefined) return undefined;
if (time === null) return null;
const result = fromFarcasterTime(time);
if (result.isErr()) throw result.error;
return new Date(result.value);
}
/**
* Helper function to convert byte arrays to text strings
*/
function bytesToText(bytes) {
if (bytes === undefined) return undefined;
if (bytes === null) return null;
return Buffer.from(bytes).toString("utf-8");
}
/**
* Helper function to sleep for a specified number of milliseconds
*/
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
/**
* Example replicator that processes external messages and creates cast objects
*/
class ExampleReplicator {
constructor() {
console.log("Example replicator initialized");
this.isRunning = false;
this.lastProcessedTimestamp = Date.now();
}
/**
* Process an external message in raw hex format
* @param {string} rawMessageHex - Raw message in hex format (with 0x prefix)
* @param {Object} overrides - Optional overrides for message processing
* @returns {Promise<Object>} - Result of processing
*/
async processExternalMessage(rawMessageHex, overrides = {}) {
try {
// Check format and remove 0x prefix if present
if (!rawMessageHex.startsWith("0x")) {
throw new Error("Raw message must start with 0x");
}
const messageBytes = Buffer.from(rawMessageHex.slice(2), "hex");
// Decode the message
const messageObject = Message.decode(messageBytes);
// Store the raw message for future reference
await this.storeRawMessage(messageObject, rawMessageHex, overrides);
// Process based on message type
if (isCastAddMessage(messageObject)) {
return await this.processCastAdd(messageObject, true, overrides);
} else {
console.log(`Unsupported message type: ${messageObject.data.type}`);
return { success: false, error: "Unsupported message type" };
}
} catch (error) {
console.error(`Error processing external message: ${error}`);
return { success: false, error: error.message };
}
}
/**
* Store raw message in Messages collection for future reference
*/
async storeRawMessage(messageObject, rawMessageHex, overrides = {}) {
const hash = bytesToHex(messageObject.hash);
// Check if message already exists
const existingMessage = await Messages.findOne({ hash });
if (existingMessage) {
console.log(`Message ${hash} already exists, skipping storage`);
return;
}
// Store the message with external flag
await Messages.create({
hash,
timestamp: farcasterTimeToDate(messageObject.data.timestamp),
raw: rawMessageHex,
fid: messageObject.data.fid,
external: true,
unindexed: false,
bodyOverrides: overrides,
});
console.log(`Stored raw message with hash ${hash}`);
}
/**
* Process a cast add message
* @param {Object} message - The cast message
* @param {boolean} external - Whether this is an external message
* @param {Object} overrides - Optional overrides for message processing
* @returns {Promise<Object>} - The created cast
*/
async processCastAdd(message, external = false, overrides = {}) {
try {
// Extract parent info from message or overrides
const parentFid =
overrides.parentCastId?.fid ||
message.data.castAddBody.parentCastId?.fid;
const parentHash = bytesToHex(
message.data.castAddBody.parentCastId?.hash
);
// Process embeds
const embeds = JSON.stringify(
message.data.castAddBody.embeds ||
message.data.castAddBody.embedsDeprecated?.map((x) => ({
url: x,
})) ||
[]
);
// Create the cast entry
const castEntry = {
timestamp: farcasterTimeToDate(message.data.timestamp),
fid: message.data.fid,
text: message.data.castAddBody.text || "",
hash: bytesToHex(message.hash),
parentHash,
parentFid,
parentUrl: message.data.castAddBody.parentUrl,
embeds,
mentions: message.data.castAddBody.mentions,
mentionsPositions: message.data.castAddBody.mentionsPositions,
threadHash: null, // Will be set later
external,
};
// Check if cast already exists
const existingCast = await Casts.findOne({
hash: castEntry.hash,
});
if (existingCast) {
console.log(`Cast ${castEntry.hash} already exists, skipping creation`);
return { success: true, cast: existingCast };
}
// Create new cast
const newCast = new Casts(castEntry);
// Determine thread hash (simplified version)
newCast.threadHash = parentHash || castEntry.hash;
// Save the cast
await newCast.save();
console.log(`Created cast with hash ${newCast.hash}`);
return { success: true, cast: newCast };
} catch (error) {
console.error(`Error processing cast add: ${error}`);
return { success: false, error: error.message };
}
}
/**
* Check for new unprocessed messages
* @returns {Promise<Array>} - Array of processed message results
*/
async checkForNewMessages() {
try {
// Find unprocessed messages newer than the last processed timestamp
const unprocessedMessages = await Messages.find({
unindexed: true,
timestamp: { $gt: new Date(this.lastProcessedTimestamp) },
}).limit(10);
if (unprocessedMessages.length > 0) {
console.log(
`Found ${unprocessedMessages.length} new messages to process`
);
// Process each message and collect results
const results = await Promise.all(
unprocessedMessages.map(async (message) => {
// Mark as processed
await Messages.updateOne(
{ _id: message._id },
{ $set: { unindexed: false } }
);
// Process the message if it has raw data
if (message.raw) {
const messageObject = Message.decode(
Buffer.from(message.raw.slice(2), "hex")
);
// If external message, set FID from the message
if (message.external) {
messageObject.data.fid = message.fid;
}
// Process based on message type
if (isCastAddMessage(messageObject)) {
return await this.processCastAdd(
messageObject,
message.external,
message.bodyOverrides || {}
);
} else {
console.log(`Skipping unsupported message type`);
return { success: false, message: "Unsupported message type" };
}
}
return { success: false, message: "No raw data in message" };
})
);
// Update last processed timestamp
this.lastProcessedTimestamp = Date.now();
return results;
} else {
// No new messages
return [];
}
} catch (error) {
console.error(`Error checking for new messages: ${error}`);
return [{ success: false, error: error.message }];
}
}
/**
* Start the replicator processing loop
*/
async start() {
if (this.isRunning) {
console.log("Replicator is already running");
return;
}
this.isRunning = true;
console.log("Starting example replicator loop");
// Main processing loop
while (this.isRunning) {
try {
// Check for and process new messages
await this.checkForNewMessages();
// Sleep for 200ms before next check
await sleep(200);
} catch (error) {
console.error(`Error in replicator loop: ${error}`);
// Continue the loop even after errors
}
}
}
/**
* Stop the replicator processing loop
*/
stop() {
console.log("Stopping example replicator");
this.isRunning = false;
}
}
/**
* Main function to run the example replicator
*/
async function main() {
try {
console.log("Starting example replicator...");
// Create a new replicator instance
const replicator = new ExampleReplicator();
// Handle graceful shutdown
process.on("SIGINT", () => {
console.log("Received SIGINT. Shutting down gracefully...");
replicator.stop();
process.exit(0);
});
process.on("SIGTERM", () => {
console.log("Received SIGTERM. Shutting down gracefully...");
replicator.stop();
process.exit(0);
});
// Start the replicator loop
await replicator.start();
} catch (error) {
console.error(`Fatal error: ${error}`);
process.exit(1);
}
}
// Run the main function when this script is executed directly
if (require.main === module) {
main().catch((error) => {
console.error(`Unhandled error in main: ${error}`);
process.exit(1);
});
}
module.exports = ExampleReplicator;