Skip to content

Commit

Permalink
fix(sb_workers): don't propagate promise rejection on request body st…
Browse files Browse the repository at this point in the history
…reaming failure (#374)

* fix(sb_workers): don't propagate promise rejection on request body streaming failure

* stamp: oops

* stamp: add an integration test
  • Loading branch information
nyannyacha authored Jun 24, 2024
1 parent 853c9d8 commit 5657032
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 60 deletions.
59 changes: 49 additions & 10 deletions crates/base/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,7 @@ async fn test_file_upload_real_multipart_bytes() {
test_oak_file_upload(
Cow::Borrowed("./test_cases/main"),
(9.98 * MB as f32) as usize, // < 10MB (in binary)
None,
|resp| async {
let res = resp.unwrap();

Expand All @@ -731,16 +732,21 @@ async fn test_file_upload_real_multipart_bytes() {
#[tokio::test]
#[serial]
async fn test_file_upload_size_exceed() {
test_oak_file_upload(Cow::Borrowed("./test_cases/main"), 10 * MB, |resp| async {
let res = resp.unwrap();
test_oak_file_upload(
Cow::Borrowed("./test_cases/main"),
10 * MB,
None,
|resp| async {
let res = resp.unwrap();

assert_eq!(res.status().as_u16(), 500);
assert_eq!(res.status().as_u16(), 500);

let res = res.text().await;
let res = res.text().await;

assert!(res.is_ok());
assert_eq!(res.unwrap(), "Error!");
})
assert!(res.is_ok());
assert_eq!(res.unwrap(), "Error!");
},
)
.await;
}

Expand Down Expand Up @@ -1166,6 +1172,7 @@ async fn req_failure_case_op_cancel_from_server_due_to_cpu_resource_limit() {
test_oak_file_upload(
Cow::Borrowed("./test_cases/main_small_cpu_time"),
48 * MB,
None,
|resp| async {
let res = resp.unwrap();

Expand All @@ -1183,8 +1190,40 @@ async fn req_failure_case_op_cancel_from_server_due_to_cpu_resource_limit() {
.await;
}

async fn test_oak_file_upload<F, R>(main_service: Cow<'static, str>, bytes: usize, resp_callback: F)
where
#[tokio::test]
#[serial]
async fn req_failure_case_op_cancel_from_server_due_to_cpu_resource_limit_2() {
test_oak_file_upload(
Cow::Borrowed("./test_cases/main_small_cpu_time"),
1024 * 64,
Some("image/png"),
|resp| async {
let res = resp.unwrap();

assert_eq!(res.status().as_u16(), 500);

let res = res.json::<ErrorResponsePayload>().await;

assert!(res.is_ok());

let msg = res.unwrap().msg;

assert!(!msg.starts_with("TypeError: request body receiver not connected"));
assert_eq!(
msg,
"WorkerRequestCancelled: request has been cancelled by supervisor"
);
},
)
.await;
}

async fn test_oak_file_upload<F, R>(
main_service: Cow<'static, str>,
bytes: usize,
mime: Option<&str>,
resp_callback: F,
) where
F: FnOnce(Result<Response, reqwest::Error>) -> R,
R: Future<Output = ()>,
{
Expand All @@ -1199,7 +1238,7 @@ where
"meow",
Part::bytes(vec![0u8; bytes])
.file_name("meow.bin")
.mime_str("application/octet-stream")
.mime_str(mime.unwrap_or("application/octet-stream"))
.unwrap(),
),
)
Expand Down
90 changes: 40 additions & 50 deletions crates/sb_workers/user_workers.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
import { primordials, core } from "ext:core/mod.js";
import { readableStreamForRid, writableStreamForRid } from 'ext:deno_web/06_streams.js';
import { getSupabaseTag } from 'ext:sb_core_main_js/js/http.js';
import { readableStreamForRid, writableStreamForRid } from "ext:deno_web/06_streams.js";
import { getSupabaseTag } from "ext:sb_core_main_js/js/http.js";

const ops = core.ops;
const {
InterruptedPrototype,
} = core;
const {
TypeError,
ObjectPrototypeIsPrototypeOf,
StringPrototypeIncludes,
} = primordials;

const { TypeError } = primordials;

const {
op_user_worker_fetch_send,
op_user_worker_create,
Expand All @@ -33,11 +28,11 @@ class UserWorker {
this.key = key;
}

async fetch(req, opts = {}) {
const tag = getSupabaseTag(req);
async fetch(request, options = {}) {
const tag = getSupabaseTag(request);

const { method, url, headers, body, bodyUsed } = req;
const { signal } = opts;
const { method, url, headers, body, bodyUsed } = request;
const { signal } = options;

signal?.throwIfAborted();

Expand All @@ -60,62 +55,56 @@ class UserWorker {
);

// stream the request body
let reqBodyPromise = null;
let requestBodyPromise = null;

if (hasBody) {
let writableStream = writableStreamForRid(requestBodyRid);
reqBodyPromise = body.pipeTo(writableStream, { signal });
requestBodyPromise = body.pipeTo(writableStream, { signal });
}

const resPromise = op_user_worker_fetch_send(
const responsePromise = op_user_worker_fetch_send(
this.key,
requestRid,
requestBodyRid,
tag.streamRid,
tag.watcherRid
);

let [sent, res] = await Promise.allSettled([reqBodyPromise, resPromise]);

if (sent.status === "rejected") {
if (res.status === "fulfilled") {
res = res.value;
} else {
if (
ObjectPrototypeIsPrototypeOf(InterruptedPrototype, sent.reason) ||
StringPrototypeIncludes(sent.reason.message, "operation canceled")
) {
throw res.reason;
} else {
throw sent.reason;
}
}
} else if (res.status === "rejected") {
throw res.reason;
} else {
res = res.value;
const [requestBodyPromiseResult, responsePromiseResult] = await Promise.allSettled([
requestBodyPromise,
responsePromise
]);

if (requestBodyPromiseResult.status === "rejected") {
// console.warn(requestBodyPromiseResult.reason);
}

if (responsePromiseResult.status === "rejected") {
throw responsePromiseResult.reason;
}

const result = responsePromiseResult.value;
const response = {
headers: res.headers,
status: res.status,
statusText: res.statusText,
headers: result.headers,
status: result.status,
statusText: result.statusText,
body: null,
};

// TODO: add a test
if (nullBodyStatus(res.status) || redirectStatus(res.status)) {
core.close(res.bodyRid);
if (nullBodyStatus(result.status) || redirectStatus(result.status)) {
core.tryClose(result.bodyRid);
} else {
if (req.method === 'HEAD' || req.method === 'CONNECT') {
response.body = null;
core.close(res.bodyRid);
if (request.method === "HEAD" || request.method === "CONNECT") {
core.tryClose(result.bodyRid);
} else {
const bodyStream = readableStreamForRid(res.bodyRid);
const stream = readableStreamForRid(result.bodyRid);

signal?.addEventListener('abort', () => {
core.tryClose(res.bodyRid);
signal?.addEventListener("abort", () => {
core.tryClose(result.bodyRid);
});
response.body = bodyStream;

response.body = stream;
}
}

Expand Down Expand Up @@ -148,8 +137,8 @@ class UserWorker {

const { servicePath, maybeEszip } = readyOptions;

if (!maybeEszip && (!servicePath || servicePath === '')) {
throw new TypeError('service path must be defined');
if (!maybeEszip && (!servicePath || servicePath === "")) {
throw new TypeError("service path must be defined");
}

const key = await op_user_worker_create(readyOptions);
Expand All @@ -159,4 +148,5 @@ class UserWorker {
}

const SUPABASE_USER_WORKERS = UserWorker;

export { SUPABASE_USER_WORKERS };

0 comments on commit 5657032

Please sign in to comment.