diff --git a/grpc/grpc.go b/grpc/grpc.go index 496536e..b83a485 100644 --- a/grpc/grpc.go +++ b/grpc/grpc.go @@ -117,12 +117,14 @@ func (mi *ModuleInstance) stream(c goja.ConstructorCall) *goja.Object { p.SetSystemTags(mi.vu.State(), client.addr, methodName) + logger := mi.vu.State().Logger.WithField("streamMethod", methodName) + s := &stream{ vu: mi.vu, client: client, methodDescriptor: methodDescriptor, method: methodName, - logger: mi.vu.State().Logger, + logger: logger, tq: taskqueue.New(mi.vu.RegisterCallback), diff --git a/grpc/stream.go b/grpc/stream.go index 54b6034..e89eaf4 100644 --- a/grpc/stream.go +++ b/grpc/stream.go @@ -234,12 +234,7 @@ func (s *stream) writeData(wg *sync.WaitGroup) { err := s.stream.Send(msg.msg) if err != nil { - s.logger.WithError(err).Error("failed to send data to the stream") - - s.tq.Queue(func() error { - return s.closeWithError(err) - }) - + s.processSendError(err) return } @@ -284,6 +279,17 @@ func (s *stream) writeData(wg *sync.WaitGroup) { } } +func (s *stream) processSendError(err error) { + if errors.Is(err, io.EOF) { + s.logger.WithError(err).Debug("skip sending a message stream is cancelled/finished") + err = nil + } + + s.tq.Queue(func() error { + return s.closeWithError(err) + }) +} + // on registers a listener for a certain event type func (s *stream) on(event string, listener func(goja.Value) (goja.Value, error)) { if err := s.eventListeners.add(event, listener); err != nil { @@ -323,10 +329,19 @@ func (s *stream) end() { } func (s *stream) closeWithError(err error) error { + s.close(err) + + return s.callErrorListeners(err) +} + +// close changes the stream state to closed and triggers the end event listeners +func (s *stream) close(err error) { if s.state == closed { - return nil + return } + s.logger.WithError(err).Debug("stream is closing") + s.state = closed close(s.done) s.tq.Queue(func() error { @@ -336,24 +351,24 @@ func (s *stream) closeWithError(err error) error { if s.timeoutCancel != nil { s.timeoutCancel() } - - s.logger.WithError(err).Debug("connection closed") - - if err != nil { - if errList := s.callErrorListeners(err); errList != nil { - return errList - } - } - - return nil } func (s *stream) callErrorListeners(e error) error { + if e == nil { + return nil + } + rt := s.vu.Runtime() obj := extractError(e) - for _, errorListener := range s.eventListeners.all(eventError) { + list := s.eventListeners.all(eventError) + + if len(list) == 0 { + s.logger.Warnf("no handlers for error registered, but an error happened: %s", e) + } + + for _, errorListener := range list { if _, err := errorListener(rt.ToValue(obj)); err != nil { return err }