Skip to content

Commit

Permalink
Merge dev into master (#45)
Browse files Browse the repository at this point in the history
* Stop the ticker when calling stop on SendRepeater

* replaced GGQ with RingBuffer implementation
  • Loading branch information
anthdm authored Aug 14, 2023
1 parent 011a4bc commit 57c621d
Show file tree
Hide file tree
Showing 10 changed files with 240 additions and 275 deletions.
1 change: 1 addition & 0 deletions actor/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func (sr SendRepeater) start() {
case <-ticker.C:
sr.engine.SendWithSender(sr.target, sr.msg, sr.self)
case <-sr.cancelch:
ticker.Stop()
return
}
}
Expand Down
9 changes: 3 additions & 6 deletions actor/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,8 @@ func TestSendMsgRaceCon(t *testing.T) {

for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
e.Send(pid, []byte("f"))
wg.Done()
}()
e.Send(pid, []byte("f"))
wg.Done()
}
wg.Wait()
}
Expand Down Expand Up @@ -231,8 +229,7 @@ func TestRequestResponse(t *testing.T) {
// 56 ns/op
func BenchmarkSendMessageLocal(b *testing.B) {
e := NewEngine()
p := NewTestProducer(nil, func(_ *testing.T, _ *Context) {})
pid := e.Spawn(p, "bench", WithInboxSize(1024*8))
pid := e.SpawnFunc(func(_ *Context) {}, "bench", WithInboxSize(128))

b.ResetTimer()
b.Run("send_message_local", func(b *testing.B) {
Expand Down
95 changes: 66 additions & 29 deletions actor/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,36 @@ package actor

import (
"runtime"
"sync/atomic"

"github.com/anthdm/hollywood/ggq"
"github.com/anthdm/hollywood/log"
"github.com/anthdm/hollywood/ringbuffer"
)

var LOCK_OS_THREAD = true
const defaultThroughput = 300

const (
idle int32 = iota
running
)

type Scheduler interface {
Schedule(fn func())
Throughput() int
}

type goscheduler int

func (goscheduler) Schedule(fn func()) {
go fn()
}

func (sched goscheduler) Throughput() int {
return int(sched)
}

func NewScheduler(throughput int) Scheduler {
return goscheduler(throughput)
}

type Inboxer interface {
Send(Envelope)
Expand All @@ -16,43 +40,56 @@ type Inboxer interface {
}

type Inbox struct {
ggq *ggq.GGQ[Envelope]
proc Processer
rb *ringbuffer.RingBuffer[Envelope]
proc Processer
scheduler Scheduler
procStatus int32
}

func NewInbox(size int) *Inbox {
in := &Inbox{}
in.ggq = ggq.New[Envelope](uint32(size), in)
return in
return &Inbox{
rb: ringbuffer.New[Envelope](int64(size)),
scheduler: NewScheduler(defaultThroughput),
}
}

func (in *Inbox) Consume(msgs []Envelope) {
in.proc.Invoke(msgs)
func (in *Inbox) Send(msg Envelope) {
in.rb.Push(msg)
in.schedule()
}

func (in *Inbox) Start(proc Processer) {
in.proc = proc
var lockOSThread bool
// prevent race condition here be reassigning before go routine.
if LOCK_OS_THREAD {
lockOSThread = true
func (in *Inbox) schedule() {
if atomic.CompareAndSwapInt32(&in.procStatus, idle, running) {
in.scheduler.Schedule(in.process)
}
go func() {
if lockOSThread {
runtime.LockOSThread()
}

func (in *Inbox) process() {
in.run()
atomic.StoreInt32(&in.procStatus, idle)
}

func (in *Inbox) run() {
i, t := 0, in.scheduler.Throughput()
for {
if i > t {
i = 0
runtime.Gosched()
}
in.ggq.ReadN()
}()
log.Tracew("[INBOX] started", log.M{"pid": proc.PID()})
i++

if msg, ok := in.rb.Pop(); ok {
in.proc.Invoke([]Envelope{msg})
} else {
return
}
}
}

func (in *Inbox) Stop() error {
in.ggq.Close()
log.Tracew("[INBOX] closed", log.M{"pid": in.proc.PID()})
return nil
func (in *Inbox) Start(proc Processer) {
in.proc = proc
}

func (in *Inbox) Send(msg Envelope) {
in.ggq.Awake()
in.ggq.Write(msg)
func (in *Inbox) Stop() error {
return nil
}
16 changes: 10 additions & 6 deletions examples/helloworld/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package main

import (
"fmt"
"time"
"sync"

"github.com/anthdm/hollywood/actor"
)
Expand All @@ -20,17 +20,21 @@ func newFoo() actor.Receiver {
func (f *foo) Receive(ctx *actor.Context) {
switch msg := ctx.Message().(type) {
case actor.Started:
fmt.Println("foo started")
fmt.Println("actor started")
case actor.Stopped:
fmt.Println("actor stopped")
case *message:
fmt.Println("foo has received", msg.data)
fmt.Println("actor has received", msg.data)
}
}

func main() {
engine := actor.NewEngine()
pid := engine.Spawn(newFoo, "foo")
for i := 0; i < 99; i++ {
pid := engine.Spawn(newFoo, "my_actor")
for i := 0; i < 100; i++ {
engine.Send(pid, &message{data: "hello world!"})
}
time.Sleep(time.Second * 1)
wg := sync.WaitGroup{}
engine.Poison(pid, &wg)
wg.Wait()
}
28 changes: 28 additions & 0 deletions examples/ttt/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package main

import (
"fmt"
"sync"
"time"

"github.com/anthdm/hollywood/actor"
)

func main() {
e := actor.NewEngine()
pid := e.SpawnFunc(func(c *actor.Context) {
switch msg := c.Message().(type) {
case actor.Started:
fmt.Println("started")
case actor.Stopped:
fmt.Println("stopped")
default:
_ = msg
}
}, "foobarbas")

wg := sync.WaitGroup{}
e.Poison(pid, &wg)
wg.Wait()
time.Sleep(time.Second * 2)
}
171 changes: 0 additions & 171 deletions ggq/ggq.go

This file was deleted.

Loading

0 comments on commit 57c621d

Please sign in to comment.