Skip to content

Commit

Permalink
Implement graceful shutdown in Fluffy (#2645)
Browse files Browse the repository at this point in the history
* Make stop functions wait for completion before return.

* Implement graceful shutdown.

* Shutdown rpc and metric servers if enabled.

* Move metrics and rpc servers out of PortalNode.
  • Loading branch information
bhartnett authored Sep 20, 2024
1 parent 7a15aa2 commit 0719396
Show file tree
Hide file tree
Showing 12 changed files with 186 additions and 81 deletions.
146 changes: 106 additions & 40 deletions fluffy/fluffy.nim
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ func optionToOpt[T](o: Option[T]): Opt[T] =
else:
Opt.none(T)

proc run(config: PortalConf) {.raises: [CatchableError].} =
proc run(
config: PortalConf
): (PortalNode, Opt[MetricsHttpServerRef], Opt[RpcHttpServer], Opt[RpcWebSocketServer]) {.
raises: [CatchableError]
.} =
setupLogging(config.logLevel, config.logStdout, none(OutFile))

notice "Launching Fluffy", version = fullVersionStr, cmdParams = commandLineParams()
Expand Down Expand Up @@ -185,26 +189,30 @@ proc run(config: PortalConf) {.raises: [CatchableError].} =
quit 1

## Start metrics HTTP server
if config.metricsEnabled:
let
address = config.metricsAddress
port = config.metricsPort
url = "http://" & $address & ":" & $port & "/metrics"

server = MetricsHttpServerRef.new($address, port).valueOr:
error "Could not instantiate metrics HTTP server", url, error
let metricsServer =
if config.metricsEnabled:
let
address = config.metricsAddress
port = config.metricsPort
url = "http://" & $address & ":" & $port & "/metrics"

server = MetricsHttpServerRef.new($address, port).valueOr:
error "Could not instantiate metrics HTTP server", url, error
quit QuitFailure

info "Starting metrics HTTP server", url
try:
waitFor server.start()
except MetricsError as exc:
fatal "Could not start metrics HTTP server",
url, error_msg = exc.msg, error_name = exc.name
quit QuitFailure

info "Starting metrics HTTP server", url
try:
waitFor server.start()
except MetricsError as exc:
fatal "Could not start metrics HTTP server",
url, error_msg = exc.msg, error_name = exc.name
quit QuitFailure

## Start discovery v5 protocol and the Portal node.
d.start()
Opt.some(server)
else:
Opt.none(MetricsHttpServerRef)

## Start the Portal node.
node.start()

## Start the JSON-RPC APIs
Expand Down Expand Up @@ -235,24 +243,32 @@ proc run(config: PortalConf) {.raises: [CatchableError].} =

rpcServer.start()

if config.rpcEnabled:
let
ta = initTAddress(config.rpcAddress, config.rpcPort)
rpcHttpServer = RpcHttpServer.new()
# Note: Set maxRequestBodySize to 4MB instead of 1MB as there are blocks
# that reach that limit (in hex, for gossip method).
rpcHttpServer.addHttpServer(ta, maxRequestBodySize = 4 * 1_048_576)

setupRpcServer(rpcHttpServer)

if config.wsEnabled:
let
ta = initTAddress(config.rpcAddress, config.wsPort)
rpcWsServer = newRpcWebSocketServer(ta, compression = config.wsCompression)

setupRpcServer(rpcWsServer)

runForever()
let rpcHttpServer =
if config.rpcEnabled:
let
ta = initTAddress(config.rpcAddress, config.rpcPort)
rpcHttpServer = RpcHttpServer.new()
# Note: Set maxRequestBodySize to 4MB instead of 1MB as there are blocks
# that reach that limit (in hex, for gossip method).
rpcHttpServer.addHttpServer(ta, maxRequestBodySize = 4 * 1_048_576)
setupRpcServer(rpcHttpServer)

Opt.some(rpcHttpServer)
else:
Opt.none(RpcHttpServer)

let rpcWsServer =
if config.wsEnabled:
let
ta = initTAddress(config.rpcAddress, config.wsPort)
rpcWsServer = newRpcWebSocketServer(ta, compression = config.wsCompression)
setupRpcServer(rpcWsServer)

Opt.some(rpcWsServer)
else:
Opt.none(RpcWebSocketServer)

return (node, metricsServer, rpcHttpServer, rpcWsServer)

when isMainModule:
{.pop.}
Expand All @@ -262,6 +278,56 @@ when isMainModule:
)
{.push raises: [].}

case config.cmd
of PortalCmd.noCommand:
run(config)
let (node, metricsServer, rpcHttpServer, rpcWsServer) =
case config.cmd
of PortalCmd.noCommand:
run(config)

# Ctrl+C handling
proc controlCHandler() {.noconv.} =
when defined(windows):
# workaround for https://github.com/nim-lang/Nim/issues/4057
try:
setupForeignThreadGc()
except Exception as exc:
raiseAssert exc.msg # shouldn't happen

