Skip to content

Commit 3dd8ae7

Browse files
committed
fix worker issue
1 parent b63749d commit 3dd8ae7

File tree

2 files changed

+25
-49
lines changed

2 files changed

+25
-49
lines changed

nimbus_verified_proxy/lc/lc_manager.nim

Lines changed: 24 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -248,73 +248,48 @@ proc workerTask[E](
248248
proc query[E](
249249
self: LightClientManager, e: typedesc[E], key: E.K
250250
): Future[bool] {.async: (raises: [CancelledError]).} =
251-
const PARALLEL_REQUESTS = 2
252-
var workers: array[PARALLEL_REQUESTS, Future[bool]]
251+
const NUM_WORKERS = 2
252+
var workers: array[NUM_WORKERS, Future[bool]]
253253

254254
let
255255
progressFut = Future[void].Raising([CancelledError]).init("lcmanProgress")
256-
doneFut = Future[void].Raising([CancelledError]).init("lcmanDone")
257256
var
258257
numCompleted = 0
258+
success = false
259259
maxCompleted = workers.len
260260

261261
proc handleFinishedWorker(future: pointer) =
262262
try:
263263
let didProgress = cast[Future[bool]](future).read()
264264
if didProgress and not progressFut.finished:
265265
progressFut.complete()
266-
except CancelledError:
267-
if not progressFut.finished:
268-
progressFut.cancelSoon()
266+
success = true
269267
except CatchableError:
270268
discard
271269
finally:
272270
inc numCompleted
273271
if numCompleted == maxCompleted:
274-
doneFut.complete()
272+
progressFut.cancelSoon()
275273

276-
try:
277-
# Start concurrent workers
278-
for i in 0 ..< workers.len:
279-
try:
280-
workers[i] = self.workerTask(e, key)
281-
workers[i].addCallback(handleFinishedWorker)
282-
except CancelledError as exc:
283-
raise exc
284-
except CatchableError:
285-
workers[i] = newFuture[bool]()
286-
workers[i].complete(false)
287-
288-
# Wait for any worker to report progress, or for all workers to finish
274+
# Start concurrent workers
275+
for i in 0 ..< workers.len:
289276
try:
290-
discard await race(progressFut, doneFut)
291-
except ValueError:
292-
raiseAssert "race API invariant"
293-
finally:
294-
for i in 0 ..< maxCompleted:
295-
if workers[i] == nil:
296-
maxCompleted = i
297-
if numCompleted == maxCompleted:
298-
doneFut.complete()
299-
break
300-
if not workers[i].finished:
301-
workers[i].cancelSoon()
302-
while true:
303-
try:
304-
await allFutures(workers[0 ..< maxCompleted])
305-
break
306-
except CancelledError:
307-
continue
308-
while true:
309-
try:
310-
await doneFut
311-
break
312-
except CancelledError:
313-
continue
277+
workers[i] = self.workerTask(e, key)
278+
workers[i].addCallback(handleFinishedWorker)
279+
except CancelledError as exc:
280+
raise exc
281+
except CatchableError:
282+
workers[i] = newFuture[bool]()
283+
workers[i].complete(false)
284+
285+
# Wait for any worker to report progress, or for all workers to finish
286+
waitFor progressFut
287+
288+
# cancel all workers
289+
for i in 0 ..< NUM_WORKERS:
290+
workers[i].cancelSoon()
314291

315-
if not progressFut.finished:
316-
progressFut.cancelSoon()
317-
return progressFut.completed
292+
return success
318293

319294
template query[E](
320295
self: LightClientManager, e: typedesc[E]
@@ -358,7 +333,8 @@ proc loop(self: LightClientManager) {.async: (raises: [CancelledError]).} =
358333
if not didProgress:
359334
debug "Re-attempting bootstrap download"
360335
await sleepAsync(chronos.seconds(2))
361-
continue
336+
337+
continue
362338

363339
# check and download sync committee updates
364340
if finalizedPeriod == optimisticPeriod and not self.isNextSyncCommitteeKnown():

nimbus_verified_proxy/libverifproxy/verifproxy.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ proc runContext(ctx: ptr Context) {.thread.} =
4141
trustedBlockRoot: Eth2Digest.fromHex(jsonNode["TrustedBlockRoot"].getStr()),
4242
backendUrl: parseCmdArg(Web3Url, jsonNode["backendUrl"].getStr()),
4343
frontendUrl: parseCmdArg(Web3Url, jsonNode["frontendUrl"].getStr()),
44-
lcEndpoint: jsonNode["lcEndpoint"].getStr(),
44+
lcEndpoints: jsonNode["lcEndpoints"].getStr(),
4545
logLevel: jsonNode["LogLevel"].getStr(),
4646
logStdout: StdoutLogKind.Auto,
4747
dataDirFlag: none(OutDir),

0 commit comments

Comments
 (0)