From fe69f3293623936d3d1d8d6a91440178e390b731 Mon Sep 17 00:00:00 2001 From: flywind Date: Fri, 12 Mar 2021 20:33:47 +0800 Subject: [PATCH] New channels implementation for ORC (#17305) * Update lib/std/channels.nim * Rename tchannel_pthread.nim to tchannels_pthread.nim * Rename tchannel_simple.nim to tchannels_simple.nim Co-authored-by: Mamy Ratsimbazafy --- changelog.md | 4 +- lib/std/channels.nim | 510 +++++++++++++++++++++++++++++ tests/stdlib/tchannels_pthread.nim | 321 ++++++++++++++++++ tests/stdlib/tchannels_simple.nim | 68 ++++ 4 files changed, 902 insertions(+), 1 deletion(-) create mode 100644 lib/std/channels.nim create mode 100644 tests/stdlib/tchannels_pthread.nim create mode 100644 tests/stdlib/tchannels_simple.nim diff --git a/changelog.md b/changelog.md index 5c2756ea610a6..8e354134a09ea 100644 --- a/changelog.md +++ b/changelog.md @@ -223,6 +223,9 @@ - Added `jscore.debugger` to [call any available debugging functionality, such as breakpoints.](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/debugger). + +- Added `std/channels`. + - Added `htmlgen.portal` for [making "SPA style" pages using HTML only](https://web.dev/hands-on-portals). - Added `ZZZ` and `ZZZZ` patterns to `times.nim` `DateTime` parsing, to match time @@ -230,7 +233,6 @@ - ## Language changes - `nimscript` now handles `except Exception as e`. diff --git a/lib/std/channels.nim b/lib/std/channels.nim new file mode 100644 index 0000000000000..a212af0d33aed --- /dev/null +++ b/lib/std/channels.nim @@ -0,0 +1,510 @@ +# +# +# The Nim Compiler +# (c) Copyright 2021 Andreas Prell, Mamy André-Ratsimbazafy & Nim Contributors +# +# See the file "copying.txt", included in this +# distribution, for details about the copyright. +# + + +# Based on https://github.com/mratsim/weave/blob/5696d94e6358711e840f8c0b7c684fcc5cbd4472/unused/channels/channels_legacy.nim +# Those are translations of @aprell (Andreas Prell) original channels from C to Nim +# (https://github.com/aprell/tasking-2.0/blob/master/src/channel_shm/channel.c) +# And in turn they are an implementation of Michael & Scott lock-based queues +# (note the paper has 2 channels: lock-free and lock-based) with additional caching: +# Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms +# Maged M. Michael, Michael L. Scott, 1996 +# https://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf + +## This module only works with `--gc:arc` or `--gc:orc`. +## +## .. warning:: This module is experimental and its interface may change. +## +## The following is a simple example of two different ways to use channels: +## blocking and non-blocking. +## + +runnableExamples("--threads:on --gc:orc"): + import std/os + + # In this example a channel is declared at module scope. + # Channels are generic, and they include support for passing objects between + # threads. + # Note that isolated data passed through channels is moved around. + var chan = newChannel[string]() + + # This proc will be run in another thread using the threads module. + proc firstWorker() = + chan.send("Hello World!") + + # This is another proc to run in a background thread. This proc takes a while + # to send the message since it sleeps for 2 seconds (or 2000 milliseconds). + proc secondWorker() = + sleep(2000) + chan.send("Another message") + + # Launch the worker. + var worker1: Thread[void] + createThread(worker1, firstWorker) + + # Block until the message arrives, then print it out. + var dest = "" + chan.recv(dest) + assert dest == "Hello World!" + + # Wait for the thread to exit before moving on to the next example. + worker1.joinThread() + + # Launch the other worker. + var worker2: Thread[void] + createThread(worker2, secondWorker) + # This time, use a non-blocking approach with tryRecv. + # Since the main thread is not blocked, it could be used to perform other + # useful work while it waits for data to arrive on the channel. + var messages: seq[string] + while true: + var msg = "" + if chan.tryRecv(msg): + messages.add msg # "Another message" + break + + messages.add "Pretend I'm doing useful work..." + # For this example, sleep in order not to flood stdout with the above + # message. + sleep(400) + + # Wait for the second thread to exit before cleaning up the channel. + worker2.joinThread() + + # Clean up the channel. + assert chan.close() + + assert messages[^1] == "Another message" + assert messages.len >= 2 + + +when not defined(gcArc) and not defined(gcOrc) and not defined(nimdoc): + {.error: "This channel implementation requires --gc:arc or --gc:orc".} + +import std/[locks, atomics, isolation] +import system/ansi_c + +# Channel (Shared memory channels) +# ---------------------------------------------------------------------------------- + +const + cacheLineSize {.intdefine.} = 64 # TODO: some Samsung phone have 128 cache-line + nimChannelCacheSize* {.intdefine.} = 100 + +type + ChannelRaw = ptr ChannelObj + ChannelObj = object + headLock, tailLock: Lock + notFullCond, notEmptyCond: Cond + closed: Atomic[bool] + size: int + itemsize: int # up to itemsize bytes can be exchanged over this channel + head {.align: cacheLineSize.} : int # Items are taken from head and new items are inserted at tail + tail: int + buffer: ptr UncheckedArray[byte] + atomicCounter: Atomic[int] + + ChannelCache = ptr ChannelCacheObj + ChannelCacheObj = object + next: ChannelCache + chanSize: int + chanN: int + numCached: int + cache: array[nimChannelCacheSize, ChannelRaw] + +# ---------------------------------------------------------------------------------- + +proc numItems(chan: ChannelRaw): int {.inline.} = + result = chan.tail - chan.head + if result < 0: + inc(result, 2 * chan.size) + + assert result <= chan.size + +template isFull(chan: ChannelRaw): bool = + abs(chan.tail - chan.head) == chan.size + +template isEmpty(chan: ChannelRaw): bool = + chan.head == chan.tail + +# Unbuffered / synchronous channels +# ---------------------------------------------------------------------------------- + +template numItemsUnbuf(chan: ChannelRaw): int = + chan.head + +template isFullUnbuf(chan: ChannelRaw): bool = + chan.head == 1 + +template isEmptyUnbuf(chan: ChannelRaw): bool = + chan.head == 0 + +# ChannelRaw kinds +# ---------------------------------------------------------------------------------- + +func isUnbuffered(chan: ChannelRaw): bool = + chan.size - 1 == 0 + +# ChannelRaw status and properties +# ---------------------------------------------------------------------------------- + +proc isClosed(chan: ChannelRaw): bool {.inline.} = load(chan.closed, moRelaxed) + +proc peek(chan: ChannelRaw): int {.inline.} = + (if chan.isUnbuffered: numItemsUnbuf(chan) else: numItems(chan)) + +# Per-thread channel cache +# ---------------------------------------------------------------------------------- + +var channelCache {.threadvar.}: ChannelCache +var channelCacheLen {.threadvar.}: int + +proc allocChannelCache(size, n: int): bool = + ## Allocate a free list for storing channels of a given type + var p = channelCache + + # Avoid multiple free lists for the exact same type of channel + while not p.isNil: + if size == p.chanSize and n == p.chanN: + return false + p = p.next + + p = cast[ptr ChannelCacheObj](c_malloc(csize_t sizeof(ChannelCacheObj))) + if p.isNil: + raise newException(OutOfMemDefect, "Could not allocate memory") + + p.chanSize = size + p.chanN = n + p.numCached = 0 + + p.next = channelCache + channelCache = p + inc channelCacheLen + result = true + +proc freeChannelCache*() = + ## Frees the entire channel cache, including all channels + var p = channelCache + var q: ChannelCache + + while not p.isNil: + q = p.next + for i in 0 ..< p.numCached: + let chan = p.cache[i] + if not chan.buffer.isNil: + c_free(chan.buffer) + deinitLock(chan.headLock) + deinitLock(chan.tailLock) + deinitCond(chan.notFullCond) + deinitCond(chan.notEmptyCond) + c_free(chan) + c_free(p) + dec channelCacheLen + p = q + + assert(channelCacheLen == 0) + channelCache = nil + +# Channels memory ops +# ---------------------------------------------------------------------------------- + +proc allocChannel(size, n: int): ChannelRaw = + when nimChannelCacheSize > 0: + var p = channelCache + + while not p.isNil: + if size == p.chanSize and n == p.chanN: + # Check if free list contains channel + if p.numCached > 0: + dec p.numCached + result = p.cache[p.numCached] + assert(result.isEmpty) + return + else: + # All the other lists in cache won't match + break + p = p.next + + result = cast[ChannelRaw](c_malloc(csize_t sizeof(ChannelObj))) + if result.isNil: + raise newException(OutOfMemDefect, "Could not allocate memory") + + # To buffer n items, we allocate for n + result.buffer = cast[ptr UncheckedArray[byte]](c_malloc(csize_t n*size)) + if result.buffer.isNil: + raise newException(OutOfMemDefect, "Could not allocate memory") + + initLock(result.headLock) + initLock(result.tailLock) + initCond(result.notFullCond) + initCond(result.notEmptyCond) + + result.closed.store(false, moRelaxed) # We don't need atomic here, how to? + result.size = n + result.itemsize = size + result.head = 0 + result.tail = 0 + result.atomicCounter.store(0, moRelaxed) + + when nimChannelCacheSize > 0: + # Allocate a cache as well if one of the proper size doesn't exist + discard allocChannelCache(size, n) + +proc freeChannel(chan: ChannelRaw) = + if chan.isNil: + return + + when nimChannelCacheSize > 0: + var p = channelCache + while not p.isNil: + if chan.itemsize == p.chanSize and + chan.size == p.chanN: + if p.numCached < nimChannelCacheSize: + # If space left in cache, cache it + p.cache[p.numCached] = chan + inc p.numCached + return + else: + # All the other lists in cache won't match + break + p = p.next + + if not chan.buffer.isNil: + c_free(chan.buffer) + + deinitLock(chan.headLock) + deinitLock(chan.tailLock) + deinitCond(chan.notFullCond) + deinitCond(chan.notEmptyCond) + + c_free(chan) + +# MPMC Channels (Multi-Producer Multi-Consumer) +# ---------------------------------------------------------------------------------- + +proc sendUnbufferedMpmc(chan: ChannelRaw, data: sink pointer, size: int, nonBlocking: bool): bool = + if nonBlocking and chan.isFullUnbuf: + return false + + acquire(chan.headLock) + + if nonBlocking and chan.isFullUnbuf: + # Another thread was faster + release(chan.headLock) + return false + + while chan.isFullUnbuf: + wait(chan.notFullcond, chan.headLock) + + assert chan.isEmptyUnbuf + assert size <= chan.itemsize + copyMem(chan.buffer, data, size) + + chan.head = 1 + + release(chan.headLock) + signal(chan.notEmptyCond) + result = true + +proc sendMpmc(chan: ChannelRaw, data: sink pointer, size: int, nonBlocking: bool): bool = + assert not chan.isNil + assert not data.isNil + + if isUnbuffered(chan): + return sendUnbufferedMpmc(chan, data, size, nonBlocking) + + if nonBlocking and chan.isFull: + return false + + acquire(chan.tailLock) + + if nonBlocking and chan.isFull: + # Another thread was faster + release(chan.tailLock) + return false + + while chan.isFull: + wait(chan.notFullcond, chan.tailLock) + + assert not chan.isFull + assert size <= chan.itemsize + + let writeIdx = if chan.tail < chan.size: chan.tail + else: chan.tail - chan.size + + copyMem(chan.buffer[writeIdx * chan.itemsize].addr, data, size) + + inc chan.tail + if chan.tail == 2 * chan.size: + chan.tail = 0 + + release(chan.tailLock) + signal(chan.notEmptyCond) + result = true + +proc recvUnbufferedMpmc(chan: ChannelRaw, data: pointer, size: int, nonBlocking: bool): bool = + if nonBlocking and chan.isEmptyUnbuf: + return false + + acquire(chan.headLock) + + if nonBlocking and chan.isEmptyUnbuf: + # Another thread was faster + release(chan.headLock) + return false + + while chan.isEmptyUnbuf: + wait(chan.notEmptyCond, chan.headLock) + + assert chan.isFullUnbuf + assert size <= chan.itemsize + + copyMem(data, chan.buffer, size) + + chan.head = 0 + + release(chan.headLock) + signal(chan.notFullCond) + result = true + +proc recvMpmc(chan: ChannelRaw, data: pointer, size: int, nonBlocking: bool): bool = + assert not chan.isNil + assert not data.isNil + + if isUnbuffered(chan): + return recvUnbufferedMpmc(chan, data, size, nonBlocking) + + if nonBlocking and chan.isEmpty: + return false + + acquire(chan.headLock) + + if nonBlocking and chan.isEmpty: + # Another thread took the last data + release(chan.headLock) + return false + + while chan.isEmpty: + wait(chan.notEmptyCond, chan.headLock) + + assert not chan.isEmpty + assert size <= chan.itemsize + + let readIdx = if chan.head < chan.size: chan.head + else: chan.head - chan.size + + copyMem(data, chan.buffer[readIdx * chan.itemsize].addr, size) + + inc chan.head + if chan.head == 2 * chan.size: + chan.head = 0 + + release(chan.headLock) + signal(chan.notFullCond) + result = true + +proc channelCloseMpmc(chan: ChannelRaw): bool = + # Unsynchronized + + if chan.isClosed: + # ChannelRaw already closed + return false + + store(chan.closed, true, moRelaxed) + result = true + +proc channelOpenMpmc(chan: ChannelRaw): bool = + # Unsynchronized + + if not chan.isClosed: + # ChannelRaw already open + return false + + store(chan.closed, false, moRelaxed) + result = true + +# Public API +# ---------------------------------------------------------------------------------- + +type + Channel*[T] = object ## Typed channels + d: ChannelRaw + +proc `=destroy`*[T](c: var Channel[T]) = + if c.d != nil: + if load(c.d.atomicCounter, moAcquire) == 0: + if c.d.buffer != nil: + freeChannel(c.d) + else: + atomicDec(c.d.atomicCounter) + +proc `=`*[T](dest: var Channel[T], src: Channel[T]) = + ## Shares `Channel` by reference counting. + if src.d != nil: + atomicInc(src.d.atomicCounter) + + if dest.d != nil: + `=destroy`(dest) + dest.d = src.d + +proc channelSend[T](chan: Channel[T], data: sink T, size: int, nonBlocking: bool): bool {.inline.} = + ## Send item to the channel (FIFO queue) + ## (Insert at last) + sendMpmc(chan.d, data.unsafeAddr, size, nonBlocking) + +proc channelReceive[T](chan: Channel[T], data: ptr T, size: int, nonBlocking: bool): bool {.inline.} = + ## Receive an item from the channel + ## (Remove the first item) + recvMpmc(chan.d, data, size, nonBlocking) + +func trySend*[T](c: Channel[T], src: var Isolated[T]): bool {.inline.} = + ## Sends item to the channel(non blocking). + var data = src.extract + result = channelSend(c, data, sizeof(data), true) + if result: + wasMoved(data) + +template trySend*[T](c: Channel[T], src: T): bool = + ## Helper templates for `trySend`. + trySend(c, isolate(src)) + +func tryRecv*[T](c: Channel[T], dst: var T): bool {.inline.} = + ## Receives item from the channel(non blocking). + channelReceive(c, dst.addr, sizeof(dst), true) + +func send*[T](c: Channel[T], src: sink Isolated[T]) {.inline.} = + ## Sends item to the channel(blocking). + var data = src.extract + discard channelSend(c, data, sizeof(data), false) + wasMoved(data) + +template send*[T](c: var Channel[T]; src: T) = + ## Helper templates for `send`. + send(c, isolate(src)) + +func recv*[T](c: Channel[T], dst: var T) {.inline.} = + ## Receives item from the channel(blocking). + discard channelReceive(c, dst.addr, sizeof(dst), false) + +func recvIso*[T](c: Channel[T]): Isolated[T] {.inline.} = + var dst: T + discard channelReceive(c, dst.addr, sizeof(dst), false) + result = isolate(dst) + +func open*[T](c: Channel[T]): bool {.inline.} = + result = c.d.channelOpenMpmc() + +func close*[T](c: Channel[T]): bool {.inline.} = + result = c.d.channelCloseMpmc() + +func peek*[T](c: Channel[T]): int {.inline.} = peek(c.d) + +proc newChannel*[T](elements = 30): Channel[T] = + assert elements >= 1, "Elements must be positive!" + result = Channel[T](d: allocChannel(sizeof(T), elements)) diff --git a/tests/stdlib/tchannels_pthread.nim b/tests/stdlib/tchannels_pthread.nim new file mode 100644 index 0000000000000..a9708d8fb6815 --- /dev/null +++ b/tests/stdlib/tchannels_pthread.nim @@ -0,0 +1,321 @@ +discard """ + targets: "c cpp" + matrix: "--gc:orc --threads:on; --gc:orc --threads:on -d:blockingTest" + disabled: "windows" + disabled: "bsd" + disabled: "osx" +""" + +include std/channels + +import std/unittest + + +type + ChannelBufKind = enum + Unbuffered # Unbuffered (blocking) channel + Buffered # Buffered (non-blocking channel) + + +proc capacity(chan: ChannelRaw): int {.inline.} = chan.size +func isBuffered(chan: ChannelRaw): bool = + chan.size - 1 > 0 + +when defined(blockingTest): + const nonBlocking = false +else: + const nonBlocking = true + +type + Pthread {.importc: "pthread_t", header: "".} = distinct culong + PthreadAttr* {.byref, importc: "pthread_attr_t", header: "".} = object + Errno* = distinct cint + +proc pthread_create[T]( + thread: var Pthread, + attr: ptr PthreadAttr, # In Nim this is a var and how Nim sets a custom stack + fn: proc (x: ptr T): pointer {.thread, noconv.}, + arg: ptr T + ): Errno {.header: "".} + +proc pthread_join( + thread: Pthread, + thread_exit_status: ptr pointer + ): Errno {.header: "".} + +template channel_send_loop(chan: ChannelRaw, + data: sink pointer, + size: int, + body: untyped): untyped = + while not sendMpmc(chan, data, size, nonBlocking): + body + +template channel_receive_loop(chan: ChannelRaw, + data: pointer, + size: int, + body: untyped): untyped = + while not recvMpmc(chan, data, size, nonBlocking): + body + + +# Without threads:on or release, +# worker threads will crash on popFrame + +import std/unittest + +type ThreadArgs = object + ID: int + chan: ChannelRaw + +template Worker(id: int, body: untyped): untyped {.dirty.} = + if args.ID == id: + body + + +const Sender = 1 +const Receiver = 0 + +proc runSuite( + name: string, + fn: proc(args: ptr ThreadArgs): pointer {.noconv, gcsafe.} + ) = + var chan: ChannelRaw + + for i in Unbuffered .. Buffered: + if i == Unbuffered: + chan = allocChannel(size = 32, n = 1) + check: + peek(chan) == 0 + capacity(chan) == 1 + isBuffered(chan) == false + isUnbuffered(chan) == true + else: + chan = allocChannel(size = int.sizeof.int, n = 7) + check: + peek(chan) == 0 + capacity(chan) == 7 + isBuffered(chan) == true + isUnbuffered(chan) == false + + var threads: array[2, Pthread] + var args = [ + ThreadArgs(ID: 0, chan: chan), + ThreadArgs(ID: 1, chan: chan) + ] + + discard pthread_create(threads[0], nil, fn, args[0].addr) + discard pthread_create(threads[1], nil, fn, args[1].addr) + + discard pthread_join(threads[0], nil) + discard pthread_join(threads[1], nil) + + freeChannel(chan) + +# ---------------------------------------------------------------------------------- + +proc thread_func(args: ptr ThreadArgs): pointer {.noconv.} = + + # Worker RECEIVER: + # --------- + # <- chan + # <- chan + # <- chan + # + # Worker SENDER: + # --------- + # chan <- 42 + # chan <- 53 + # chan <- 64 + # + + Worker(Receiver): + var val: int + for j in 0 ..< 3: + channel_receive_loop(args.chan, val.addr, val.sizeof.int): + # Busy loop, normally it should yield + discard + check: val == 42 + j*11 + + Worker(Sender): + var val: int + check: peek(args.chan) == 0 + for j in 0 ..< 3: + val = 42 + j*11 + channel_send_loop(args.chan, val.addr, val.sizeof.int): + # Busy loop, normally it should yield + discard + + return nil + +runSuite("[ChannelRaw] 2 threads can send data", thread_func) + +# ---------------------------------------------------------------------------------- + +iterator pairs(chan: ChannelRaw, T: typedesc): (int, T) = + var i = 0 + var x: T + while not isClosed(chan) or peek(chan) > 0: + let r = recvMpmc(chan, x.addr, x.sizeof.int, true) + # printf("x: %d, r: %d\n", x, r) + if r: + yield (i, x) + inc i + +proc thread_func_2(args: ptr ThreadArgs): pointer {.noconv.} = + # Worker RECEIVER: + # --------- + # <- chan until closed and empty + # + # Worker SENDER: + # --------- + # chan <- 42, 53, 64, ... + + const N = 100 + + Worker(Receiver): + for j, val in pairs(args.chan, int): + # TODO: Need special handling that doesn't allocate + # in thread with no GC + # when check fails + # + check: val == 42 + j*11 + + Worker(Sender): + var val: int + check: peek(args.chan) == 0 + for j in 0 ..< N: + val = 42 + j*11 + channel_send_loop(args.chan, val.addr, int.sizeof.int): + discard + discard channelCloseMpmc(args.chan) + + return nil + +runSuite("[ChannelRaw] channel_close, freeChannel, channelCache", thread_func_2) + +# ---------------------------------------------------------------------------------- + +proc isCached(chan: ChannelRaw): bool = + assert not chan.isNil + + var p = channelCache + while not p.isNil: + if chan.itemsize == p.chanSize and + chan.size == p.chanN: + for i in 0 ..< p.numCached: + if chan == p.cache[i]: + return true + # No more channel in cache can match + return false + p = p.next + return false + +block: # [ChannelRaw] ChannelRaw caching implementation + + # Start from clean cache slate + freeChannelCache() + + block: # Explicit caches allocation + check: + allocChannelCache(int sizeof(char), 4) + allocChannelCache(int sizeof(int), 8) + allocChannelCache(int sizeof(ptr float64), 16) + + # Don't create existing channel cache + not allocChannelCache(int sizeof(char), 4) + not allocChannelCache(int sizeof(int), 8) + not allocChannelCache(int sizeof(ptr float64), 16) + + check: + channelCacheLen == 3 + + # --------------------------------- + var chan, stash: array[10, ChannelRaw] + + block: # Implicit caches allocation + + chan[0] = allocChannel(sizeof(char), 4) + chan[1] = allocChannel(sizeof(int32), 8) + chan[2] = allocChannel(sizeof(ptr float64), 16) + + chan[3] = allocChannel(sizeof(char), 5) + chan[4] = allocChannel(sizeof(int64), 8) + chan[5] = allocChannel(sizeof(ptr float32), 24) + + # We have caches ready to store specific channel kinds + check: channelCacheLen == 6 # Cumulated with previous test + # But they are not in cache while in use + check: + not chan[0].isCached + not chan[1].isCached + not chan[2].isCached + not chan[3].isCached + not chan[4].isCached + not chan[5].isCached + + block: # Freed channels are returned to cache + stash[0..5] = chan.toOpenArray(0, 5) + for i in 0 .. 5: + # Free the channels + freeChannel(chan[i]) + + check: + stash[0].isCached + stash[1].isCached + stash[2].isCached + stash[3].isCached + stash[4].isCached + stash[5].isCached + + block: # Cached channels are being reused + + chan[6] = allocChannel(sizeof(char), 4) + chan[7] = allocChannel(sizeof(int32), 8) + chan[8] = allocChannel(sizeof(ptr float32), 16) + chan[9] = allocChannel(sizeof(ptr float64), 16) + + # All (itemsize, queue size, implementation) were already allocated + check: channelCacheLen == 6 + + # We reused old channels from cache + check: + chan[6] == stash[0] + chan[7] == stash[1] + chan[8] == stash[2] + # chan[9] - required a fresh alloc + + block: # Clearing the cache + + stash[6..9] = chan.toOpenArray(6, 9) + + for i in 6 .. 9: + freeChannel(chan[i]) + + check: + stash[6].isCached + stash[7].isCached + stash[8].isCached + stash[9].isCached + + freeChannelCache() + + # Check that nothing is cached anymore + for i in 0 .. 9: + check: not stash[i].isCached + # And length is reset to 0 + check: channelCacheLen == 0 + + # Cache can grow again + chan[0] = allocChannel(sizeof((int, float, int32, uint)), 1) + chan[1] = allocChannel(sizeof(int32), 0) + chan[2] = allocChannel(sizeof(int32), 0) + + check: channelCacheLen == 2 + + # Interleave cache clear and channel free + freeChannelCache() + check: channelCacheLen == 0 + + freeChannel(chan[0]) + freeChannel(chan[1]) + freeChannel(chan[2]) diff --git a/tests/stdlib/tchannels_simple.nim b/tests/stdlib/tchannels_simple.nim new file mode 100644 index 0000000000000..dc4857c3e853d --- /dev/null +++ b/tests/stdlib/tchannels_simple.nim @@ -0,0 +1,68 @@ +discard """ + matrix: "--threads:on --gc:orc; --threads:on --gc:arc" + disabled: "freebsd" +""" + +import std/channels +import std/os + +var chan = newChannel[string]() + +# This proc will be run in another thread using the threads module. +proc firstWorker() = + chan.send("Hello World!") + +# This is another proc to run in a background thread. This proc takes a while +# to send the message since it sleeps for 2 seconds (or 2000 milliseconds). +proc secondWorker() = + sleep(2000) + chan.send("Another message") + + +# Launch the worker. +var worker1: Thread[void] +createThread(worker1, firstWorker) + +# Block until the message arrives, then print it out. +var dest = "" +chan.recv(dest) +doAssert dest == "Hello World!" + +# Wait for the thread to exit before moving on to the next example. +worker1.joinThread() + +# Launch the other worker. +var worker2: Thread[void] +createThread(worker2, secondWorker) +# This time, use a non-blocking approach with tryRecv. +# Since the main thread is not blocked, it could be used to perform other +# useful work while it waits for data to arrive on the channel. + +var messages: seq[string] +var msg = "" +while true: + let tried = chan.tryRecv(msg) + if tried: + messages.add move(msg) + break + + messages.add "Pretend I'm doing useful work..." + # For this example, sleep in order not to flood stdout with the above + # message. + sleep(400) + +# Wait for the second thread to exit before cleaning up the channel. +worker2.joinThread() + +# Clean up the channel. +doAssert chan.close() +doAssert messages[^1] == "Another message" +doAssert messages.len >= 2 + + +block: + let chan0 = newChannel[int]() + let chan1 = chan0 + block: + let chan3 = chan0 + let chan4 = chan0