Skip to content

Commit

Permalink
Merge pull request #133 from libp2p/0.5.4
Browse files Browse the repository at this point in the history
0.5.4 release
- Throw exception when multistream messages do not end with line ending (…
- Retrieve the value from seenMessages once to avoid NullPointerExcepti…
- Reduce log level from INFO to DEBUG on inbound invalid gossip message (…
- More flexible pubsub message fields handling (#132)
  • Loading branch information
Nashatyrev authored Aug 2, 2020
2 parents 2b2ce52 + 174d7f7 commit 8350a55
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 70 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ Builds are published to JCenter. Maven Central mirrors JCenter, but updates can
jcenter()
}
implementation 'io.libp2p:jvm-libp2p-minimal:0.5.0-RELEASE'
implementation 'io.libp2p:jvm-libp2p-minimal:0.5.4-RELEASE'
```
### Using Maven
```
<dependency>
<groupId>io.libp2p</groupId>
<artifactId>jvm-libp2p-minimal</artifactId>
<version>0.5.0-RELEASE</version>
<version>0.5.4-RELEASE</version>
<type>pom</type>
</dependency>
```
Expand Down
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import java.nio.file.Paths
// ./gradlew bintrayUpload -PbintrayUser=<user> -PbintrayApiKey=<api-key>

group = "io.libp2p"
version = "0.5.3-RELEASE"
version = "0.5.4-RELEASE"
description = "a minimal implementation of libp2p for the jvm"

plugins {
Expand Down
39 changes: 32 additions & 7 deletions src/main/kotlin/io/libp2p/core/pubsub/PubsubApi.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import io.libp2p.pubsub.PubsubApiImpl
import io.libp2p.pubsub.PubsubRouter
import io.netty.buffer.ByteBuf
import java.util.concurrent.CompletableFuture
import java.util.concurrent.atomic.AtomicLong
import java.util.function.Consumer
import java.util.function.Function
import kotlin.random.Random.Default.nextLong
Expand Down Expand Up @@ -114,7 +115,15 @@ interface PubsubPublisherApi {
* The future completes normally when the message
* is transmitted to at least one peer
*/
fun publish(data: ByteBuf, vararg topics: Topic): CompletableFuture<Unit>
fun publish(data: ByteBuf, vararg topics: Topic): CompletableFuture<Unit> =
publishExt(data, null, null, *topics)

/**
* Extended [publish] method where `from` and `seqId` may be customized
* @param from If null the field is calculated based on the private key
* @param seqId If null the field is calculated from the internal id counter
*/
fun publishExt(data: ByteBuf, from: ByteArray?, seqId: Long?, vararg topics: Topic): CompletableFuture<Unit>
}

/**
Expand All @@ -124,13 +133,29 @@ interface PubsubApi : PubsubSubscriberApi {

/**
* Creates a Publisher instance for a single sender identified by [privKey]
* @param privKey The sender's private key for singing published messages
* @param seqId Initial sequence id for the sender. Since messages are
* uniquely identified by a pair of `sender + seqId` it is recommended to
* initialize the id with the `lastUsedId + 1`
* Initialized with random value by default
* @param privKey The sender's private key for singing published messages, the
* message `from` field is derived as [PeerId] from this parameter
* If this parameter is [null] then the message `signature` and `from` fields
* are omitted
* @param initialSeqId Initial sequence id for the sender. Since messages are
* uniquely identified by a pair of `sender + seqId` it is recommended to
* initialize the id with the `lastUsedId + 1`
* Initialized with random value by default
*/
fun createPublisher(privKey: PrivKey?, initialSeqId: Long = nextLong()): PubsubPublisherApi {
val idGenerator = AtomicLong(initialSeqId)
return createPublisher(privKey, idGenerator::incrementAndGet)
}

/**
* Creates a Publisher instance for a single sender identified by [privKey]
* @param privKey The sender's private key for singing published messages, the
* message `from` field is derived as [PeerId] from this parameter
* If this parameter is [null] then the message `signature` and `from` fields
* are omitted
* @param seqIdGenerator supplies `seqId` for published messages
*/
fun createPublisher(privKey: PrivKey, seqId: Long = nextLong()): PubsubPublisherApi
fun createPublisher(privKey: PrivKey?, seqIdGenerator: () -> Long): PubsubPublisherApi
}

/**
Expand Down
8 changes: 5 additions & 3 deletions src/main/kotlin/io/libp2p/etc/util/netty/StringSuffixCodec.kt
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
package io.libp2p.etc.util.netty

import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.DecoderException
import io.netty.handler.codec.MessageToMessageCodec

/**
* Adds/removes trailing character from messages
*/
class StringSuffixCodec(val trainlingChar: Char) : MessageToMessageCodec<String, String>() {
class StringSuffixCodec(val trailingChar: Char) : MessageToMessageCodec<String, String>() {

override fun encode(ctx: ChannelHandlerContext?, msg: String, out: MutableList<Any>) {
out += (msg + trainlingChar)
out += (msg + trailingChar)
}

override fun decode(ctx: ChannelHandlerContext?, msg: String, out: MutableList<Any>) {
out += msg.trimEnd(trainlingChar)
if (!msg.endsWith(trailingChar)) throw DecoderException("Missing message end character")
out += msg.substring(0, msg.length - 1)
}
}
27 changes: 18 additions & 9 deletions src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ abstract class AbstractRouter : P2PServiceSemiDuplex(), PubsubRouter, PubsubRout
override var curTimeMillis: () -> Long by lazyVarInit { { System.currentTimeMillis() } }
override var random by lazyVarInit { Random() }
override var name: String = "router"
var messageIdGenerator: (Rpc.Message) -> MessageId = { it.from.toByteArray().toHex() + it.seqno.toByteArray().toHex() }
var messageIdGenerator: (Rpc.Message) -> MessageId =
{ it.from.toByteArray().toHex() + it.seqno.toByteArray().toHex() }

private val peerTopics = MultiSet<PeerHandler, String>()
private var msgHandler: (Rpc.Message) -> CompletableFuture<ValidationResult> = { RESULT_VALID }
Expand Down Expand Up @@ -181,19 +182,27 @@ abstract class AbstractRouter : P2PServiceSemiDuplex(), PubsubRouter, PubsubRout
(msg.publishList - msgSubscribed).forEach { notifyNonSubscribedMessage(peer, it) }

val msgUnseen = msgSubscribed
.filter { !seenMessages.containsKey(getMessageId(it)) }

msgUnseen.forEach { seenMessages[getMessageId(it)] = Optional.empty() }

msgUnseen.forEach { notifyUnseenMessage(peer, it) }
(msgSubscribed - msgUnseen).forEach { notifySeenMessage(peer, it, seenMessages[getMessageId(it)]!!) }
.filter { subscribedMessage ->
val messageId = getMessageId(subscribedMessage)
val validationResult = seenMessages[messageId]
if (validationResult != null) {
// Message has been seen
notifySeenMessage(peer, subscribedMessage, validationResult)
false
} else {
// Message is unseen
seenMessages[messageId] = Optional.empty()
notifyUnseenMessage(peer, subscribedMessage)
true
}
}

val msgValid = msgUnseen.filter {
try {
validator.validate(it)
true
} catch (e: Exception) {
logger.info("Invalid pubsub message from peer $peer: $it", e)
logger.debug("Invalid pubsub message from peer $peer: $it", e)
seenMessages[getMessageId(it)] = Optional.of(ValidationResult.Invalid)
notifyUnseenInvalidMessage(peer, it)
false
Expand Down Expand Up @@ -230,7 +239,7 @@ abstract class AbstractRouter : P2PServiceSemiDuplex(), PubsubRouter, PubsubRout
it.second.whenCompleteAsync(BiConsumer { res, err ->
when {
err != null -> logger.warn("Exception while handling message from peer $peer: ${it.first}", err)
res == ValidationResult.Invalid -> logger.info("Invalid pubsub message from peer $peer: ${it.first}")
res == ValidationResult.Invalid -> logger.debug("Invalid pubsub message from peer $peer: ${it.first}")
res == ValidationResult.Ignore -> logger.debug("Ingnoring pubsub message from peer $peer: ${it.first}")
else -> {
newValidatedMessages(singletonList(it.first), peer)
Expand Down
45 changes: 28 additions & 17 deletions src/main/kotlin/io/libp2p/pubsub/PubsubApiImpl.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import io.libp2p.etc.types.toProtobuf
import io.netty.buffer.ByteBuf
import pubsub.pb.Rpc
import java.util.concurrent.CompletableFuture
import java.util.concurrent.atomic.AtomicLong

open class PubsubApiImpl(val router: PubsubRouter) : PubsubApi {

Expand All @@ -32,23 +31,32 @@ open class PubsubApiImpl(val router: PubsubRouter) : PubsubApi {
}
}

protected open inner class PublisherImpl(val privKey: PrivKey, seqId: Long) : PubsubPublisherApi {
val from = PeerId.fromPubKey(privKey.publicKey()).bytes.toProtobuf()
val seqCounter = AtomicLong(seqId)
protected open inner class PublisherImpl(val privKey: PrivKey?, val seqIdGenerator: () -> Long) :
PubsubPublisherApi {

override fun publish(data: ByteBuf, vararg topics: Topic): CompletableFuture<Unit> {
val msgToSign = createMessageToSign(data, *topics)
val signedMsg = pubsubSign(msgToSign, privKey)
return router.publish(signedMsg)
}
val from = privKey?.let { PeerId.fromPubKey(it.publicKey()).bytes.toProtobuf() }

override fun publishExt(
data: ByteBuf,
from: ByteArray?,
seqId: Long?,
vararg topics: Topic
): CompletableFuture<Unit> {
val mFrom = from?.toProtobuf() ?: this.from
val mSeqId = seqId ?: seqIdGenerator()

protected open fun createMessageToSign(data: ByteBuf, vararg topics: Topic): Rpc.Message =
Rpc.Message.newBuilder()
.setFrom(from)
val msgToSign = Rpc.Message.newBuilder()
.addAllTopicIDs(topics.map { it.topic })
.setData(data.toByteArray().toProtobuf())
.setSeqno(seqCounter.incrementAndGet().toBytesBigEndian().toProtobuf())
.build()
.setSeqno(mSeqId.toBytesBigEndian().toProtobuf())
mFrom?.also {
msgToSign.setFrom(it)
}

return router.publish(sign(msgToSign.build()))
}

private fun sign(msg: Rpc.Message) = if (privKey != null) pubsubSign(msg, privKey) else msg
}

init {
Expand All @@ -71,7 +79,7 @@ open class PubsubApiImpl(val router: PubsubRouter) : PubsubApi {
it.receiver.apply(rpc2Msg(msg))
}
return validationFuts.thenApplyAll {
if (it.isEmpty())ValidationResult.Ignore
if (it.isEmpty()) ValidationResult.Ignore
else it.reduce(validationResultReduce)
}
}
Expand All @@ -80,7 +88,9 @@ open class PubsubApiImpl(val router: PubsubRouter) : PubsubApi {
return MessageImpl(
msg.data.toByteArray().toByteBuf(),
msg.from.toByteArray(),
msg.seqno.toByteArray().copyOfRange(0, 8).toLongBigEndian(),
if (msg.hasSeqno() && msg.seqno.size() >= 8)
msg.seqno.toByteArray().copyOfRange(0, 8).toLongBigEndian()
else 0,
msg.topicIDsList.map { Topic(it) }
)
}
Expand Down Expand Up @@ -128,7 +138,8 @@ open class PubsubApiImpl(val router: PubsubRouter) : PubsubApi {
router.unsubscribe(*routerToUnsubscribe.toTypedArray())
}

override fun createPublisher(privKey: PrivKey, seqId: Long): PubsubPublisherApi = PublisherImpl(privKey, seqId)
override fun createPublisher(privKey: PrivKey?, seqIdGenerator: () -> Long): PubsubPublisherApi =
PublisherImpl(privKey, seqIdGenerator)
}

class MessageImpl(
Expand Down
37 changes: 37 additions & 0 deletions src/test/kotlin/io/libp2p/etc/util/netty/StringSuffixCodecTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.libp2p.etc.util.netty

import io.netty.channel.embedded.EmbeddedChannel
import io.netty.handler.codec.DecoderException
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows

class StringSuffixCodecTest {
val channel = EmbeddedChannel(StringSuffixCodec('\n'))

@Test
fun encodeAppendTrailingChar() {
channel.writeOutbound("theMessage")
val result = channel.readOutbound<String>()
assertEquals(result, "theMessage\n")
}

@Test
fun decodeStripsTrailingChar() {
channel.writeInbound("theMessage\n")
val result = channel.readInbound<String>()
assertEquals(result, "theMessage")
}

@Test
fun decodeOnlyStripsSingleTrailingChar() {
channel.writeInbound("theMessage\n\n")
val result = channel.readInbound<String>()
assertEquals(result, "theMessage\n")
}

@Test
fun decodeThrowsWhenTrailingCharMissing() {
assertThrows<DecoderException> { channel.writeInbound("theMessage") }
}
}
Loading

0 comments on commit 8350a55

Please sign in to comment.