From 5657032885bf9982097d23cf6225e342ce8a6cba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EB=83=A5=EB=83=90=EC=B1=A0?= Date: Mon, 24 Jun 2024 15:03:13 +0900 Subject: [PATCH] fix(sb_workers): don't propagate promise rejection on request body streaming failure (#374) * fix(sb_workers): don't propagate promise rejection on request body streaming failure * stamp: oops * stamp: add an integration test --- crates/base/tests/integration_tests.rs | 59 ++++++++++++++--- crates/sb_workers/user_workers.js | 90 ++++++++++++-------------- 2 files changed, 89 insertions(+), 60 deletions(-) diff --git a/crates/base/tests/integration_tests.rs b/crates/base/tests/integration_tests.rs index 1307b5e1c..388aa74e5 100644 --- a/crates/base/tests/integration_tests.rs +++ b/crates/base/tests/integration_tests.rs @@ -714,6 +714,7 @@ async fn test_file_upload_real_multipart_bytes() { test_oak_file_upload( Cow::Borrowed("./test_cases/main"), (9.98 * MB as f32) as usize, // < 10MB (in binary) + None, |resp| async { let res = resp.unwrap(); @@ -731,16 +732,21 @@ async fn test_file_upload_real_multipart_bytes() { #[tokio::test] #[serial] async fn test_file_upload_size_exceed() { - test_oak_file_upload(Cow::Borrowed("./test_cases/main"), 10 * MB, |resp| async { - let res = resp.unwrap(); + test_oak_file_upload( + Cow::Borrowed("./test_cases/main"), + 10 * MB, + None, + |resp| async { + let res = resp.unwrap(); - assert_eq!(res.status().as_u16(), 500); + assert_eq!(res.status().as_u16(), 500); - let res = res.text().await; + let res = res.text().await; - assert!(res.is_ok()); - assert_eq!(res.unwrap(), "Error!"); - }) + assert!(res.is_ok()); + assert_eq!(res.unwrap(), "Error!"); + }, + ) .await; } @@ -1166,6 +1172,7 @@ async fn req_failure_case_op_cancel_from_server_due_to_cpu_resource_limit() { test_oak_file_upload( Cow::Borrowed("./test_cases/main_small_cpu_time"), 48 * MB, + None, |resp| async { let res = resp.unwrap(); @@ -1183,8 +1190,40 @@ async fn req_failure_case_op_cancel_from_server_due_to_cpu_resource_limit() { .await; } -async fn test_oak_file_upload(main_service: Cow<'static, str>, bytes: usize, resp_callback: F) -where +#[tokio::test] +#[serial] +async fn req_failure_case_op_cancel_from_server_due_to_cpu_resource_limit_2() { + test_oak_file_upload( + Cow::Borrowed("./test_cases/main_small_cpu_time"), + 1024 * 64, + Some("image/png"), + |resp| async { + let res = resp.unwrap(); + + assert_eq!(res.status().as_u16(), 500); + + let res = res.json::().await; + + assert!(res.is_ok()); + + let msg = res.unwrap().msg; + + assert!(!msg.starts_with("TypeError: request body receiver not connected")); + assert_eq!( + msg, + "WorkerRequestCancelled: request has been cancelled by supervisor" + ); + }, + ) + .await; +} + +async fn test_oak_file_upload( + main_service: Cow<'static, str>, + bytes: usize, + mime: Option<&str>, + resp_callback: F, +) where F: FnOnce(Result) -> R, R: Future, { @@ -1199,7 +1238,7 @@ where "meow", Part::bytes(vec![0u8; bytes]) .file_name("meow.bin") - .mime_str("application/octet-stream") + .mime_str(mime.unwrap_or("application/octet-stream")) .unwrap(), ), ) diff --git a/crates/sb_workers/user_workers.js b/crates/sb_workers/user_workers.js index 8e28ebfbf..1eff3c7f0 100644 --- a/crates/sb_workers/user_workers.js +++ b/crates/sb_workers/user_workers.js @@ -1,16 +1,11 @@ import { primordials, core } from "ext:core/mod.js"; -import { readableStreamForRid, writableStreamForRid } from 'ext:deno_web/06_streams.js'; -import { getSupabaseTag } from 'ext:sb_core_main_js/js/http.js'; +import { readableStreamForRid, writableStreamForRid } from "ext:deno_web/06_streams.js"; +import { getSupabaseTag } from "ext:sb_core_main_js/js/http.js"; const ops = core.ops; -const { - InterruptedPrototype, -} = core; -const { - TypeError, - ObjectPrototypeIsPrototypeOf, - StringPrototypeIncludes, -} = primordials; + +const { TypeError } = primordials; + const { op_user_worker_fetch_send, op_user_worker_create, @@ -33,11 +28,11 @@ class UserWorker { this.key = key; } - async fetch(req, opts = {}) { - const tag = getSupabaseTag(req); + async fetch(request, options = {}) { + const tag = getSupabaseTag(request); - const { method, url, headers, body, bodyUsed } = req; - const { signal } = opts; + const { method, url, headers, body, bodyUsed } = request; + const { signal } = options; signal?.throwIfAborted(); @@ -60,13 +55,14 @@ class UserWorker { ); // stream the request body - let reqBodyPromise = null; + let requestBodyPromise = null; + if (hasBody) { let writableStream = writableStreamForRid(requestBodyRid); - reqBodyPromise = body.pipeTo(writableStream, { signal }); + requestBodyPromise = body.pipeTo(writableStream, { signal }); } - const resPromise = op_user_worker_fetch_send( + const responsePromise = op_user_worker_fetch_send( this.key, requestRid, requestBodyRid, @@ -74,48 +70,41 @@ class UserWorker { tag.watcherRid ); - let [sent, res] = await Promise.allSettled([reqBodyPromise, resPromise]); - - if (sent.status === "rejected") { - if (res.status === "fulfilled") { - res = res.value; - } else { - if ( - ObjectPrototypeIsPrototypeOf(InterruptedPrototype, sent.reason) || - StringPrototypeIncludes(sent.reason.message, "operation canceled") - ) { - throw res.reason; - } else { - throw sent.reason; - } - } - } else if (res.status === "rejected") { - throw res.reason; - } else { - res = res.value; + const [requestBodyPromiseResult, responsePromiseResult] = await Promise.allSettled([ + requestBodyPromise, + responsePromise + ]); + + if (requestBodyPromiseResult.status === "rejected") { + // console.warn(requestBodyPromiseResult.reason); } + if (responsePromiseResult.status === "rejected") { + throw responsePromiseResult.reason; + } + + const result = responsePromiseResult.value; const response = { - headers: res.headers, - status: res.status, - statusText: res.statusText, + headers: result.headers, + status: result.status, + statusText: result.statusText, body: null, }; // TODO: add a test - if (nullBodyStatus(res.status) || redirectStatus(res.status)) { - core.close(res.bodyRid); + if (nullBodyStatus(result.status) || redirectStatus(result.status)) { + core.tryClose(result.bodyRid); } else { - if (req.method === 'HEAD' || req.method === 'CONNECT') { - response.body = null; - core.close(res.bodyRid); + if (request.method === "HEAD" || request.method === "CONNECT") { + core.tryClose(result.bodyRid); } else { - const bodyStream = readableStreamForRid(res.bodyRid); + const stream = readableStreamForRid(result.bodyRid); - signal?.addEventListener('abort', () => { - core.tryClose(res.bodyRid); + signal?.addEventListener("abort", () => { + core.tryClose(result.bodyRid); }); - response.body = bodyStream; + + response.body = stream; } } @@ -148,8 +137,8 @@ class UserWorker { const { servicePath, maybeEszip } = readyOptions; - if (!maybeEszip && (!servicePath || servicePath === '')) { - throw new TypeError('service path must be defined'); + if (!maybeEszip && (!servicePath || servicePath === "")) { + throw new TypeError("service path must be defined"); } const key = await op_user_worker_create(readyOptions); @@ -159,4 +148,5 @@ class UserWorker { } const SUPABASE_USER_WORKERS = UserWorker; + export { SUPABASE_USER_WORKERS };