11import { BotConnectorResponse } from '../model/responses' ;
2- import {
3- EventSourceMessage ,
4- EventStreamContentType ,
5- fetchEventSource ,
6- } from '@microsoft/fetch-event-source' ;
7-
8- class RetriableError extends Error { }
9- class FatalError extends Error { }
102
113const INITIAL_RETRY_DELAY = 0 ;
124const RETRY_DELAY_INCREMENT = 1000 ;
135const MAX_RETRY_DELAY = 15000 ;
146
7+ enum SseStatus {
8+ /**
9+ * The server is not answering, or answering with a 1XX, 3XX, 429, or 5XX HTTP status code
10+ */
11+ SERVER_UNAVAILABLE = - 1 ,
12+ /**
13+ * The server is answering with a 4XX HTTP status code, except 429 (rate limit)
14+ */
15+ UNSUPPORTED = 0 ,
16+ /**
17+ * The server is answering with a 2XX HTTP status code
18+ */
19+ SUPPORTED = 1 ,
20+ }
21+
22+ async function getSseStatus ( url : string ) {
23+ try {
24+ const response = await fetch ( url ) ;
25+ if ( response . ok ) {
26+ return SseStatus . SUPPORTED ;
27+ } else if (
28+ response . status >= 400 &&
29+ response . status < 500 &&
30+ response . status !== 429
31+ ) {
32+ return SseStatus . UNSUPPORTED ;
33+ } else {
34+ return SseStatus . SERVER_UNAVAILABLE ;
35+ }
36+ } catch ( _ ) {
37+ return SseStatus . SERVER_UNAVAILABLE ;
38+ }
39+ }
40+
1541export class TockEventSource {
1642 private initialized : boolean ;
17- private abortController : AbortController ;
43+ private eventSource : EventSource | null ;
1844 private retryDelay : number ;
1945 onResponse : ( botResponse : BotConnectorResponse ) => void ;
2046 onStateChange : ( state : number ) => void ;
@@ -38,60 +64,55 @@ export class TockEventSource {
3864 */
3965 open ( endpoint : string , userId : string ) : Promise < void > {
4066 this . onStateChange ( EventSource . CONNECTING ) ;
41- this . abortController = new AbortController ( ) ;
67+ const url = ` ${ endpoint } /sse?userid= ${ userId } ` ;
4268 return new Promise < void > ( ( resolve , reject ) : void => {
43- fetchEventSource ( `${ endpoint } /sse?userid=${ userId } ` , {
44- signal : this . abortController . signal ,
45- onopen : async ( response ) => {
46- if (
47- response . ok &&
48- response . headers
49- . get ( 'content-type' )
50- ?. includes ( EventStreamContentType )
51- ) {
52- this . onStateChange ( EventSource . OPEN ) ;
53- this . initialized = true ;
54- resolve ( ) ;
55- return ;
56- } else if (
57- response . status >= 400 &&
58- response . status < 500 &&
59- response . status !== 429
60- ) {
61- throw new FatalError ( ) ;
62- } else {
63- throw new RetriableError ( ) ;
64- }
65- } ,
66- onmessage : ( e : EventSourceMessage ) => {
67- if ( e . event === 'message' ) {
68- this . onResponse ( JSON . parse ( e . data ) ) ;
69- }
70- } ,
71- onerror : ( err ) => {
72- if ( err instanceof FatalError ) {
73- throw err ; // rethrow to stop the operation
74- } else {
75- const retryDelay = this . retryDelay ;
76- this . retryDelay = Math . min (
77- MAX_RETRY_DELAY ,
78- retryDelay + RETRY_DELAY_INCREMENT ,
79- ) ;
80- return retryDelay ;
81- }
82- } ,
83- } )
84- . catch ( ( e ) => console . error ( e ) )
85- . finally ( ( ) => {
86- reject ( ) ;
87- this . onStateChange ( EventSource . CLOSED ) ;
88- this . initialized = false ;
89- } ) ;
69+ this . tryOpen ( url , resolve , reject ) ;
70+ } ) ;
71+ }
72+
73+ private tryOpen ( url : string , resolve : ( ) => void , reject : ( ) => void ) {
74+ this . eventSource = new EventSource ( url ) ;
75+ this . eventSource . addEventListener ( 'open' , ( ) => {
76+ this . onStateChange ( EventSource . OPEN ) ;
77+ this . initialized = true ;
78+ this . retryDelay = INITIAL_RETRY_DELAY ;
79+ resolve ( ) ;
9080 } ) ;
81+ this . eventSource . addEventListener ( 'error' , ( ) => {
82+ this . eventSource ?. close ( ) ;
83+ this . retry ( url , reject , resolve ) ;
84+ } ) ;
85+ this . eventSource . addEventListener ( 'message' , ( e ) => {
86+ this . onResponse ( JSON . parse ( e . data ) ) ;
87+ } ) ;
88+ }
89+
90+ private retry ( url : string , reject : ( ) => void , resolve : ( ) => void ) {
91+ const retryDelay = this . retryDelay ;
92+ this . retryDelay = Math . min (
93+ MAX_RETRY_DELAY ,
94+ retryDelay + RETRY_DELAY_INCREMENT ,
95+ ) ;
96+ setTimeout ( async ( ) => {
97+ switch ( await getSseStatus ( url ) ) {
98+ case SseStatus . UNSUPPORTED :
99+ reject ( ) ;
100+ this . close ( ) ;
101+ break ;
102+ case SseStatus . SUPPORTED :
103+ this . tryOpen ( url , resolve , reject ) ;
104+ break ;
105+ case SseStatus . SERVER_UNAVAILABLE :
106+ this . retry ( url , reject , resolve ) ;
107+ break ;
108+ }
109+ } , retryDelay ) ;
91110 }
92111
93112 close ( ) {
94- this . abortController ?. abort ( ) ;
113+ this . eventSource ?. close ( ) ;
114+ this . eventSource = null ;
95115 this . initialized = false ;
116+ this . onStateChange ( EventSource . CLOSED ) ;
96117 }
97118}
0 commit comments