Skip to content

Commit

Permalink
Some more Options to Opt and similar changes (#1611)
Browse files Browse the repository at this point in the history
Includes nim-eth bump.
  • Loading branch information
kdeme committed Jun 27, 2023
1 parent 7dbcf94 commit 8ebac4c
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 87 deletions.
32 changes: 14 additions & 18 deletions fluffy/network/history/history_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -344,21 +344,21 @@ proc get(db: ContentDB, T: type BlockHeader, contentId: ContentId): Opt[T] =

proc get(db: ContentDB, T: type BlockBody, contentId: ContentId,
header: BlockHeader): Opt[T] =
let encoded = db.get(contentId)
if encoded.isNone():
let encoded = db.get(contentId).valueOr:
return Opt.none(T)

let timestamp = Moment.init(header.timestamp.toUnix(), Second)
let body =
if isShanghai(chainConfig, timestamp):
BlockBody.fromPortalBlockBodyOrRaise(
decodeSszOrRaise(encoded.get(), PortalBlockBodyShanghai))
elif isPoSBlock(chainConfig, header.blockNumber.truncate(uint64)):
BlockBody.fromPortalBlockBodyOrRaise(
decodeSszOrRaise(encoded.get(), PortalBlockBodyLegacy))
else:
BlockBody.fromPortalBlockBodyOrRaise(
decodeSszOrRaise(encoded.get(), PortalBlockBodyLegacy))
let
timestamp = Moment.init(header.timestamp.toUnix(), Second)
body =
if isShanghai(chainConfig, timestamp):
BlockBody.fromPortalBlockBodyOrRaise(
decodeSszOrRaise(encoded, PortalBlockBodyShanghai))
elif isPoSBlock(chainConfig, header.blockNumber.truncate(uint64)):
BlockBody.fromPortalBlockBodyOrRaise(
decodeSszOrRaise(encoded, PortalBlockBodyLegacy))
else:
BlockBody.fromPortalBlockBodyOrRaise(
decodeSszOrRaise(encoded, PortalBlockBodyLegacy))

Opt.some(body)

Expand Down Expand Up @@ -733,17 +733,13 @@ proc validateContent(
for i, contentItem in contentItems:
let contentKey = contentKeys[i]
if await n.validateContent(contentItem, contentKey):
let contentIdOpt = n.portalProtocol.toContentId(contentKey)
if contentIdOpt.isNone():
let contentId = n.portalProtocol.toContentId(contentKey).valueOr:
error "Received offered content with invalid content key", contentKey
return false

let contentId = contentIdOpt.get()

n.portalProtocol.storeContent(contentKey, contentId, contentItem)

info "Received offered content validated successfully", contentKey

else:
error "Received offered content failed validation", contentKey
return false
Expand Down
111 changes: 43 additions & 68 deletions fluffy/network/wire/portal_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ proc addNode*(p: PortalProtocol, r: Record): bool =
else:
false

proc getNode*(p: PortalProtocol, id: NodeId): Option[Node] =
proc getNode*(p: PortalProtocol, id: NodeId): Opt[Node] =
p.routingTable.getNode(id)

func localNode*(p: PortalProtocol): Node = p.baseProtocol.localNode
Expand Down Expand Up @@ -316,15 +316,13 @@ proc handleFindContent(
maxPayloadSize = maxDiscv5PacketSize - talkRespOverhead - contentOverhead
enrOverhead = 4 # per added ENR, 4 bytes offset overhead

let contentIdResult = p.toContentId(fc.contentKey)

if contentIdResult.isErr:
let contentId = p.toContentId(fc.contentKey).valueOr:
# Return empty response when content key validation fails
# TODO: Better would be to return no message at all? Needs changes on
# discv5 layer.
return @[]

let contentResult = p.dbGet(fc.contentKey, contentIdResult.get())
let contentResult = p.dbGet(fc.contentKey, contentId)

if contentResult.isOk():
let content = contentResult.get()
Expand All @@ -340,7 +338,7 @@ proc handleFindContent(
# Don't have the content, send closest neighbours to content id.
let
closestNodes = p.routingTable.neighbours(
NodeId(contentIdResult.get()), seenOnly = true)
NodeId(contentId), seenOnly = true)
enrs = truncateEnrs(closestNodes, maxPayloadSize, enrOverhead)
portal_content_enrs_packed.observe(enrs.len().int64)

Expand Down Expand Up @@ -438,8 +436,8 @@ proc getInitialRadius(rc: RadiusConfig): UInt256 =
# In case of a dynamic radius we start from the maximum value to quickly
# gather as much data as possible, and also make sure each data piece in
# the database is in our range after a node restart.
# Alternative would be to store node the radius in database, and initialize it
# from database after a restart
# Alternative would be to store node the radius in database, and initialize
# it from database after a restart
return UInt256.high()

proc new*(T: type PortalProtocol,
Expand Down Expand Up @@ -609,25 +607,20 @@ proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList):
return err("Trying to connect to node with unknown address")

# uTP protocol uses BE for all values in the header, incl. connection id
let connectionResult =
await p.stream.connectTo(
nodeAddress.unsafeGet(),
uint16.fromBytesBE(m.connectionId)
)

if connectionResult.isErr():
debug "uTP connection error while trying to find content",
error = connectionResult.error
let socket =
(await p.stream.connectTo(
nodeAddress.unsafeGet(),
uint16.fromBytesBE(m.connectionId)
)).valueOr:
debug "uTP connection error for find content", error
return err("Error connecting uTP socket")

let socket = connectionResult.get()

try:
# Read all bytes from the socket
# This will either end with a FIN, or because the read action times out.
# A FIN does not necessarily mean that the data read is complete. Further
# validation is required, using a length prefix here might be beneficial for
# this.
# A FIN does not necessarily mean that the data read is complete.
# Further validation is required, using a length prefix here might be
# beneficial for this.
let readFut = socket.read()

readFut.cancelCallback = proc(udate: pointer) {.gcsafe.} =
Expand All @@ -649,7 +642,7 @@ proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList):
else :
debug "Socket read time-out",
socketKey = socket.socketKey
# Note: This might look a bit strange, be not doing a socket.close()
# Note: This might look a bit strange, but not doing a socket.close()
# here as this is already done internally. utp_socket `checkTimeouts`
# already does a socket.destroy() on timeout. Might want to change the
# API on this later though.
Expand Down Expand Up @@ -771,19 +764,14 @@ proc offer(p: PortalProtocol, o: OfferRequest):
id = o.dst.id
return err("Trying to connect to node with unknown address")

let connectionResult =
await p.stream.connectTo(
nodeAddress.unsafeGet(),
uint16.fromBytesBE(m.connectionId)
)

if connectionResult.isErr():
debug "Utp connection error while trying to offer content",
error = connectionResult.error
let socket =
(await p.stream.connectTo(
nodeAddress.unsafeGet(),
uint16.fromBytesBE(m.connectionId)
)).valueOr:
debug "uTP connection error for offer content", error
return err("Error connecting uTP socket")

let socket = connectionResult.get()

template lenu32(x: untyped): untyped =
uint32(len(x))

Expand All @@ -797,10 +785,8 @@ proc offer(p: PortalProtocol, o: OfferRequest):
output.write(toBytes(content.lenu32, Leb128).toOpenArray())
output.write(content)

let dataWritten = await socket.write(output.getOutput)
if dataWritten.isErr:
debug "Error writing requested data",
error = dataWritten.error
let dataWritten = (await socket.write(output.getOutput)).valueOr:
debug "Error writing requested data", error
# No point in trying to continue writing data
socket.close()
return err("Error writing requested data")
Expand All @@ -825,10 +811,8 @@ proc offer(p: PortalProtocol, o: OfferRequest):
# When data turns out missing, add a 0 size varint
output.write(toBytes(0'u8, Leb128).toOpenArray())

let dataWritten = await socket.write(output.getOutput)
if dataWritten.isErr:
debug "Error writing requested data",
error = dataWritten.error
let dataWritten = (await socket.write(output.getOutput)).valueOr:
debug "Error writing requested data", error
# No point in trying to continue writing data
socket.close()
return err("Error writing requested data")
Expand Down Expand Up @@ -1143,12 +1127,9 @@ proc neighborhoodGossip*(

# Just taking the first content item as target id.
# TODO: come up with something better?
let contentIdOpt = p.toContentId(contentList[0].contentKey)
if contentIdOpt.isNone():
let contentId = p.toContentId(contentList[0].contentKey).valueOr:
return 0

let contentId = contentIdOpt.get()

# For selecting the closest nodes to whom to gossip the content a mixed
# approach is taken:
# 1. Select the closest neighbours in the routing table
Expand All @@ -1163,7 +1144,6 @@ proc neighborhoodGossip*(
# in its propagation than when looking only for nodes in the own routing
# table, but at the same time avoid unnecessary node lookups.
# It might still cause issues in data getting propagated in a wider id range.

const maxGossipNodes = 8

let closestLocalNodes = p.routingTable.neighbours(
Expand Down Expand Up @@ -1308,34 +1288,35 @@ proc stop*(p: PortalProtocol) =
worker.cancel()
p.offerWorkers = @[]

proc resolve*(p: PortalProtocol, id: NodeId): Future[Option[Node]] {.async.} =
proc resolve*(p: PortalProtocol, id: NodeId): Future[Opt[Node]] {.async.} =
## Resolve a `Node` based on provided `NodeId`.
##
## This will first look in the own routing table. If the node is known, it
## will try to contact if for newer information. If node is not known or it
## does not reply, a lookup is done to see if it can find a (newer) record of
## the node on the network.
if id == p.localNode.id:
return some(p.localNode)
return Opt.some(p.localNode)

let node = p.routingTable.getNode(id)
let node = p.getNode(id)
if node.isSome():
let nodesMessage = await p.findNodes(node.get(), @[0'u16])
# TODO: Handle failures better. E.g. stop on different failures than timeout
if nodesMessage.isOk() and nodesMessage[].len > 0:
return some(nodesMessage[][0])
return Opt.some(nodesMessage[][0])

let discovered = await p.lookup(id)
for n in discovered:
if n.id == id:
if node.isSome() and node.get().record.seqNum >= n.record.seqNum:
return node
else:
return some(n)
return Opt.some(n)

return node

proc resolveWithRadius*(p: PortalProtocol, id: NodeId): Future[Option[(Node, UInt256)]] {.async.} =
proc resolveWithRadius*(
p: PortalProtocol, id: NodeId): Future[Opt[(Node, UInt256)]] {.async.} =
## Resolve a `Node` based on provided `NodeId`, also try to establish what
## is known radius of found node.
##
Expand All @@ -1349,30 +1330,24 @@ proc resolveWithRadius*(p: PortalProtocol, id: NodeId): Future[Option[(Node, UIn
##

let n = await p.resolve(id)

if n.isNone():
return none((Node, UInt256))
return Opt.none((Node, UInt256))

let node = n.unsafeGet()

let r = p.radiusCache.get(id)

if r.isSome():
return some((node, r.unsafeGet()))
return Opt.some((node, r.unsafeGet()))

let pongResult = await p.ping(node)

if pongResult.isOk():
let maybeRadius = p.radiusCache.get(id)

# After successful ping radius should already be in cache, but for the unlikely
# case that it is not, check it just to be sure.
# TODO: rafactor ping to return node radius.
# After successful ping radius should already be in cache, but for the
# unlikely case that it is not, check it just to be sure.
# TODO: refactor ping to return node radius.
if maybeRadius.isNone():
return none((Node, UInt256))

# If pong is successful, radius of the node should definitly be in local
# radius cache
return some((node, maybeRadius.unsafeGet()))
return Opt.none((Node, UInt256))
else:
return Opt.some((node, maybeRadius.unsafeGet()))
else:
return none((Node, UInt256))
return Opt.none((Node, UInt256))
2 changes: 1 addition & 1 deletion vendor/nim-eth

0 comments on commit 8ebac4c

Please sign in to comment.