Skip to content

Simple optimizations to block protocol #1260

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
2 changes: 2 additions & 0 deletions codex/blockexchange/engine/discovery.nim
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async: (raises: []).} =
trace "Discovery request already in progress", cid
continue

trace "Running discovery task for cid", cid

let haves = b.peers.peersHave(cid)

if haves.len < b.minPeersPerBlock:
Expand Down
238 changes: 156 additions & 82 deletions codex/blockexchange/engine/engine.nim

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions codex/blockexchange/engine/pendingblocks.nim
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ declareGauge(

const
DefaultBlockRetries* = 3000
DefaultRetryInterval* = 500.millis
DefaultRetryInterval* = 180.seconds

type
RetriesExhaustedError* = object of CatchableError
Expand All @@ -50,6 +50,7 @@ type
blockRetries*: int = DefaultBlockRetries
retryInterval*: Duration = DefaultRetryInterval
blocks*: Table[BlockAddress, BlockReq] # pending Block requests
lastInclusion*: Moment # time at which we last included a block into our wantlist

proc updatePendingBlockGauge(p: PendingBlocksManager) =
codex_block_exchange_pending_block_requests.set(p.blocks.len.int64)
Expand All @@ -70,6 +71,8 @@ proc getWantHandle*(
startTime: getMonoTime().ticks,
)
self.blocks[address] = blk
self.lastInclusion = Moment.now()

let handle = blk.handle

proc cleanUpBlock(data: pointer) {.raises: [].} =
Expand Down Expand Up @@ -108,9 +111,6 @@ proc resolve*(
blockReq.handle.complete(bd.blk)

codex_block_exchange_retrieval_time_us.set(retrievalDurationUs)

if retrievalDurationUs > 500000:
warn "High block retrieval time", retrievalDurationUs, address = bd.address
else:
trace "Block handle already finished", address = bd.address

Expand Down
28 changes: 21 additions & 7 deletions codex/blockexchange/peers/peercontext.nim
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,34 @@ export payments, nitro
type BlockExcPeerCtx* = ref object of RootObj
id*: PeerId
blocks*: Table[BlockAddress, Presence] # remote peer have list including price
peerWants*: seq[WantListEntry] # remote peers want lists
wantedBlocks*: HashSet[BlockAddress] # blocks that the peer wants
exchanged*: int # times peer has exchanged with us
lastExchange*: Moment # last time peer has exchanged with us
lastRefresh*: Moment # last time we refreshed our knowledge of the blocks this peer has
account*: ?Account # ethereum account of this peer
paymentChannel*: ?ChannelId # payment channel id
blocksInFlight*: HashSet[BlockAddress] # blocks in flight towards peer

proc peerHave*(self: BlockExcPeerCtx): seq[BlockAddress] =
toSeq(self.blocks.keys)
proc isKnowledgeStale*(self: BlockExcPeerCtx): bool =
self.lastRefresh + 5.minutes < Moment.now()

proc peerHaveCids*(self: BlockExcPeerCtx): HashSet[Cid] =
self.blocks.keys.toSeq.mapIt(it.cidOrTreeCid).toHashSet
proc isInFlight*(self: BlockExcPeerCtx, address: BlockAddress): bool =
address in self.blocksInFlight

proc peerWantsCids*(self: BlockExcPeerCtx): HashSet[Cid] =
self.peerWants.mapIt(it.address.cidOrTreeCid).toHashSet
proc addInFlight*(self: BlockExcPeerCtx, address: BlockAddress) =
self.blocksInFlight.incl(address)

proc removeInFlight*(self: BlockExcPeerCtx, address: BlockAddress) =
self.blocksInFlight.excl(address)

proc refreshed*(self: BlockExcPeerCtx) =
self.lastRefresh = Moment.now()

proc peerHave*(self: BlockExcPeerCtx): HashSet[BlockAddress] =
# XXX: this is ugly an inefficient, but since those will typically
# be used in "joins", it's better to pay the price here and have
# a linear join than to not do it and have a quadratic join.
toHashSet(self.blocks.keys.toSeq)

proc contains*(self: BlockExcPeerCtx, address: BlockAddress): bool =
address in self.blocks
Expand Down
10 changes: 6 additions & 4 deletions codex/blockexchange/peers/peerctxstore.nim
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,23 @@ func len*(self: PeerCtxStore): int =
self.peers.len

func peersHave*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx] =
toSeq(self.peers.values).filterIt(it.peerHave.anyIt(it == address))
toSeq(self.peers.values).filterIt(address in it.peerHave)

func peersHave*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] =
# FIXME: this is way slower and can end up leading to unexpected performance loss.
toSeq(self.peers.values).filterIt(it.peerHave.anyIt(it.cidOrTreeCid == cid))

func peersWant*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx] =
toSeq(self.peers.values).filterIt(it.peerWants.anyIt(it == address))
toSeq(self.peers.values).filterIt(address in it.wantedBlocks)

func peersWant*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] =
toSeq(self.peers.values).filterIt(it.peerWants.anyIt(it.address.cidOrTreeCid == cid))
# FIXME: this is way slower and can end up leading to unexpected performance loss.
toSeq(self.peers.values).filterIt(it.wantedBlocks.anyIt(it.cidOrTreeCid == cid))

proc getPeersForBlock*(self: PeerCtxStore, address: BlockAddress): PeersForBlock =
var res: PeersForBlock = (@[], @[])
for peer in self:
if peer.peerHave.anyIt(it == address):
if address in peer.peerHave:
res.with.add(peer)
else:
res.without.add(peer)
Expand Down
8 changes: 0 additions & 8 deletions codex/blockexchange/protobuf/blockexc.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import std/hashes
import std/sequtils
import pkg/stew/endians2

