Skip to content

Commit

Permalink
refine sliding window sender in #532
Browse files Browse the repository at this point in the history
  • Loading branch information
WeiminWangKolmostar committed Jan 21, 2023
1 parent 732ebc5 commit 45eea48
Showing 1 changed file with 40 additions and 30 deletions.
70 changes: 40 additions & 30 deletions webiojs/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,67 +188,76 @@ export function is_mobile() {
// put send task to a queue and run it one by one
export class ReliableSender {
private seq = 0;
private queue: { enable_batch: boolean, param: any }[] = [];
private send_running = false
private queue: { enable_batch: boolean, task: any }[] = [];
private _stop = false;
private ignore_interval_send = false;
private last_send_time = 0;
private interval_send_id = 0;

constructor(
private readonly sender: (params: any[], seq: number) => Promise<void>,
private readonly sender: (tasks: any[], seq: number) => Promise<void>,
private window_size: number = 8,
init_seq = 0, private timeout = 2000
init_seq = 0,
send_interval = 2000,
private min_send_interval = 1000,
) {
this.sender = sender;
this.window_size = window_size;
this.timeout = timeout;
this.seq = init_seq;
this.queue = [];
this.ignore_interval_send = false;
this.interval_send_id = setInterval(this.interval_send.bind(this), send_interval);
}

/*
* for continuous batch_send tasks in queue, they will be sent in one sender, the sending will retry when it finished or timeout.
* for non-batch task, each will be sent in a single sender, the sending will retry when it finished.
* for continuous batch_send tasks in queue, they will be sent in one sender,
* for non-batch task, each will be sent in a single sender,
* the sending will retry when there are unfinished task in queue.
* */
add_send_task(param: any, allow_batch_send = true) {
add_send_task(task: any, allow_batch_send = true) {
if (this._stop) return;
this.queue.push({
enable_batch: allow_batch_send,
param: param
task: task
});
if (!this.send_running)
this.start_send();
this.do_send();
}

private start_send() {
if (this._stop || this.queue.length === 0) {
this.send_running = false;
return;
}
this.send_running = true;
let params: any[] = [];
private get_tasks() {
let tasks: any[] = [];
for (let item of this.queue) {
if (!item.enable_batch)
break;
params.push(item.param);
tasks.push(item.task);
}
let batch_send = true;
if (params.length === 0 && !this.queue[0].enable_batch) {
if (tasks.length === 0 && this.queue.length > 0 && !this.queue[0].enable_batch) {
batch_send = false;
params.push(this.queue[0].param);
tasks.push(this.queue[0].task);
}
if (params.length === 0) {
this.send_running = false;
return {tasks, batch_send};
}

private do_send() {
const info = this.get_tasks();
const tasks = info.tasks, batch_send = info.batch_send;
if (tasks.length === 0) {
return;
}

let promises = [this.sender(params, this.seq)];
if (batch_send)
promises.push(new Promise((resolve) => setTimeout(resolve, this.timeout)));

Promise.race(promises).then(() => {
this.start_send();
this.last_send_time = Date.now();
if (!batch_send) // for non-batch task, only retry after current request finished
this.ignore_interval_send = true;
this.sender(tasks, this.seq).then(() => {
this.ignore_interval_send = false;
});
}

private interval_send() {
if (this._stop || this.ignore_interval_send) return;
if (Date.now() - this.last_send_time < this.min_send_interval) return;
this.do_send();
}

// seq for each ack call must be larger than the previous one, otherwise the ack will be ignored
ack(seq: number) {
if (seq < this.seq)
Expand All @@ -260,5 +269,6 @@ export class ReliableSender {

stop() {
this._stop = true;
clearInterval(this.interval_send_id);
}
}

0 comments on commit 45eea48

Please sign in to comment.