notice "Shutting down after having received SIGINT"
node.state = PortalNodeState.Stopping

try:
setControlCHook(controlCHandler)
except Exception as exc: # TODO Exception
warn "Cannot set ctrl-c handler", msg = exc.msg

while node.state == PortalNodeState.Running:
try:
poll()
except CatchableError as e:
warn "Exception in poll()", exc = e.name, err = e.msg

if rpcWsServer.isSome():
let server = rpcWsServer.get()
try:
server.stop()
waitFor server.closeWait()
except CatchableError as e:
warn "Failed to stop rpc WS server", exc = e.name, err = e.msg

if rpcHttpServer.isSome():
let server = rpcHttpServer.get()
try:
waitFor server.stop()
waitFor server.closeWait()
except CatchableError as e:
warn "Failed to stop rpc HTTP server", exc = e.name, err = e.msg

if metricsServer.isSome():
let server = metricsServer.get()
try:
waitFor server.stop()
waitFor server.close()
except CatchableError as e:
warn "Failed to stop metrics HTTP server", exc = e.name, err = e.msg

waitFor node.stop()
5 changes: 3 additions & 2 deletions fluffy/network/beacon/beacon_light_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,10 @@ proc start*(lightClient: LightClient) =
info "Starting beacon light client", trusted_block_root = lightClient.trustedBlockRoot
lightClient.manager.start()

proc stop*(lightClient: LightClient) =
proc stop*(lightClient: LightClient) {.async: (raises: []).} =
info "Stopping beacon light client"
discard lightClient.manager.stop()

await lightClient.manager.stop()

proc resetToFinalizedHeader*(
lightClient: LightClient,
Expand Down
10 changes: 5 additions & 5 deletions fluffy/network/beacon/beacon_light_client_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type
GetBoolCallback* = proc(): bool {.gcsafe, raises: [].}
GetSlotCallback* = proc(): Slot {.gcsafe, raises: [].}

LightClientManager* = object
LightClientManager* = ref object
network: BeaconNetwork
rng: ref HmacDrbgContext
getTrustedBlockRoot: GetTrustedBlockRootCallback
Expand Down Expand Up @@ -315,13 +315,13 @@ proc loop(self: LightClientManager) {.async: (raises: [CancelledError]).} =
didLatestSyncTaskProgress = didProgress,
)

proc start*(self: var LightClientManager) =
proc start*(self: LightClientManager) =
## Start light client manager's loop.
doAssert self.loopFuture == nil
self.loopFuture = self.loop()

proc stop*(self: var LightClientManager) {.async: (raises: []).} =
proc stop*(self: LightClientManager) {.async: (raises: []).} =
## Stop light client manager's loop.
if self.loopFuture != nil:
await noCancel self.loopFuture.cancelAndWait()
if not self.loopFuture.isNil():
await noCancel(self.loopFuture.cancelAndWait())
self.loopFuture = nil
16 changes: 11 additions & 5 deletions fluffy/network/beacon/beacon_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -381,13 +381,19 @@ proc start*(n: BeaconNetwork) =
n.portalProtocol.start()
n.processContentLoop = processContentLoop(n)

proc stop*(n: BeaconNetwork) =
proc stop*(n: BeaconNetwork) {.async: (raises: []).} =
info "Stopping Portal beacon chain network"

n.portalProtocol.stop()
var futures: seq[Future[void]]
futures.add(n.portalProtocol.stop())

if not n.processContentLoop.isNil:
n.processContentLoop.cancelSoon()
if not n.processContentLoop.isNil():
futures.add(n.processContentLoop.cancelAndWait())

if not n.statusLogLoop.isNil():
n.statusLogLoop.cancelSoon()
futures.add(n.statusLogLoop.cancelAndWait())

await noCancel(allFutures(futures))

n.processContentLoop = nil
n.statusLogLoop = nil
14 changes: 9 additions & 5 deletions fluffy/network/history/history_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -722,13 +722,17 @@ proc start*(n: HistoryNetwork) =
n.statusLogLoop = statusLogLoop(n)
pruneDeprecatedAccumulatorRecords(n.accumulator, n.contentDB)

proc stop*(n: HistoryNetwork) =
proc stop*(n: HistoryNetwork) {.async: (raises: []).} =
info "Stopping Portal execution history network"

n.portalProtocol.stop()
var futures: seq[Future[void]]
futures.add(n.portalProtocol.stop())

if not n.processContentLoop.isNil:
n.processContentLoop.cancelSoon()

futures.add(n.processContentLoop.cancelAndWait())
if not n.statusLogLoop.isNil:
n.statusLogLoop.cancelSoon()
futures.add(n.statusLogLoop.cancelAndWait())
await noCancel(allFutures(futures))

