diff --git a/src/mummy.nim b/src/mummy.nim index 1dff846..cb632cb 100644 --- a/src/mummy.nim +++ b/src/mummy.nim @@ -19,13 +19,7 @@ when defined(linux): let SOCK_NONBLOCK {.importc: "SOCK_NONBLOCK", header: "".}: cint -# const useLockAndCond = (not defined(linux)) or defined(mummyUseLockAndCond) - -# when useLockAndCond: import std/locks -# else: -# proc eventfd(count: cuint, flags: cint): cint -# {.cdecl, importc: "eventfd", header: "".} export Port, common, httpheaders @@ -81,20 +75,14 @@ type logHandler: LogHandler maxHeadersLen, maxBodyLen, maxMessageLen: int rand: Rand - workerThreads: seq[Thread[(Server, int)]] + workerThreads: seq[Thread[Server]] serving, destroyCalled: Atomic[bool] socket: SocketHandle selector: Selector[DataEntry] responseQueued, sendQueued, shutdown: SelectEvent clientSockets: HashSet[SocketHandle] - # when useLockAndCond: taskQueueLock: Lock taskQueueCond: Cond - # else: - # taskQueueLock: Atomic[bool] - # workerEventFds: seq[cint] - # destroyCalledFd: cint - # workersAwake: int taskQueue: Deque[WorkerTask] responseQueue: Deque[OutgoingBuffer] responseQueueLock: Atomic[bool] @@ -245,17 +233,6 @@ proc trigger( "Error triggering event ", $err, " ", osErrorMsg(err) ) -# when not useLockAndCond: -# proc trigger(server: Server, efd: cint) {.raises: [].} = -# var v: uint64 = 1 -# let ret = write(efd, v.addr, sizeof(uint64)) -# if ret != sizeof(uint64): -# let err = osLastError() -# server.log( -# ErrorLevel, -# "Error writing to eventfd ", $err, " ", osErrorMsg(err) -# ) - proc send*( websocket: WebSocket, data: sink string, @@ -432,11 +409,9 @@ proc upgradeToWebSocket*( request.respond(101, headers, "") -proc workerProc(params: (Server, int)) {.raises: [].} = +proc workerProc(server: Server) {.raises: [].} = # The worker threads run the task queue here - let - server = params[0] - threadIdx = params[1] + let server = server proc runTask(task: WorkerTask) = if task.request != nil: @@ -491,7 +466,6 @@ proc workerProc(params: (Server, int)) {.raises: [].} = if update.event == CloseEvent: break - # when useLockAndCond: while true: acquire(server.taskQueueLock) @@ -507,55 +481,11 @@ proc workerProc(params: (Server, int)) {.raises: [].} = release(server.taskQueueLock) runTask(task) - # else: - # var pollFds: array[2, TPollfd] - # pollFds[0].fd = server.workerEventFds[threadIdx] - # pollFds[0].events = POLLIN - # pollFds[1].fd = server.destroyCalledFd - # pollFds[1].events = POLLIN - - # while true: - # if server.destroyCalled.load(moRelaxed): - # break - # var - # task: WorkerTask - # poppedTask: bool - # withLock server.taskQueueLock: - # if server.taskQueue.len > 0: - # task = server.taskQueue.popFirst() - # poppedTask = true - # if poppedTask: - # runTask(task) - # else: - # # Go to sleep if there are no tasks to run - # discard poll(pollFds[0].addr, 2, -1) - # if pollFds[0].revents != 0: - # var data: uint64 = 0 - # let ret = posix.read(pollFds[0].fd, data.addr, sizeof(uint64)) - # if ret != sizeof(uint64): - # let err = osLastError() - # server.log( - # ErrorLevel, - # "Error reading eventfd ", $err, " ", osErrorMsg(err) - # ) proc postTask(server: Server, task: WorkerTask) {.raises: [].} = - # when useLockAndCond: withLock server.taskQueueLock: server.taskQueue.addLast(task) signal(server.taskQueueCond) - # else: - # withLock server.taskQueueLock: - # # If the task queue is not empty, no threads could have fallen asleep - # # If the task queue is empty, any number could have fallen asleep - # if server.taskQueue.len == 0: - # server.workersAwake = 0 - # server.taskQueue.addLast(task) - - # if server.workersAwake < server.workerThreads.len: - # # Wake up a worker - # server.trigger(server.workerEventFds[server.workersAwake]) - # inc server.workersAwake proc postWebSocketUpdate( websocket: WebSocket, @@ -1102,19 +1032,11 @@ proc destroy(server: Server, joinThreads: bool) {.raises: [].} = server.socket.close() for clientSocket in server.clientSockets: clientSocket.close() - # when useLockAndCond: broadcast(server.taskQueueCond) - # else: - # server.trigger(server.destroyCalledFd) if joinThreads: joinThreads(server.workerThreads) - # when useLockAndCond: deinitLock(server.taskQueueLock) deinitCond(server.taskQueueCond) - # else: - # for workerEventFd in server.workerEventFds: - # discard workerEventFd.close() - # discard server.destroyCalledFd.close() try: server.responseQueued.close() except: @@ -1517,21 +1439,11 @@ proc newServer*( shutdownData.event = result.shutdown result.selector.registerEvent(result.shutdown, shutdownData) - # when useLockAndCond: initLock(result.taskQueueLock) initCond(result.taskQueueCond) - # else: - # result.workerEventFds.setLen(workerThreads) - - # for i in 0 ..< workerThreads: - # result.workerEventFds[i] = eventfd(0, O_CLOEXEC or O_NONBLOCK) - # if result.workerEventFds[i] == -1: - # raiseOSError(osLastError()) - - # result.destroyCalledFd = eventfd(0, O_CLOEXEC or O_NONBLOCK) for i in 0 ..< workerThreads: - createThread(result.workerThreads[i], workerProc, (result, i)) + createThread(result.workerThreads[i], workerProc, result) except: result.destroy(true) raise currentExceptionAsMummyError()