-
Notifications
You must be signed in to change notification settings - Fork 55
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(transport): add experimental QUIC Transport (not production read…
…y) (#725) Our quic effort is blocked by bearssl not supporting TLS1.3, but since Mark did most of the work to implement Quic here: #563 and on nim-quic, this PR is going to bring encryption-less Quic into nim-libp2p This allows us to test it, and make sure it doesn't bitrot. Heavily WiP: - [X] Extract code from #563 - [X] Create custom muxer & upgrader - [X] Basic E2E switch test working - [x] Update nim-quic to get address informations in libp2p (for `observed address` and port 0 resolving) - [ ] More tests - [ ] Cleanup Since encryption is not yet supported, we're not compatible with any other libp2ps, and have to rely on home made protocols to retrieve the peer's id --------- Co-authored-by: markspanbroek <[email protected]> Co-authored-by: Diego <[email protected]>
- Loading branch information
1 parent
3e3df07
commit b37133c
Showing
9 changed files
with
318 additions
and
14 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,224 @@ | ||
import std/sequtils | ||
import pkg/chronos | ||
import pkg/chronicles | ||
import pkg/quic | ||
import ../multiaddress | ||
import ../multicodec | ||
import ../stream/connection | ||
import ../wire | ||
import ../muxers/muxer | ||
import ../upgrademngrs/upgrade | ||
import ./transport | ||
|
||
export multiaddress | ||
export multicodec | ||
export connection | ||
export transport | ||
|
||
logScope: | ||
topics = "libp2p quictransport" | ||
|
||
type | ||
P2PConnection = connection.Connection | ||
QuicConnection = quic.Connection | ||
|
||
# Stream | ||
type QuicStream* = ref object of P2PConnection | ||
stream: Stream | ||
cached: seq[byte] | ||
|
||
proc new( | ||
_: type QuicStream, stream: Stream, oaddr: Opt[MultiAddress], peerId: PeerId | ||
): QuicStream = | ||
let quicstream = QuicStream(stream: stream, observedAddr: oaddr, peerId: peerId) | ||
procCall P2PConnection(quicstream).initStream() | ||
quicstream | ||
|
||
template mapExceptions(body: untyped) = | ||
try: | ||
body | ||
except QuicError: | ||
raise newLPStreamEOFError() | ||
except CatchableError: | ||
raise newLPStreamEOFError() | ||
|
||
method readOnce*( | ||
stream: QuicStream, pbytes: pointer, nbytes: int | ||
): Future[int] {.async: (raises: [CancelledError, LPStreamError]).} = | ||
try: | ||
if stream.cached.len == 0: | ||
stream.cached = await stream.stream.read() | ||
result = min(nbytes, stream.cached.len) | ||
copyMem(pbytes, addr stream.cached[0], result) | ||
stream.cached = stream.cached[result ..^ 1] | ||
except CatchableError as exc: | ||
raise newLPStreamEOFError() | ||
|
||
{.push warning[LockLevel]: off.} | ||
method write*( | ||
stream: QuicStream, bytes: seq[byte] | ||
) {.async: (raises: [CancelledError, LPStreamError]).} = | ||
mapExceptions(await stream.stream.write(bytes)) | ||
|
||
{.pop.} | ||
|
||
method closeImpl*(stream: QuicStream) {.async: (raises: []).} = | ||
try: | ||
await stream.stream.close() | ||
except CatchableError as exc: | ||
discard | ||
await procCall P2PConnection(stream).closeImpl() | ||
|
||
# Session | ||
type QuicSession* = ref object of P2PConnection | ||
connection: QuicConnection | ||
|
||
method close*(session: QuicSession) {.async, base.} = | ||
await session.connection.close() | ||
await procCall P2PConnection(session).close() | ||
|
||
proc getStream*( | ||
session: QuicSession, direction = Direction.In | ||
): Future[QuicStream] {.async.} = | ||
var stream: Stream | ||
case direction | ||
of Direction.In: | ||
stream = await session.connection.incomingStream() | ||
of Direction.Out: | ||
stream = await session.connection.openStream() | ||
await stream.write(@[]) # QUIC streams do not exist until data is sent | ||
return QuicStream.new(stream, session.observedAddr, session.peerId) | ||
|
||
method getWrapped*(self: QuicSession): P2PConnection = | ||
nil | ||
|
||
# Muxer | ||
type QuicMuxer = ref object of Muxer | ||
quicSession: QuicSession | ||
handleFut: Future[void] | ||
|
||
method newStream*( | ||
m: QuicMuxer, name: string = "", lazy: bool = false | ||
): Future[P2PConnection] {. | ||
async: (raises: [CancelledError, LPStreamError, MuxerError]) | ||
.} = | ||
try: | ||
return await m.quicSession.getStream(Direction.Out) | ||
except CatchableError as exc: | ||
raise newException(MuxerError, exc.msg, exc) | ||
|
||
proc handleStream(m: QuicMuxer, chann: QuicStream) {.async.} = | ||
## call the muxer stream handler for this channel | ||
## | ||
try: | ||
await m.streamHandler(chann) | ||
trace "finished handling stream" | ||
doAssert(chann.closed, "connection not closed by handler!") | ||
except CatchableError as exc: | ||
trace "Exception in mplex stream handler", msg = exc.msg | ||
await chann.close() | ||
|
||
method handle*(m: QuicMuxer): Future[void] {.async: (raises: []).} = | ||
try: | ||
while not m.quicSession.atEof: | ||
let incomingStream = await m.quicSession.getStream(Direction.In) | ||
asyncSpawn m.handleStream(incomingStream) | ||
except CatchableError as exc: | ||
trace "Exception in mplex handler", msg = exc.msg | ||
|
||
method close*(m: QuicMuxer) {.async: (raises: []).} = | ||
try: | ||
await m.quicSession.close() | ||
m.handleFut.cancel() | ||
except CatchableError as exc: | ||
discard | ||
|
||
# Transport | ||
type QuicUpgrade = ref object of Upgrade | ||
|
||
type QuicTransport* = ref object of Transport | ||
listener: Listener | ||
connections: seq[P2PConnection] | ||
|
||
func new*(_: type QuicTransport, u: Upgrade): QuicTransport = | ||
QuicTransport(upgrader: QuicUpgrade(ms: u.ms)) | ||
|
||
method handles*(transport: QuicTransport, address: MultiAddress): bool = | ||
if not procCall Transport(transport).handles(address): | ||
return false | ||
QUIC_V1.match(address) | ||
|
||
method start*(transport: QuicTransport, addrs: seq[MultiAddress]) {.async.} = | ||
doAssert transport.listener.isNil, "start() already called" | ||
#TODO handle multiple addr | ||
transport.listener = listen(initTAddress(addrs[0]).tryGet) | ||
await procCall Transport(transport).start(addrs) | ||
transport.addrs[0] = | ||
MultiAddress.init(transport.listener.localAddress(), IPPROTO_UDP).tryGet() & | ||
MultiAddress.init("/quic-v1").get() | ||
transport.running = true | ||
|
||
method stop*(transport: QuicTransport) {.async.} = | ||
if transport.running: | ||
for c in transport.connections: | ||
await c.close() | ||
await procCall Transport(transport).stop() | ||
await transport.listener.stop() | ||
transport.running = false | ||
transport.listener = nil | ||
|
||
proc wrapConnection( | ||
transport: QuicTransport, connection: QuicConnection | ||
): P2PConnection {.raises: [Defect, TransportOsError, LPError].} = | ||
let | ||
remoteAddr = connection.remoteAddress() | ||
observedAddr = | ||
MultiAddress.init(remoteAddr, IPPROTO_UDP).get() & | ||
MultiAddress.init("/quic-v1").get() | ||
conres = QuicSession(connection: connection, observedAddr: Opt.some(observedAddr)) | ||
conres.initStream() | ||
|
||
transport.connections.add(conres) | ||
proc onClose() {.async.} = | ||
await conres.join() | ||
transport.connections.keepItIf(it != conres) | ||
trace "Cleaned up client" | ||
|
||
asyncSpawn onClose() | ||
return conres | ||
|
||
method accept*(transport: QuicTransport): Future[P2PConnection] {.async.} = | ||
doAssert not transport.listener.isNil, "call start() before calling accept()" | ||
let connection = await transport.listener.accept() | ||
return transport.wrapConnection(connection) | ||
|
||
method dial*( | ||
transport: QuicTransport, | ||
hostname: string, | ||
address: MultiAddress, | ||
peerId: Opt[PeerId] = Opt.none(PeerId), | ||
): Future[P2PConnection] {.async, gcsafe.} = | ||
let connection = await dial(initTAddress(address).tryGet) | ||
return transport.wrapConnection(connection) | ||
|
||
method upgrade*( | ||
self: QuicTransport, conn: P2PConnection, peerId: Opt[PeerId] | ||
): Future[Muxer] {.async: (raises: [CancelledError, LPError]).} = | ||
let qs = QuicSession(conn) | ||
if peerId.isSome: | ||
qs.peerId = peerId.get() | ||
|
||
let muxer = QuicMuxer(quicSession: qs, connection: conn) | ||
muxer.streamHandler = proc(conn: P2PConnection) {.async: (raises: []).} = | ||
trace "Starting stream handler" | ||
try: | ||
await self.upgrader.ms.handle(conn) # handle incoming connection | ||
except CancelledError as exc: | ||
return | ||
except CatchableError as exc: | ||
trace "exception in stream handler", conn, msg = exc.msg | ||
finally: | ||
await conn.closeWithEOF() | ||
trace "Stream handler done", conn | ||
muxer.handleFut = muxer.handle() | ||
return muxer |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
{.used.} | ||
|
||
import sequtils | ||
import chronos, stew/byteutils | ||
import | ||
../libp2p/[ | ||
stream/connection, | ||
transports/transport, | ||
transports/quictransport, | ||
upgrademngrs/upgrade, | ||
multiaddress, | ||
errors, | ||
wire, | ||
] | ||
|
||
import ./helpers, ./commontransport | ||
|
||
suite "Quic transport": | ||
asyncTest "can handle local address": | ||
let ma = @[MultiAddress.init("/ip4/127.0.0.1/udp/0/quic-v1").tryGet()] | ||
let transport1 = QuicTransport.new() | ||
await transport1.start(ma) | ||
check transport1.handles(transport1.addrs[0]) | ||
await transport1.stop() |
Oops, something went wrong.