Skip to content

Commit

Permalink
Make stop functions wait for completion before return.
Browse files Browse the repository at this point in the history
  • Loading branch information
bhartnett committed Sep 19, 2024
1 parent a9ad10c commit a79b091
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 31 deletions.
5 changes: 3 additions & 2 deletions fluffy/network/beacon/beacon_light_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,10 @@ proc start*(lightClient: LightClient) =
info "Starting beacon light client", trusted_block_root = lightClient.trustedBlockRoot
lightClient.manager.start()

proc stop*(lightClient: LightClient) =
proc stop*(lightClient: LightClient) {.async: (raises: []).} =
info "Stopping beacon light client"
discard lightClient.manager.stop()

await lightClient.manager.stop()

proc resetToFinalizedHeader*(
lightClient: LightClient,
Expand Down
4 changes: 2 additions & 2 deletions fluffy/network/beacon/beacon_light_client_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,6 @@ proc start*(self: var LightClientManager) =

proc stop*(self: var LightClientManager) {.async: (raises: []).} =
## Stop light client manager's loop.
if self.loopFuture != nil:
await noCancel self.loopFuture.cancelAndWait()
if not self.loopFuture.isNil():
await noCancel(self.loopFuture.cancelAndWait())
self.loopFuture = nil
16 changes: 11 additions & 5 deletions fluffy/network/beacon/beacon_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -381,13 +381,19 @@ proc start*(n: BeaconNetwork) =
n.portalProtocol.start()
n.processContentLoop = processContentLoop(n)

proc stop*(n: BeaconNetwork) =
proc stop*(n: BeaconNetwork) {.async: (raises: []).} =
info "Stopping Portal beacon chain network"

n.portalProtocol.stop()
var futures: seq[Future[void]]
futures.add(n.portalProtocol.stop())

if not n.processContentLoop.isNil:
n.processContentLoop.cancelSoon()
if not n.processContentLoop.isNil():
futures.add(n.processContentLoop.cancelAndWait())

if not n.statusLogLoop.isNil():
n.statusLogLoop.cancelSoon()
futures.add(n.statusLogLoop.cancelAndWait())

await noCancel(allFutures(futures))

n.processContentLoop = nil
n.statusLogLoop = nil
14 changes: 9 additions & 5 deletions fluffy/network/history/history_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -722,13 +722,17 @@ proc start*(n: HistoryNetwork) =
n.statusLogLoop = statusLogLoop(n)
pruneDeprecatedAccumulatorRecords(n.accumulator, n.contentDB)

proc stop*(n: HistoryNetwork) =
proc stop*(n: HistoryNetwork) {.async: (raises: []).} =
info "Stopping Portal execution history network"

n.portalProtocol.stop()
var futures: seq[Future[void]]
futures.add(n.portalProtocol.stop())

if not n.processContentLoop.isNil:
n.processContentLoop.cancelSoon()

futures.add(n.processContentLoop.cancelAndWait())
if not n.statusLogLoop.isNil:
n.statusLogLoop.cancelSoon()
futures.add(n.statusLogLoop.cancelAndWait())
await noCancel(allFutures(futures))

n.processContentLoop = nil
n.statusLogLoop = nil
15 changes: 10 additions & 5 deletions fluffy/network/state/state_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -241,13 +241,18 @@ proc start*(n: StateNetwork) =
n.processContentLoop = processContentLoop(n)
n.statusLogLoop = statusLogLoop(n)

proc stop*(n: StateNetwork) =
proc stop*(n: StateNetwork) {.async: (raises: []).} =
info "Stopping Portal execution state network"

n.portalProtocol.stop()
var futures: seq[Future[void]]
futures.add(n.portalProtocol.stop())

if not n.processContentLoop.isNil():
n.processContentLoop.cancelSoon()

futures.add(n.processContentLoop.cancelAndWait())
if not n.statusLogLoop.isNil():
n.statusLogLoop.cancelSoon()
futures.add(n.statusLogLoop.cancelAndWait())

await noCancel(allFutures(futures))

n.processContentLoop = nil
n.statusLogLoop = nil
19 changes: 13 additions & 6 deletions fluffy/network/wire/portal_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1708,14 +1708,21 @@ proc start*(p: PortalProtocol) =
for i in 0 ..< concurrentOffers:
p.offerWorkers.add(offerWorker(p))

proc stop*(p: PortalProtocol) =
if not p.revalidateLoop.isNil:
p.revalidateLoop.cancelSoon()
if not p.refreshLoop.isNil:
p.refreshLoop.cancelSoon()
proc stop*(p: PortalProtocol) {.async: (raises: []).} =
var futures: seq[Future[void]]

if not p.revalidateLoop.isNil():
futures.add(p.revalidateLoop.cancelAndWait())
if not p.refreshLoop.isNil():
futures.add(p.refreshLoop.cancelAndWait())

for worker in p.offerWorkers:
worker.cancelSoon()
futures.add(worker.cancelAndWait())

await noCancel(allFutures(futures))

p.revalidateLoop = nil
p.refreshLoop = nil
p.offerWorkers = @[]

proc resolve*(
Expand Down
21 changes: 15 additions & 6 deletions fluffy/portal_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -214,18 +214,27 @@ proc start*(n: PortalNode) =

n.statusLogLoop = statusLogLoop(n)

proc stop*(n: PortalNode) =
proc stop*(n: PortalNode) {.async: (raises: []).} =
debug "Stopping Portal node"

var futures: seq[Future[void]]

if n.beaconNetwork.isSome():
n.beaconNetwork.value.stop()
futures.add(n.beaconNetwork.value.stop())
if n.historyNetwork.isSome():
n.historyNetwork.value.stop()
futures.add(n.historyNetwork.value.stop())
if n.stateNetwork.isSome():
n.stateNetwork.value.stop()
futures.add(n.stateNetwork.value.stop())

if n.beaconLightClient.isSome():
n.beaconLightClient.value.stop()
futures.add(n.beaconLightClient.value.stop())

if not n.statusLogLoop.isNil:
n.statusLogLoop.cancelSoon()
futures.add(n.statusLogLoop.cancelAndWait())

futures.add(n.discovery.closeWait())

await noCancel(allFutures(futures))

n.contentDB.close()
n.statusLogLoop = nil

0 comments on commit a79b091

Please sign in to comment.