Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Service as object #17

Merged
merged 19 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 50 additions & 67 deletions reactor/demux.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@ import (

"github.com/amirylm/go-options"
"github.com/amirylm/lockfree/core"
"github.com/amirylm/lockfree/ringbuffer"
"github.com/amirylm/lockfree/queue"
)

// DemuxHandler is a function that handles events
type DemuxHandler[T any] func(T)

// Selector is a predicate that selects events for a given set of handlers
type Selector[T any] func(T) bool
type Service[T any] interface {
Select(T) bool
Handle(T)
}

// Demultiplexer provides a thread-safe, non-blocking, asynchronous event processing.
// Using lock-free queues for events and control messages, and atomic pointers to manage states and workers.
Expand All @@ -28,16 +27,15 @@ type Demultiplexer[T any] interface {
Enqueue(T)
// Register registers handlers. It accepts the event selector, amount of goroutine workers
// that will be used to process events, and the handlers that will be called.
Register(id string, s Selector[T], workers int, handlers ...DemuxHandler[T])
Register(id string, s Service[T], workers int)
// Unregister unregisters handlers
Unregister(id string)
}

type service[T any] struct {
id string
handlers []DemuxHandler[T]
selector Selector[T]
workers *atomic.Int32
type serviceWrapper[T any] struct {
id string
svc Service[T]
workers *atomic.Int32
}

type control int32
Expand All @@ -49,8 +47,9 @@ const (

type controlEvent[T any] struct {
control control

svc service[T]
id string
svc Service[T]
workers int32
}

func WithEventQueue[T any](q core.Queue[T]) options.Option[demultiplexer[T]] {
Expand Down Expand Up @@ -83,10 +82,10 @@ func NewDemux[T any](opts ...options.Option[demultiplexer[T]]) Demultiplexer[T]
el := options.Apply(nil, opts...)

if el.eventQ == nil {
el.eventQ = ringbuffer.New(ringbuffer.WithCapacity[T](1024), ringbuffer.WithOverride[T]())
el.eventQ = queue.New(queue.WithCapacity[T](1024))
}
if el.controlQ == nil {
el.controlQ = ringbuffer.New(ringbuffer.WithCapacity[controlEvent[T]](32))
el.controlQ = queue.New(queue.WithCapacity[controlEvent[T]](32))
}
el.done = atomic.Pointer[context.CancelFunc]{}

Expand All @@ -106,8 +105,7 @@ func (r *demultiplexer[T]) Close() error {
func (r *demultiplexer[T]) Start(pctx context.Context) error {
ctx, cancel := context.WithCancel(pctx)
r.done.Store(&cancel)

services := make([]service[T], 0)
services := make([]serviceWrapper[T], 0)
for ctx.Err() == nil {
c, ok := r.controlQ.Dequeue()
if ok {
Expand All @@ -128,98 +126,83 @@ func (r *demultiplexer[T]) Start(pctx context.Context) error {

// Register will add the given service (id, selector and handlers).
// Note that we filter existing IDs, one must use Unregister ID before trying to register.
func (r *demultiplexer[T]) Register(serviceID string, selector Selector[T], workers int, handlers ...DemuxHandler[T]) {
if len(handlers) == 0 {
return
}
aworkers := &atomic.Int32{}
aworkers.Store(int32(workers))
func (r *demultiplexer[T]) Register(serviceID string, service Service[T], workers int) {
r.controlQ.Enqueue(controlEvent[T]{
control: registerService,
svc: service[T]{
id: serviceID,
handlers: handlers,
selector: selector,
workers: aworkers,
},
id: serviceID,
svc: service,
workers: int32(workers),
})
}

func (r *demultiplexer[T]) Unregister(serviceID string) {
r.controlQ.Enqueue(controlEvent[T]{
control: unregisterService,
svc: service[T]{
id: serviceID,
},
id: serviceID,
})
}

func (r *demultiplexer[T]) Enqueue(t T) {
r.eventQ.Enqueue(t)
}

func (r *demultiplexer[T]) selectServices(t T, services ...service[T]) []service[T] {
var selected []service[T]
for _, svc := range services {
if svc.selector != nil && svc.selector(t) {
selected = append(selected, svc)
func (r *demultiplexer[T]) selectServices(t T, serviceWrappers ...serviceWrapper[T]) []serviceWrapper[T] {
var selected []serviceWrapper[T]
for _, s := range serviceWrappers {
if s.svc != nil && s.svc.Select(t) {
selected = append(selected, s)
}
}
return selected
}

// handleEvent handles an event by calling the appropriate handlers.
// Runs in an event thread, and might spawn worker threads.
func (r *demultiplexer[T]) handleEvent(t T, services ...service[T]) {
for _, svc := range services {
if svc.workers.Load() <= 0 {
func (r *demultiplexer[T]) handleEvent(t T, services ...serviceWrapper[T]) {
for _, s := range services {
if s.workers.Load() <= 0 {
// if there are no available workers, run on the event thread
r.invokeHandlers(r.clone(t), svc.handlers...)
s.svc.Handle(r.clone(t))
continue
}
svc.workers.Add(-1)
go func(t T, workers *atomic.Int32, handlers ...DemuxHandler[T]) {
s.workers.Add(-1)
go func(t T, workers *atomic.Int32, svc Service[T]) {
defer workers.Add(1)
r.invokeHandlers(t, handlers...)
}(r.clone(t), svc.workers, svc.handlers...)
svc.Handle(r.clone(t))
}(r.clone(t), s.workers, s.svc)
}
}

func (r *demultiplexer[T]) handleControl(services []service[T], ce *controlEvent[T]) []service[T] {
func (r *demultiplexer[T]) handleControl(serviceWrappers []serviceWrapper[T], ce *controlEvent[T]) []serviceWrapper[T] {
switch ce.control {
case registerService:
for _, svc := range services {
if svc.id == ce.svc.id {
return services
for _, s := range serviceWrappers {
if s.id == ce.id {
return serviceWrappers
}
}
return append(services, service[T]{
id: ce.svc.id,
handlers: ce.svc.handlers,
selector: ce.svc.selector,
workers: ce.svc.workers,
workers := &atomic.Int32{}
workers.Store(ce.workers)
return append(serviceWrappers, serviceWrapper[T]{
id: ce.id,
svc: ce.svc,
workers: workers,
})
case unregisterService:
updated := make([]service[T], len(services))
updated := make([]serviceWrapper[T], len(serviceWrappers))
i := 0
for _, svc := range services {
if svc.id != ce.svc.id {
updated[i] = svc
for _, s := range serviceWrappers {
if s.id != ce.id {
updated[i] = s
i++
}
}
if i == 0 {
return []service[T]{}
return []serviceWrapper[T]{}
}
return updated[:i]
default:
return services
}
}

func (r *demultiplexer[T]) invokeHandlers(t T, handlers ...DemuxHandler[T]) {
for _, h := range handlers {
h(t)
return serviceWrappers
}
}

Expand Down
Loading