Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixes #23212; Asyncdispatch leaks under --mm:arc #24556

Merged
merged 8 commits into from
Dec 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 54 additions & 43 deletions lib/pure/asyncmacro.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,36 +22,39 @@ proc newCallWithLineInfo(fromNode: NimNode; theProc: NimNode, args: varargs[NimN
result = newCall(theProc, args)
result.copyLineInfo(fromNode)

template createCb(retFutureSym, iteratorNameSym,
strName, identName, futureVarCompletions: untyped) =
type
ClosureIt[T] = iterator(f: Future[T]): owned(FutureBase)

template createCb(futTyp, strName, identName, futureVarCompletions: untyped) =
bind finished
var nameIterVar = iteratorNameSym
proc identName {.closure, stackTrace: off.} =
{.push stackTrace: off.}
proc identName(fut: Future[futTyp], it: ClosureIt[futTyp]) {.effectsOf: it.} =
try:
if not nameIterVar.finished:
var next = nameIterVar()
if not it.finished:
var next = it(fut)
# Continue while the yielded future is already finished.
while (not next.isNil) and next.finished:
next = nameIterVar()
if nameIterVar.finished:
next = it(fut)
if it.finished:
break

if next == nil:
if not retFutureSym.finished:
if not fut.finished:
let msg = "Async procedure ($1) yielded `nil`, are you await'ing a `nil` Future?"
raise newException(AssertionDefect, msg % strName)
else:
{.gcsafe.}:
next.addCallback cast[proc() {.closure, gcsafe.}](identName)
next.addCallback(cast[proc() {.closure, gcsafe.}](proc =
identName(fut, it)))
except:
futureVarCompletions
if retFutureSym.finished:
if fut.finished:
# Take a look at tasyncexceptions for the bug which this fixes.
# That test explains it better than I can here.
raise
else:
retFutureSym.fail(getCurrentException())
identName()
fut.fail(getCurrentException())
{.pop.}

proc createFutureVarCompletions(futureVarIdents: seq[NimNode], fromNode: NimNode): NimNode =
result = newNimNode(nnkStmtList, fromNode)
Expand All @@ -68,7 +71,7 @@ proc createFutureVarCompletions(futureVarIdents: seq[NimNode], fromNode: NimNode
)
)

proc processBody(ctx: Context; node, needsCompletionSym, retFutureSym: NimNode, futureVarIdents: seq[NimNode]): NimNode =
proc processBody(ctx: Context; node, needsCompletionSym, retFutParamSym: NimNode, futureVarIdents: seq[NimNode]): NimNode =
result = node
case node.kind
of nnkReturnStmt:
Expand All @@ -80,14 +83,14 @@ proc processBody(ctx: Context; node, needsCompletionSym, retFutureSym: NimNode,
ctx.hasRet = true
if node[0].kind == nnkEmpty:
if ctx.inTry == 0:
result.add newCallWithLineInfo(node, newIdentNode("complete"), retFutureSym, newIdentNode("result"))
result.add newCallWithLineInfo(node, newIdentNode("complete"), retFutParamSym, newIdentNode("result"))
else:
result.add newAssignment(needsCompletionSym, newLit(true))
else:
let x = processBody(ctx, node[0], needsCompletionSym, retFutureSym, futureVarIdents)
let x = processBody(ctx, node[0], needsCompletionSym, retFutParamSym, futureVarIdents)
if x.kind == nnkYieldStmt: result.add x
elif ctx.inTry == 0:
result.add newCallWithLineInfo(node, newIdentNode("complete"), retFutureSym, x)
result.add newCallWithLineInfo(node, newIdentNode("complete"), retFutParamSym, x)
else:
result.add newAssignment(newIdentNode("result"), x)
result.add newAssignment(needsCompletionSym, newLit(true))
Expand All @@ -100,18 +103,18 @@ proc processBody(ctx: Context; node, needsCompletionSym, retFutureSym: NimNode,
of nnkTryStmt:
if result[^1].kind == nnkFinally:
inc ctx.inTry
result[0] = processBody(ctx, result[0], needsCompletionSym, retFutureSym, futureVarIdents)
result[0] = processBody(ctx, result[0], needsCompletionSym, retFutParamSym, futureVarIdents)
dec ctx.inTry
for i in 1 ..< result.len:
result[i] = processBody(ctx, result[i], needsCompletionSym, retFutureSym, futureVarIdents)
result[i] = processBody(ctx, result[i], needsCompletionSym, retFutParamSym, futureVarIdents)
if ctx.inTry == 0 and ctx.hasRet:
let finallyNode = copyNimNode(result[^1])
let stmtNode = newNimNode(nnkStmtList)
for child in result[^1]:
stmtNode.add child
stmtNode.add newIfStmt(
( needsCompletionSym,
newCallWithLineInfo(node, newIdentNode("complete"), retFutureSym,
newCallWithLineInfo(node, newIdentNode("complete"), retFutParamSym,
newIdentNode("result")
)
)
Expand All @@ -120,10 +123,10 @@ proc processBody(ctx: Context; node, needsCompletionSym, retFutureSym: NimNode,
result[^1] = finallyNode
else:
for i in 0 ..< result.len:
result[i] = processBody(ctx, result[i], needsCompletionSym, retFutureSym, futureVarIdents)
result[i] = processBody(ctx, result[i], needsCompletionSym, retFutParamSym, futureVarIdents)
else:
for i in 0 ..< result.len:
result[i] = processBody(ctx, result[i], needsCompletionSym, retFutureSym, futureVarIdents)
result[i] = processBody(ctx, result[i], needsCompletionSym, retFutParamSym, futureVarIdents)

# echo result.repr

Expand Down Expand Up @@ -229,29 +232,21 @@ proc asyncSingleProc(prc: NimNode): NimNode =
# transformation even more complex.
let body2 = extractDocCommentsAndRunnables(prc.body)

# -> var retFuture = newFuture[T]()
var retFutureSym = genSym(nskVar, "retFuture")
var subRetType =
if returnType.kind == nnkEmpty: newIdentNode("void")
else: baseType
outerProcBody.add(
newVarStmt(retFutureSym,
newCall(
newNimNode(nnkBracketExpr, prc.body).add(
newIdentNode("newFuture"),
subRetType),
newLit(prcName)))) # Get type from return type of this proc

# -> iterator nameIter(): FutureBase {.closure.} =
let retFutParamSym = genSym(nskParam, "retFutParamSym")

# -> iterator nameIter(retFutParam: Future[T]): FutureBase {.closure.} =
# -> {.push warning[resultshadowed]: off.}
# -> var result: T
# -> {.pop.}
# -> <proc_body>
# -> complete(retFuture, result)
# -> complete(retFutParam, result)
var iteratorNameSym = genSym(nskIterator, $prcName & " (Async)")
var needsCompletionSym = genSym(nskVar, "needsCompletion")
var ctx = Context()
var procBody = processBody(ctx, prc.body, needsCompletionSym, retFutureSym, futureVarIdents)
var procBody = processBody(ctx, prc.body, needsCompletionSym, retFutParamSym, futureVarIdents)
# don't do anything with forward bodies (empty)
if procBody.kind != nnkEmpty:
# fix #13899, defer should not escape its original scope
Expand All @@ -275,9 +270,11 @@ proc asyncSingleProc(prc: NimNode): NimNode =

var `needsCompletionSym` = false
procBody.add quote do:
complete(`retFutureSym`, `resultIdent`)
complete(`retFutParamSym`, `resultIdent`)

var closureIterator = newProc(iteratorNameSym, [quote do: owned(FutureBase)],
var retFutureTyp = newNimNode(nnkBracketExpr, prc).add(newIdentNode("Future")).add(subRetType)
var retFutureParam = newNimNode(nnkIdentDefs, prc).add(retFutParamSym).add(retFutureTyp).add(newEmptyNode())
var closureIterator = newProc(iteratorNameSym, [quote do: owned(FutureBase), retFutureParam],
procBody, nnkIteratorDef)
closureIterator.pragma = newNimNode(nnkPragma, lineInfoFrom = prc.body)
closureIterator.addPragma(newIdentNode("closure"))
Expand All @@ -287,17 +284,31 @@ proc asyncSingleProc(prc: NimNode): NimNode =
closureIterator.addPragma(newIdentNode("gcsafe"))
outerProcBody.add(closureIterator)

# -> createCb(retFuture)
# -> createCb()
# NOTE: The NimAsyncContinueSuffix is checked for in asyncfutures.nim to produce
# friendlier stack traces:
var cbName = genSym(nskProc, prcName & NimAsyncContinueSuffix)
var procCb = getAst createCb(retFutureSym, iteratorNameSym,
newStrLitNode(prcName),
cbName,
createFutureVarCompletions(futureVarIdents, nil)
)
var procCb = getAst createCb(
subRetType,
newStrLitNode(prcName),
cbName,
createFutureVarCompletions(futureVarIdents, nil)
)
outerProcBody.add procCb

# -> var retFuture = newFuture[T]()
let retFutureSym = genSym(nskVar, "retFuture")
outerProcBody.add(
newVarStmt(retFutureSym,
newCall(
newNimNode(nnkBracketExpr, prc.body).add(
newIdentNode("newFuture"),
subRetType),
newLit(prcName)))) # Get type from return type of this proc

# -> cb(retFuture, nameIter)
outerProcBody.add newCall(cbName, retFutureSym, iteratorNameSym)

# -> return retFuture
outerProcBody.add newNimNode(nnkReturnStmt, prc.body[^1]).add(retFutureSym)

Expand Down
2 changes: 1 addition & 1 deletion tests/arc/tasyncleak.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
discard """
outputsub: "(allocCount: 4050, deallocCount: 4048)"
outputsub: "(allocCount: 4011, deallocCount: 4009)"
cmd: "nim c --gc:orc -d:nimAllocStats $file"
"""

Expand Down
28 changes: 28 additions & 0 deletions tests/async/t23212.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
discard """
valgrind: true
cmd: '''nim c --mm:arc -d:nimAllocStats -d:useMalloc $file'''
output: '''1000'''
"""

import std/asyncdispatch

var count: int

proc stuff() {.async.} =
#echo count, 1
await sleepAsync(1)
#echo count, 2
count.inc

for _ in 0..<1000:
asyncCheck stuff()

while hasPendingOperations(): poll()

echo count

setGlobalDispatcher(nil)

import std/importutils
privateAccess(AllocStats)
doAssert getAllocStats().allocCount - getAllocStats().deallocCount < 10