Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some more Options to Opt and similar changes #1611

Merged
merged 1 commit into from
Jun 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 14 additions & 18 deletions fluffy/network/history/history_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -344,21 +344,21 @@ proc get(db: ContentDB, T: type BlockHeader, contentId: ContentId): Opt[T] =

proc get(db: ContentDB, T: type BlockBody, contentId: ContentId,
header: BlockHeader): Opt[T] =
let encoded = db.get(contentId)
if encoded.isNone():
let encoded = db.get(contentId).valueOr:
return Opt.none(T)

let timestamp = Moment.init(header.timestamp.toUnix(), Second)
let body =
if isShanghai(chainConfig, timestamp):
BlockBody.fromPortalBlockBodyOrRaise(
decodeSszOrRaise(encoded.get(), PortalBlockBodyShanghai))
elif isPoSBlock(chainConfig, header.blockNumber.truncate(uint64)):
BlockBody.fromPortalBlockBodyOrRaise(
decodeSszOrRaise(encoded.get(), PortalBlockBodyLegacy))
else:
BlockBody.fromPortalBlockBodyOrRaise(
decodeSszOrRaise(encoded.get(), PortalBlockBodyLegacy))
let
timestamp = Moment.init(header.timestamp.toUnix(), Second)
body =
if isShanghai(chainConfig, timestamp):
BlockBody.fromPortalBlockBodyOrRaise(
decodeSszOrRaise(encoded, PortalBlockBodyShanghai))
elif isPoSBlock(chainConfig, header.blockNumber.truncate(uint64)):
BlockBody.fromPortalBlockBodyOrRaise(
decodeSszOrRaise(encoded, PortalBlockBodyLegacy))
else:
BlockBody.fromPortalBlockBodyOrRaise(
decodeSszOrRaise(encoded, PortalBlockBodyLegacy))

Opt.some(body)

