Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: unbuffered channel #21

Merged
merged 4 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/cse-machine/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,21 @@ export class Stack<T> implements IStack<T> {
return this.storage[this.size() - 1]
}

/**
* Returns the top n items in the stack.
*
* @param n amount of items to peek from the top of the stack
* @returns an array of the top n items in the stack
* the first item in the array is the top of the stack
*/
public peekN(n: number): T[] | undefined {
if (this.isEmpty()) {
return undefined
}

return this.storage.slice(-n).reverse()
}

public size(): number {
return this.storage.length
}
Expand Down
61 changes: 43 additions & 18 deletions src/go-slang/goroutine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { AstMap } from './lib/astMap'
import { evaluateBinaryOp } from './lib/binaryOp'
import { Environment } from './lib/env'
import { Heap, HeapAddress } from './lib/heap'
import { Result, isAny } from './lib/utils'
import { Counter, Result, isAny } from './lib/utils'
import {
AssignOp,
Assignment,
Expand Down Expand Up @@ -61,7 +61,7 @@ import {
} from './types'
import { Scheduler } from './scheduler'
import { PredeclaredFuncT } from './lib/predeclared'
import { BufferedChannel } from './lib/channel'
import { BufferedChannel, UnbufferedChannel } from './lib/channel'

export type Control = Stack<Instruction | HeapAddress>
export type Stash = Stack<HeapAddress | any>
Expand All @@ -77,12 +77,16 @@ export interface Context {
}

export class GoRoutine {
static idCounter: Counter = new Counter()

private id: number
private context: Context
private scheduler: Scheduler

public isMain: boolean

constructor(context: Context, scheduler: Scheduler, isMain: boolean = false) {
this.id = GoRoutine.idCounter.next()
this.context = context
this.scheduler = scheduler
this.isMain = isMain
Expand All @@ -100,7 +104,7 @@ export class GoRoutine {
return Result.fail(new UnknownInstructionError(inst.type))
}

const runtimeError = Interpreter[inst.type](inst, this.context, this.scheduler)
const runtimeError = Interpreter[inst.type](inst, this.context, this.scheduler, this.id)
return runtimeError ? Result.fail(runtimeError) : Result.ok()
}
}
Expand All @@ -109,7 +113,8 @@ const Interpreter: {
[key: string]: (
inst: Instruction,
context: Context,
sched: Scheduler
sched: Scheduler,
routineId: number
) => RuntimeSourceError | void
} = {
SourceFile: ({ topLevelDecls }: SourceFile, { C, H }) => C.pushR(...H.allocM(topLevelDecls)),
Expand Down Expand Up @@ -308,26 +313,46 @@ const Interpreter: {
return sched.schedule(new GoRoutine({ C: _C, S: _S, E: _E, B, H, A }, sched))
},

ChanRecvOp: (_inst, { C, S, H }) => {
const chanAddr = S.peek()
const chan = H.resolve(chanAddr) as BufferedChannel
ChanRecvOp: (_inst, { C, S, H }, _sched, routineId: number) => {
const chan = H.resolve(S.peek()) as BufferedChannel | UnbufferedChannel

if (chan instanceof BufferedChannel) {
// if the channel is empty, we retry the receive operation
if (chan.isBufferEmpty()) { return C.push(ChanRecv) } // prettier-ignore

// if the channel is empty, we retry the receive operation
if (chan.isBufferEmpty()) { return C.push(ChanRecv) } // prettier-ignore
S.pop() // pop the channel address
return S.push(H.alloc(chan.recv()))
}

S.pop()
S.push(H.alloc(chan.recv()))
if (chan instanceof UnbufferedChannel) {
const recvValue = chan.recv(routineId)
// if we cannot receive, we retry the receive operation
if (recvValue === null) { return C.push(ChanRecv) } // prettier-ignore

S.pop() // pop the channel address
return S.push(H.alloc(recvValue))
}
},

ChanSendOp: (_inst, { C, S, H }) => {
const chanAddr = S.peek()
const chan = H.resolve(chanAddr) as BufferedChannel
ChanSendOp: (_inst, { C, S, H }, _sched, routineId: number) => {
const [chan, sendValue] = H.resolveM(S.peekN(2)!) as [BufferedChannel | UnbufferedChannel, any]

// if the channel is full, we retry the send operation
if (chan.isBufferFull()) { return C.push(ChanSend) } // prettier-ignore
if (chan instanceof BufferedChannel) {
// if the channel is full, we retry the send operation
if (chan.isBufferFull()) { return C.push(ChanSend) } // prettier-ignore

const [_, valueAddr] = S.popN(2)
chan.send(H.resolve(valueAddr))
S.popN(2) // pop the channel address and the value address
chan.send(sendValue)
return
}

if (chan instanceof UnbufferedChannel) {
// if we cannot send, we retry the send operation
if (!chan.send(routineId, sendValue)) { return C.push(ChanSend) } // prettier-ignore

S.popN(2) // pop the channel address and the value address
return
}
},

BranchOp: ({ cons, alt }: BranchOp, { S, C, H }) =>
Expand Down
125 changes: 125 additions & 0 deletions src/go-slang/lib/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,131 @@ class Channel {
protected setSlotValue(slotIdx: number, value: number): void { this.memory.setFloat64(this.getSlotAddr(slotIdx), value) }
}

export class UnbufferedChannel extends Channel {
static RECV_ID_OFFSET = 1
static SEND_ID_OFFSET = 3
static SYNCED_OFFSET = 7

static NULL_ID = -1

constructor(memory: DataView) { super(memory) } // prettier-ignore

public send(routineId: number, value: any): boolean {
const isSender = this.sendId === routineId
if (isSender) {
if (this.synced) {
// there is a receiver that has already taken the value
// that the current routine was trying to send previously
// therefore, the current routine can continue

// reset the channel state
this.reset()
return true
} else {
// the value hasn't been taken by a receiver
// so the routine needs to continue waiting
return false
}
}

if (this.hasSender() || this.synced) {
// if there is another sender or another sync is happening, the current routine
// cannot send the value and needs to wait
return false
}

// there is no sender and no sync happening
// the current routine can try to send the value

this.setSlotValue(0, value) // set the value to be sent
this.sendId = routineId // claim the channel as the sender

if (this.hasReceiver()) {
// if there is a receiver waiting, the current routine can proceed to start
// the sync process
this.synced = true
return true
} else {
// no receiver waiting, the current routine needs to wait
// we hand the responsibility of syncing to the receiver
return false
}
}

public recv(routineId: number): number | null {
const isReciever = this.recvId === routineId
if (isReciever) {
if (this.synced) {
// there is a value to be recieved by the current routine
// that was trying to recieve previously
// therefore, the current routine can continue

// extract the value from the channel and reset the channel state
const value = this.getSlotValue(0)
this.reset()
return value
} else {
// there is no value to be recieved so the routine needs to continue waiting
return null
}
}

if (this.hasReceiver() || this.synced) {
// if there is another reciever or another sync is happening, the current routine
// cannot try to recieve a value and needs to wait
return null
}

// there is no reciever and no sync happening
// the current routine can try to recieve a value

if (this.hasSender()) {
// if there is a sender waiting, the current routine can proceed to start
// the recieve the value and start the sync process
this.synced = true
return this.getSlotValue(0)
} else {
// no sender waiting, the current routine needs to wait
// we hand the responsibility of syncing to the sender
return null
}
}

private hasSender(): boolean { return this.sendId !== UnbufferedChannel.NULL_ID } // prettier-ignore

private hasReceiver(): boolean { return this.recvId !== UnbufferedChannel.NULL_ID } // prettier-ignore

private get recvId(): number {
return this.memory.getInt16(UnbufferedChannel.RECV_ID_OFFSET)
}

private set recvId(newRecvId: number) {
this.memory.setInt16(UnbufferedChannel.RECV_ID_OFFSET, newRecvId)
}

private get sendId(): number {
return this.memory.getInt16(UnbufferedChannel.SEND_ID_OFFSET)
}

private set sendId(newSendId: number) {
this.memory.setInt16(UnbufferedChannel.SEND_ID_OFFSET, newSendId)
}

private get synced(): boolean {
return this.memory.getUint8(UnbufferedChannel.SYNCED_OFFSET) === 1
}

private set synced(hasSynced: boolean) {
this.memory.setUint8(UnbufferedChannel.SYNCED_OFFSET, hasSynced ? 1 : 0)
}

private reset(): void {
this.sendId = UnbufferedChannel.NULL_ID
this.recvId = UnbufferedChannel.NULL_ID
this.synced = false
}
}

export class BufferedChannel extends Channel {
static READ_IDX_OFFSET = 1
static WRITE_IDX_OFFSET = 2
Expand Down
16 changes: 14 additions & 2 deletions src/go-slang/lib/heap/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {
isNode
} from '../../types'
import { AstMap } from '../astMap'
import { BufferedChannel } from '../channel'
import { BufferedChannel, UnbufferedChannel } from '../channel'
import { DEFAULT_HEAP_SIZE, WORD_SIZE } from './config'
import { PointerTag } from './tags'

Expand Down Expand Up @@ -172,6 +172,8 @@ export class Heap {
} as EnvOp
case PointerTag.PopSOp:
return PopS
case PointerTag.UnbufferedChannel:
return new UnbufferedChannel(new DataView(this.memory.buffer, mem_addr, WORD_SIZE * 2))
case PointerTag.BufferedChannel:
const chanMaxBufSize = this.size(heap_addr)
const chanMemRegion = new DataView(
Expand Down Expand Up @@ -298,8 +300,18 @@ export class Heap {
return ptr_heap_addr
}

/* Memory Layout of an BufferedChannel:
* [0:tag, 1-2:recvId, 3-4:sendId, 5-6:bufSize, 7:hasSynced] (2 words)
*/
public allocateUnbufferedChan(): HeapAddress {
throw new Error('allocateUnbufferedChan not implemented.')
const ptr_heap_addr = this.allocateTaggedPtr(PointerTag.UnbufferedChannel, 2)

const ptr_mem_addr = ptr_heap_addr * WORD_SIZE
this.memory.setInt16(ptr_mem_addr + 1, -1) // initialize recvId to -1
this.memory.setInt16(ptr_mem_addr + 3, -1) // initialize sendId to -1
this.memory.setUint8(ptr_mem_addr + 7, 0) // initialize hasSynced to false

return ptr_heap_addr
}

/* Memory Layout of an BufferedChannel:
Expand Down
3 changes: 2 additions & 1 deletion src/go-slang/lib/heap/tags.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ export enum PointerTag {
ClosureOp,
EnvOp,
PopSOp,
BufferedChannel
BufferedChannel,
UnbufferedChannel
}
Loading