Skip to content
This repository has been archived by the owner on May 21, 2024. It is now read-only.

Commit

Permalink
fix: omit reporting EOF errors (#19)
Browse files Browse the repository at this point in the history
* fix: omit reporting EOF errors
+ always try to trigger error listeners

---------

Co-authored-by: Mihail Stoykov <[email protected]>
  • Loading branch information
olegbespalov and mstoykov authored Jul 10, 2023
1 parent 0cf059a commit d2b2ce4
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 19 deletions.
4 changes: 3 additions & 1 deletion grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,14 @@ func (mi *ModuleInstance) stream(c goja.ConstructorCall) *goja.Object {

p.SetSystemTags(mi.vu.State(), client.addr, methodName)

logger := mi.vu.State().Logger.WithField("streamMethod", methodName)

s := &stream{
vu: mi.vu,
client: client,
methodDescriptor: methodDescriptor,
method: methodName,
logger: mi.vu.State().Logger,
logger: logger,

tq: taskqueue.New(mi.vu.RegisterCallback),

Expand Down
51 changes: 33 additions & 18 deletions grpc/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,12 +234,7 @@ func (s *stream) writeData(wg *sync.WaitGroup) {

err := s.stream.Send(msg.msg)
if err != nil {
s.logger.WithError(err).Error("failed to send data to the stream")

s.tq.Queue(func() error {
return s.closeWithError(err)
})

s.processSendError(err)
return
}

Expand Down Expand Up @@ -284,6 +279,17 @@ func (s *stream) writeData(wg *sync.WaitGroup) {
}
}

func (s *stream) processSendError(err error) {
if errors.Is(err, io.EOF) {
s.logger.WithError(err).Debug("skip sending a message stream is cancelled/finished")
err = nil
}

s.tq.Queue(func() error {
return s.closeWithError(err)
})
}

// on registers a listener for a certain event type
func (s *stream) on(event string, listener func(goja.Value) (goja.Value, error)) {
if err := s.eventListeners.add(event, listener); err != nil {
Expand Down Expand Up @@ -323,10 +329,19 @@ func (s *stream) end() {
}

func (s *stream) closeWithError(err error) error {
s.close(err)

return s.callErrorListeners(err)
}

// close changes the stream state to closed and triggers the end event listeners
func (s *stream) close(err error) {
if s.state == closed {
return nil
return
}

s.logger.WithError(err).Debug("stream is closing")

s.state = closed
close(s.done)
s.tq.Queue(func() error {
Expand All @@ -336,24 +351,24 @@ func (s *stream) closeWithError(err error) error {
if s.timeoutCancel != nil {
s.timeoutCancel()
}

s.logger.WithError(err).Debug("connection closed")

if err != nil {
if errList := s.callErrorListeners(err); errList != nil {
return errList
}
}

return nil
}

func (s *stream) callErrorListeners(e error) error {
if e == nil {
return nil
}

rt := s.vu.Runtime()

obj := extractError(e)

for _, errorListener := range s.eventListeners.all(eventError) {
list := s.eventListeners.all(eventError)

if len(list) == 0 {
s.logger.Warnf("no handlers for error registered, but an error happened: %s", e)
}

for _, errorListener := range list {
if _, err := errorListener(rt.ToValue(obj)); err != nil {
return err
}
Expand Down

0 comments on commit d2b2ce4

Please sign in to comment.