Expand Down Expand Up @@ -733,17 +733,13 @@ proc validateContent(
for i, contentItem in contentItems:
let contentKey = contentKeys[i]
if await n.validateContent(contentItem, contentKey):
let contentIdOpt = n.portalProtocol.toContentId(contentKey)
if contentIdOpt.isNone():
let contentId = n.portalProtocol.toContentId(contentKey).valueOr:
error "Received offered content with invalid content key", contentKey
return false

let contentId = contentIdOpt.get()

n.portalProtocol.storeContent(contentKey, contentId, contentItem)

info "Received offered content validated successfully", contentKey

else:
error "Received offered content failed validation", contentKey
return false
Expand Down
111 changes: 43 additions & 68 deletions fluffy/network/wire/portal_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ proc addNode*(p: PortalProtocol, r: Record): bool =
else:
false

proc getNode*(p: PortalProtocol, id: NodeId): Option[Node] =
proc getNode*(p: PortalProtocol, id: NodeId): Opt[Node] =
p.routingTable.getNode(id)

func localNode*(p: PortalProtocol): Node = p.baseProtocol.localNode
Expand Down Expand Up @@ -316,15 +316,13 @@ proc handleFindContent(
maxPayloadSize = maxDiscv5PacketSize - talkRespOverhead - contentOverhead
enrOverhead = 4 # per added ENR, 4 bytes offset overhead

let contentIdResult = p.toContentId(fc.contentKey)

if contentIdResult.isErr:
let contentId = p.toContentId(fc.contentKey).valueOr:
# Return empty response when content key validation fails
# TODO: Better would be to return no message at all? Needs changes on
# discv5 layer.
return @[]

let contentResult = p.dbGet(fc.contentKey, contentIdResult.get())
let contentResult = p.dbGet(fc.contentKey, contentId)

if contentResult.isOk():
let content = contentResult.get()
Expand All @@ -340,7 +338,7 @@ proc handleFindContent(
# Don't have the content, send closest neighbours to content id.
let
closestNodes = p.routingTable.neighbours(
NodeId(contentIdResult.get()), seenOnly = true)
NodeId(contentId), seenOnly = true)
enrs = truncateEnrs(closestNodes, maxPayloadSize, enrOverhead)
portal_content_enrs_packed.observe(enrs.len().int64)

Expand Down Expand Up @@ -438,8 +436,8 @@ proc getInitialRadius(rc: RadiusConfig): UInt256 =
# In case of a dynamic radius we start from the maximum value to quickly
# gather as much data as possible, and also make sure each data piece in
# the database is in our range after a node restart.
# Alternative would be to store node the radius in database, and initialize it
# from database after a restart
# Alternative would be to store node the radius in database, and initialize
# it from database after a restart
return UInt256.high()

proc new*(T: type PortalProtocol,
Expand Down Expand Up @@ -609,25 +607,20 @@ proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList):
return err("Trying to connect to node with unknown address")

# uTP protocol uses BE for all values in the header, incl. connection id
let connectionResult =
await p.stream.connectTo(
nodeAddress.unsafeGet(),
uint16.fromBytesBE(m.connectionId)
)

if connectionResult.isErr():
debug "uTP connection error while trying to find content",
error = connectionResult.error
let socket =
(await p.stream.connectTo(
nodeAddress.unsafeGet(),
uint16.fromBytesBE(m.connectionId)
)).valueOr:
debug "uTP connection error for find content", error
return err("Error connecting uTP socket")

let socket = connectionResult.get()

try:
# Read all bytes from the socket
# This will either end with a FIN, or because the read action times out.
# A FIN does not necessarily mean that the data read is complete. Further
# validation is required, using a length prefix here might be beneficial for
# this.
# A FIN does not necessarily mean that the data read is complete.
# Further validation is required, using a length prefix here might be
# beneficial for this.
let readFut = socket.read()

readFut.cancelCallback = proc(udate: pointer) {.gcsafe.} =
Expand All @@ -649,7 +642,7 @@ proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList):
else :
debug "Socket read time-out",
socketKey = socket.socketKey
# Note: This might look a bit strange, be not doing a socket.close()
# Note: This might look a bit strange, but not doing a socket.close()
# here as this is already done internally. utp_socket `checkTimeouts`
# already does a socket.destroy() on timeout. Might want to change the
# API on this later though.
Expand Down Expand Up @@ -771,19 +764,14 @@ proc offer(p: PortalProtocol, o: OfferRequest):
id = o.dst.id
return err("Trying to connect to node with unknown address")

let connectionResult =
await p.stream.connectTo(
nodeAddress.unsafeGet(),
uint16.fromBytesBE(m.connectionId)
)

if connectionResult.isErr():
debug "Utp connection error while trying to offer content",
error = connectionResult.error
let socket =
(await p.stream.connectTo(
nodeAddress.unsafeGet(),
uint16.fromBytesBE(m.connectionId)
)).valueOr:
debug "uTP connection error for offer content", error
return err("Error connecting uTP socket")

let socket = connectionResult.get()

template lenu32(x: untyped): untyped =
uint32(len(x))

Expand All @@ -797,10 +785,8 @@ proc offer(p: PortalProtocol, o: OfferRequest):
output.write(toBytes(content.lenu32, Leb128).toOpenArray())
output.write(content)

let dataWritten = await socket.write(output.getOutput)
if dataWritten.isErr:
debug "Error writing requested data",
error = dataWritten.error
let dataWritten = (await socket.write(output.getOutput)).valueOr:
debug "Error writing requested data", error
# No point in trying to continue writing data
socket.close()
return err("Error writing requested data")
Expand All @@ -825,10 +811,8 @@ proc offer(p: PortalProtocol, o: OfferRequest):
# When data turns out missing, add a 0 size varint
output.write(toBytes(0'u8, Leb128).toOpenArray())

let dataWritten = await socket.write(output.getOutput)
if dataWritten.isErr:
debug "Error writing requested data",
error = dataWritten.error
let dataWritten = (await socket.write(output.getOutput)).valueOr:
debug "Error writing requested data", error
# No point in trying to continue writing data
socket.close()
return err("Error writing requested data")
Expand Down Expand Up @@ -1143,12 +1127,9 @@ proc neighborhoodGossip*(

# Just taking the first content item as target id.
# TODO: come up with something better?
let contentIdOpt = p.toContentId(contentList[0].contentKey)
if contentIdOpt.isNone():
let contentId = p.toContentId(contentList[0].contentKey).valueOr:
return 0

let contentId = contentIdOpt.get()

# For selecting the closest nodes to whom to gossip the content a mixed
# approach is taken:
# 1. Select the closest neighbours in the routing table
Expand All @@ -1163,7 +1144,6 @@ proc neighborhoodGossip*(
# in its propagation than when looking only for nodes in the own routing
# table, but at the same time avoid unnecessary node lookups.
# It might still cause issues in data getting propagated in a wider id range.

const maxGossipNodes = 8

let closestLocalNodes = p.routingTable.neighbours(
Expand Down Expand Up @@ -1308,34 +1288,35 @@ proc stop*(p: PortalProtocol) =
worker.cancel()
p.offerWorkers = @[]

proc resolve*(p: PortalProtocol, id: NodeId): Future[Option[Node]] {.async.} =
proc resolve*(p: PortalProtocol, id: NodeId): Future[Opt[Node]] {.async.} =
## Resolve a `Node` based on provided `NodeId`.
##
## This will first look in the own routing table. If the node is known, it
## will try to contact if for newer information. If node is not known or it
## does not reply, a lookup is done to see if it can find a (newer) record of
## the node on the network.
if id == p.localNode.id:
return some(p.localNode)
return Opt.some(p.localNode)

let node = p.routingTable.getNode(id)
let node = p.getNode(id)
if node.isSome():
let nodesMessage = await p.findNodes(node.get(), @[0'u16])
# TODO: Handle failures better. E.g. stop on different failures than timeout
if nodesMessage.isOk() and nodesMessage[].len > 0:
return some(nodesMessage[][0])
return Opt.some(nodesMessage[][0])

let discovered = await p.lookup(id)
for n in discovered:
if n.id == id:
if node.isSome() and node.get().record.seqNum >= n.record.seqNum:
return node
else:
return some(n)
return Opt.some(n)

return node

proc resolveWithRadius*(p: PortalProtocol, id: NodeId): Future[Option[(Node, UInt256)]] {.async.} =
proc resolveWithRadius*(
p: PortalProtocol, id: NodeId): Future[Opt[(Node, UInt256)]] {.async.} =
## Resolve a `Node` based on provided `NodeId`, also try to establish what
## is known radius of found node.
##
Expand All @@ -1349,30 +1330,24 @@ proc resolveWithRadius*(p: PortalProtocol, id: NodeId): Future[Option[(Node, UIn
##

let n = await p.resolve(id)

if n.isNone():
return none((Node, UInt256))
return Opt.none((Node, UInt256))

let node = n.unsafeGet()

let r = p.radiusCache.get(id)

if r.isSome():
return some((node, r.unsafeGet()))
return Opt.some((node, r.unsafeGet()))

let pongResult = await p.ping(node)

if pongResult.isOk():
let maybeRadius = p.radiusCache.get(id)

# After successful ping radius should already be in cache, but for the unlikely
# case that it is not, check it just to be sure.
# TODO: rafactor ping to return node radius.
# After successful ping radius should already be in cache, but for the
# unlikely case that it is not, check it just to be sure.
# TODO: refactor ping to return node radius.
if maybeRadius.isNone():
return none((Node, UInt256))

# If pong is successful, radius of the node should definitly be in local
# radius cache
return some((node, maybeRadius.unsafeGet()))
return Opt.none((Node, UInt256))
else:
return Opt.some((node, maybeRadius.unsafeGet()))
else:
return none((Node, UInt256))
return Opt.none((Node, UInt256))
2 changes: 1 addition & 1 deletion vendor/nim-eth