import message

Expand All @@ -20,13 +19,6 @@ export Wantlist, WantType, WantListEntry
export BlockDelivery, BlockPresenceType, BlockPresence
export AccountMessage, StateChannelUpdate

proc hash*(a: BlockAddress): Hash =
if a.leaf:
let data = a.treeCid.data.buffer & @(a.index.uint64.toBytesBE)
hash(data)
else:
hash(a.cid.data.buffer)

proc hash*(e: WantListEntry): Hash =
hash(e.address)

Expand Down
6 changes: 5 additions & 1 deletion codex/blockexchange/protobuf/message.nim
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@ type

WantListEntry* = object
address*: BlockAddress
# XXX: I think explicit priority is pointless as the peer will request
# the blocks in the order it wants to receive them, and all we have to
# do is process those in the same order as we send them back. It also
# complicates things for no reason at the moment, as the priority is
# always set to 0.
priority*: int32 # The priority (normalized). default to 1
cancel*: bool # Whether this revokes an entry
wantType*: WantType # Note: defaults to enum 0, ie Block
sendDontHave*: bool # Note: defaults to false
inFlight*: bool # Whether block sending is in progress. Not serialized.

WantList* = object
entries*: seq[WantListEntry] # A list of wantList entries
Expand Down
10 changes: 9 additions & 1 deletion codex/blocktype.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import std/tables
import std/sugar
import std/hashes

export tables

Expand All @@ -18,7 +19,7 @@ push:
{.upraises: [].}

import pkg/libp2p/[cid, multicodec, multihash]
import pkg/stew/byteutils
import pkg/stew/[byteutils, endians2]
import pkg/questionable
import pkg/questionable/results

Expand Down Expand Up @@ -67,6 +68,13 @@ proc `$`*(a: BlockAddress): string =
else:
"cid: " & $a.cid

proc hash*(a: BlockAddress): Hash =
if a.leaf:
let data = a.treeCid.data.buffer & @(a.index.uint64.toBytesBE)
hash(data)
else:
hash(a.cid.data.buffer)

proc cidOrTreeCid*(a: BlockAddress): Cid =
if a.leaf: a.treeCid else: a.cid

Expand Down
19 changes: 7 additions & 12 deletions codex/node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ export logutils
logScope:
topics = "codex node"

const DefaultFetchBatch = 10
const DefaultFetchBatch = 1_000_000

type
Contracts* =
Expand Down Expand Up @@ -187,23 +187,18 @@ proc fetchBatched*(
# )

while not iter.finished:
let blockFutures = collect:
let addresses = collect:
for i in 0 ..< batchSize:
if not iter.finished:
let address = BlockAddress.init(cid, iter.next())
if not (await address in self.networkStore) or fetchLocal:
self.networkStore.getBlock(address)
address

if blockFutures.len == 0:
continue
let
blockResults = await self.networkStore.getBlocks(addresses)
blocks = blockResults.filterIt(it.isSuccess()).mapIt(it.value)
numOfFailedBlocks = blockResults.len - blocks.len

without blockResults =? await allFinishedValues[?!bt.Block](blockFutures), err:
trace "Some blocks failed to fetch", err = err.msg
return failure(err)

let blocks = blockResults.filterIt(it.isSuccess()).mapIt(it.value)

let numOfFailedBlocks = blockResults.len - blocks.len
if numOfFailedBlocks > 0:
return
failure("Some blocks failed (Result) to fetch (" & $numOfFailedBlocks & ")")
Expand Down
19 changes: 19 additions & 0 deletions codex/stores/networkstore.nim
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,25 @@ type NetworkStore* = ref object of BlockStore
engine*: BlockExcEngine # blockexc decision engine
localStore*: BlockStore # local block store

proc getBlocks*(
self: NetworkStore, addresses: seq[BlockAddress]
): Future[seq[?!Block]] {.async: (raises: [CancelledError]).} =
var
localBlocks: seq[?!Block]
remoteAddresses: seq[BlockAddress]

# We can resolve local blocks sequentially as for now those are blocking anyway. Still:
# TODO: implement getBlocks for local store so we can delegate it here.
for address in addresses:
if not (await address in self.localStore):
remoteAddresses.add(address)
else:
localBlocks.add(await self.localStore.getBlock(address))

let remoteBlocks = await self.engine.requestBlocks(remoteAddresses)

return localBlocks.concat(remoteBlocks)

method getBlock*(
self: NetworkStore, address: BlockAddress
): Future[?!Block] {.async: (raises: [CancelledError]).} =
Expand Down
25 changes: 25 additions & 0 deletions codex/utils/safeasynciter.nim
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,28 @@ proc empty*[T](_: type SafeAsyncIter[T]): SafeAsyncIter[T] =
true

SafeAsyncIter[T].new(genNext, isFinished)

proc chain*[T](iters: seq[SafeAsyncIter[T]]): SafeAsyncIter[T] =
if iters.len == 0:
return SafeAsyncIter[T].empty

var curIdx = 0

proc ensureNext(): void =
while curIdx < iters.len and iters[curIdx].finished:
inc(curIdx)

proc isFinished(): bool =
curIdx == iters.len

proc genNext(): Future[?!T] {.async: (raises: [CancelledError]).} =
let item = await iters[curIdx].next()
ensureNext()
return item

ensureNext()

return SafeAsyncIter[T].new(genNext, isFinished)

proc chain*[T](iters: varargs[SafeAsyncIter[T]]): SafeAsyncIter[T] =
chain(iters.toSeq)
Loading