diff --git a/tests/wakunode2/test_validators.nim b/tests/wakunode2/test_validators.nim index 5c30d14667..058d2bedf4 100644 --- a/tests/wakunode2/test_validators.nim +++ b/tests/wakunode2/test_validators.nim @@ -31,14 +31,14 @@ suite "WakuNode2 - Validators": newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) ) - # Protected topic and key to sign - let spamProtectedTopic = PubSubTopic("some-spam-protected-topic") + # Protected shard and key to sign + let spamProtectedShard = RelayShard(clusterId: 0, shardId: 7) let secretKey = SkSecretKey .fromHex("5526a8990317c9b7b58d07843d270f9cd1d9aaee129294c1c478abf7261dd9e6") .expect("valid key") let publicKey = secretKey.toPublicKey() - let topicsPrivateKeys = {spamProtectedTopic: secretKey}.toTable - let topicsPublicKeys = {spamProtectedTopic: publicKey}.toTable + let shardsPrivateKeys = {spamProtectedShard: secretKey}.toTable + let shardsPublicKeys = {spamProtectedShard: publicKey}.toTable # Start all the nodes and mount relay with protected topic await allFutures(nodes.mapIt(it.start())) @@ -48,10 +48,12 @@ suite "WakuNode2 - Validators": # Add signed message validator to all nodes. They will only route signed messages for node in nodes: - var signedTopics: seq[ProtectedTopic] - for topic, publicKey in topicsPublicKeys: - signedTopics.add(ProtectedTopic(topic: topic, key: publicKey)) - node.wakuRelay.addSignedTopicsValidator(signedTopics) + var signedShards: seq[ProtectedShard] + for shard, publicKey in shardsPublicKeys: + signedShards.add(ProtectedShard(shard: shard.shardId, key: publicKey)) + node.wakuRelay.addSignedShardsValidator( + signedShards, spamProtectedShard.clusterId + ) # Connect the nodes in a full mesh for i in 0 ..< 5: @@ -72,7 +74,7 @@ suite "WakuNode2 - Validators": # Subscribe all nodes to the same topic/handler for node in nodes: - discard node.wakuRelay.subscribe(spamProtectedTopic, handler) + discard node.wakuRelay.subscribe($spamProtectedShard, handler) await sleepAsync(500.millis) # Each node publishes 10 signed messages @@ -80,7 +82,7 @@ suite "WakuNode2 - Validators": for j in 0 ..< 10: var msg = WakuMessage( payload: urandom(1 * (10 ^ 3)), - contentTopic: spamProtectedTopic, + contentTopic: spamProtectedShard, version: 2, timestamp: now(), ephemeral: true, @@ -88,9 +90,9 @@ suite "WakuNode2 - Validators": # Include signature msg.meta = - secretKey.sign(SkMessage(spamProtectedTopic.msgHash(msg))).toRaw()[0 .. 63] + secretKey.sign(SkMessage(spamProtectedShard.msgHash(msg))).toRaw()[0 .. 63] - discard await nodes[i].publish(some(spamProtectedTopic), msg) + discard await nodes[i].publish(some($spamProtectedShard), msg) # Wait for gossip await sleepAsync(2.seconds) @@ -103,7 +105,7 @@ suite "WakuNode2 - Validators": for i in 0 ..< 5: for k, v in nodes[i].wakuRelay.peerStats.mpairs: check: - v.topicInfos[spamProtectedTopic].invalidMessageDeliveries == 0.0 + v.topicInfos[spamProtectedShard].invalidMessageDeliveries == 0.0 # Stop all nodes await allFutures(nodes.mapIt(it.stop())) @@ -114,14 +116,14 @@ suite "WakuNode2 - Validators": newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) ) - # Protected topic and key to sign - let spamProtectedTopic = PubSubTopic("some-spam-protected-topic") + # Protected shard and key to sign + let spamProtectedShard = RelayShard(clusterId: 0, shardId: 7) let secretKey = SkSecretKey .fromHex("5526a8990317c9b7b58d07843d270f9cd1d9aaee129294c1c478abf7261dd9e6") .expect("valid key") let publicKey = secretKey.toPublicKey() - let topicsPrivateKeys = {spamProtectedTopic: secretKey}.toTable - let topicsPublicKeys = {spamProtectedTopic: publicKey}.toTable + let shardsPrivateKeys = {spamProtectedShard: secretKey}.toTable + let shardsPublicKeys = {spamProtectedShard: publicKey}.toTable # Non whitelisted secret key let wrongSecretKey = SkSecretKey @@ -136,10 +138,12 @@ suite "WakuNode2 - Validators": # Add signed message validator to all nodes. They will only route signed messages for node in nodes: - var signedTopics: seq[ProtectedTopic] - for topic, publicKey in topicsPublicKeys: - signedTopics.add(ProtectedTopic(topic: topic, key: publicKey)) - node.wakuRelay.addSignedTopicsValidator(signedTopics) + var signedShards: seq[ProtectedShard] + for shard, publicKey in shardsPublicKeys: + signedShards.add(ProtectedShard(shard: shard.shardId, key: publicKey)) + node.wakuRelay.addSignedShardsValidator( + signedShards, spamProtectedShard.clusterId + ) # Connect the nodes in a full mesh for i in 0 ..< 5: @@ -160,7 +164,7 @@ suite "WakuNode2 - Validators": # Subscribe all nodes to the same topic/handler for node in nodes: - discard node.wakuRelay.subscribe(spamProtectedTopic, handler) + discard node.wakuRelay.subscribe($spamProtectedShard, handler) await sleepAsync(500.millis) # Each node sends 5 messages, signed but with a non-whitelisted key (total = 25) @@ -168,42 +172,42 @@ suite "WakuNode2 - Validators": for j in 0 ..< 5: var msg = WakuMessage( payload: urandom(1 * (10 ^ 3)), - contentTopic: spamProtectedTopic, + contentTopic: spamProtectedShard, version: 2, timestamp: now(), ephemeral: true, ) # Sign the message with a wrong key - msg.meta = wrongSecretKey.sign(SkMessage(spamProtectedTopic.msgHash(msg))).toRaw()[ + msg.meta = wrongSecretKey.sign(SkMessage(spamProtectedShard.msgHash(msg))).toRaw()[ 0 .. 63 ] - discard await nodes[i].publish(some(spamProtectedTopic), msg) + discard await nodes[i].publish(some($spamProtectedShard), msg) # Each node sends 5 messages that are not signed (total = 25) for i in 0 ..< 5: for j in 0 ..< 5: let unsignedMessage = WakuMessage( payload: urandom(1 * (10 ^ 3)), - contentTopic: spamProtectedTopic, + contentTopic: spamProtectedShard, version: 2, timestamp: now(), ephemeral: true, ) - discard await nodes[i].publish(some(spamProtectedTopic), unsignedMessage) + discard await nodes[i].publish(some($spamProtectedShard), unsignedMessage) # Each node sends 5 messages that dont contain timestamp (total = 25) for i in 0 ..< 5: for j in 0 ..< 5: let unsignedMessage = WakuMessage( payload: urandom(1 * (10 ^ 3)), - contentTopic: spamProtectedTopic, + contentTopic: spamProtectedShard, version: 2, timestamp: 0, ephemeral: true, ) - discard await nodes[i].publish(some(spamProtectedTopic), unsignedMessage) + discard await nodes[i].publish(some($spamProtectedShard), unsignedMessage) # Each node sends 5 messages way BEFORE than the current timestmap (total = 25) for i in 0 ..< 5: @@ -211,12 +215,12 @@ suite "WakuNode2 - Validators": let beforeTimestamp = now() - getNanosecondTime(6 * 60) let unsignedMessage = WakuMessage( payload: urandom(1 * (10 ^ 3)), - contentTopic: spamProtectedTopic, + contentTopic: spamProtectedShard, version: 2, timestamp: beforeTimestamp, ephemeral: true, ) - discard await nodes[i].publish(some(spamProtectedTopic), unsignedMessage) + discard await nodes[i].publish(some($spamProtectedShard), unsignedMessage) # Each node sends 5 messages way LATER than the current timestmap (total = 25) for i in 0 ..< 5: @@ -224,12 +228,12 @@ suite "WakuNode2 - Validators": let afterTimestamp = now() - getNanosecondTime(6 * 60) let unsignedMessage = WakuMessage( payload: urandom(1 * (10 ^ 3)), - contentTopic: spamProtectedTopic, + contentTopic: spamProtectedShard, version: 2, timestamp: afterTimestamp, ephemeral: true, ) - discard await nodes[i].publish(some(spamProtectedTopic), unsignedMessage) + discard await nodes[i].publish(some($spamProtectedShard), unsignedMessage) # Since we have a full mesh with 5 nodes and each one publishes 25+25+25+25+25 msgs # there are 625 messages being sent. @@ -243,7 +247,7 @@ suite "WakuNode2 - Validators": msgRejected = 0 for i in 0 ..< 5: for k, v in nodes[i].wakuRelay.peerStats.mpairs: - msgRejected += v.topicInfos[spamProtectedTopic].invalidMessageDeliveries.int + msgRejected += v.topicInfos[spamProtectedShard].invalidMessageDeliveries.int if msgReceived == 125 and msgRejected == 500: break @@ -262,14 +266,14 @@ suite "WakuNode2 - Validators": newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) ) - # Protected topic and key to sign - let spamProtectedTopic = PubSubTopic("some-spam-protected-topic") + # Protected shard and key to sign + let spamProtectedShard = RelayShard(clusterId: 0, shardId: 7) let secretKey = SkSecretKey .fromHex("5526a8990317c9b7b58d07843d270f9cd1d9aaee129294c1c478abf7261dd9e6") .expect("valid key") let publicKey = secretKey.toPublicKey() - let topicsPrivateKeys = {spamProtectedTopic: secretKey}.toTable - let topicsPublicKeys = {spamProtectedTopic: publicKey}.toTable + let shardsPrivateKeys = {spamProtectedShard: secretKey}.toTable + let shardsPublicKeys = {spamProtectedShard: publicKey}.toTable # Non whitelisted secret key let wrongSecretKey = SkSecretKey @@ -288,15 +292,17 @@ suite "WakuNode2 - Validators": # Subscribe all nodes to the same topic/handler for node in nodes: - discard node.wakuRelay.subscribe(spamProtectedTopic, handler) + discard node.wakuRelay.subscribe($spamProtectedShard, handler) await sleepAsync(500.millis) # Add signed message validator to all nodes. They will only route signed messages for node in nodes: - var signedTopics: seq[ProtectedTopic] - for topic, publicKey in topicsPublicKeys: - signedTopics.add(ProtectedTopic(topic: topic, key: publicKey)) - node.wakuRelay.addSignedTopicsValidator(signedTopics) + var signedShards: seq[ProtectedShard] + for shard, publicKey in shardsPublicKeys: + signedShards.add(ProtectedShard(shard: shard.shardId, key: publicKey)) + node.wakuRelay.addSignedShardsValidator( + signedShards, spamProtectedShard.clusterId + ) # nodes[0] is connected only to nodes[1] let connOk1 = await nodes[0].peerManager.connectRelay( @@ -321,26 +327,26 @@ suite "WakuNode2 - Validators": for j in 0 ..< 50: let unsignedMessage = WakuMessage( payload: urandom(1 * (10 ^ 3)), - contentTopic: spamProtectedTopic, + contentTopic: spamProtectedShard, version: 2, timestamp: now(), ephemeral: true, ) - discard await nodes[0].publish(some(spamProtectedTopic), unsignedMessage) + discard await nodes[0].publish(some($spamProtectedShard), unsignedMessage) # nodes[0] spams 50 wrongly signed messages (nodes[0] just knows of nodes[1]) for j in 0 ..< 50: var msg = WakuMessage( payload: urandom(1 * (10 ^ 3)), - contentTopic: spamProtectedTopic, + contentTopic: spamProtectedShard, version: 2, timestamp: now(), ephemeral: true, ) # Sign the message with a wrong key msg.meta = - wrongSecretKey.sign(SkMessage(spamProtectedTopic.msgHash(msg))).toRaw()[0 .. 63] - discard await nodes[0].publish(some(spamProtectedTopic), msg) + wrongSecretKey.sign(SkMessage(spamProtectedShard.msgHash(msg))).toRaw()[0 .. 63] + discard await nodes[0].publish(some($spamProtectedShard), msg) # Wait for gossip await sleepAsync(2.seconds) @@ -353,7 +359,7 @@ suite "WakuNode2 - Validators": # peer1 got invalid messages from peer0 let p0Id = nodes[0].peerInfo.peerId check: - nodes[1].wakuRelay.peerStats[p0Id].topicInfos[spamProtectedTopic].invalidMessageDeliveries == + nodes[1].wakuRelay.peerStats[p0Id].topicInfos[spamProtectedShard].invalidMessageDeliveries == 100.0 # peer1 did not gossip further, so no other node rx invalid messages @@ -362,7 +368,7 @@ suite "WakuNode2 - Validators": if k == p0Id and i == 1: continue check: - v.topicInfos[spamProtectedTopic].invalidMessageDeliveries == 0.0 + v.topicInfos[spamProtectedShard].invalidMessageDeliveries == 0.0 # Stop all nodes await allFutures(nodes.mapIt(it.stop())) diff --git a/waku/factory/external_config.nim b/waku/factory/external_config.nim index 6995a9a8ea..41e66640ae 100644 --- a/waku/factory/external_config.nim +++ b/waku/factory/external_config.nim @@ -19,19 +19,21 @@ import ../common/confutils/envvar/std/net as confEnvvarNet, ../common/logging, ../waku_enr, - ../node/peer_manager + ../node/peer_manager, + ../waku_core/topics/pubsub_topic include ../waku_core/message/default_values export confTomlDefs, confTomlNet, confEnvvarDefs, confEnvvarNet type ConfResult*[T] = Result[T, string] -type ProtectedTopic* = object - topic*: string - key*: secp256k1.SkPublicKey type EthRpcUrl* = distinct string +type ProtectedShard* = object + shard*: uint16 + key*: secp256k1.SkPublicKey + type StartUpCommand* = enum noCommand # default, runs waku generateRlnKeystore # generates a new RLN keystore @@ -134,10 +136,17 @@ type WakuNodeConf* = object ## Application-level configuration protectedTopics* {. desc: - "Topics and its public key to be used for message validation, topic:pubkey. Argument may be repeated.", - defaultValue: newSeq[ProtectedTopic](0), + "Deprecated. Topics and its public key to be used for message validation, topic:pubkey. Argument may be repeated.", + defaultValue: newSeq[ProtectedShard](0), name: "protected-topic" - .}: seq[ProtectedTopic] + .}: seq[ProtectedShard] + + protectedShards* {. + desc: + "Shards and its public keys to be used for message validation, shard:pubkey. Argument may be repeated.", + defaultValue: newSeq[ProtectedShard](0), + name: "protected-shard" + .}: seq[ProtectedShard] ## General node config clusterId* {. @@ -694,20 +703,36 @@ proc parseCmdArg*[T](_: type seq[T], s: string): seq[T] {.raises: [ValueError].} proc completeCmdArg*(T: type crypto.PrivateKey, val: string): seq[string] = return @[] -proc parseCmdArg*(T: type ProtectedTopic, p: string): T = +# TODO: Remove when removing protected-topic configuration +proc isNumber(x: string): bool = + try: + discard parseInt(x) + result = true + except ValueError: + result = false + +proc parseCmdArg*(T: type ProtectedShard, p: string): T = let elements = p.split(":") if elements.len != 2: raise newException( - ValueError, "Invalid format for protected topic expected topic:publickey" + ValueError, "Invalid format for protected shard expected shard:publickey" ) - let publicKey = secp256k1.SkPublicKey.fromHex(elements[1]) if publicKey.isErr: raise newException(ValueError, "Invalid public key") - return ProtectedTopic(topic: elements[0], key: publicKey.get()) + if isNumber(elements[0]): + return ProtectedShard(shard: uint16.parseCmdArg(elements[0]), key: publicKey.get()) + + # TODO: Remove when removing protected-topic configuration + let shard = RelayShard.parse(elements[0]).valueOr: + raise newException( + ValueError, + "Invalid pubsub topic. Pubsub topics must be in the format /waku/2/rs//", + ) + return ProtectedShard(shard: shard.shardId, key: publicKey.get()) -proc completeCmdArg*(T: type ProtectedTopic, val: string): seq[string] = +proc completeCmdArg*(T: type ProtectedShard, val: string): seq[string] = return @[] proc completeCmdArg*(T: type IpAddress, val: string): seq[string] = @@ -769,18 +794,18 @@ proc readValue*( raise newException(SerializationError, getCurrentExceptionMsg()) proc readValue*( - r: var TomlReader, value: var ProtectedTopic + r: var TomlReader, value: var ProtectedShard ) {.raises: [SerializationError].} = try: - value = parseCmdArg(ProtectedTopic, r.readValue(string)) + value = parseCmdArg(ProtectedShard, r.readValue(string)) except CatchableError: raise newException(SerializationError, getCurrentExceptionMsg()) proc readValue*( - r: var EnvvarReader, value: var ProtectedTopic + r: var EnvvarReader, value: var ProtectedShard ) {.raises: [SerializationError].} = try: - value = parseCmdArg(ProtectedTopic, r.readValue(string)) + value = parseCmdArg(ProtectedShard, r.readValue(string)) except CatchableError: raise newException(SerializationError, getCurrentExceptionMsg()) diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 91407e9d89..2624b949e3 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -175,16 +175,16 @@ proc setupProtocols( return err("failed to mount waku relay protocol: " & getCurrentExceptionMsg()) # Add validation keys to protected topics - var subscribedProtectedTopics: seq[ProtectedTopic] - for topicKey in conf.protectedTopics: - if topicKey.topic notin pubsubTopics: - warn "protected topic not in subscribed pubsub topics, skipping adding validator", - protectedTopic = topicKey.topic, subscribedTopics = pubsubTopics + var subscribedProtectedShards: seq[ProtectedShard] + for shardKey in conf.protectedShards: + if shardKey.shard notin conf.shards: + warn "protected shard not in subscribed shards, skipping adding validator", + protectedShard = shardKey.shard, subscribedShards = shards continue - subscribedProtectedTopics.add(topicKey) + subscribedProtectedShards.add(shardKey) notice "routing only signed traffic", - protectedTopic = topicKey.topic, publicKey = topicKey.key - node.wakuRelay.addSignedTopicsValidator(subscribedProtectedTopics) + protectedShard = shardKey.shard, publicKey = shardKey.key + node.wakuRelay.addSignedShardsValidator(subscribedProtectedShards, conf.clusterId) # Enable Rendezvous Discovery protocol when Relay is enabled try: diff --git a/waku/factory/validator_signed.nim b/waku/factory/validator_signed.nim index f4a9253adb..59ee384b1b 100644 --- a/waku/factory/validator_signed.nim +++ b/waku/factory/validator_signed.nim @@ -50,30 +50,34 @@ proc withinTimeWindow*(msg: WakuMessage): bool = return true return false -proc addSignedTopicsValidator*(w: WakuRelay, protectedTopics: seq[ProtectedTopic]) = - debug "adding validator to signed topics" +proc addSignedShardsValidator*( + w: WakuRelay, protectedShards: seq[ProtectedShard], clusterId: uint16 +) = + debug "adding validator to signed shards", protectedShards, clusterId proc validator( topic: string, msg: WakuMessage ): Future[errors.ValidationResult] {.async.} = var outcome = errors.ValidationResult.Reject - for protectedTopic in protectedTopics: - if (protectedTopic.topic == topic): + for protectedShard in protectedShards: + let topicString = + $RelayShard(clusterId: clusterId, shardId: uint16(protectedShard.shard)) + if (topicString == topic): if msg.timestamp != 0: if msg.withinTimeWindow(): let msgHash = SkMessage(topic.msgHash(msg)) let recoveredSignature = SkSignature.fromRaw(msg.meta) if recoveredSignature.isOk(): - if recoveredSignature.get.verify(msgHash, protectedTopic.key): + if recoveredSignature.get.verify(msgHash, protectedShard.key): outcome = errors.ValidationResult.Accept if outcome != errors.ValidationResult.Accept: debug "signed topic validation failed", - topic = topic, publicTopicKey = protectedTopic.key + topic = topic, publicShardKey = protectedShard.key waku_msg_validator_signed_outcome.inc(labelValues = [$outcome]) return outcome return errors.ValidationResult.Accept - w.addValidator(validator, "signed topic validation failed") + w.addValidator(validator, "signed shard validation failed")