diff --git a/fluffy/network/history/history_network.nim b/fluffy/network/history/history_network.nim index 457e64af47..fa3b40acf6 100644 --- a/fluffy/network/history/history_network.nim +++ b/fluffy/network/history/history_network.nim @@ -344,21 +344,21 @@ proc get(db: ContentDB, T: type BlockHeader, contentId: ContentId): Opt[T] = proc get(db: ContentDB, T: type BlockBody, contentId: ContentId, header: BlockHeader): Opt[T] = - let encoded = db.get(contentId) - if encoded.isNone(): + let encoded = db.get(contentId).valueOr: return Opt.none(T) - let timestamp = Moment.init(header.timestamp.toUnix(), Second) - let body = - if isShanghai(chainConfig, timestamp): - BlockBody.fromPortalBlockBodyOrRaise( - decodeSszOrRaise(encoded.get(), PortalBlockBodyShanghai)) - elif isPoSBlock(chainConfig, header.blockNumber.truncate(uint64)): - BlockBody.fromPortalBlockBodyOrRaise( - decodeSszOrRaise(encoded.get(), PortalBlockBodyLegacy)) - else: - BlockBody.fromPortalBlockBodyOrRaise( - decodeSszOrRaise(encoded.get(), PortalBlockBodyLegacy)) + let + timestamp = Moment.init(header.timestamp.toUnix(), Second) + body = + if isShanghai(chainConfig, timestamp): + BlockBody.fromPortalBlockBodyOrRaise( + decodeSszOrRaise(encoded, PortalBlockBodyShanghai)) + elif isPoSBlock(chainConfig, header.blockNumber.truncate(uint64)): + BlockBody.fromPortalBlockBodyOrRaise( + decodeSszOrRaise(encoded, PortalBlockBodyLegacy)) + else: + BlockBody.fromPortalBlockBodyOrRaise( + decodeSszOrRaise(encoded, PortalBlockBodyLegacy)) Opt.some(body) @@ -733,17 +733,13 @@ proc validateContent( for i, contentItem in contentItems: let contentKey = contentKeys[i] if await n.validateContent(contentItem, contentKey): - let contentIdOpt = n.portalProtocol.toContentId(contentKey) - if contentIdOpt.isNone(): + let contentId = n.portalProtocol.toContentId(contentKey).valueOr: error "Received offered content with invalid content key", contentKey return false - let contentId = contentIdOpt.get() - n.portalProtocol.storeContent(contentKey, contentId, contentItem) info "Received offered content validated successfully", contentKey - else: error "Received offered content failed validation", contentKey return false diff --git a/fluffy/network/wire/portal_protocol.nim b/fluffy/network/wire/portal_protocol.nim index 3b14d8b522..df6b0eb6cb 100644 --- a/fluffy/network/wire/portal_protocol.nim +++ b/fluffy/network/wire/portal_protocol.nim @@ -223,7 +223,7 @@ proc addNode*(p: PortalProtocol, r: Record): bool = else: false -proc getNode*(p: PortalProtocol, id: NodeId): Option[Node] = +proc getNode*(p: PortalProtocol, id: NodeId): Opt[Node] = p.routingTable.getNode(id) func localNode*(p: PortalProtocol): Node = p.baseProtocol.localNode @@ -316,15 +316,13 @@ proc handleFindContent( maxPayloadSize = maxDiscv5PacketSize - talkRespOverhead - contentOverhead enrOverhead = 4 # per added ENR, 4 bytes offset overhead - let contentIdResult = p.toContentId(fc.contentKey) - - if contentIdResult.isErr: + let contentId = p.toContentId(fc.contentKey).valueOr: # Return empty response when content key validation fails # TODO: Better would be to return no message at all? Needs changes on # discv5 layer. return @[] - let contentResult = p.dbGet(fc.contentKey, contentIdResult.get()) + let contentResult = p.dbGet(fc.contentKey, contentId) if contentResult.isOk(): let content = contentResult.get() @@ -340,7 +338,7 @@ proc handleFindContent( # Don't have the content, send closest neighbours to content id. let closestNodes = p.routingTable.neighbours( - NodeId(contentIdResult.get()), seenOnly = true) + NodeId(contentId), seenOnly = true) enrs = truncateEnrs(closestNodes, maxPayloadSize, enrOverhead) portal_content_enrs_packed.observe(enrs.len().int64) @@ -438,8 +436,8 @@ proc getInitialRadius(rc: RadiusConfig): UInt256 = # In case of a dynamic radius we start from the maximum value to quickly # gather as much data as possible, and also make sure each data piece in # the database is in our range after a node restart. - # Alternative would be to store node the radius in database, and initialize it - # from database after a restart + # Alternative would be to store node the radius in database, and initialize + # it from database after a restart return UInt256.high() proc new*(T: type PortalProtocol, @@ -609,25 +607,20 @@ proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList): return err("Trying to connect to node with unknown address") # uTP protocol uses BE for all values in the header, incl. connection id - let connectionResult = - await p.stream.connectTo( - nodeAddress.unsafeGet(), - uint16.fromBytesBE(m.connectionId) - ) - - if connectionResult.isErr(): - debug "uTP connection error while trying to find content", - error = connectionResult.error + let socket = + (await p.stream.connectTo( + nodeAddress.unsafeGet(), + uint16.fromBytesBE(m.connectionId) + )).valueOr: + debug "uTP connection error for find content", error return err("Error connecting uTP socket") - let socket = connectionResult.get() - try: # Read all bytes from the socket # This will either end with a FIN, or because the read action times out. - # A FIN does not necessarily mean that the data read is complete. Further - # validation is required, using a length prefix here might be beneficial for - # this. + # A FIN does not necessarily mean that the data read is complete. + # Further validation is required, using a length prefix here might be + # beneficial for this. let readFut = socket.read() readFut.cancelCallback = proc(udate: pointer) {.gcsafe.} = @@ -649,7 +642,7 @@ proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList): else : debug "Socket read time-out", socketKey = socket.socketKey - # Note: This might look a bit strange, be not doing a socket.close() + # Note: This might look a bit strange, but not doing a socket.close() # here as this is already done internally. utp_socket `checkTimeouts` # already does a socket.destroy() on timeout. Might want to change the # API on this later though. @@ -771,19 +764,14 @@ proc offer(p: PortalProtocol, o: OfferRequest): id = o.dst.id return err("Trying to connect to node with unknown address") - let connectionResult = - await p.stream.connectTo( - nodeAddress.unsafeGet(), - uint16.fromBytesBE(m.connectionId) - ) - - if connectionResult.isErr(): - debug "Utp connection error while trying to offer content", - error = connectionResult.error + let socket = + (await p.stream.connectTo( + nodeAddress.unsafeGet(), + uint16.fromBytesBE(m.connectionId) + )).valueOr: + debug "uTP connection error for offer content", error return err("Error connecting uTP socket") - let socket = connectionResult.get() - template lenu32(x: untyped): untyped = uint32(len(x)) @@ -797,10 +785,8 @@ proc offer(p: PortalProtocol, o: OfferRequest): output.write(toBytes(content.lenu32, Leb128).toOpenArray()) output.write(content) - let dataWritten = await socket.write(output.getOutput) - if dataWritten.isErr: - debug "Error writing requested data", - error = dataWritten.error + let dataWritten = (await socket.write(output.getOutput)).valueOr: + debug "Error writing requested data", error # No point in trying to continue writing data socket.close() return err("Error writing requested data") @@ -825,10 +811,8 @@ proc offer(p: PortalProtocol, o: OfferRequest): # When data turns out missing, add a 0 size varint output.write(toBytes(0'u8, Leb128).toOpenArray()) - let dataWritten = await socket.write(output.getOutput) - if dataWritten.isErr: - debug "Error writing requested data", - error = dataWritten.error + let dataWritten = (await socket.write(output.getOutput)).valueOr: + debug "Error writing requested data", error # No point in trying to continue writing data socket.close() return err("Error writing requested data") @@ -1143,12 +1127,9 @@ proc neighborhoodGossip*( # Just taking the first content item as target id. # TODO: come up with something better? - let contentIdOpt = p.toContentId(contentList[0].contentKey) - if contentIdOpt.isNone(): + let contentId = p.toContentId(contentList[0].contentKey).valueOr: return 0 - let contentId = contentIdOpt.get() - # For selecting the closest nodes to whom to gossip the content a mixed # approach is taken: # 1. Select the closest neighbours in the routing table @@ -1163,7 +1144,6 @@ proc neighborhoodGossip*( # in its propagation than when looking only for nodes in the own routing # table, but at the same time avoid unnecessary node lookups. # It might still cause issues in data getting propagated in a wider id range. - const maxGossipNodes = 8 let closestLocalNodes = p.routingTable.neighbours( @@ -1308,7 +1288,7 @@ proc stop*(p: PortalProtocol) = worker.cancel() p.offerWorkers = @[] -proc resolve*(p: PortalProtocol, id: NodeId): Future[Option[Node]] {.async.} = +proc resolve*(p: PortalProtocol, id: NodeId): Future[Opt[Node]] {.async.} = ## Resolve a `Node` based on provided `NodeId`. ## ## This will first look in the own routing table. If the node is known, it @@ -1316,14 +1296,14 @@ proc resolve*(p: PortalProtocol, id: NodeId): Future[Option[Node]] {.async.} = ## does not reply, a lookup is done to see if it can find a (newer) record of ## the node on the network. if id == p.localNode.id: - return some(p.localNode) + return Opt.some(p.localNode) - let node = p.routingTable.getNode(id) + let node = p.getNode(id) if node.isSome(): let nodesMessage = await p.findNodes(node.get(), @[0'u16]) # TODO: Handle failures better. E.g. stop on different failures than timeout if nodesMessage.isOk() and nodesMessage[].len > 0: - return some(nodesMessage[][0]) + return Opt.some(nodesMessage[][0]) let discovered = await p.lookup(id) for n in discovered: @@ -1331,11 +1311,12 @@ proc resolve*(p: PortalProtocol, id: NodeId): Future[Option[Node]] {.async.} = if node.isSome() and node.get().record.seqNum >= n.record.seqNum: return node else: - return some(n) + return Opt.some(n) return node -proc resolveWithRadius*(p: PortalProtocol, id: NodeId): Future[Option[(Node, UInt256)]] {.async.} = +proc resolveWithRadius*( + p: PortalProtocol, id: NodeId): Future[Opt[(Node, UInt256)]] {.async.} = ## Resolve a `Node` based on provided `NodeId`, also try to establish what ## is known radius of found node. ## @@ -1349,30 +1330,24 @@ proc resolveWithRadius*(p: PortalProtocol, id: NodeId): Future[Option[(Node, UIn ## let n = await p.resolve(id) - if n.isNone(): - return none((Node, UInt256)) + return Opt.none((Node, UInt256)) let node = n.unsafeGet() let r = p.radiusCache.get(id) - if r.isSome(): - return some((node, r.unsafeGet())) + return Opt.some((node, r.unsafeGet())) let pongResult = await p.ping(node) - if pongResult.isOk(): let maybeRadius = p.radiusCache.get(id) - - # After successful ping radius should already be in cache, but for the unlikely - # case that it is not, check it just to be sure. - # TODO: rafactor ping to return node radius. + # After successful ping radius should already be in cache, but for the + # unlikely case that it is not, check it just to be sure. + # TODO: refactor ping to return node radius. if maybeRadius.isNone(): - return none((Node, UInt256)) - - # If pong is successful, radius of the node should definitly be in local - # radius cache - return some((node, maybeRadius.unsafeGet())) + return Opt.none((Node, UInt256)) + else: + return Opt.some((node, maybeRadius.unsafeGet())) else: - return none((Node, UInt256)) + return Opt.none((Node, UInt256)) diff --git a/vendor/nim-eth b/vendor/nim-eth index 6b8a7b009e..26ae539598 160000 --- a/vendor/nim-eth +++ b/vendor/nim-eth @@ -1 +1 @@ -Subproject commit 6b8a7b009eb94bfdac1410f243865984bdd7c4f2 +Subproject commit 26ae539598e31efbaa016f6694b9a60ea08fc4b6