Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(store): align protocol implementation to use BaseProtocolSDK #2019

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 63 additions & 20 deletions packages/sdk/src/protocols/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
PageDirection,
type ProtocolCreateOptions
} from "@waku/interfaces";
import { messageHashStr } from "@waku/message-hash";
import { ensurePubsubTopicIsConfigured, isDefined, Logger } from "@waku/utils";
import { concat } from "@waku/utils/bytes";

Expand All @@ -18,16 +19,15 @@ import { BaseProtocolSDK } from "./base_protocol.js";

export const DefaultPageSize = 10;

const DEFAULT_NUM_PEERS = 1;
const DEFAULT_NUM_PEERS = 3;

const log = new Logger("waku:store:protocol");

export class StoreSDK extends BaseProtocolSDK implements IStoreSDK {
public readonly protocol: StoreCore;

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
// TODO: options.numPeersToUse is disregarded: https://github.com/waku-org/js-waku/issues/1685
super({ numPeersToUse: DEFAULT_NUM_PEERS });
super({ numPeersToUse: options?.numPeersToUse ?? DEFAULT_NUM_PEERS });

this.protocol = new StoreCore(libp2p, options);
}
Expand All @@ -52,7 +52,7 @@ export class StoreSDK extends BaseProtocolSDK implements IStoreSDK {
* @throws If no decoders are provided.
* @throws If no decoders are found for the provided pubsub topic.
*/
async *queryGenerator<T extends IDecodedMessage>(
public async *queryGenerator<T extends IDecodedMessage>(
decoders: IDecoder<T>[],
options?: waku_store.QueryOptions
): AsyncGenerator<Promise<T | undefined>[]> {
Expand All @@ -65,23 +65,45 @@ export class StoreSDK extends BaseProtocolSDK implements IStoreSDK {
options
);

const peer = (
await this.protocol.getPeers({
numPeers: this.numPeers,
maxBootstrapPeers: 1
})
)[0];
const peers = await this.protocol.getPeers({
numPeers: this.numPeers,
maxBootstrapPeers: 1
});

if (!peer) throw new Error("No peers available to query");
if (peers.length === 0) {
throw new Error("No peers available to query");
}

const responseGenerator = this.protocol.queryPerPage(
queryOpts,
decodersAsMap,
peer
const peerGenerators = peers.map((peer) =>
this.protocol.queryPerPage(queryOpts, decodersAsMap, peer)
);

for await (const messages of responseGenerator) {
yield messages;
const seenHashes = new Set<string>();

while (peerGenerators.length > 0) {
const peerPages = await Promise.all(
peerGenerators.map((generator) => generator.next())
);

const uniqueMessagesForPage: Promise<T | undefined>[] = [];

for (const peerPage of peerPages) {
if (peerPage.done) {
peerGenerators.splice(peerGenerators.indexOf(peerPage.value), 1);
continue;
}

const pageMessages = await this.getUniqueMessages(
peerPage.value,
pubsubTopic,
seenHashes
);
uniqueMessagesForPage.push(...pageMessages);
}

if (uniqueMessagesForPage.length > 0) {
yield uniqueMessagesForPage;
}
}
}

Expand All @@ -102,7 +124,7 @@ export class StoreSDK extends BaseProtocolSDK implements IStoreSDK {
* or if an error is encountered when processing the reply,
* or if two decoders with the same content topic are passed.
*/
async queryWithOrderedCallback<T extends IDecodedMessage>(
public async queryWithOrderedCallback<T extends IDecodedMessage>(
decoders: IDecoder<T>[],
callback: (message: T) => Promise<void | boolean> | boolean | void,
options?: waku_store.QueryOptions
Expand All @@ -129,7 +151,7 @@ export class StoreSDK extends BaseProtocolSDK implements IStoreSDK {
* or if an error is encountered when processing the reply,
* or if two decoders with the same content topic are passed.
*/
async queryWithPromiseCallback<T extends IDecodedMessage>(
public async queryWithPromiseCallback<T extends IDecodedMessage>(
decoders: IDecoder<T>[],
callback: (
message: Promise<T | undefined>
Expand All @@ -148,7 +170,7 @@ export class StoreSDK extends BaseProtocolSDK implements IStoreSDK {
}
}

createCursor(message: IDecodedMessage): Cursor {
public createCursor(message: IDecodedMessage): Cursor {
if (
!message ||
!message.timestamp ||
Expand Down Expand Up @@ -270,6 +292,27 @@ export class StoreSDK extends BaseProtocolSDK implements IStoreSDK {
return queryOpts;
}

private async getUniqueMessages<T extends IDecodedMessage>(
page: Promise<T | undefined>[],
pubsubTopic: string,
seenHashes: Set<string>
): Promise<Promise<T | undefined>[]> {
const uniqueMessages: Promise<T | undefined>[] = [];

for (const msgPromise of page) {
const message = await msgPromise;
if (message) {
const hash = messageHashStr(pubsubTopic, message);
if (!seenHashes.has(hash)) {
seenHashes.add(hash);
uniqueMessages.push(Promise.resolve(message));
}
}
}

return uniqueMessages;
}

/**
* Processes messages based on the provided callback and options.
* @private
Expand Down
7 changes: 6 additions & 1 deletion packages/tests/src/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ export class ServiceNodesFleet {
message: MessageRpcQuery,
pubsubTopic: string = DefaultPubsubTopic
): Promise<boolean> {
message = {
...message,
timestamp:
message.timestamp || BigInt(new Date().valueOf()) * BigInt(1_000_000)
};
const relayMessagePromises: Promise<boolean>[] = this.nodes.map((node) =>
node.sendMessage(message, pubsubTopic)
);
Expand Down Expand Up @@ -141,7 +146,7 @@ class MultipleNodesMessageCollector {
callback: (msg: DecodedMessage) => void = () => {};
messageList: Array<DecodedMessage> = [];
constructor(
private messageCollectors: MessageCollector[],
public messageCollectors: MessageCollector[],
private relayNodes?: ServiceNode[],
private strictChecking: boolean = false
) {
Expand Down
57 changes: 32 additions & 25 deletions packages/tests/tests/store/cursor.node.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,34 @@ import { expect } from "chai";
import {
afterEachCustom,
beforeEachCustom,
ServiceNode,
ServiceNodesFleet,
tearDownNodes
} from "../../src/index.js";
} from "../../src";

import {
runStoreNodes,
sendMessages,
startAndConnectLightNode,
TestDecoder,
TestDecoder2,
TestShardInfo,
totalMsgs
} from "./utils.js";
} from "./single_node/utils";
import {
runMultipleNodes,
sendMessagesToMultipleNodes,
startAndConnectLightNodeWithMultipleServiceNodes
} from "./utils";

describe("Waku Store, cursor", function () {
describe("Waku Store: Multiple Nodes: cursor", function () {
this.timeout(15000);
let waku: LightNode;
let waku2: LightNode;
let nwaku: ServiceNode;
let serviceNodesFleet: ServiceNodesFleet;

beforeEachCustom(this, async () => {
[nwaku, waku] = await runStoreNodes(this.ctx, TestShardInfo);
[serviceNodesFleet, waku] = await runMultipleNodes(this.ctx, TestShardInfo);
});

afterEachCustom(this, async () => {
await tearDownNodes(nwaku, [waku, waku2]);
await tearDownNodes(serviceNodesFleet.nodes, [waku, waku2]);
});

[
Expand All @@ -43,8 +45,8 @@ describe("Waku Store, cursor", function () {
[110, 120]
].forEach(([cursorIndex, messageCount]) => {
it(`Passing a valid cursor at ${cursorIndex} index when there are ${messageCount} messages`, async function () {
await sendMessages(
nwaku,
await sendMessagesToMultipleNodes(
serviceNodesFleet.nodes,
messageCount,
TestDecoder.contentTopic,
TestDecoder.pubsubTopic
Expand Down Expand Up @@ -91,13 +93,16 @@ describe("Waku Store, cursor", function () {
});

it("Reusing cursor across nodes", async function () {
await sendMessages(
nwaku,
await sendMessagesToMultipleNodes(
serviceNodesFleet.nodes,
totalMsgs,
TestDecoder.contentTopic,
TestDecoder.pubsubTopic
);
waku2 = await startAndConnectLightNode(nwaku, TestShardInfo);
waku2 = await startAndConnectLightNodeWithMultipleServiceNodes(
serviceNodesFleet.nodes,
TestShardInfo
);

// messages in reversed order (first message at last index)
const messages: DecodedMessage[] = [];
Expand Down Expand Up @@ -133,8 +138,8 @@ describe("Waku Store, cursor", function () {
});

it("Passing cursor with wrong message digest", async function () {
await sendMessages(
nwaku,
await sendMessagesToMultipleNodes(
serviceNodesFleet.nodes,
totalMsgs,
TestDecoder.contentTopic,
TestDecoder.pubsubTopic
Expand Down Expand Up @@ -165,12 +170,14 @@ describe("Waku Store, cursor", function () {
// Should return same as go-waku. Raised bug: https://github.com/waku-org/nwaku/issues/2117
expect(messagesAfterCursor.length).to.eql(0);
} catch (error) {
if (
nwaku.type === "go-waku" &&
typeof error === "string" &&
error.includes("History response contains an Error: INVALID_CURSOR")
) {
return;
for (const node of serviceNodesFleet.nodes) {
if (
node.type === "go-waku" &&
typeof error === "string" &&
error.includes("History response contains an Error: INVALID_CURSOR")
) {
return;
}
}
throw error instanceof Error
? new Error(`Unexpected error: ${error.message}`)
Expand All @@ -179,8 +186,8 @@ describe("Waku Store, cursor", function () {
});

it("Passing cursor with wrong pubsubTopic", async function () {
await sendMessages(
nwaku,
await sendMessagesToMultipleNodes(
serviceNodesFleet.nodes,
totalMsgs,
TestDecoder.contentTopic,
TestDecoder.pubsubTopic
Expand Down
16 changes: 8 additions & 8 deletions packages/tests/tests/store/error_handling.node.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,30 @@ import { expect } from "chai";
import {
afterEachCustom,
beforeEachCustom,
ServiceNode,
ServiceNodesFleet,
tearDownNodes
} from "../../src/index.js";
} from "../../src";

import {
processQueriedMessages,
runStoreNodes,
TestContentTopic1,
TestDecoder,
TestDecoder2,
TestShardInfo
} from "./utils.js";
} from "./single_node/utils";
import { runMultipleNodes } from "./utils";

describe("Waku Store, error handling", function () {
describe("Waku Store: Multiple Peers: error handling", function () {
this.timeout(15000);
let waku: LightNode;
let nwaku: ServiceNode;
let ServiceNodesFleet: ServiceNodesFleet;

beforeEachCustom(this, async () => {
[nwaku, waku] = await runStoreNodes(this.ctx, TestShardInfo);
[ServiceNodesFleet, waku] = await runMultipleNodes(this.ctx, TestShardInfo);
});

afterEachCustom(this, async () => {
await tearDownNodes(nwaku, waku);
await tearDownNodes(ServiceNodesFleet.nodes, waku);
});

it("Query Generator, Wrong PubsubTopic", async function () {
Expand Down
Loading
Loading