Skip to content

Commit

Permalink
zmq4: implement Topics interface for xpub sockets
Browse files Browse the repository at this point in the history
Co-authored-by: Bryan Paluch <[email protected]>
  • Loading branch information
bryanpaluch and Bryan Paluch authored Oct 14, 2020
1 parent 9a6a79c commit f49bc03
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 20 deletions.
21 changes: 1 addition & 20 deletions pub.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package zmq4
import (
"context"
"net"
"sort"
"sync"

"golang.org/x/xerrors"
Expand Down Expand Up @@ -113,25 +112,7 @@ func (pub *pubSocket) SetOption(name string, value interface{}) error {

// Topics returns the sorted list of topics a socket is subscribed to.
func (pub *pubSocket) Topics() []string {
var (
keys = make(map[string]struct{})
topics []string
)
pub.sck.mu.RLock()
for _, con := range pub.sck.conns {
con.mu.RLock()
for topic := range con.topics {
if _, dup := keys[topic]; dup {
continue
}
keys[topic] = struct{}{}
topics = append(topics, topic)
}
con.mu.RUnlock()
}
pub.sck.mu.RUnlock()
sort.Strings(topics)
return topics
return pub.sck.topics()
}

// pubQReader is a queued-message reader.
Expand Down
23 changes: 23 additions & 0 deletions socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"log"
"net"
"os"
"sort"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -91,6 +92,28 @@ func newSocket(ctx context.Context, sockType SocketType, opts ...Option) *socket
return sck
}

func (sck *socket) topics() []string {
var (
keys = make(map[string]struct{})
topics []string
)
sck.mu.RLock()
for _, con := range sck.conns {
con.mu.RLock()
for topic := range con.topics {
if _, dup := keys[topic]; dup {
continue
}
keys[topic] = struct{}{}
topics = append(topics, topic)
}
con.mu.RUnlock()
}
sck.mu.RUnlock()
sort.Strings(topics)
return topics
}

// Close closes the open Socket
func (sck *socket) Close() error {
sck.cancel()
Expand Down
4 changes: 4 additions & 0 deletions xpub.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ func (xpub *xpubSocket) SetOption(name string, value interface{}) error {
return xpub.sck.SetOption(name, value)
}

func (xpub *xpubSocket) Topics() []string {
return xpub.sck.topics()
}

var (
_ Socket = (*xpubSocket)(nil)
)
5 changes: 5 additions & 0 deletions zmq4_xpubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ func TestXPubSub(t *testing.T) {

time.Sleep(1 * time.Second)

gotTopics := tc.xpub.(zmq4.Topics).Topics()
if !reflect.DeepEqual(gotTopics, topics) {
t.Fatalf("Missing or wrong topics.\ngot= %q\nwant=%q", gotTopics, topics)
}

for _, msg := range msgs[0] {
err = tc.xpub.Send(msg)
if err != nil {
Expand Down

0 comments on commit f49bc03

Please sign in to comment.