diff --git a/src/cse-machine/utils.ts b/src/cse-machine/utils.ts index c6f5d6ef5..ddcbcd40a 100644 --- a/src/cse-machine/utils.ts +++ b/src/cse-machine/utils.ts @@ -77,6 +77,21 @@ export class Stack implements IStack { return this.storage[this.size() - 1] } + /** + * Returns the top n items in the stack. + * + * @param n amount of items to peek from the top of the stack + * @returns an array of the top n items in the stack + * the first item in the array is the top of the stack + */ + public peekN(n: number): T[] | undefined { + if (this.isEmpty()) { + return undefined + } + + return this.storage.slice(-n).reverse() + } + public size(): number { return this.storage.length } diff --git a/src/go-slang/goroutine.ts b/src/go-slang/goroutine.ts index 984e52d84..e572b4d15 100644 --- a/src/go-slang/goroutine.ts +++ b/src/go-slang/goroutine.ts @@ -12,7 +12,7 @@ import { AstMap } from './lib/astMap' import { evaluateBinaryOp } from './lib/binaryOp' import { Environment } from './lib/env' import { Heap, HeapAddress } from './lib/heap' -import { Result, isAny } from './lib/utils' +import { Counter, Result, isAny } from './lib/utils' import { AssignOp, Assignment, @@ -61,7 +61,7 @@ import { } from './types' import { Scheduler } from './scheduler' import { PredeclaredFuncT } from './lib/predeclared' -import { BufferedChannel } from './lib/channel' +import { BufferedChannel, UnbufferedChannel } from './lib/channel' export type Control = Stack export type Stash = Stack @@ -77,12 +77,16 @@ export interface Context { } export class GoRoutine { + static idCounter: Counter = new Counter() + + private id: number private context: Context private scheduler: Scheduler public isMain: boolean constructor(context: Context, scheduler: Scheduler, isMain: boolean = false) { + this.id = GoRoutine.idCounter.next() this.context = context this.scheduler = scheduler this.isMain = isMain @@ -100,7 +104,7 @@ export class GoRoutine { return Result.fail(new UnknownInstructionError(inst.type)) } - const runtimeError = Interpreter[inst.type](inst, this.context, this.scheduler) + const runtimeError = Interpreter[inst.type](inst, this.context, this.scheduler, this.id) return runtimeError ? Result.fail(runtimeError) : Result.ok() } } @@ -109,7 +113,8 @@ const Interpreter: { [key: string]: ( inst: Instruction, context: Context, - sched: Scheduler + sched: Scheduler, + routineId: number ) => RuntimeSourceError | void } = { SourceFile: ({ topLevelDecls }: SourceFile, { C, H }) => C.pushR(...H.allocM(topLevelDecls)), @@ -308,26 +313,46 @@ const Interpreter: { return sched.schedule(new GoRoutine({ C: _C, S: _S, E: _E, B, H, A }, sched)) }, - ChanRecvOp: (_inst, { C, S, H }) => { - const chanAddr = S.peek() - const chan = H.resolve(chanAddr) as BufferedChannel + ChanRecvOp: (_inst, { C, S, H }, _sched, routineId: number) => { + const chan = H.resolve(S.peek()) as BufferedChannel | UnbufferedChannel + + if (chan instanceof BufferedChannel) { + // if the channel is empty, we retry the receive operation + if (chan.isBufferEmpty()) { return C.push(ChanRecv) } // prettier-ignore - // if the channel is empty, we retry the receive operation - if (chan.isBufferEmpty()) { return C.push(ChanRecv) } // prettier-ignore + S.pop() // pop the channel address + return S.push(H.alloc(chan.recv())) + } - S.pop() - S.push(H.alloc(chan.recv())) + if (chan instanceof UnbufferedChannel) { + const recvValue = chan.recv(routineId) + // if we cannot receive, we retry the receive operation + if (recvValue === null) { return C.push(ChanRecv) } // prettier-ignore + + S.pop() // pop the channel address + return S.push(H.alloc(recvValue)) + } }, - ChanSendOp: (_inst, { C, S, H }) => { - const chanAddr = S.peek() - const chan = H.resolve(chanAddr) as BufferedChannel + ChanSendOp: (_inst, { C, S, H }, _sched, routineId: number) => { + const [chan, sendValue] = H.resolveM(S.peekN(2)!) as [BufferedChannel | UnbufferedChannel, any] - // if the channel is full, we retry the send operation - if (chan.isBufferFull()) { return C.push(ChanSend) } // prettier-ignore + if (chan instanceof BufferedChannel) { + // if the channel is full, we retry the send operation + if (chan.isBufferFull()) { return C.push(ChanSend) } // prettier-ignore - const [_, valueAddr] = S.popN(2) - chan.send(H.resolve(valueAddr)) + S.popN(2) // pop the channel address and the value address + chan.send(sendValue) + return + } + + if (chan instanceof UnbufferedChannel) { + // if we cannot send, we retry the send operation + if (!chan.send(routineId, sendValue)) { return C.push(ChanSend) } // prettier-ignore + + S.popN(2) // pop the channel address and the value address + return + } }, BranchOp: ({ cons, alt }: BranchOp, { S, C, H }) => diff --git a/src/go-slang/lib/channel.ts b/src/go-slang/lib/channel.ts index 93e3f8951..5099063f9 100644 --- a/src/go-slang/lib/channel.ts +++ b/src/go-slang/lib/channel.ts @@ -15,6 +15,131 @@ class Channel { protected setSlotValue(slotIdx: number, value: number): void { this.memory.setFloat64(this.getSlotAddr(slotIdx), value) } } +export class UnbufferedChannel extends Channel { + static RECV_ID_OFFSET = 1 + static SEND_ID_OFFSET = 3 + static SYNCED_OFFSET = 7 + + static NULL_ID = -1 + + constructor(memory: DataView) { super(memory) } // prettier-ignore + + public send(routineId: number, value: any): boolean { + const isSender = this.sendId === routineId + if (isSender) { + if (this.synced) { + // there is a receiver that has already taken the value + // that the current routine was trying to send previously + // therefore, the current routine can continue + + // reset the channel state + this.reset() + return true + } else { + // the value hasn't been taken by a receiver + // so the routine needs to continue waiting + return false + } + } + + if (this.hasSender() || this.synced) { + // if there is another sender or another sync is happening, the current routine + // cannot send the value and needs to wait + return false + } + + // there is no sender and no sync happening + // the current routine can try to send the value + + this.setSlotValue(0, value) // set the value to be sent + this.sendId = routineId // claim the channel as the sender + + if (this.hasReceiver()) { + // if there is a receiver waiting, the current routine can proceed to start + // the sync process + this.synced = true + return true + } else { + // no receiver waiting, the current routine needs to wait + // we hand the responsibility of syncing to the receiver + return false + } + } + + public recv(routineId: number): number | null { + const isReciever = this.recvId === routineId + if (isReciever) { + if (this.synced) { + // there is a value to be recieved by the current routine + // that was trying to recieve previously + // therefore, the current routine can continue + + // extract the value from the channel and reset the channel state + const value = this.getSlotValue(0) + this.reset() + return value + } else { + // there is no value to be recieved so the routine needs to continue waiting + return null + } + } + + if (this.hasReceiver() || this.synced) { + // if there is another reciever or another sync is happening, the current routine + // cannot try to recieve a value and needs to wait + return null + } + + // there is no reciever and no sync happening + // the current routine can try to recieve a value + + if (this.hasSender()) { + // if there is a sender waiting, the current routine can proceed to start + // the recieve the value and start the sync process + this.synced = true + return this.getSlotValue(0) + } else { + // no sender waiting, the current routine needs to wait + // we hand the responsibility of syncing to the sender + return null + } + } + + private hasSender(): boolean { return this.sendId !== UnbufferedChannel.NULL_ID } // prettier-ignore + + private hasReceiver(): boolean { return this.recvId !== UnbufferedChannel.NULL_ID } // prettier-ignore + + private get recvId(): number { + return this.memory.getInt16(UnbufferedChannel.RECV_ID_OFFSET) + } + + private set recvId(newRecvId: number) { + this.memory.setInt16(UnbufferedChannel.RECV_ID_OFFSET, newRecvId) + } + + private get sendId(): number { + return this.memory.getInt16(UnbufferedChannel.SEND_ID_OFFSET) + } + + private set sendId(newSendId: number) { + this.memory.setInt16(UnbufferedChannel.SEND_ID_OFFSET, newSendId) + } + + private get synced(): boolean { + return this.memory.getUint8(UnbufferedChannel.SYNCED_OFFSET) === 1 + } + + private set synced(hasSynced: boolean) { + this.memory.setUint8(UnbufferedChannel.SYNCED_OFFSET, hasSynced ? 1 : 0) + } + + private reset(): void { + this.sendId = UnbufferedChannel.NULL_ID + this.recvId = UnbufferedChannel.NULL_ID + this.synced = false + } +} + export class BufferedChannel extends Channel { static READ_IDX_OFFSET = 1 static WRITE_IDX_OFFSET = 2 diff --git a/src/go-slang/lib/heap/index.ts b/src/go-slang/lib/heap/index.ts index 842a86242..2f02d494d 100644 --- a/src/go-slang/lib/heap/index.ts +++ b/src/go-slang/lib/heap/index.ts @@ -18,7 +18,7 @@ import { isNode } from '../../types' import { AstMap } from '../astMap' -import { BufferedChannel } from '../channel' +import { BufferedChannel, UnbufferedChannel } from '../channel' import { DEFAULT_HEAP_SIZE, WORD_SIZE } from './config' import { PointerTag } from './tags' @@ -172,6 +172,8 @@ export class Heap { } as EnvOp case PointerTag.PopSOp: return PopS + case PointerTag.UnbufferedChannel: + return new UnbufferedChannel(new DataView(this.memory.buffer, mem_addr, WORD_SIZE * 2)) case PointerTag.BufferedChannel: const chanMaxBufSize = this.size(heap_addr) const chanMemRegion = new DataView( @@ -298,8 +300,18 @@ export class Heap { return ptr_heap_addr } + /* Memory Layout of an BufferedChannel: + * [0:tag, 1-2:recvId, 3-4:sendId, 5-6:bufSize, 7:hasSynced] (2 words) + */ public allocateUnbufferedChan(): HeapAddress { - throw new Error('allocateUnbufferedChan not implemented.') + const ptr_heap_addr = this.allocateTaggedPtr(PointerTag.UnbufferedChannel, 2) + + const ptr_mem_addr = ptr_heap_addr * WORD_SIZE + this.memory.setInt16(ptr_mem_addr + 1, -1) // initialize recvId to -1 + this.memory.setInt16(ptr_mem_addr + 3, -1) // initialize sendId to -1 + this.memory.setUint8(ptr_mem_addr + 7, 0) // initialize hasSynced to false + + return ptr_heap_addr } /* Memory Layout of an BufferedChannel: diff --git a/src/go-slang/lib/heap/tags.ts b/src/go-slang/lib/heap/tags.ts index 7c7c5546b..7b6250e93 100644 --- a/src/go-slang/lib/heap/tags.ts +++ b/src/go-slang/lib/heap/tags.ts @@ -13,5 +13,6 @@ export enum PointerTag { ClosureOp, EnvOp, PopSOp, - BufferedChannel + BufferedChannel, + UnbufferedChannel }