From 7ede275a1033ae62dfaa6d7c5bdccbbfd1aa5d8f Mon Sep 17 00:00:00 2001 From: Peter Wood Date: Tue, 25 May 2021 16:13:28 +0100 Subject: [PATCH] Refactor unpi broker to expose Reading and Writing functions, causes a breaking change that means a Broker must be Start()'d. --- broker/broker.go | 14 +++++++++++--- broker/listen_test.go | 2 ++ broker/receiving.go | 3 +-- broker/request_test.go | 3 +++ broker/requestresponse_test.go | 8 ++++++++ broker/sending.go | 2 +- 6 files changed, 26 insertions(+), 6 deletions(-) diff --git a/broker/broker.go b/broker/broker.go index 0ad6516..3e12220 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -1,15 +1,22 @@ package broker import ( + "github.com/shimmeringbee/unpi" . "github.com/shimmeringbee/unpi/library" "io" "sync" ) +type FrameReader func(r io.Reader) (unpi.Frame, error) +type FrameWriter func(w io.Writer, frame unpi.Frame) error + type Broker struct { reader io.Reader writer io.Writer + FrameReader FrameReader + FrameWriter FrameWriter + sendingChannel chan outgoingFrame sendingEnd chan bool @@ -30,6 +37,9 @@ func NewBroker(reader io.Reader, writer io.Writer, ml *Library) *Broker { reader: reader, writer: writer, + FrameReader: unpi.Read, + FrameWriter: unpi.Write, + sendingChannel: make(chan outgoingFrame, PermittedQueuedRequests), sendingEnd: make(chan bool), @@ -44,12 +54,10 @@ func NewBroker(reader io.Reader, writer io.Writer, ml *Library) *Broker { messageLibrary: ml, } - z.start() - return z } -func (b *Broker) start() { +func (b *Broker) Start() { go b.handleSending() go b.handleReceiving() } diff --git a/broker/listen_test.go b/broker/listen_test.go index 71a63eb..86e39af 100644 --- a/broker/listen_test.go +++ b/broker/listen_test.go @@ -15,6 +15,7 @@ func TestBroker_listen(t *testing.T) { m := testunpi.NewMockAdapter() defer m.Stop() b := NewBroker(m, m, ml) + b.Start() defer b.Stop() awaitOneMatch := false @@ -46,6 +47,7 @@ func TestBroker_listen(t *testing.T) { m := testunpi.NewMockAdapter() defer m.Stop() b := NewBroker(m, m, ml) + b.Start() defer b.Stop() awaitOneMatch := false diff --git a/broker/receiving.go b/broker/receiving.go index 23891bb..8c13c93 100644 --- a/broker/receiving.go +++ b/broker/receiving.go @@ -2,7 +2,6 @@ package broker import ( "errors" - . "github.com/shimmeringbee/unpi" "io" "log" "syscall" @@ -10,7 +9,7 @@ import ( func (b *Broker) handleReceiving() { for { - frame, err := Read(b.reader) + frame, err := b.FrameReader(b.reader) if err != nil { switch e := err.(type) { diff --git a/broker/request_test.go b/broker/request_test.go index 5e2130b..dcd05d4 100644 --- a/broker/request_test.go +++ b/broker/request_test.go @@ -20,6 +20,7 @@ func TestRequest(t *testing.T) { m := testunpi.NewMockAdapter() defer m.Stop() b := NewBroker(m, m, ml) + b.Start() defer b.Stop() m.On(AREQ, SYS, 0x01) @@ -44,6 +45,7 @@ func TestRequest(t *testing.T) { m := testunpi.NewMockAdapter() defer m.Stop() b := NewBroker(m, m, ml) + b.Start() defer b.Stop() request := Request{} @@ -62,6 +64,7 @@ func TestRequest(t *testing.T) { m := testunpi.NewMockAdapter() defer m.Stop() b := NewBroker(m, m, ml) + b.Start() defer b.Stop() request := Request{} diff --git a/broker/requestresponse_test.go b/broker/requestresponse_test.go index 6621a68..60e63c0 100644 --- a/broker/requestresponse_test.go +++ b/broker/requestresponse_test.go @@ -28,6 +28,7 @@ func TestRequestResponse(t *testing.T) { m := testunpi.NewMockAdapter() defer m.Stop() b := NewBroker(m, m, ml) + b.Start() defer b.Stop() expectedResponse := Response{Value: 42} @@ -70,6 +71,7 @@ func TestRequestResponse(t *testing.T) { m := testunpi.NewMockAdapter() defer m.Stop() b := NewBroker(m, m, ml) + b.Start() defer b.Stop() expectedResponse := Response{Value: 42} @@ -111,6 +113,7 @@ func TestRequestResponse(t *testing.T) { m := testunpi.NewMockAdapter() defer m.Stop() b := NewBroker(m, m, ml) + b.Start() defer b.Stop() m.On(SREQ, SYS, 0x01) @@ -142,6 +145,7 @@ func TestBroker_Await(t *testing.T) { m := testunpi.NewMockAdapter() defer m.Stop() b := NewBroker(m, m, ml) + b.Start() defer b.Stop() expectedResponse := Response{Value: 0x42} @@ -178,6 +182,7 @@ func TestBroker_Await(t *testing.T) { m := testunpi.NewMockAdapter() defer m.Stop() b := NewBroker(m, m, ml) + b.Start() defer b.Stop() ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) @@ -203,6 +208,7 @@ func TestBroker_Await(t *testing.T) { m := testunpi.NewMockAdapter() defer m.Stop() b := NewBroker(m, m, ml) + b.Start() defer b.Stop() ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond) @@ -228,6 +234,7 @@ func TestBroker_Subscribe(t *testing.T) { m := testunpi.NewMockAdapter() defer m.Stop() b := NewBroker(m, m, ml) + b.Start() defer b.Stop() err, subCancel := b.Subscribe(&Message{}, func(v interface{}) {}) @@ -251,6 +258,7 @@ func TestBroker_Subscribe(t *testing.T) { m := testunpi.NewMockAdapter() defer m.Stop() b := NewBroker(m, m, ml) + b.Start() defer b.Stop() called := 0 diff --git a/broker/sending.go b/broker/sending.go index dba4b09..7261416 100644 --- a/broker/sending.go +++ b/broker/sending.go @@ -11,7 +11,7 @@ func (b *Broker) handleSending() { for { select { case outgoing := <-b.sendingChannel: - outgoing.ErrorChannel <- Write(b.writer, outgoing.Frame) + outgoing.ErrorChannel <- b.FrameWriter(b.writer, outgoing.Frame) case <-b.sendingEnd: return }