Skip to content

Commit

Permalink
0.0.7: critical fix
Browse files Browse the repository at this point in the history
  • Loading branch information
disruptek committed Jul 24, 2020
1 parent 157c184 commit f37322e
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 27 deletions.
2 changes: 1 addition & 1 deletion cps.nimble
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version = "0.0.6"
version = "0.0.7"
author = "disruptek"
description = "continuation-passing style"
license = "MIT"
Expand Down
64 changes: 39 additions & 25 deletions cps/eventqueue.nim
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import cps
import cps/semaphore

export Semaphore, `==`, `<`, hash, signal, wait, isReady, withReady
export Event

const
cpsDebug {.booldefine.} = false ## produce gratuitous output
Expand Down Expand Up @@ -57,7 +58,6 @@ const
wakeupId = Id(-1)
invalidId = Id(0)
invalidFd = Fd(-1)
bogusIds = wakeupId .. invalidId
oneMs = initDuration(milliseconds = 1)

var eq {.threadvar.}: EventQueue
Expand All @@ -72,7 +72,7 @@ proc `<`(a, b: Fd): bool {.borrow.}
proc `==`(a, b: Id): bool {.borrow.}
proc `==`(a, b: Fd): bool {.borrow.}

proc `[]=`(w: var WaitingIds; fd: int | Fd; id: Id) =
proc put(w: var WaitingIds; fd: int | Fd; id: Id) =
while fd.int > len(w):
setLen(w, len(w) * 2)
system.`[]=`(w, fd.int, id)
Expand All @@ -81,8 +81,9 @@ proc `[]=`(w: var WaitingIds; fd: int | Fd; id: Id) =
discard
else:
inc eq.waiters
assert eq.waiters > 0

proc pop(w: var WaitingIds; fd: int | Fd): Id =
proc get(w: var WaitingIds; fd: int | Fd): Id =
result = w[fd.int]
if result != wakeupId: # don't zap our wakeup id
if result != invalidId: # don't count invalid ids
Expand Down Expand Up @@ -119,7 +120,7 @@ proc init() {.inline.} =
trigger eq.wake
for ready in select(eq.selector, -1):
assert User in ready.events
eq.waiting[ready.fd] = wakeupId
eq.waiting.put(ready.fd, wakeupId)

eq.lastId = invalidId
eq.yields = initDeque[Cont]()
Expand Down Expand Up @@ -234,13 +235,12 @@ proc poll*() =
if eq.waiters > 0:
when cpsDebug:
let clock = now()
let ready = select(eq.selector, -1)

# ready holds the ready file descriptors and their events.

let ready = select(eq.selector, -1)
for event in items(ready):
# get the registration of the pending continuation
let id = eq.waiting.pop(event.fd)
let id = eq.waiting.get(event.fd)
# the id will be wakeupId if it's a wake-up event
assert id != invalidId
if id == wakeupId:
Expand All @@ -261,26 +261,27 @@ proc poll*() =
raise newException(KeyError, "missing registration " & $id)

if len(eq.yields) > 0:
# at this point, we've handled all timers and i/o so we can simply
# iterate over the yields and run them. to make sure we don't run any
# newly-added continuations in this poll, we'll process no more than
# the current number of queued continuations...

# run no more than the current number of ready continuations
for index in 1 .. len(eq.yields):
let cont = popFirst eq.yields
trampoline cont
elif eq.timer == invalidFd and len(eq) == 0:
# if there's no timer and we have no pending continuations,
stop()
else:
when cpsDebug:
echo "💈"
# else wait until the next polling interval or signal
for ready in eq.manager.select(-1):
# if we get any kind of error, all we can reasonably do is stop
if ready.errorCode.int != 0:
stop()
raiseOSError(ready.errorCode, "cps eventqueue error")

# if there are no pending continuations,
if len(eq) == 0:
# and there is no polling timer setup,
if eq.timer == invalidFd:
# then we'll stop the dispatcher now.
stop()
else:
when cpsDebug:
echo "💈"
# else wait until the next polling interval or signal
for ready in eq.manager.select(-1):
# if we get any kind of error, all we can reasonably do is stop
if ready.errorCode.int != 0:
stop()
raiseOSError(ready.errorCode, "cps eventqueue error")
break

proc run*(interval: Duration = DurationZero) =
## The dispatcher runs with a maximal polling interval; an `interval` of
Expand Down Expand Up @@ -316,7 +317,7 @@ proc cpsSleep*(interval: Duration): Cont {.cpsMagic.} =
let fd = registerTimer(eq.selector,
timeout = interval.inMilliseconds.int,
oneshot = true, data = id)
eq.waiting[fd] = id
eq.waiting.put(fd, id)
when cpsDebug:
echo "⏰timer ", fd.Fd

Expand Down Expand Up @@ -393,3 +394,16 @@ proc cpsSpawn*(c: Cont) =
## procedure.
wakeAfter:
addLast(eq.yields, c)

proc cpsIo*(file: int | SocketHandle; events: set[Event]): Cont {.cpsMagic.} =
## Continue upon any of `events` on the given file-descriptor or
## SocketHandle.
if len(events) == 0:
raise newException(ValueError, "no events supplied")
else:
wakeAfter:
let id = eq.add(c)
let fd = registerHandle(eq.selector, file, events = events, data = id)
eq.waiting.put(fd, id)
when cpsDebug:
echo "📂file ", fd.Fd
2 changes: 1 addition & 1 deletion tests/tyield.nim.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
--threads:on
--define:threadsafe
--gc:arc
--define:cpsDebug
#--define:cpsDebug

0 comments on commit f37322e

Please sign in to comment.