diff --git a/pub.go b/pub.go index e164c7e..4d7f8c9 100644 --- a/pub.go +++ b/pub.go @@ -177,13 +177,11 @@ type pubMWriter struct { ctx context.Context mu sync.Mutex ws []*msgWriter - sem *semaphore } func newPubMWriter(ctx context.Context) *pubMWriter { return &pubMWriter{ ctx: ctx, - sem: newSemaphore(), } } @@ -203,7 +201,6 @@ func (w *pubMWriter) Close() error { func (mw *pubMWriter) addConn(w *msgWriter) { mw.mu.Lock() - mw.sem.enable() mw.ws = append(mw.ws, w) mw.mu.Unlock() } @@ -225,7 +222,6 @@ func (mw *pubMWriter) rmConn(w *msgWriter) { } func (w *pubMWriter) write(ctx context.Context, msg Msg) error { - w.sem.lock() grp, ctx := errgroup.WithContext(ctx) w.mu.Lock() topic := string(msg.Frames[0]) diff --git a/zmq4_pubsub_test.go b/zmq4_pubsub_test.go index c46b32f..e6df148 100644 --- a/zmq4_pubsub_test.go +++ b/zmq4_pubsub_test.go @@ -55,6 +55,31 @@ type testCasePubSub struct { sub2 zmq4.Socket } +func TestNotBlockingSendOnPub(t *testing.T) { + + pub := zmq4.NewPub(context.Background()) + defer pub.Close() + + err := pub.Listen(must(EndPoint("tcp"))) + if err != nil { + t.Fatalf("could not listen on end point: %v", err) + } + + errc := make(chan error) + go func() { + errc <- pub.Send(zmq4.NewMsg([]byte("blocked?"))) + }() + + select { + case <-time.After(5 * time.Second): + t.Fatalf("pub socket should not block!") + case err := <-errc: + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + } +} + func TestPubSub(t *testing.T) { var ( topics = []string{"", "MSG", "msg"}