Skip to content

Commit

Permalink
feat: pull missing messages into sync trie via sync health job (#2326)
Browse files Browse the repository at this point in the history
There are cases where messages exist in the db but are missing from the
trie. This happens primarily around restarts. When the sync health job
sees "message has already been merged" it's an indication that the
message exists in the db but not in the trie.

Deployed to hoyt for testing and it looks good. 

## Merge Checklist

_Choose all relevant options below by adding an `x` now or at any time
before submitting for review_

- [x] PR title adheres to the [conventional
commits](https://www.conventionalcommits.org/en/v1.0.0/) standard
- [x] PR has a
[changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets)
- [x] PR has been tagged with a change label(s) (i.e. documentation,
feature, bugfix, or chore)
- [ ] PR includes
[documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs)
if necessary.

<!-- start pr-codex -->

---

## PR-Codex overview
This PR adds missing messages to the sync trie via a sync health job. 

### Detailed summary
- Added missing messages to sync trie
- Updated `processSumbitResults` to handle already merged messages
- Added `numAlreadyMerged` counter
- Updated logging for merged messages

> ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your
question}`

<!-- end pr-codex -->
  • Loading branch information
aditiharini authored Sep 20, 2024
1 parent 3b1d12f commit 5504e05
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 18 deletions.
5 changes: 5 additions & 0 deletions .changeset/fast-icons-admire.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@farcaster/hubble": patch
---

feat: add missing messages to sync trie via sync health job
51 changes: 33 additions & 18 deletions apps/hubble/src/network/sync/syncHealthJob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { peerIdFromString } from "@libp2p/peer-id";
import { bytesToHexString, fromFarcasterTime, Message, UserDataType } from "@farcaster/hub-nodejs";
import { Result } from "neverthrow";
import { SubmitError } from "../../utils/syncHealth.js";
import { SyncId } from "./syncId.js";

const log = logger.child({
component: "SyncHealth",
Expand Down Expand Up @@ -75,9 +76,15 @@ export class MeasureSyncHealthJobScheduler {
return undefined;
}

processSumbitResults(results: Result<Message, SubmitError>[], peerId: string, startTime: number, stopTime: number) {
async processSumbitResults(
results: Result<Message, SubmitError>[],
peerId: string,
startTime: number,
stopTime: number,
) {
let numSuccesses = 0;
let numErrors = 0;
let numAlreadyMerged = 0;
for (const result of results) {
if (result.isOk()) {
const hashString = bytesToHexString(result.value.hash);
Expand Down Expand Up @@ -105,25 +112,31 @@ export class MeasureSyncHealthJobScheduler {
} else {
const hashString = bytesToHexString(result.error.originalMessage.hash);
const hash = hashString.isOk() ? hashString.value : "unable to show hash";
log.info(
{
errMessage: result.error.hubError.message,
peerId,
startTime,
stopTime,
msgDetails: {
fid: result.error.originalMessage.data?.fid,
timestamp: this.unixTimestampFromMessage(result.error.originalMessage),
hash,
},
},
"Failed to submit message via SyncHealth",
);

numErrors += 1;
const logTags = {
errMessage: result.error.hubError.message,
peerId,
startTime,
stopTime,
msgDetails: {
fid: result.error.originalMessage.data?.fid,
timestamp: this.unixTimestampFromMessage(result.error.originalMessage),
hash,
},
};
if (result.error.hubError.errCode === "bad_request.duplicate") {
// This message has already been merged into the DB, but for some reason is not in the Trie.
// Just update the trie.
await this._metadataRetriever._syncEngine.trie.insert(SyncId.fromMessage(result.error.originalMessage));
log.info(logTags, "Merged missing message into sync trie via SyncHealth");
numAlreadyMerged += 1;
} else {
log.info(logTags, "Failed to submit message via SyncHealth");
numErrors += 1;
}
}
}
return { numSuccesses, numErrors };
return { numSuccesses, numErrors, numAlreadyMerged };
}

async doJobs() {
Expand Down Expand Up @@ -178,13 +191,15 @@ export class MeasureSyncHealthJobScheduler {
continue;
}

const processedResults = await this.processSumbitResults(resultsPushingToUs.value, peerId, startTime, stopTime);

log.info(
{
ourNumMessages: syncHealthMessageStats.value.primaryNumMessages,
theirNumMessages: syncHealthMessageStats.value.peerNumMessages,
syncHealth: syncHealthMessageStats.value.computeDiff(),
syncHealthPercentage: syncHealthMessageStats.value.computeDiffPercentage(),
resultsPushingToUs: this.processSumbitResults(resultsPushingToUs.value, peerId, startTime, stopTime),
resultsPushingToUs: processedResults,
peerId,
startTime,
stopTime,
Expand Down

0 comments on commit 5504e05

Please sign in to comment.