Skip to content

Commit

Permalink
zmq4: make sure PUB sockets do not block on SEND
Browse files Browse the repository at this point in the history
Fixes #33.
  • Loading branch information
sbinet committed Nov 8, 2018
1 parent 94bac15 commit 91805d6
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 4 deletions.
4 changes: 0 additions & 4 deletions pub.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,11 @@ type pubMWriter struct {
ctx context.Context
mu sync.Mutex
ws []*msgWriter
sem *semaphore
}

func newPubMWriter(ctx context.Context) *pubMWriter {
return &pubMWriter{
ctx: ctx,
sem: newSemaphore(),
}
}

Expand All @@ -203,7 +201,6 @@ func (w *pubMWriter) Close() error {

func (mw *pubMWriter) addConn(w *msgWriter) {
mw.mu.Lock()
mw.sem.enable()
mw.ws = append(mw.ws, w)
mw.mu.Unlock()
}
Expand All @@ -225,7 +222,6 @@ func (mw *pubMWriter) rmConn(w *msgWriter) {
}

func (w *pubMWriter) write(ctx context.Context, msg Msg) error {
w.sem.lock()
grp, ctx := errgroup.WithContext(ctx)
w.mu.Lock()
topic := string(msg.Frames[0])
Expand Down
25 changes: 25 additions & 0 deletions zmq4_pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,31 @@ type testCasePubSub struct {
sub2 zmq4.Socket
}

func TestNotBlockingSendOnPub(t *testing.T) {

pub := zmq4.NewPub(context.Background())
defer pub.Close()

err := pub.Listen(must(EndPoint("tcp")))
if err != nil {
t.Fatalf("could not listen on end point: %v", err)
}

errc := make(chan error)
go func() {
errc <- pub.Send(zmq4.NewMsg([]byte("blocked?")))
}()

select {
case <-time.After(5 * time.Second):
t.Fatalf("pub socket should not block!")
case err := <-errc:
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
}
}

func TestPubSub(t *testing.T) {
var (
topics = []string{"", "MSG", "msg"}
Expand Down

0 comments on commit 91805d6

Please sign in to comment.