@@ -534,45 +534,54 @@ func (es *EventSource) listenStream(res *http.Response) error {
534534 return nil
535535 }
536536
537- e , err := readEvent (scanner )
538- if err != nil {
539- if err == io .EOF {
540- return err
541- }
542- es .triggerOnError (err )
537+ if err := es .processEvent (scanner ); err != nil {
543538 return err
544539 }
540+ }
541+ }
545542
546- ed , err := parseEvent (e )
547- if err != nil {
548- es .triggerOnError (err )
549- continue // parsing errors, just continue
543+ func (es * EventSource ) processEvent (scanner * bufio.Scanner ) error {
544+ e , err := readEvent (scanner )
545+ if err != nil {
546+ if err == io .EOF {
547+ return err
550548 }
549+ es .triggerOnError (err )
550+ return err
551+ }
552+
553+ ed , err := parseEvent (e )
554+ if err != nil {
555+ es .triggerOnError (err )
556+ return nil // parsing errors, will not return error.
557+ }
558+ defer putRawEvent (ed )
559+
560+ if len (ed .ID ) > 0 {
561+ es .lock .Lock ()
562+ es .lastEventID = string (ed .ID )
563+ es .lock .Unlock ()
564+ }
551565
552- if len (ed .ID ) > 0 {
566+ if len (ed .Retry ) > 0 {
567+ if retry , err := strconv .Atoi (string (ed .Retry )); err == nil {
553568 es .lock .Lock ()
554- es .lastEventID = string ( ed . ID )
569+ es .serverSentRetry = time . Millisecond * time . Duration ( retry )
555570 es .lock .Unlock ()
571+ } else {
572+ es .triggerOnError (err )
556573 }
574+ }
557575
558- if len (ed .Retry ) > 0 {
559- if retry , err := strconv .Atoi (string (ed .Retry )); err == nil {
560- es .lock .Lock ()
561- es .serverSentRetry = time .Millisecond * time .Duration (retry )
562- es .lock .Unlock ()
563- } else {
564- es .triggerOnError (err )
565- }
566- }
567-
568- if len (ed .Data ) > 0 {
569- es .handleCallback (& Event {
570- ID : string (ed .ID ),
571- Name : string (ed .Event ),
572- Data : string (ed .Data ),
573- })
574- }
576+ if len (ed .Data ) > 0 {
577+ es .handleCallback (& Event {
578+ ID : string (ed .ID ),
579+ Name : string (ed .Event ),
580+ Data : string (ed .Data ),
581+ })
575582 }
583+
584+ return nil
576585}
577586
578587func (es * EventSource ) handleCallback (e * Event ) {
@@ -633,7 +642,7 @@ func parseEventFunc(msg []byte) (*rawEvent, error) {
633642 return nil , errors .New ("resty:sse: event message was empty" )
634643 }
635644
636- e := new ( rawEvent )
645+ e := newRawEvent ( )
637646
638647 // Split the line by "\n"
639648 for _ , line := range bytes .FieldsFunc (msg , func (r rune ) bool { return r == '\n' }) {
@@ -670,3 +679,18 @@ func trimHeader(size int, data []byte) []byte {
670679 data = bytes .TrimSuffix (data , []byte ("\n " ))
671680 return data
672681}
682+
683+ var rawEventPool = & sync.Pool {New : func () any { return new (rawEvent ) }}
684+
685+ func newRawEvent () * rawEvent {
686+ e := rawEventPool .Get ().(* rawEvent )
687+ e .ID = e .ID [:0 ]
688+ e .Data = e .Data [:0 ]
689+ e .Event = e .Event [:0 ]
690+ e .Retry = e .Retry [:0 ]
691+ return e
692+ }
693+
694+ func putRawEvent (e * rawEvent ) {
695+ rawEventPool .Put (e )
696+ }
0 commit comments