Skip to content

Commit

Permalink
feat: add the ability to provide ip addresses to sync health job (#2324)
Browse files Browse the repository at this point in the history
We want to enable hubs to provide ip addresses to the sync health job in
order to bypass the contact info lookup required when peer ids are
provided.

## Merge Checklist

_Choose all relevant options below by adding an `x` now or at any time
before submitting for review_

- [x] PR title adheres to the [conventional
commits](https://www.conventionalcommits.org/en/v1.0.0/) standard
- [x] PR has a
[changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets)
- [x] PR has been tagged with a change label(s) (i.e. documentation,
feature, bugfix, or chore)
- [ ] PR includes
[documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs)
if necessary.

<!-- start pr-codex -->

---

## PR-Codex overview
This PR focuses on enhancing the `Hub` and
`MeasureSyncHealthJobScheduler` classes to support synchronization
health monitoring with added capabilities for peer identification and
RPC client retrieval.

### Detailed summary
- Added `performedFirstSync` property to `MockHub` and `Hub` classes.
- Introduced `getHubRpcClient` method in `Hub` and `MockHub`.
- Updated `MeasureSyncHealthJobScheduler` to use `PeerIdentifier` type.
- Created `getRpcClient` method for handling different peer types.
- Modified job execution logic to check for `performedFirstSync`.

> ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your
question}`

<!-- end pr-codex -->
  • Loading branch information
aditiharini authored Sep 24, 2024
1 parent 39be986 commit 4009400
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 20 deletions.
5 changes: 5 additions & 0 deletions .changeset/real-dodos-lay.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@farcaster/hubble": patch
---

feat: add the ability to provide peers ip address/port to sync health job
7 changes: 5 additions & 2 deletions apps/hubble/src/hubble.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ const ALLOWED_CLOCK_SKEW_SECONDS = 60 * 10; // 10 minutes
export interface HubInterface {
engine: Engine;
identity: string;
performedFirstSync: boolean;
hubOperatorFid?: number;
submitMessage(message: Message, source?: HubSubmitSource): HubAsyncResult<number>;
submitMessageBundle(
Expand All @@ -156,6 +157,7 @@ export interface HubInterface {
getHubState(): HubAsyncResult<HubState>;
putHubState(hubState: HubState): HubAsyncResult<void>;
gossipContactInfo(): HubAsyncResult<void>;
getHubRpcClient(address: string, options?: Partial<ClientOptions>): Promise<HubRpcClient | undefined>;
getRPCClientForPeer(
peerId: PeerId,
peer: ContactInfoContentBody,
Expand Down Expand Up @@ -379,7 +381,6 @@ export class Hub implements HubInterface {
private allowlistedImmunePeers: string[] | undefined;
private strictContactInfoValidation: boolean;
private strictNoSign: boolean;
private performedFirstSync = false;

private pruneMessagesJobScheduler: PruneMessagesJobScheduler;
private periodSyncJobScheduler: PeriodicSyncJobScheduler;
Expand All @@ -398,6 +399,7 @@ export class Hub implements HubInterface {
engine: Engine;
fNameRegistryEventsProvider: FNameRegistryEventsProvider;
l2RegistryProvider: L2EventsProvider;
performedFirstSync = false;

constructor(options: HubOptions) {
this.options = options;
Expand Down Expand Up @@ -1678,6 +1680,7 @@ export class Hub implements HubInterface {
const result = this.syncEngine.addContactInfoForPeerId(peerId, message, CONTACT_INFO_UPDATE_THRESHOLD_MS);
if (result.isOk() && !this.performedFirstSync) {
// Sync with the first peer so we are upto date on startup.
log.info({ peerInfo: message }, "Performing first sync");
this.performedFirstSync = true;
setTimeout(async () => {
await ResultAsync.fromPromise(this.syncEngine.diffSyncIfRequired(this), (e) => e);
Expand All @@ -1697,7 +1700,7 @@ export class Hub implements HubInterface {
/** Since we don't know if the peer is using SSL or not, we'll attempt to get the SSL version,
* and fall back to the non-SSL version
*/
private async getHubRpcClient(address: string, options?: Partial<ClientOptions>): Promise<HubRpcClient> {
public async getHubRpcClient(address: string, options?: Partial<ClientOptions>): Promise<HubRpcClient> {
return new Promise((resolve) => {
try {
const sslClientResult = getSSLHubRpcClient(address, options);
Expand Down
75 changes: 57 additions & 18 deletions apps/hubble/src/network/sync/syncHealthJob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,21 @@ const log = logger.child({

type SchedulerStatus = "started" | "stopped";

enum PeerIdentifierKind {
PeerId = 0,
AddrInfo = 1,
}

type PeerIdentifier = { kind: PeerIdentifierKind; identifier: string };

export class MeasureSyncHealthJobScheduler {
private _cronTask?: cron.ScheduledTask;
private _metadataRetriever: SyncEngineMetadataRetriever;
// Start at 65 minutes ago and take a 60 minute span
private _startSecondsAgo = 60 * 65;
private _spanSeconds = 60 * 60;
private _hub: HubInterface;
private _peersInScope: string[];
private _peersInScope: PeerIdentifier[];

constructor(syncEngine: SyncEngine, hub: HubInterface) {
this._metadataRetriever = new SyncEngineMetadataRetriever(hub, syncEngine);
Expand All @@ -48,19 +55,33 @@ export class MeasureSyncHealthJobScheduler {
return this._cronTask ? "started" : "stopped";
}

peerAddrInfosInScope() {}

peersInScope() {
const peers = process.env["SYNC_HEALTH_PEER_IDS"]?.split(",") ?? [];
const peerIds = process.env["SYNC_HEALTH_PEER_IDS"]?.split(",") ?? [];

for (const multiaddr of this._hub.bootstrapAddrs()) {
const peerId = multiaddr.getPeerId();
if (!peerId) {
log.info({ multiaddr }, "Couldn't get peerid for multiaddr");
} else {
peers.push(peerId);
peerIds.push(peerId);
}
}

return peers;
const peerIdentifiers = peerIds.map((peerId) => {
return {
kind: PeerIdentifierKind.PeerId,
identifier: peerId,
};
});

const addrInfos =
process.env["SYNC_HEALTH_ADDR_INFOS"]?.split(",").map((addrInfo) => {
return { kind: PeerIdentifierKind.AddrInfo, identifier: addrInfo };
}) ?? [];

return [...peerIdentifiers, ...addrInfos];
}

unixTimestampFromMessage(message: Message) {
Expand Down Expand Up @@ -139,24 +160,37 @@ export class MeasureSyncHealthJobScheduler {
return { numSuccesses, numErrors, numAlreadyMerged };
}

async getRpcClient(peer: PeerIdentifier) {
if (peer.kind === PeerIdentifierKind.PeerId) {
const contactInfo = this._metadataRetriever._syncEngine.getContactInfoForPeerId(peer.identifier);

if (!contactInfo) {
log.info({ peerId: peer.identifier }, "Couldn't get contact info for peer");
return undefined;
}

return this._hub.getRPCClientForPeer(peerIdFromString(peer.identifier), contactInfo.contactInfo);
} else {
return this._hub.getHubRpcClient(peer.identifier);
}
}

async doJobs() {
if (!this._hub.performedFirstSync) {
log.info("Skipping SyncHealth job because we haven't performed our first sync yet");
return;
}

log.info({}, "Starting compute SyncHealth job");

const startTime = Date.now() - this._startSecondsAgo * 1000;
const stopTime = startTime + this._spanSeconds * 1000;

for (const peerId of this._peersInScope) {
const contactInfo = this._metadataRetriever._syncEngine.getContactInfoForPeerId(peerId);

if (!contactInfo) {
log.info({ peerId }, "Couldn't get contact info, skipping peer");
continue;
}

const rpcClient = await this._hub.getRPCClientForPeer(peerIdFromString(peerId), contactInfo.contactInfo);
for (const peer of this._peersInScope) {
const rpcClient = await this.getRpcClient(peer);

if (rpcClient === undefined) {
log.info({ peerId, contactInfo }, "Couldn't get rpc client, skipping peer");
log.info({ peerId: peer.identifier }, "Couldn't get rpc client, skipping peer");
continue;
}

Expand All @@ -171,7 +205,7 @@ export class MeasureSyncHealthJobScheduler {

if (syncHealthMessageStats.isErr()) {
log.info(
{ peerId, err: syncHealthMessageStats.error, contactInfo },
{ peerId: peer.identifier, err: syncHealthMessageStats.error },
`Error computing SyncHealth: ${syncHealthMessageStats.error}`,
);
continue;
Expand All @@ -185,13 +219,18 @@ export class MeasureSyncHealthJobScheduler {

if (resultsPushingToUs.isErr()) {
log.info(
{ peerId, err: resultsPushingToUs.error },
{ peerId: peer.identifier, err: resultsPushingToUs.error },
`Error pushing new messages to ourself ${resultsPushingToUs.error}`,
);
continue;
}

const processedResults = await this.processSumbitResults(resultsPushingToUs.value, peerId, startTime, stopTime);
const processedResults = await this.processSumbitResults(
resultsPushingToUs.value,
peer.identifier,
startTime,
stopTime,
);

log.info(
{
Expand All @@ -200,7 +239,7 @@ export class MeasureSyncHealthJobScheduler {
syncHealth: syncHealthMessageStats.value.computeDiff(),
syncHealthPercentage: syncHealthMessageStats.value.computeDiffPercentage(),
resultsPushingToUs: processedResults,
peerId,
peerId: peer.identifier,
startTime,
stopTime,
},
Expand Down
6 changes: 6 additions & 0 deletions apps/hubble/src/test/mocks.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
ClientOptions,
FarcasterNetwork,
HubAsyncResult,
HubError,
Expand Down Expand Up @@ -36,6 +37,7 @@ export class MockHub implements HubInterface {
public engine: Engine;
public gossipNode: GossipNode | undefined;
public gossipCount = 0;
public performedFirstSync = false;

constructor(db: RocksDB, engine?: Engine, gossipNode?: GossipNode) {
this.db = db;
Expand Down Expand Up @@ -110,6 +112,10 @@ export class MockHub implements HubInterface {
return undefined;
}

async getHubRpcClient(_address: string, _options?: Partial<ClientOptions>): Promise<HubRpcClient | undefined> {
return undefined;
}

async updateApplicationPeerScore(_peerId: String, _score: number) {
return ok(undefined);
}
Expand Down

0 comments on commit 4009400

Please sign in to comment.