Skip to content

Commit

Permalink
Enable GC Safe mode (#9)
Browse files Browse the repository at this point in the history
* Start working on making it gcsafe

* Remove generic restriction from most places

Issue was in `start` that since I had already restricted `T` to the two kinds of handlers (To make multisync work) it didn't allow the gcsafety to pass through

Might need to make my own multisync

* Remove generic restriction for `start`

By not restricting it we can make it extend to gcsafe procs also

* Add generic restriction back where it makes sense

* Write a `multisync` proc that applies generic restrictions instead of setting exact parameters

* Fix the custom comparator not being used

Needed to export the proc (maybe this is a good use of typebound ops?)

* `multisync` working with gcsafe

* Fix the default parameter on stable

Bug is fixed on devel so at some point we can remove the strange code and just bump the min version

* Move the logic for running the next task into a different function
  • Loading branch information
ire4ever1190 authored Feb 1, 2025
1 parent 81ef391 commit dd152ce
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 50 deletions.
153 changes: 103 additions & 50 deletions src/taskman.nim
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ type
##
## .. Note::When using `--threads:on` the proc must be gcsafe

TaskHandler* = proc ()
TaskHandler* = proc (): void
## Proc that runs in a normal scheduler
##
## .. Note::When using `--threads:on` the proc must be gcsafe
Expand Down Expand Up @@ -203,20 +203,28 @@ proc wakeUp[T](tasks: SchedulerBase[T]) =
tasks.timer.fut.complete()
# Disarm the timer
tasks.timer.fut = nil
else:
discard

func `<`(a, b: TaskBase[HandlerTypes]): bool {.inline.} = a.startTime < b.startTime
func `==`(a, b: TaskBase[HandlerTypes]): bool {.inline.} = a.handler == b.handler
func `<`*[T](a, b: TaskBase[T]): bool {.inline.} =
## Checks if one task is scheduled before another
a.startTime < b.startTime

func `==`*[T](a, b: TaskBase[T]): bool {.inline.} =
## Two handlers are considered equal if they have the same handler
a.handler == b.handler

proc defaultErrorHandler[T: HandlerTypes](tasks: SchedulerBase[T], task: TaskBase[T], exception: ref Exception) =
## Default error handler, just raises the error further up the stack
raise exception

proc newSchedulerBase[T: HandlerTypes](errorHandler: ErrorHandler[T]): SchedulerBase[T] =
proc newSchedulerBase*[T: HandlerTypes](errorHandler: ErrorHandler[T] = nil): SchedulerBase[T] =
## Creates a new scheduler that calls procs of `T`. Only use this if you want to add extra restriction to the proc type (exceptions, gcsafe, etc).
##
## * **errorHandler**: Proc that is called when exceptions are raised in tasks. By default it just reraises exceptions
SchedulerBase[T](
tasks: initHeapQueue[TaskBase[T]](),
errorHandler: errorHandler
# Generic bug that is fixed on devel.
# T isn't available in default params so we can't instantiate the default handler.
errorHandler: if errorHandler != nil: errorHandler else: defaultErrorHandler[T]
)

proc newScheduler*(errorHandler: ErrorHandler[TaskHandler] = defaultErrorHandler[TaskHandler]): Scheduler =
Expand Down Expand Up @@ -254,7 +262,7 @@ proc newTask*[T: HandlerTypes](interval: TimeInterval, handler: T, name = defaul
name: name
)

proc newTask*[T: HandlerTypes](time: DateTime, handler: T, name = defaultTaskName): TaskBase[T] =
proc newTask*[T](time: DateTime, handler: T, name = defaultTaskName): TaskBase[T] =
## Creates a new task which can be added to a scheduler.
## This task will only run once (will run at **time**)
TaskBase[T](
Expand All @@ -264,7 +272,7 @@ proc newTask*[T: HandlerTypes](time: DateTime, handler: T, name = defaultTaskNam
name: name
)

proc newTask*[T: HandlerTypes](cron: Cron, handler: T, name = defaultTaskName): TaskBase[T] =
proc newTask*[T](cron: Cron, handler: T, name = defaultTaskName): TaskBase[T] =
## Creates a new task which can be added to a scheduler.
TaskBase[T](
kind: Cron,
Expand All @@ -274,12 +282,12 @@ proc newTask*[T: HandlerTypes](cron: Cron, handler: T, name = defaultTaskName):
name: name
)

proc add*[T: HandlerTypes](scheduler: SchedulerBase[T], task: TaskBase[T]) {.inline.} =
proc add*[T](scheduler: SchedulerBase[T], task: TaskBase[T]) {.inline.} =
## Adds a task to the scheduler.
scheduler.tasks.push task
scheduler.wakeUp()

proc every*[T: HandlerTypes](scheduler: SchedulerBase[T]; interval: TimeInterval, handler: T, name = defaultTaskName) =
proc every*[T](scheduler: SchedulerBase[T]; interval: TimeInterval, handler: T, name = defaultTaskName) =
## Runs a task every time the interval occurs.
runnableExamples:
let tasks = newAsyncScheduler()
Expand All @@ -293,7 +301,7 @@ proc every*[T: HandlerTypes](scheduler: SchedulerBase[T]; interval: TimeInterval
scheduler &= newTask(interval, handler, name)


proc every*[T: HandlerTypes](scheduler: SchedulerBase[T], cron: Cron, handler: T, name = defaultTaskName) =
proc every*[T](scheduler: SchedulerBase[T], cron: Cron, handler: T, name = defaultTaskName) =
## Runs a task every time a cron timer is valid
runnableExamples:
let tasks = newAsyncScheduler()
Expand All @@ -303,11 +311,11 @@ proc every*[T: HandlerTypes](scheduler: SchedulerBase[T], cron: Cron, handler: T
#==#
scheduler &= newTask(cron, handler, name)

proc every*[T: HandlerTypes](scheduler: SchedulerBase[T], often: TimeInterval | Cron, name: string, handler: T) =
proc every*[T](scheduler: SchedulerBase[T], often: TimeInterval | Cron, name: string, handler: T) =
## Sugar that allows you to have name and lambda
scheduler.every(often, handler, name)

proc at*[T: HandlerTypes](scheduler: SchedulerBase[T], time: DateTime, handler: T, name = defaultTaskName) =
proc at*[T](scheduler: SchedulerBase[T], time: DateTime, handler: T, name = defaultTaskName) =
## Runs a task at a certain date/time (only runs once).
runnableExamples:
let tasks = newAsyncScheduler()
Expand All @@ -317,11 +325,11 @@ proc at*[T: HandlerTypes](scheduler: SchedulerBase[T], time: DateTime, handler:
#==#
scheduler &= newTask(time, handler, name)

proc at*[T: HandlerTypes](scheduler: SchedulerBase[T], interval: TimeInterval, name: string, handler: T) =
proc at*[T](scheduler: SchedulerBase[T], interval: TimeInterval, name: string, handler: T) =
## Sugar that allows you to have name and lambda
scheduler.at(interval, handler, name)

proc wait*[T: HandlerTypes](scheduler: SchedulerBase[T], interval: TimeInterval, handler: T, name = defaultTaskName) =
proc wait*[T](scheduler: SchedulerBase[T], interval: TimeInterval, handler: T, name = defaultTaskName) =
## Waits `interval` amount of time and then runs task (only runs once).
runnableExamples "--threads:off":
import std/httpclient
Expand All @@ -333,15 +341,15 @@ proc wait*[T: HandlerTypes](scheduler: SchedulerBase[T], interval: TimeInterval,
#==#
scheduler.at(now() + interval, handler, name)

proc wait*[T: HandlerTypes](scheduler: SchedulerBase[T], interval: TimeInterval, name: string, handler: T) =
proc wait*[T](scheduler: SchedulerBase[T], interval: TimeInterval, name: string, handler: T) =
## Sugar that allows you to have name and lambda
scheduler.wait(interval, handler, name)

proc milliSecondsLeft(task: TaskBase[HandlerTypes]): int =
proc milliSecondsLeft(task: TaskBase): int =
## Returns time different in milliseconds between now and the tasks start time
result = int((task.startTime - getTime()).inMilliseconds)

proc del*[T: HandlerTypes](scheduler: SchedulerBase[T], task: TaskBase[T]) =
proc del*[T](scheduler: SchedulerBase[T], task: TaskBase[T]) =
## Removes a task from the scheduler
runnableExamples:
import std/sugar
Expand All @@ -359,7 +367,7 @@ proc del*[T: HandlerTypes](scheduler: SchedulerBase[T], task: TaskBase[T]) =
let index = scheduler.tasks.find task
scheduler.tasks.del index

proc del*(scheduler: SchedulerBase[HandlerTypes], task: string) =
proc del*(scheduler: SchedulerBase, task: string) =
## Removes a task from the scheduler (via its name).
## If there are multiple tasks with the same name then the first task is deleted
runnableExamples:
Expand Down Expand Up @@ -400,6 +408,7 @@ proc next*(task: TaskBase): Time {.raises: [TooFarAheadCron].} =
of Cron:
result = now().next(task.cronFormat).toTime()

# TODO: Remove, this is a terrible helper
template onlyRun*(times: int) =
## Make task only run a certain number of times
runnableExamples:
Expand All @@ -414,55 +423,99 @@ template onlyRun*(times: int) =
else:
inc timesRan

proc start*(scheduler: AsyncScheduler | Scheduler, periodicCheck = 0) {.multisync.} =
## Starts running the tasks.
## Call with `asyncCheck` to make it run in the background
##
## * **periodicCheck**: This prevents the scheduler from fulling stopping and specifies how many milliseconds to poll for new tasks. Also use this with non async scheduler to allow you to add new tasks in that might be shorter than current running one
const isAsync = scheduler is AsyncScheduler
#[
Differencs are
- How it sleeps
- how it calls the handler
]#

macro multiGenericSync(arg: untyped, syncRestriction: untyped, asyncRestriction: untyped, prc: untyped) =
## Like `multisync` from the stdlib except this operates on generic restrictions instead of concrete parameters.
## Excepts the proc to already have a generic argument
# Find the generic parameter
let genericParamIdx = block:
var res = -1
for i, param in prc[2]:
if param[0].eqIdent(arg):
res = i
if res == -1:
"Couldn't find the generic parameter".error(arg)
res

let
syncProc = prc.copy()
asyncProc = prc.copy()

# Apply restriction, and add in an `await` template that just ignores await
syncProc[2][genericParamIdx][1] = syncRestriction
let awaitTempl = quote do:
template await(value: typed): untyped =
value
syncProc.body = newStmtList(awaitTempl, syncProc.body)

# Async version must have future type attached
asyncProc.params[0] = nnkBracketExpr.newTree(ident"Future", asyncProc.params[0] or ident"void")

# And async pragma
asyncProc.addPragma(ident"async")

# And generic restruction
asyncProc[2][genericParamIdx][1] = asyncRestriction

result = newStmtList(syncProc, asyncProc)

proc runNextTask[T](scheduler: SchedulerBase[T]) {.multiGenericSync(T, TaskHandler, AsyncTaskHandler).} =
## Runs the next scheduled task. If no task is scheduled to run then it does nothing
if scheduler.len == 0: return

var currTask = scheduler.tasks.pop()
if getTime() < currTask.startTime:
# Add task back so it can be waited on again
scheduler &= currTask
return

if currTask.kind != OneShot:
# Schedule task again
currTask.startTime = currTask.next()
scheduler &= currTask

try:
await currTask.handler()
except RemoveTaskException:
scheduler.del currTask
except CatchableError as e:
e.msg = "Error with task '" & currTask.name & "': " & e.msg
scheduler.errorHandler(scheduler, currTask, e)

proc start*[T](scheduler: SchedulerBase[T], periodicCheck = 0) {.multiGenericSync(T, TaskHandler, AsyncTaskHandler).} =
## Starts running tasks.
## * **periodicCheck**: This prevents the scheduler from fulling stopping and specifies how many milliseconds to poll for new tasks. Also needed if tasks are added midflight
scheduler.running = true
while scheduler.len > 0 or periodicCheck > 0:
if scheduler.len == 0:
when isAsync:
when T is AsyncTaskHandler:
await sleepAsync(periodicCheck)
else:
sleep periodicCheck
continue # Run loop again to check if stuff has been added while sleeping

let sleepTime = scheduler.tasks[0].milliSecondsLeft
when isAsync:
when T is AsyncTaskHandler:
# This is more or less copy and pasted from async dispatch.
# But we make a few modifications so that we can effectively cancel the sleeping.
# This enables us to force the scheduler to check for new tasks when they get added (Instead of needing to poll with periodicCheck)
var retFuture = newFuture[void]("start")
let
retFuture = newFuture[void]("start")
p = getGlobalDispatcher()
scheduler.timer = (getMonoTime() + initDuration(milliseconds = sleepTime), retFuture)
p.timers.push(scheduler.timer)
await retFuture
else:
# We can't do any fancy sleep cancelling so we need to do this
if periodicCheck == 0:
sleep sleepTime
else:
sleep periodicCheck
if scheduler.len > 0:
var currTask = scheduler.tasks.pop()
if getTime() >= currTask.startTime:
if currTask.kind != OneShot:
# Schedule task again
currTask.startTime = currTask.next()
scheduler &= currTask
try:
await currTask.handler()
except RemoveTaskException:
scheduler.del currTask
except CatchableError as e:
e.msg = "Error with task '" & currTask.name & "': " & e.msg
scheduler.errorHandler(scheduler, currTask, e)
else:
# Add task back so it can be waited on again
scheduler &= currTask
sleep if periodicCheck == 0: sleepTime
else: periodicCheck

await scheduler.runNextTask()

scheduler.running = false

Expand Down
12 changes: 12 additions & 0 deletions tests/testTaskman.nim
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,18 @@ test "Can use a closure":
tasks.every(1.seconds) do () {.async.}:
echo x

test "Can work gcsafe":
proc mustBeSafe() {.gcsafe.} =
let tasks = newSchedulerBase[proc () {.gcsafe.}]()
# let tasks = newScheduler()

tasks.every(1.seconds) do () {.gcsafe.}:
discard
if false:
tasks.start()

mustBeSafe()

when defined(testCron):
# Since it takes minimum 1 for a cron task to run we will put it behind a flag
suite "Cron":
Expand Down

0 comments on commit dd152ce

Please sign in to comment.