From e00abdacc09f019afb7eb6e12eb25e58bef03c45 Mon Sep 17 00:00:00 2001 From: wangweimin Date: Sat, 14 Jan 2023 17:49:31 +0800 Subject: [PATCH 1/4] use receiver window in http session --- pywebio/platform/adaptor/http.py | 61 +++++++++++++++++++++++--------- webiojs/src/session.ts | 19 +++++----- 2 files changed, 55 insertions(+), 25 deletions(-) diff --git a/pywebio/platform/adaptor/http.py b/pywebio/platform/adaptor/http.py index db024844..ae429be8 100644 --- a/pywebio/platform/adaptor/http.py +++ b/pywebio/platform/adaptor/http.py @@ -15,11 +15,12 @@ import time from contextlib import contextmanager from typing import Dict, Optional +from collections import deque from ..page import make_applications, render_page from ..utils import deserialize_binary_event from ...session import CoroutineBasedSession, ThreadBasedSession, register_session_implement_for_target -from ...session.base import get_session_info_from_headers +from ...session.base import get_session_info_from_headers, Session from ...utils import random_str, LRUDict, isgeneratorfunction, iscoroutinefunction, check_webio_js @@ -102,6 +103,41 @@ def get_client_ip(self): _event_loop = None +class ReliableTransport: + def __init__(self, session: Session, message_window: int = 4): + self.session = session + self.messages = deque() + self.window_size = message_window + self.min_msg_id = 0 # the id of the first message in the window + self.next_event_id = 0 + + @staticmethod + def close_message(ack): + return dict( + commands=[[dict(command='close_session')]], + seq=ack+1 + ) + + def get_response(self, ack=0): + """ + ack num is the number of messages that the client has received. + response is a list of messages that the client should receive, along with their min id `seq`. + """ + while ack >= self.min_msg_id and self.messages: + self.messages.popleft() + self.min_msg_id += 1 + + if len(self.messages) < self.window_size: + msgs = self.session.get_task_commands() + if msgs: + self.messages.append(msgs) + + return dict( + commands=list(self.messages), + seq=self.min_msg_id + ) + + # todo: use lock to avoid thread race condition class HttpHandler: """基于HTTP的后端Handler实现 @@ -112,7 +148,7 @@ class HttpHandler: """ _webio_sessions = {} # WebIOSessionID -> WebIOSession() - _webio_last_commands = {} # WebIOSessionID -> (last commands, commands sequence id) + _webio_transports = {} # WebIOSessionID -> ReliableTransport(), type: Dict[str, ReliableTransport] _webio_expire = LRUDict() # WebIOSessionID -> last active timestamp. In increasing order of last active time _webio_expire_lock = threading.Lock() @@ -149,17 +185,6 @@ def _remove_webio_session(cls, sid): cls._webio_sessions.pop(sid, None) cls._webio_expire.pop(sid, None) - @classmethod - def get_response(cls, sid, ack=0): - commands, seq = cls._webio_last_commands.get(sid, ([], 0)) - if ack == seq: - webio_session = cls._webio_sessions[sid] - commands = webio_session.get_task_commands() - seq += 1 - cls._webio_last_commands[sid] = (commands, seq) - - return {'commands': commands, 'seq': seq} - def _process_cors(self, context: HttpContext): """Handling cross-domain requests: check the source of the request and set headers""" origin = context.request_headers().get('Origin', '') @@ -240,8 +265,8 @@ def handle_request_context(self, context: HttpContext): context.set_content(html) return context.get_response() + ack = int(context.request_url_parameter('ack', 0)) webio_session_id = None - # 初始请求,创建新 Session if not request_headers['webio-session-id'] or request_headers['webio-session-id'] == 'NEW': if context.request_method() == 'POST': # 不能在POST请求中创建Session,防止CSRF攻击 @@ -264,9 +289,11 @@ def handle_request_context(self, context: HttpContext): session_cls = ThreadBasedSession webio_session = session_cls(application, session_info=session_info) cls._webio_sessions[webio_session_id] = webio_session + cls._webio_transports[webio_session_id] = ReliableTransport(webio_session) yield type(self).WAIT_MS_ON_POST / 1000.0 # <--- <--- <--- <--- <--- <--- <--- <--- <--- <--- <--- <--- elif request_headers['webio-session-id'] not in cls._webio_sessions: # WebIOSession deleted - context.set_content([dict(command='close_session')], json_type=True) + close_msg = ReliableTransport.close_message(ack) + context.set_content(close_msg, json_type=True) return context.get_response() else: webio_session_id = request_headers['webio-session-id'] @@ -283,8 +310,8 @@ def handle_request_context(self, context: HttpContext): self.interval_cleaning() - ack = int(context.request_url_parameter('ack', 0)) - context.set_content(type(self).get_response(webio_session_id, ack=ack), json_type=True) + resp = cls._webio_transports[webio_session_id].get_response(ack) + context.set_content(resp, json_type=True) if webio_session.closed(): self._remove_webio_session(webio_session_id) diff --git a/webiojs/src/session.ts b/webiojs/src/session.ts index e1c2b2b7..85c7f4f9 100644 --- a/webiojs/src/session.ts +++ b/webiojs/src/session.ts @@ -181,7 +181,7 @@ export class HttpSession implements Session { webio_session_id: string = 'NEW'; debug = false; - private _executed_command_msg_id = 0; + private _executed_command_msg_id = -1; private _closed = false; private _session_create_callbacks: (() => void)[] = []; private _session_close_callbacks: (() => void)[] = []; @@ -223,7 +223,7 @@ export class HttpSession implements Session { contentType: "application/json; charset=utf-8", dataType: "json", headers: {"webio-session-id": this.webio_session_id}, - success: function (data: { commands: Command[], seq: number }, textStatus: string, jqXHR: JQuery.jqXHR) { + success: function (data: { commands: Command[][], seq: number }, textStatus: string, jqXHR: JQuery.jqXHR) { safe_poprun_callbacks(that._session_create_callbacks, 'session_create_callback'); that._on_request_success(data, textStatus, jqXHR); }, @@ -233,18 +233,21 @@ export class HttpSession implements Session { }) } - private _on_request_success(data: { commands: Command[], seq: number }, textStatus: string, jqXHR: JQuery.jqXHR) { - if (data.seq == this._executed_command_msg_id) + private _on_request_success(data: { commands: Command[][], seq: number }, textStatus: string, jqXHR: JQuery.jqXHR) { + let msg_start_idx = this._executed_command_msg_id - data.seq + 1; + if (data.commands.length <= msg_start_idx) return; - this._executed_command_msg_id = data.seq; + this._executed_command_msg_id = data.seq + data.commands.length - 1; let sid = jqXHR.getResponseHeader('webio-session-id'); if (sid) this.webio_session_id = sid; - for (let msg of data.commands) { - if (this.debug) console.info('>>>', msg); - this._on_server_message(msg); + for (let msgs of data.commands.slice(msg_start_idx)) { + for (let msg of msgs) { + if (this.debug) console.info('>>>', msg); + this._on_server_message(msg); + } } }; From 8d4fe46562e1d008f85c7423fa17bf7110190e6f Mon Sep 17 00:00:00 2001 From: wangweimin Date: Sun, 15 Jan 2023 00:16:20 +0800 Subject: [PATCH 2/4] generate session id in client side to avoid duplicated session created in poor network environment --- pywebio/platform/adaptor/http.py | 30 +++++++++++++++++------------- webiojs/src/session.ts | 11 ++++++++--- 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/pywebio/platform/adaptor/http.py b/pywebio/platform/adaptor/http.py index ae429be8..75aa047c 100644 --- a/pywebio/platform/adaptor/http.py +++ b/pywebio/platform/adaptor/http.py @@ -36,7 +36,7 @@ def request_obj(self): Return the current request object""" pass - def request_method(self): + def request_method(self) -> str: """返回当前请求的方法,大写 Return the HTTP method of the current request, uppercase""" pass @@ -46,12 +46,12 @@ def request_headers(self) -> Dict: Return the header dictionary of the current request""" pass - def request_url_parameter(self, name, default=None): + def request_url_parameter(self, name, default=None) -> str: """返回当前请求的URL参数 Returns the value of the given URL parameter of the current request""" pass - def request_body(self): + def request_body(self) -> bytes: """返回当前请求的body数据 Returns the data of the current request body @@ -93,7 +93,7 @@ def get_response(self): Get the current response object""" pass - def get_client_ip(self): + def get_client_ip(self) -> str: """获取用户的ip Get the user's ip""" pass @@ -115,7 +115,7 @@ def __init__(self, session: Session, message_window: int = 4): def close_message(ack): return dict( commands=[[dict(command='close_session')]], - seq=ack+1 + seq=ack + 1 ) def get_response(self, ack=0): @@ -266,15 +266,17 @@ def handle_request_context(self, context: HttpContext): return context.get_response() ack = int(context.request_url_parameter('ack', 0)) - webio_session_id = None - # 初始请求,创建新 Session - if not request_headers['webio-session-id'] or request_headers['webio-session-id'] == 'NEW': + webio_session_id = request_headers['webio-session-id'] + new_request = False + if webio_session_id.startswith('NEW-'): + new_request = True + webio_session_id = webio_session_id[4:] + + if new_request and webio_session_id not in cls._webio_sessions: # 初始请求,创建新 Session if context.request_method() == 'POST': # 不能在POST请求中创建Session,防止CSRF攻击 context.set_status(403) return context.get_response() - webio_session_id = random_str(24) - context.set_header('webio-session-id', webio_session_id) session_info = get_session_info_from_headers(context.request_headers()) session_info['user_ip'] = context.get_client_ip() session_info['request'] = context.request_obj() @@ -290,13 +292,15 @@ def handle_request_context(self, context: HttpContext): webio_session = session_cls(application, session_info=session_info) cls._webio_sessions[webio_session_id] = webio_session cls._webio_transports[webio_session_id] = ReliableTransport(webio_session) - yield type(self).WAIT_MS_ON_POST / 1000.0 # <--- <--- <--- <--- <--- <--- <--- <--- <--- <--- <--- <--- - elif request_headers['webio-session-id'] not in cls._webio_sessions: # WebIOSession deleted + yield cls.WAIT_MS_ON_POST / 1000.0 # <--- <--- <--- <--- <--- <--- <--- <--- <--- <--- <--- <--- + elif webio_session_id not in cls._webio_sessions: # WebIOSession deleted close_msg = ReliableTransport.close_message(ack) context.set_content(close_msg, json_type=True) return context.get_response() else: - webio_session_id = request_headers['webio-session-id'] + # in this case, the request_headers['webio-session-id'] may also startswith NEW, + # this is because the response for the previous new session request has not been received by the client, + # and the client has sent a new request with the same session id. webio_session = cls._webio_sessions[webio_session_id] if context.request_method() == 'POST': # client push event diff --git a/webiojs/src/session.ts b/webiojs/src/session.ts index 85c7f4f9..bc008fd4 100644 --- a/webiojs/src/session.ts +++ b/webiojs/src/session.ts @@ -1,4 +1,4 @@ -import {error_alert} from "./utils"; +import {error_alert, randomid} from "./utils"; import {state} from "./state"; import {t} from "./i18n"; @@ -178,7 +178,7 @@ export class WebSocketSession implements Session { export class HttpSession implements Session { interval_pull_id: number = null; - webio_session_id: string = 'NEW'; + webio_session_id: string = ''; debug = false; private _executed_command_msg_id = -1; @@ -209,6 +209,7 @@ export class HttpSession implements Session { start_session(debug: boolean = false): void { this.debug = debug; + this.webio_session_id = "NEW-" + randomid(24); this.pull(); this.interval_pull_id = setInterval(() => { this.pull() @@ -223,9 +224,13 @@ export class HttpSession implements Session { contentType: "application/json; charset=utf-8", dataType: "json", headers: {"webio-session-id": this.webio_session_id}, - success: function (data: { commands: Command[][], seq: number }, textStatus: string, jqXHR: JQuery.jqXHR) { + success: function (data: { commands: Command[][], seq: number, event: number }, + textStatus: string, jqXHR: JQuery.jqXHR) { safe_poprun_callbacks(that._session_create_callbacks, 'session_create_callback'); that._on_request_success(data, textStatus, jqXHR); + if(that.webio_session_id.startsWith("NEW-")){ + that.webio_session_id = that.webio_session_id.substring(4); + } }, error: function () { console.error('Http pulling failed'); From bf3d67eff82635bbbd3ed9ac871223867d8ee24b Mon Sep 17 00:00:00 2001 From: wangweimin Date: Sun, 15 Jan 2023 17:13:50 +0800 Subject: [PATCH 3/4] reliably send msg from client to server when use HTTP --- pywebio/platform/adaptor/http.py | 46 +++++++++----- webiojs/src/session.ts | 106 +++++++++++++++++++------------ webiojs/src/utils.ts | 78 +++++++++++++++++++++++ 3 files changed, 175 insertions(+), 55 deletions(-) diff --git a/pywebio/platform/adaptor/http.py b/pywebio/platform/adaptor/http.py index 75aa047c..c1ee62ff 100644 --- a/pywebio/platform/adaptor/http.py +++ b/pywebio/platform/adaptor/http.py @@ -14,7 +14,7 @@ import threading import time from contextlib import contextmanager -from typing import Dict, Optional +from typing import Dict, Optional, List from collections import deque from ..page import make_applications, render_page @@ -59,16 +59,6 @@ def request_body(self) -> bytes: """ return b'' - def request_json(self) -> Optional[Dict]: - """返回当前请求的json反序列化后的内容,若请求数据不为json格式,返回None - Return the data (json deserialization) of the currently requested, if the data is not in json format, return None""" - try: - if self.request_headers().get('content-type') == 'application/octet-stream': - return deserialize_binary_event(self.request_body()) - return json.loads(self.request_body()) - except Exception: - return None - def set_header(self, name, value): """为当前响应设置header Set a header for the current response""" @@ -109,7 +99,7 @@ def __init__(self, session: Session, message_window: int = 4): self.messages = deque() self.window_size = message_window self.min_msg_id = 0 # the id of the first message in the window - self.next_event_id = 0 + self.finished_event_id = -1 # the id of the last finished event @staticmethod def close_message(ack): @@ -118,6 +108,20 @@ def close_message(ack): seq=ack + 1 ) + def push_event(self, events: List[Dict], seq: int) -> int: + """Send client events to the session and return the success message count""" + if not events: + return 0 + + submit_cnt = 0 + for eid, event in enumerate(events, start=seq): + if eid > self.finished_event_id: + self.finished_event_id = eid # todo: use lock for check and set operation + self.session.send_client_event(event) + submit_cnt += 1 + + return submit_cnt + def get_response(self, ack=0): """ ack num is the number of messages that the client has received. @@ -134,7 +138,8 @@ def get_response(self, ack=0): return dict( commands=list(self.messages), - seq=self.min_msg_id + seq=self.min_msg_id, + ack=self.finished_event_id ) @@ -179,6 +184,7 @@ def _remove_expired_sessions(cls, session_expire_seconds): if session: session.close(nonblock=True) del cls._webio_sessions[sid] + del cls._webio_transports[sid] @classmethod def _remove_webio_session(cls, sid): @@ -234,6 +240,14 @@ def get_cdn(self, context): return False return self.cdn + def read_event_data(self, context: HttpContext) -> List[Dict]: + try: + if context.request_headers().get('content-type') == 'application/octet-stream': + return [deserialize_binary_event(context.request_body())] + return json.loads(context.request_body()) + except Exception: + return [] + @contextmanager def handle_request_context(self, context: HttpContext): """called when every http request""" @@ -304,8 +318,10 @@ def handle_request_context(self, context: HttpContext): webio_session = cls._webio_sessions[webio_session_id] if context.request_method() == 'POST': # client push event - if context.request_json() is not None: - webio_session.send_client_event(context.request_json()) + seq = int(context.request_url_parameter('seq', 0)) + event_data = self.read_event_data(context) + submit_cnt = cls._webio_transports[webio_session_id].push_event(event_data, seq) + if submit_cnt > 0: yield type(self).WAIT_MS_ON_POST / 1000.0 # <--- <--- <--- <--- <--- <--- <--- <--- <--- <--- <--- <--- elif context.request_method() == 'GET': # client pull messages pass diff --git a/webiojs/src/session.ts b/webiojs/src/session.ts index bc008fd4..c55d8a1b 100644 --- a/webiojs/src/session.ts +++ b/webiojs/src/session.ts @@ -1,4 +1,4 @@ -import {error_alert, randomid} from "./utils"; +import {error_alert, randomid, ReliableSender} from "./utils"; import {state} from "./state"; import {t} from "./i18n"; @@ -181,6 +181,7 @@ export class HttpSession implements Session { webio_session_id: string = ''; debug = false; + private sender: ReliableSender = null; private _executed_command_msg_id = -1; private _closed = false; private _session_create_callbacks: (() => void)[] = []; @@ -193,6 +194,7 @@ export class HttpSession implements Session { let url = new URL(api_url, window.location.href); url.search = "?app=" + app_name; this.api_url = url.href; + this.sender = new ReliableSender(this._send.bind(this)); } on_session_create(callback: () => void): void { @@ -224,21 +226,21 @@ export class HttpSession implements Session { contentType: "application/json; charset=utf-8", dataType: "json", headers: {"webio-session-id": this.webio_session_id}, - success: function (data: { commands: Command[][], seq: number, event: number }, + success: function (data: { commands: Command[][], seq: number, event: number, ack: number }, textStatus: string, jqXHR: JQuery.jqXHR) { safe_poprun_callbacks(that._session_create_callbacks, 'session_create_callback'); that._on_request_success(data, textStatus, jqXHR); - if(that.webio_session_id.startsWith("NEW-")){ + if (that.webio_session_id.startsWith("NEW-")) { that.webio_session_id = that.webio_session_id.substring(4); } - }, - error: function () { - console.error('Http pulling failed'); } }) } - private _on_request_success(data: { commands: Command[][], seq: number }, textStatus: string, jqXHR: JQuery.jqXHR) { + private _on_request_success(data: { commands: Command[][], seq: number, ack: number }, + textStatus: string, jqXHR: JQuery.jqXHR) { + this.sender.ack(data.ack); + let msg_start_idx = this._executed_command_msg_id - data.seq + 1; if (data.commands.length <= msg_start_idx) return; @@ -258,47 +260,70 @@ export class HttpSession implements Session { send_message(msg: ClientEvent, onprogress?: (loaded: number, total: number) => void): void { if (this.debug) console.info('<<<', msg); - this._send({ - data: JSON.stringify(msg), - contentType: "application/json; charset=utf-8", - }, onprogress); + this.sender.add_send_task({ + data: msg, + json: true, + onprogress: onprogress, + }) } send_buffer(data: Blob, onprogress?: (loaded: number, total: number) => void): void { if (this.debug) console.info('<<< Blob data...'); - this._send({ + this.sender.add_send_task({ data: data, - cache: false, - processData: false, - contentType: 'application/octet-stream', - }, onprogress); + json: false, + onprogress: onprogress, + }, false) } - _send(options: { [key: string]: any; }, onprogress?: (loaded: number, total: number) => void): void { - if (this.closed()) - return error_alert(t("disconnected_with_server")); - - $.ajax({ - ...options, - type: "POST", - url: `${this.api_url}&ack=${this._executed_command_msg_id}`, - dataType: "json", - headers: {"webio-session-id": this.webio_session_id}, - success: this._on_request_success.bind(this), - xhr: function () { - let xhr = new window.XMLHttpRequest(); - // Upload progress - xhr.upload.addEventListener("progress", function (evt) { - if (evt.lengthComputable && onprogress) { - onprogress(evt.loaded, evt.total); - } - }, false); - return xhr; - }, - error: function () { - console.error('Http push blob data failed'); - error_alert(t("connect_fail")); + _send(params: { [key: string]: any; }[], seq: number): Promise { + if (this.closed()) { + this.sender.stop(); + error_alert(t("disconnected_with_server")); + return Promise.reject(); + } + let data: any, ajax_options: any; + let json = params.some(p => p.json); + if (json) { + data = JSON.stringify(params.map(p => p.data)); + ajax_options = { + contentType: "application/json; charset=utf-8", } + } else { + data = params[0].data; + ajax_options = { + cache: false, + processData: false, + contentType: 'application/octet-stream', + } + } + return new Promise((resolve, reject) => { + $.ajax({ + data: data, + ...ajax_options, + type: "POST", + url: `${this.api_url}&ack=${this._executed_command_msg_id}&seq=${seq}`, + dataType: "json", + headers: {"webio-session-id": this.webio_session_id}, + success: this._on_request_success.bind(this), + xhr: function () { + let xhr = new window.XMLHttpRequest(); + // Upload progress + xhr.upload.addEventListener("progress", function (evt) { + if (evt.lengthComputable) { + params.forEach(p => { + if (p.onprogress) // only the first one + p.onprogress(evt.loaded, evt.total); + p.onprogress = null; + }); + } + }, false); + return xhr; + }, + error: function () { + console.error('Http push event failed, will retry'); + } + }).always(() => resolve()); }); } @@ -306,6 +331,7 @@ export class HttpSession implements Session { this._closed = true; safe_poprun_callbacks(this._session_close_callbacks, 'session_close_callback'); clearInterval(this.interval_pull_id); + this.sender.stop(); } closed(): boolean { diff --git a/webiojs/src/utils.ts b/webiojs/src/utils.ts index 3bfa5673..e6a540a6 100644 --- a/webiojs/src/utils.ts +++ b/webiojs/src/utils.ts @@ -183,4 +183,82 @@ export function is_mobile() { if (navigator.userAgentData) return navigator.userAgentData.mobile; const ipadOS = (navigator.platform === 'MacIntel' && navigator.maxTouchPoints > 1); /* iPad OS 13 */ return /android|webos|iphone|ipad|ipod|blackberry|iemobile|opera mini/i.test(navigator.userAgent.toLowerCase()) || ipadOS; +} + +// put send task to a queue and run it one by one +export class ReliableSender { + private seq = 0; + private queue: { enable_batch: boolean, param: any }[] = []; + private send_running = false + private _stop = false; + + constructor( + private readonly sender: (params: any[], seq: number) => Promise, + private window_size: number = 8, + init_seq = 0, private timeout = 2000 + ) { + this.sender = sender; + this.window_size = window_size; + this.timeout = timeout; + this.seq = init_seq; + this.queue = []; + } + + /* + * for continuous batch_send tasks in queue, they will be sent in one sender, the sending will retry when it finished or timeout. + * for non-batch task, each will be sent in a single sender, the sending will retry when it finished. + * */ + add_send_task(param: any, allow_batch_send = true) { + if (this._stop) return; + this.queue.push({ + enable_batch: allow_batch_send, + param: param + }); + if (!this.send_running) + this.start_send(); + } + + private start_send() { + if (this._stop || this.queue.length === 0) { + this.send_running = false; + return; + } + this.send_running = true; + let params: any[] = []; + for (let item of this.queue) { + if (!item.enable_batch) + break; + params.push(item.param); + } + let batch_send = true; + if (params.length === 0 && !this.queue[0].enable_batch) { + batch_send = false; + params.push(this.queue[0].param); + } + if (params.length === 0) { + this.send_running = false; + return; + } + + let promises = [this.sender(params, this.seq)]; + if (batch_send) + promises.push(new Promise((resolve) => setTimeout(resolve, this.timeout))); + + Promise.race(promises).then(() => { + this.start_send(); + }); + } + + // seq for each ack call must be larger than the previous one, otherwise the ack will be ignored + ack(seq: number) { + if (seq < this.seq) + return; + let pop_count = seq - this.seq + 1; + this.queue = this.queue.slice(pop_count); + this.seq = seq + 1; + } + + stop() { + this._stop = true; + } } \ No newline at end of file From 3168544edda1a0085d0e572dd99eea2f5b15150e Mon Sep 17 00:00:00 2001 From: wangweimin Date: Sun, 15 Jan 2023 17:16:23 +0800 Subject: [PATCH 4/4] update dev version --- pywebio/__version__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pywebio/__version__.py b/pywebio/__version__.py index a63faab8..030d01e2 100644 --- a/pywebio/__version__.py +++ b/pywebio/__version__.py @@ -1,8 +1,8 @@ __package__ = 'pywebio' __description__ = 'Write interactive web app in script way.' __url__ = 'https://pywebio.readthedocs.io' -__version__ = "1.7.0" -__version_info__ = (1, 7, 0, 0) +__version__ = "1.7.1" +__version_info__ = (1, 7, 1, 0) __author__ = 'WangWeimin' __author_email__ = 'wang0.618@qq.com' __license__ = 'MIT'