Skip to content

Commit

Permalink
Merge pull request #57 from guzba/ryan
Browse files Browse the repository at this point in the history
rm
  • Loading branch information
guzba committed May 8, 2023
2 parents aa780d4 + fad1335 commit e9fd54a
Showing 1 changed file with 4 additions and 92 deletions.
96 changes: 4 additions & 92 deletions src/mummy.nim
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,7 @@ when defined(linux):
let SOCK_NONBLOCK
{.importc: "SOCK_NONBLOCK", header: "<sys/socket.h>".}: cint

# const useLockAndCond = (not defined(linux)) or defined(mummyUseLockAndCond)

# when useLockAndCond:
import std/locks
# else:
# proc eventfd(count: cuint, flags: cint): cint
# {.cdecl, importc: "eventfd", header: "<sys/eventfd.h>".}

export Port, common, httpheaders

Expand Down Expand Up @@ -81,20 +75,14 @@ type
logHandler: LogHandler
maxHeadersLen, maxBodyLen, maxMessageLen: int
rand: Rand
workerThreads: seq[Thread[(Server, int)]]
workerThreads: seq[Thread[Server]]
serving, destroyCalled: Atomic[bool]
socket: SocketHandle
selector: Selector[DataEntry]
responseQueued, sendQueued, shutdown: SelectEvent
clientSockets: HashSet[SocketHandle]
# when useLockAndCond:
taskQueueLock: Lock
taskQueueCond: Cond
# else:
# taskQueueLock: Atomic[bool]
# workerEventFds: seq[cint]
# destroyCalledFd: cint
# workersAwake: int
taskQueue: Deque[WorkerTask]
responseQueue: Deque[OutgoingBuffer]
responseQueueLock: Atomic[bool]
Expand Down Expand Up @@ -245,17 +233,6 @@ proc trigger(
"Error triggering event ", $err, " ", osErrorMsg(err)
)

# when not useLockAndCond:
# proc trigger(server: Server, efd: cint) {.raises: [].} =
# var v: uint64 = 1
# let ret = write(efd, v.addr, sizeof(uint64))
# if ret != sizeof(uint64):
# let err = osLastError()
# server.log(
# ErrorLevel,
# "Error writing to eventfd ", $err, " ", osErrorMsg(err)
# )

proc send*(
websocket: WebSocket,
data: sink string,
Expand Down Expand Up @@ -432,11 +409,9 @@ proc upgradeToWebSocket*(

request.respond(101, headers, "")

proc workerProc(params: (Server, int)) {.raises: [].} =
proc workerProc(server: Server) {.raises: [].} =
# The worker threads run the task queue here
let
server = params[0]
threadIdx = params[1]
let server = server

proc runTask(task: WorkerTask) =
if task.request != nil:
Expand Down Expand Up @@ -491,7 +466,6 @@ proc workerProc(params: (Server, int)) {.raises: [].} =
if update.event == CloseEvent:
break

# when useLockAndCond:
while true:
acquire(server.taskQueueLock)

Expand All @@ -507,55 +481,11 @@ proc workerProc(params: (Server, int)) {.raises: [].} =
release(server.taskQueueLock)

runTask(task)
# else:
# var pollFds: array[2, TPollfd]
# pollFds[0].fd = server.workerEventFds[threadIdx]
# pollFds[0].events = POLLIN
# pollFds[1].fd = server.destroyCalledFd
# pollFds[1].events = POLLIN

# while true:
# if server.destroyCalled.load(moRelaxed):
# break
# var
# task: WorkerTask
# poppedTask: bool
# withLock server.taskQueueLock:
# if server.taskQueue.len > 0:
# task = server.taskQueue.popFirst()
# poppedTask = true
# if poppedTask:
# runTask(task)
# else:
# # Go to sleep if there are no tasks to run
# discard poll(pollFds[0].addr, 2, -1)
# if pollFds[0].revents != 0:
# var data: uint64 = 0
# let ret = posix.read(pollFds[0].fd, data.addr, sizeof(uint64))
# if ret != sizeof(uint64):
# let err = osLastError()
# server.log(
# ErrorLevel,
# "Error reading eventfd ", $err, " ", osErrorMsg(err)
# )

proc postTask(server: Server, task: WorkerTask) {.raises: [].} =
# when useLockAndCond:
withLock server.taskQueueLock:
server.taskQueue.addLast(task)
signal(server.taskQueueCond)
# else:
# withLock server.taskQueueLock:
# # If the task queue is not empty, no threads could have fallen asleep
# # If the task queue is empty, any number could have fallen asleep
# if server.taskQueue.len == 0:
# server.workersAwake = 0
# server.taskQueue.addLast(task)

# if server.workersAwake < server.workerThreads.len:
# # Wake up a worker
# server.trigger(server.workerEventFds[server.workersAwake])
# inc server.workersAwake

proc postWebSocketUpdate(
websocket: WebSocket,
Expand Down Expand Up @@ -1102,19 +1032,11 @@ proc destroy(server: Server, joinThreads: bool) {.raises: [].} =
server.socket.close()
for clientSocket in server.clientSockets:
clientSocket.close()
# when useLockAndCond:
broadcast(server.taskQueueCond)
# else:
# server.trigger(server.destroyCalledFd)
if joinThreads:
joinThreads(server.workerThreads)
# when useLockAndCond:
deinitLock(server.taskQueueLock)
deinitCond(server.taskQueueCond)
# else:
# for workerEventFd in server.workerEventFds:
# discard workerEventFd.close()
# discard server.destroyCalledFd.close()
try:
server.responseQueued.close()
except:
Expand Down Expand Up @@ -1517,21 +1439,11 @@ proc newServer*(
shutdownData.event = result.shutdown
result.selector.registerEvent(result.shutdown, shutdownData)

# when useLockAndCond:
initLock(result.taskQueueLock)
initCond(result.taskQueueCond)
# else:
# result.workerEventFds.setLen(workerThreads)

# for i in 0 ..< workerThreads:
# result.workerEventFds[i] = eventfd(0, O_CLOEXEC or O_NONBLOCK)
# if result.workerEventFds[i] == -1:
# raiseOSError(osLastError())

# result.destroyCalledFd = eventfd(0, O_CLOEXEC or O_NONBLOCK)

for i in 0 ..< workerThreads:
createThread(result.workerThreads[i], workerProc, (result, i))
createThread(result.workerThreads[i], workerProc, result)
except:
result.destroy(true)
raise currentExceptionAsMummyError()
Expand Down

0 comments on commit e9fd54a

Please sign in to comment.