From f49bc03bfcb95bd2faf6d8f40dfbfbff61a713c4 Mon Sep 17 00:00:00 2001 From: Bryan Paluch Date: Wed, 14 Oct 2020 05:02:08 -0400 Subject: [PATCH] zmq4: implement Topics interface for xpub sockets Co-authored-by: Bryan Paluch --- pub.go | 21 +-------------------- socket.go | 23 +++++++++++++++++++++++ xpub.go | 4 ++++ zmq4_xpubsub_test.go | 5 +++++ 4 files changed, 33 insertions(+), 20 deletions(-) diff --git a/pub.go b/pub.go index 04a509b..7a7b789 100644 --- a/pub.go +++ b/pub.go @@ -7,7 +7,6 @@ package zmq4 import ( "context" "net" - "sort" "sync" "golang.org/x/xerrors" @@ -113,25 +112,7 @@ func (pub *pubSocket) SetOption(name string, value interface{}) error { // Topics returns the sorted list of topics a socket is subscribed to. func (pub *pubSocket) Topics() []string { - var ( - keys = make(map[string]struct{}) - topics []string - ) - pub.sck.mu.RLock() - for _, con := range pub.sck.conns { - con.mu.RLock() - for topic := range con.topics { - if _, dup := keys[topic]; dup { - continue - } - keys[topic] = struct{}{} - topics = append(topics, topic) - } - con.mu.RUnlock() - } - pub.sck.mu.RUnlock() - sort.Strings(topics) - return topics + return pub.sck.topics() } // pubQReader is a queued-message reader. diff --git a/socket.go b/socket.go index f81e171..849bc6f 100644 --- a/socket.go +++ b/socket.go @@ -9,6 +9,7 @@ import ( "log" "net" "os" + "sort" "strings" "sync" "time" @@ -91,6 +92,28 @@ func newSocket(ctx context.Context, sockType SocketType, opts ...Option) *socket return sck } +func (sck *socket) topics() []string { + var ( + keys = make(map[string]struct{}) + topics []string + ) + sck.mu.RLock() + for _, con := range sck.conns { + con.mu.RLock() + for topic := range con.topics { + if _, dup := keys[topic]; dup { + continue + } + keys[topic] = struct{}{} + topics = append(topics, topic) + } + con.mu.RUnlock() + } + sck.mu.RUnlock() + sort.Strings(topics) + return topics +} + // Close closes the open Socket func (sck *socket) Close() error { sck.cancel() diff --git a/xpub.go b/xpub.go index d8d7211..8bea95e 100644 --- a/xpub.go +++ b/xpub.go @@ -77,6 +77,10 @@ func (xpub *xpubSocket) SetOption(name string, value interface{}) error { return xpub.sck.SetOption(name, value) } +func (xpub *xpubSocket) Topics() []string { + return xpub.sck.topics() +} + var ( _ Socket = (*xpubSocket)(nil) ) diff --git a/zmq4_xpubsub_test.go b/zmq4_xpubsub_test.go index f33e2cf..5e92cab 100644 --- a/zmq4_xpubsub_test.go +++ b/zmq4_xpubsub_test.go @@ -113,6 +113,11 @@ func TestXPubSub(t *testing.T) { time.Sleep(1 * time.Second) + gotTopics := tc.xpub.(zmq4.Topics).Topics() + if !reflect.DeepEqual(gotTopics, topics) { + t.Fatalf("Missing or wrong topics.\ngot= %q\nwant=%q", gotTopics, topics) + } + for _, msg := range msgs[0] { err = tc.xpub.Send(msg) if err != nil {