Skip to content

Commit

Permalink
Refactor unpi broker to expose Reading and Writing functions, causes …
Browse files Browse the repository at this point in the history
…a breaking change that means a Broker must be Start()'d.
  • Loading branch information
pwood committed May 25, 2021
1 parent 862f129 commit 7ede275
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 6 deletions.
14 changes: 11 additions & 3 deletions broker/broker.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
package broker

import (
"github.com/shimmeringbee/unpi"
. "github.com/shimmeringbee/unpi/library"
"io"
"sync"
)

type FrameReader func(r io.Reader) (unpi.Frame, error)
type FrameWriter func(w io.Writer, frame unpi.Frame) error

type Broker struct {
reader io.Reader
writer io.Writer

FrameReader FrameReader
FrameWriter FrameWriter

sendingChannel chan outgoingFrame
sendingEnd chan bool

Expand All @@ -30,6 +37,9 @@ func NewBroker(reader io.Reader, writer io.Writer, ml *Library) *Broker {
reader: reader,
writer: writer,

FrameReader: unpi.Read,
FrameWriter: unpi.Write,

sendingChannel: make(chan outgoingFrame, PermittedQueuedRequests),
sendingEnd: make(chan bool),

Expand All @@ -44,12 +54,10 @@ func NewBroker(reader io.Reader, writer io.Writer, ml *Library) *Broker {
messageLibrary: ml,
}

z.start()

return z
}

func (b *Broker) start() {
func (b *Broker) Start() {
go b.handleSending()
go b.handleReceiving()
}
Expand Down
2 changes: 2 additions & 0 deletions broker/listen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func TestBroker_listen(t *testing.T) {
m := testunpi.NewMockAdapter()
defer m.Stop()
b := NewBroker(m, m, ml)
b.Start()
defer b.Stop()

awaitOneMatch := false
Expand Down Expand Up @@ -46,6 +47,7 @@ func TestBroker_listen(t *testing.T) {
m := testunpi.NewMockAdapter()
defer m.Stop()
b := NewBroker(m, m, ml)
b.Start()
defer b.Stop()

awaitOneMatch := false
Expand Down
3 changes: 1 addition & 2 deletions broker/receiving.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@ package broker

import (
"errors"
. "github.com/shimmeringbee/unpi"
"io"
"log"
"syscall"
)

func (b *Broker) handleReceiving() {
for {
frame, err := Read(b.reader)
frame, err := b.FrameReader(b.reader)

if err != nil {
switch e := err.(type) {
Expand Down
3 changes: 3 additions & 0 deletions broker/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func TestRequest(t *testing.T) {
m := testunpi.NewMockAdapter()
defer m.Stop()
b := NewBroker(m, m, ml)
b.Start()
defer b.Stop()

m.On(AREQ, SYS, 0x01)
Expand All @@ -44,6 +45,7 @@ func TestRequest(t *testing.T) {
m := testunpi.NewMockAdapter()
defer m.Stop()
b := NewBroker(m, m, ml)
b.Start()
defer b.Stop()

request := Request{}
Expand All @@ -62,6 +64,7 @@ func TestRequest(t *testing.T) {
m := testunpi.NewMockAdapter()
defer m.Stop()
b := NewBroker(m, m, ml)
b.Start()
defer b.Stop()

request := Request{}
Expand Down
8 changes: 8 additions & 0 deletions broker/requestresponse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func TestRequestResponse(t *testing.T) {
m := testunpi.NewMockAdapter()
defer m.Stop()
b := NewBroker(m, m, ml)
b.Start()
defer b.Stop()

expectedResponse := Response{Value: 42}
Expand Down Expand Up @@ -70,6 +71,7 @@ func TestRequestResponse(t *testing.T) {
m := testunpi.NewMockAdapter()
defer m.Stop()
b := NewBroker(m, m, ml)
b.Start()
defer b.Stop()

expectedResponse := Response{Value: 42}
Expand Down Expand Up @@ -111,6 +113,7 @@ func TestRequestResponse(t *testing.T) {
m := testunpi.NewMockAdapter()
defer m.Stop()
b := NewBroker(m, m, ml)
b.Start()
defer b.Stop()

m.On(SREQ, SYS, 0x01)
Expand Down Expand Up @@ -142,6 +145,7 @@ func TestBroker_Await(t *testing.T) {
m := testunpi.NewMockAdapter()
defer m.Stop()
b := NewBroker(m, m, ml)
b.Start()
defer b.Stop()

expectedResponse := Response{Value: 0x42}
Expand Down Expand Up @@ -178,6 +182,7 @@ func TestBroker_Await(t *testing.T) {
m := testunpi.NewMockAdapter()
defer m.Stop()
b := NewBroker(m, m, ml)
b.Start()
defer b.Stop()

ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
Expand All @@ -203,6 +208,7 @@ func TestBroker_Await(t *testing.T) {
m := testunpi.NewMockAdapter()
defer m.Stop()
b := NewBroker(m, m, ml)
b.Start()
defer b.Stop()

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
Expand All @@ -228,6 +234,7 @@ func TestBroker_Subscribe(t *testing.T) {
m := testunpi.NewMockAdapter()
defer m.Stop()
b := NewBroker(m, m, ml)
b.Start()
defer b.Stop()

err, subCancel := b.Subscribe(&Message{}, func(v interface{}) {})
Expand All @@ -251,6 +258,7 @@ func TestBroker_Subscribe(t *testing.T) {
m := testunpi.NewMockAdapter()
defer m.Stop()
b := NewBroker(m, m, ml)
b.Start()
defer b.Stop()

called := 0
Expand Down
2 changes: 1 addition & 1 deletion broker/sending.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ func (b *Broker) handleSending() {
for {
select {
case outgoing := <-b.sendingChannel:
outgoing.ErrorChannel <- Write(b.writer, outgoing.Frame)
outgoing.ErrorChannel <- b.FrameWriter(b.writer, outgoing.Frame)
case <-b.sendingEnd:
return
}
Expand Down

0 comments on commit 7ede275

Please sign in to comment.