diff --git a/webiojs/src/utils.ts b/webiojs/src/utils.ts index e6a540a6..f72ec7e2 100644 --- a/webiojs/src/utils.ts +++ b/webiojs/src/utils.ts @@ -188,67 +188,76 @@ export function is_mobile() { // put send task to a queue and run it one by one export class ReliableSender { private seq = 0; - private queue: { enable_batch: boolean, param: any }[] = []; - private send_running = false + private queue: { enable_batch: boolean, task: any }[] = []; private _stop = false; + private ignore_interval_send = false; + private last_send_time = 0; + private interval_send_id = 0; constructor( - private readonly sender: (params: any[], seq: number) => Promise, + private readonly sender: (tasks: any[], seq: number) => Promise, private window_size: number = 8, - init_seq = 0, private timeout = 2000 + init_seq = 0, + send_interval = 2000, + private min_send_interval = 1000, ) { this.sender = sender; this.window_size = window_size; - this.timeout = timeout; this.seq = init_seq; this.queue = []; + this.ignore_interval_send = false; + this.interval_send_id = setInterval(this.interval_send.bind(this), send_interval); } /* - * for continuous batch_send tasks in queue, they will be sent in one sender, the sending will retry when it finished or timeout. - * for non-batch task, each will be sent in a single sender, the sending will retry when it finished. + * for continuous batch_send tasks in queue, they will be sent in one sender, + * for non-batch task, each will be sent in a single sender, + * the sending will retry when there are unfinished task in queue. * */ - add_send_task(param: any, allow_batch_send = true) { + add_send_task(task: any, allow_batch_send = true) { if (this._stop) return; this.queue.push({ enable_batch: allow_batch_send, - param: param + task: task }); - if (!this.send_running) - this.start_send(); + this.do_send(); } - private start_send() { - if (this._stop || this.queue.length === 0) { - this.send_running = false; - return; - } - this.send_running = true; - let params: any[] = []; + private get_tasks() { + let tasks: any[] = []; for (let item of this.queue) { if (!item.enable_batch) break; - params.push(item.param); + tasks.push(item.task); } let batch_send = true; - if (params.length === 0 && !this.queue[0].enable_batch) { + if (tasks.length === 0 && this.queue.length > 0 && !this.queue[0].enable_batch) { batch_send = false; - params.push(this.queue[0].param); + tasks.push(this.queue[0].task); } - if (params.length === 0) { - this.send_running = false; + return {tasks, batch_send}; + } + + private do_send() { + const info = this.get_tasks(); + const tasks = info.tasks, batch_send = info.batch_send; + if (tasks.length === 0) { return; } - - let promises = [this.sender(params, this.seq)]; - if (batch_send) - promises.push(new Promise((resolve) => setTimeout(resolve, this.timeout))); - - Promise.race(promises).then(() => { - this.start_send(); + this.last_send_time = Date.now(); + if (!batch_send) // for non-batch task, only retry after current request finished + this.ignore_interval_send = true; + this.sender(tasks, this.seq).then(() => { + this.ignore_interval_send = false; }); } + private interval_send() { + if (this._stop || this.ignore_interval_send) return; + if (Date.now() - this.last_send_time < this.min_send_interval) return; + this.do_send(); + } + // seq for each ack call must be larger than the previous one, otherwise the ack will be ignored ack(seq: number) { if (seq < this.seq) @@ -260,5 +269,6 @@ export class ReliableSender { stop() { this._stop = true; + clearInterval(this.interval_send_id); } } \ No newline at end of file