n.processContentLoop = nil
n.statusLogLoop = nil
15 changes: 10 additions & 5 deletions fluffy/network/state/state_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -241,13 +241,18 @@ proc start*(n: StateNetwork) =
n.processContentLoop = processContentLoop(n)
n.statusLogLoop = statusLogLoop(n)

proc stop*(n: StateNetwork) =
proc stop*(n: StateNetwork) {.async: (raises: []).} =
info "Stopping Portal execution state network"

n.portalProtocol.stop()
var futures: seq[Future[void]]
futures.add(n.portalProtocol.stop())

if not n.processContentLoop.isNil():
n.processContentLoop.cancelSoon()

futures.add(n.processContentLoop.cancelAndWait())
if not n.statusLogLoop.isNil():
n.statusLogLoop.cancelSoon()
futures.add(n.statusLogLoop.cancelAndWait())

await noCancel(allFutures(futures))

n.processContentLoop = nil
n.statusLogLoop = nil
19 changes: 13 additions & 6 deletions fluffy/network/wire/portal_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1695,14 +1695,21 @@ proc start*(p: PortalProtocol) =
for i in 0 ..< concurrentOffers:
p.offerWorkers.add(offerWorker(p))

proc stop*(p: PortalProtocol) =
if not p.revalidateLoop.isNil:
p.revalidateLoop.cancelSoon()
if not p.refreshLoop.isNil:
p.refreshLoop.cancelSoon()
proc stop*(p: PortalProtocol) {.async: (raises: []).} =
var futures: seq[Future[void]]

if not p.revalidateLoop.isNil():
futures.add(p.revalidateLoop.cancelAndWait())
if not p.refreshLoop.isNil():
futures.add(p.refreshLoop.cancelAndWait())

for worker in p.offerWorkers:
worker.cancelSoon()
futures.add(worker.cancelAndWait())

await noCancel(allFutures(futures))

p.revalidateLoop = nil
p.refreshLoop = nil
p.offerWorkers = @[]

proc resolve*(
Expand Down
32 changes: 24 additions & 8 deletions fluffy/portal_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ export
beacon_light_client, history_network, state_network, portal_protocol_config, forks

type
PortalNodeState* = enum
Starting
Running
Stopping

PortalNodeConfig* = object
accumulatorFile*: Opt[string]
disableStateRootValidation*: bool
Expand All @@ -33,6 +38,7 @@ type
storageCapacity*: uint64

PortalNode* = ref object
state*: PortalNodeState
discovery: protocol.Protocol
contentDB: ContentDB
streamManager: StreamManager
Expand Down Expand Up @@ -202,6 +208,8 @@ proc statusLogLoop(n: PortalNode) {.async: (raises: []).} =
proc start*(n: PortalNode) =
debug "Starting Portal node"

n.discovery.start()

if n.beaconNetwork.isSome():
n.beaconNetwork.value.start()
if n.historyNetwork.isSome():
Expand All @@ -214,18 +222,26 @@ proc start*(n: PortalNode) =

n.statusLogLoop = statusLogLoop(n)

proc stop*(n: PortalNode) =
n.state = PortalNodeState.Running

proc stop*(n: PortalNode) {.async: (raises: []).} =
debug "Stopping Portal node"

var futures: seq[Future[void]]

if n.beaconNetwork.isSome():
n.beaconNetwork.value.stop()
futures.add(n.beaconNetwork.value.stop())
if n.historyNetwork.isSome():
n.historyNetwork.value.stop()
futures.add(n.historyNetwork.value.stop())
if n.stateNetwork.isSome():
n.stateNetwork.value.stop()

futures.add(n.stateNetwork.value.stop())
if n.beaconLightClient.isSome():
n.beaconLightClient.value.stop()
futures.add(n.beaconLightClient.value.stop())
if not n.statusLogLoop.isNil():
futures.add(n.statusLogLoop.cancelAndWait())

await noCancel(allFutures(futures))

if not n.statusLogLoop.isNil:
n.statusLogLoop.cancelSoon()
await n.discovery.closeWait()
n.contentDB.close()
n.statusLogLoop = nil
2 changes: 1 addition & 1 deletion fluffy/tests/beacon_network_tests/beacon_test_helpers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ proc start*(n: BeaconNode) =
n.beaconNetwork.start()

proc stop*(n: BeaconNode) {.async.} =
n.beaconNetwork.stop()
discard n.beaconNetwork.stop()
await n.discoveryProtocol.closeWait()

proc containsId*(n: BeaconNode, contentId: ContentId): bool =
Expand Down
2 changes: 1 addition & 1 deletion fluffy/tests/state_network_tests/state_test_helpers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ proc start*(sn: StateNode) =
sn.stateNetwork.start()

proc stop*(sn: StateNode) {.async.} =
sn.stateNetwork.stop()
discard sn.stateNetwork.stop()
await sn.discoveryProtocol.closeWait()

proc containsId*(sn: StateNode, contentId: ContentId): bool {.inline.} =
Expand Down
Loading

0 comments on commit 0719396

Please sign in to comment.