diff --git a/Cargo.lock b/Cargo.lock index 61d2e7d7b6..9111a1905c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -218,7 +218,7 @@ version = "2.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ebcd09b382f40fcd159c2d695175b2ae620ffa5f3bd6f664131efff4e8b9e04a" dependencies = [ - "async-lock", + "async-lock 3.4.0", "blocking", "futures-lite", ] @@ -232,7 +232,7 @@ dependencies = [ "async-channel 2.5.0", "async-executor", "async-io", - "async-lock", + "async-lock 3.4.0", "blocking", "futures-lite", "once_cell", @@ -247,7 +247,7 @@ dependencies = [ "async-channel 2.5.0", "async-executor", "async-io", - "async-lock", + "async-lock 3.4.0", "blocking", "futures-lite", ] @@ -258,7 +258,7 @@ version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1237c0ae75a0f3765f58910ff9cdd0a12eeb39ab2f4c7de23262f337f0aacbb3" dependencies = [ - "async-lock", + "async-lock 3.4.0", "cfg-if", "concurrent-queue", "futures-io", @@ -271,6 +271,15 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "async-lock" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" +dependencies = [ + "event-listener 2.5.3", +] + [[package]] name = "async-lock" version = "3.4.0" @@ -301,7 +310,7 @@ checksum = "cde3f4e40e6021d7acffc90095cbd6dc54cb593903d1de5832f435eb274b85dc" dependencies = [ "async-channel 2.5.0", "async-io", - "async-lock", + "async-lock 3.4.0", "async-signal", "async-task", "blocking", @@ -319,7 +328,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7605a4e50d4b06df3898d5a70bf5fde51ed9059b0434b73105193bc27acce0d" dependencies = [ "async-io", - "async-lock", + "async-lock 3.4.0", "atomic-waker", "cfg-if", "futures-core", @@ -340,7 +349,7 @@ dependencies = [ "async-channel 1.9.0", "async-global-executor 2.4.1", "async-io", - "async-lock", + "async-lock 3.4.0", "crossbeam-utils", "futures-channel", "futures-core", @@ -754,10 +763,11 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.29" +version = "1.2.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c1599538de2394445747c8cf7935946e3cc27e9625f889d979bfb2aaf569362" +checksum = "739eb0f94557554b3ca9a86d2d37bebd49c5e6d0c1d2bda35ba5bdac830befc2" dependencies = [ + "find-msvc-tools", "jobserver", "libc", "shlex", @@ -951,6 +961,19 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "connect-test" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap", + "dotenvy", + "futures", + "sqlx", + "tokio", + "wasip3", +] + [[package]] name = "console" version = "0.15.11" @@ -1345,6 +1368,19 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "execute-query-test" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap", + "dotenvy", + "futures", + "sqlx", + "tokio", + "wasip3", +] + [[package]] name = "eyre" version = "0.6.12" @@ -1373,6 +1409,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "find-msvc-tools" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52051878f80a721bb68ebfbc930e07b65ba72f2da88968ea5c06fd6ca3d3a127" + [[package]] name = "float-cmp" version = "0.9.0" @@ -1455,6 +1497,7 @@ checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" dependencies = [ "futures-channel", "futures-core", + "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -1547,6 +1590,7 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ + "futures-channel", "futures-core", "futures-io", "futures-macro", @@ -1617,12 +1661,13 @@ dependencies = [ [[package]] name = "half" -version = "2.6.0" +version = "2.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "459196ed295495a68f7d7fe1d84f6c4b7ff0e21fe3017b2f283c6fac3ad803c9" +checksum = "6ea2d84b969582b4b1864a92dc5d27cd2b77b622a8d79306834f1be5ba20d84b" dependencies = [ "cfg-if", "crunchy", + "zerocopy", ] [[package]] @@ -1878,6 +1923,12 @@ dependencies = [ "zerovec", ] +[[package]] +name = "id-arena" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25a2bc672d1148e28034f176e01fffebb08b35768468cc954630da77a1449005" + [[package]] name = "ident_case" version = "1.0.1" @@ -1946,6 +1997,7 @@ checksum = "fe4cd85333e22411419a0bcae1297d25e58c9443848b11dc6a86fefe8c78a661" dependencies = [ "equivalent", "hashbrown 0.15.4", + "serde", ] [[package]] @@ -1982,13 +2034,13 @@ checksum = "cf370abdafd54d13e54a620e8c3e1145f28e46cc9d704bc6d94414559df41763" [[package]] name = "is-terminal" -version = "0.4.16" +version = "0.4.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e04d7f318608d35d4b61ddd75cbdaee86b023ebe2bd5a66ee0915f0bf93095a9" +checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46" dependencies = [ "hermit-abi", "libc", - "windows-sys 0.59.0", + "windows-sys 0.60.2", ] [[package]] @@ -2098,6 +2150,12 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" +[[package]] +name = "leb128fmt" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" + [[package]] name = "libc" version = "0.2.174" @@ -2122,9 +2180,9 @@ checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de" [[package]] name = "libredox" -version = "0.1.4" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1580801010e535496706ba011c15f8532df6b42297d2e471fec38ceadd8c0638" +checksum = "416f7e718bdb06000964960ffa43b4335ad4012ae8b99060261aa4a8088d5ccb" dependencies = [ "bitflags 2.9.1", "libc", @@ -2698,6 +2756,19 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "pool-crud-test" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap", + "dotenvy", + "futures", + "sqlx", + "tokio", + "wasip3", +] + [[package]] name = "portable-atomic" version = "1.11.1" @@ -2778,6 +2849,19 @@ dependencies = [ "termtree", ] +[[package]] +name = "prepared-query-test" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap", + "dotenvy", + "futures", + "sqlx", + "tokio", + "wasip3", +] + [[package]] name = "prettyplease" version = "0.2.35" @@ -2933,9 +3017,9 @@ dependencies = [ [[package]] name = "rayon" -version = "1.10.0" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" +checksum = "368f01d005bf8fd9b1206fb6fa653e6c4a81ceb1466406b81792d87c5677a58f" dependencies = [ "either", "rayon-core", @@ -2943,9 +3027,9 @@ dependencies = [ [[package]] name = "rayon-core" -version = "1.12.1" +version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +checksum = "22e18b0f0062d30d4230b2e85ff77fdfe4326feb054b9783a3460d8435c8ab91" dependencies = [ "crossbeam-deque", "crossbeam-utils", @@ -2953,9 +3037,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.5.13" +version = "0.5.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d04b7d0ee6b4a0207a0a7adb104d23ecb0b47d6beae7152d0fa34b692b29fd6" +checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" dependencies = [ "bitflags 2.9.1", ] @@ -3445,7 +3529,7 @@ dependencies = [ "async-executor", "async-fs", "async-io", - "async-lock", + "async-lock 3.4.0", "async-net", "async-process", "blocking", @@ -3490,6 +3574,7 @@ dependencies = [ "criterion", "dotenvy", "env_logger", + "futures", "futures-util", "hex", "libsqlite3-sys", @@ -3509,6 +3594,7 @@ dependencies = [ "tokio", "trybuild", "url", + "wasip3", ] [[package]] @@ -3542,6 +3628,7 @@ dependencies = [ "async-fs", "async-global-executor 3.1.0", "async-io", + "async-lock 2.8.0", "async-std", "async-task", "base64 0.22.1", @@ -3555,6 +3642,7 @@ dependencies = [ "crossbeam-queue", "either", "event-listener 5.4.0", + "futures", "futures-core", "futures-intrusive", "futures-io", @@ -3568,6 +3656,7 @@ dependencies = [ "mac_address", "memchr", "native-tls", + "once_cell", "percent-encoding", "rust_decimal", "rustls", @@ -3582,11 +3671,14 @@ dependencies = [ "time", "tokio", "tokio-stream", + "tokio-util", "toml", "tracing", "url", "uuid", + "wasip3", "webpki-roots 0.26.11", + "wit-bindgen", ] [[package]] @@ -3595,8 +3687,11 @@ version = "0.1.0" dependencies = [ "anyhow", "clap", + "dotenvy", + "futures", "sqlx", "tokio", + "wasip3", ] [[package]] @@ -4279,7 +4374,6 @@ dependencies = [ "io-uring", "libc", "mio 1.0.4", - "parking_lot", "pin-project-lite", "signal-hook-registry", "slab", @@ -4310,6 +4404,19 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-util" +version = "0.7.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14307c986784f72ef81c89db7d9e28d6ac26d16213b109ea501696195e6e3ce5" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + [[package]] name = "toml" version = "0.8.23" @@ -4549,6 +4656,12 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a1a07cc7db3810833284e8d372ccdc6da29741639ecc70c9ec107df0fa6154c" +[[package]] +name = "unicode-xid" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" + [[package]] name = "untrusted" version = "0.9.0" @@ -4698,6 +4811,15 @@ dependencies = [ "wit-bindgen-rt", ] +[[package]] +name = "wasip3" +version = "0.2.0+wasi-0.3.0-rc-2025-09-16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a33c5ffefc208bc11080d0e6a44e1807cbbd3fc67dafd20078fffb4598421e33" +dependencies = [ + "wit-bindgen", +] + [[package]] name = "wasite" version = "0.1.0" @@ -4775,6 +4897,40 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "wasm-encoder" +version = "0.239.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5be00faa2b4950c76fe618c409d2c3ea5a3c9422013e079482d78544bb2d184c" +dependencies = [ + "leb128fmt", + "wasmparser", +] + +[[package]] +name = "wasm-metadata" +version = "0.239.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20b3ec880a9ac69ccd92fbdbcf46ee833071cf09f82bb005b2327c7ae6025ae2" +dependencies = [ + "anyhow", + "indexmap 2.10.0", + "wasm-encoder", + "wasmparser", +] + +[[package]] +name = "wasmparser" +version = "0.239.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c9d90bb93e764f6beabf1d02028c70a2156a6583e63ac4218dd07ef733368b0" +dependencies = [ + "bitflags 2.9.1", + "hashbrown 0.15.4", + "indexmap 2.10.0", + "semver", +] + [[package]] name = "web-sys" version = "0.3.77" @@ -4817,11 +4973,10 @@ dependencies = [ [[package]] name = "whoami" -version = "1.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6994d13118ab492c3c80c1f81928718159254c53c472bf9ce36f8dae4add02a7" +version = "2.0.0-pre.3" +source = "git+https://github.com/Aditya1404Sal/whoami?branch=v2#ede57568756d4fa4297025529e9758624e96ea3b" dependencies = [ - "redox_syscall", + "libredox", "wasite", ] @@ -5145,6 +5300,29 @@ dependencies = [ "memchr", ] +[[package]] +name = "wit-bindgen" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" +dependencies = [ + "bitflags 2.9.1", + "futures", + "once_cell", + "wit-bindgen-rust-macro", +] + +[[package]] +name = "wit-bindgen-core" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cabd629f94da277abc739c71353397046401518efb2c707669f805205f0b9890" +dependencies = [ + "anyhow", + "heck 0.5.0", + "wit-parser", +] + [[package]] name = "wit-bindgen-rt" version = "0.39.0" @@ -5154,6 +5332,74 @@ dependencies = [ "bitflags 2.9.1", ] +[[package]] +name = "wit-bindgen-rust" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a4232e841089fa5f3c4fc732a92e1c74e1a3958db3b12f1de5934da2027f1f4" +dependencies = [ + "anyhow", + "heck 0.5.0", + "indexmap 2.10.0", + "prettyplease", + "syn 2.0.104", + "wasm-metadata", + "wit-bindgen-core", + "wit-component", +] + +[[package]] +name = "wit-bindgen-rust-macro" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e0d4698c2913d8d9c2b220d116409c3f51a7aa8d7765151b886918367179ee9" +dependencies = [ + "anyhow", + "prettyplease", + "proc-macro2", + "quote", + "syn 2.0.104", + "wit-bindgen-core", + "wit-bindgen-rust", +] + +[[package]] +name = "wit-component" +version = "0.239.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88a866b19dba2c94d706ec58c92a4c62ab63e482b4c935d2a085ac94caecb136" +dependencies = [ + "anyhow", + "bitflags 2.9.1", + "indexmap 2.10.0", + "log", + "serde", + "serde_derive", + "serde_json", + "wasm-encoder", + "wasm-metadata", + "wasmparser", + "wit-parser", +] + +[[package]] +name = "wit-parser" +version = "0.239.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55c92c939d667b7bf0c6bf2d1f67196529758f99a2a45a3355cc56964fd5315d" +dependencies = [ + "anyhow", + "id-arena", + "indexmap 2.10.0", + "log", + "semver", + "serde", + "serde_derive", + "serde_json", + "unicode-xid", + "wasmparser", +] + [[package]] name = "writeable" version = "0.6.1" diff --git a/Cargo.toml b/Cargo.toml index b24b59cfa0..c8c88a84ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,10 @@ members = [ "examples/postgres/transaction", "examples/sqlite/todos", "examples/sqlite/extension", + "tests/mysql/wasm-components/connect-test", + "tests/mysql/wasm-components/execute-query-test", + "tests/mysql/wasm-components/pool-crud-test", + "tests/mysql/wasm-components/prepared-query-test", ] [workspace.package] @@ -186,10 +190,12 @@ mac_address = "1.1.5" rust_decimal = { version = "1.26.1", default-features = false, features = ["std"] } time = { version = "0.3.36", features = ["formatting", "parsing", "macros"] } uuid = "1.1.2" - +tokio-util = { version = "*" } # Common utility crates cfg-if = "1.0.0" dotenvy = { version = "0.15.0", default-features = false } +wasip3 ={ version = "0.2.0" ,default-features = false } +wit-bindgen = { version = "0.46", default-features = false } # Runtimes [workspace.dependencies.async-global-executor] @@ -206,7 +212,7 @@ default-features = false [workspace.dependencies.tokio] version = "1" -features = ["time", "net", "sync", "fs", "io-util", "rt"] +features = ["time", "sync", "io-util", "rt"] default-features = false [dependencies] @@ -222,8 +228,10 @@ anyhow = "1.0.52" time_ = { version = "0.3.2", package = "time" } futures-util = { version = "0.3.19", default-features = false, features = ["alloc"] } env_logger = "0.11" -async-std = { workspace = true, features = ["attributes"] } -tokio = { version = "1.15.0", features = ["full"] } +futures = "0.3" +wasip3 = "0.2.0" +#async-std = { workspace = true, features = ["attributes"] } +tokio = { version = "1.15.0", features = ["sync", "macros", "io-util", "rt", "time"] } dotenvy = "0.15.0" trybuild = "1.0.53" sqlx-test = { path = "./sqlx-test" } @@ -234,9 +242,18 @@ url = "2.2.2" rand = "0.8.4" rand_xoshiro = "0.6.0" hex = "0.4.3" +#tempfile = "3.10.1" +#criterion = { version = "0.5.1", features = ["async_tokio"] } +libsqlite3-sys = { version = "0.30.1" } + +# Dev dependencies that should only be enabled when NOT targeting wasm32. +[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] +# Enable tempfile for non-wasm tests tempfile = "3.10.1" +# async-std is required for some async tests (only for non-wasm targets) +async-std = { workspace = true, features = ["attributes"] } +# criterion benchmarks (only for non-wasm targets) criterion = { version = "0.5.1", features = ["async_tokio"] } -libsqlite3-sys = { version = "0.30.1" } # If this is an unconditional dev-dependency then Cargo will *always* try to build `libsqlite3-sys`, # even when SQLite isn't the intended test target, and fail if the build environment is not set up for compiling C code. @@ -253,6 +270,9 @@ cast_sign_loss = 'deny' # See `clippy.toml` disallowed_methods = 'deny' +[patch.crates-io] +whoami = { git = "https://github.com/Aditya1404Sal/whoami", branch = "v2" } + [lints.rust.unexpected_cfgs] level = 'warn' @@ -360,6 +380,10 @@ name = "mysql" path = "tests/mysql/mysql.rs" required-features = ["mysql"] +[[test]] +name = "wasi_integration_test" +path = "tests/mysql/wasi_integration_test.rs" + [[test]] name = "mysql-types" path = "tests/mysql/types.rs" diff --git a/examples/mysql/todos/Cargo.toml b/examples/mysql/todos/Cargo.toml index db8c677980..d3f060bcf9 100644 --- a/examples/mysql/todos/Cargo.toml +++ b/examples/mysql/todos/Cargo.toml @@ -4,8 +4,17 @@ version = "0.1.0" edition = "2021" workspace = "../../../" +[lib] +crate-type = ["cdylib"] + [dependencies] anyhow = "1.0" -sqlx = { path = "../../../", features = [ "mysql", "runtime-tokio", "tls-native-tls" ] } +futures = "0.3" +sqlx = { path = "../../../", features = [ "mysql", "runtime-tokio" ] } clap = { version = "4", features = ["derive"] } -tokio = { version = "1.20.0", features = ["rt", "macros"]} +tokio = { version = "1.20.0", features = ["rt"]} +dotenvy = "0.15.0" +wasip3 = "0.2.0+wasi-0.3.0-rc-2025-09-16" + +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +sqlx = { path = "../../../", features = ["tls-native-tls"] } \ No newline at end of file diff --git a/examples/mysql/todos/src/main.rs b/examples/mysql/todos/src/lib.rs similarity index 54% rename from examples/mysql/todos/src/main.rs rename to examples/mysql/todos/src/lib.rs index 7d2a8a1530..6d6a342447 100644 --- a/examples/mysql/todos/src/main.rs +++ b/examples/mysql/todos/src/lib.rs @@ -14,27 +14,26 @@ enum Command { Done { id: u64 }, } -#[tokio::main(flavor = "current_thread")] -async fn main() -> anyhow::Result<()> { - let args = Args::parse(); +async fn run() -> anyhow::Result<()> { + let args = Args::parse_from(wasip3::cli::environment::get_arguments()); let pool = MySqlPool::connect(&env::var("DATABASE_URL")?).await?; match args.cmd { Some(Command::Add { description }) => { - println!("Adding new todo with description '{description}'"); + eprintln!("Adding new todo with description '{description}'"); let todo_id = add_todo(&pool, description).await?; - println!("Added new todo with id {todo_id}"); + eprintln!("Added new todo with id {todo_id}"); } Some(Command::Done { id }) => { - println!("Marking todo {id} as done"); + eprintln!("Marking todo {id} as done"); if complete_todo(&pool, id).await? { - println!("Todo {id} is marked as done"); + eprintln!("Todo {id} is marked as done"); } else { - println!("Invalid id {id}"); + eprintln!("Invalid id {id}"); } } None => { - println!("Printing list of all todos"); + eprintln!("Printing list of all todos"); list_todos(&pool).await?; } } @@ -43,7 +42,6 @@ async fn main() -> anyhow::Result<()> { } async fn add_todo(pool: &MySqlPool, description: String) -> anyhow::Result { - // Insert the task, then obtain the ID of this row let todo_id = sqlx::query!( r#" INSERT INTO todos ( description ) @@ -85,10 +83,8 @@ ORDER BY id .fetch_all(pool) .await?; - // NOTE: Booleans in MySQL are stored as `TINYINT(1)` / `i8` - // 0 = false, non-0 = true for rec in recs { - println!( + eprintln!( "- [{}] {}: {}", if rec.done != 0 { "x" } else { " " }, rec.id, @@ -98,3 +94,31 @@ ORDER BY id Ok(()) } + +wasip3::cli::command::export!(Component); + +struct Component; + +impl wasip3::exports::cli::run::Guest for Component { + async fn run() -> Result<(), ()> { + tokio::task::LocalSet::new() + .run_until(async { + if let Err(err) = run().await { + let (mut tx, rx) = wasip3::wit_stream::new(); + + futures::join!( + async { wasip3::cli::stderr::write_via_stream(rx).await.unwrap() }, + async { + let remaining = tx.write_all(format!("{err:#}\n").into_bytes()).await; + assert!(remaining.is_empty()); + drop(tx); + } + ); + Err(()) + } else { + Ok(()) + } + }) + .await + } +} diff --git a/sqlx-core/Cargo.toml b/sqlx-core/Cargo.toml index 58c5b67e05..4b3ed48cea 100644 --- a/sqlx-core/Cargo.toml +++ b/sqlx-core/Cargo.toml @@ -50,7 +50,10 @@ _unstable-doc = ["sqlx-toml"] async-global-executor = { workspace = true, optional = true } async-std = { workspace = true, optional = true } smol = { workspace = true, optional = true } -tokio = { workspace = true, optional = true } +tokio = { workspace = true, optional = true, features = ["sync","rt"]} +tokio-util = { workspace = true } +async-lock = "2.5.0" +once_cell = "1.21.3" # TLS native-tls = { version = "0.2.10", optional = true } @@ -81,6 +84,7 @@ chrono = { version = "0.4.34", default-features = false, features = ["clock"], o crc = { version = "3", optional = true } crossbeam-queue = "0.3.2" either = "1.6.1" +futures = "0.3" futures-core = { version = "0.3.19", default-features = false } futures-io = "0.3.24" futures-intrusive = "0.5.0" @@ -94,7 +98,7 @@ toml = { version = "0.8.16", optional = true } sha2 = { version = "0.10.0", default-features = false, optional = true } #sqlformat = "0.2.0" thiserror = "2.0.0" -tokio-stream = { version = "0.1.8", features = ["fs"], optional = true } +tokio-stream = { version = "0.1.8", optional = true } tracing = { version = "0.1.37", features = ["log"] } smallvec = "1.7.0" url = { version = "2.2.2" } @@ -102,10 +106,20 @@ bstr = { version = "1.0", default-features = false, features = ["std"], optional hashlink = "0.10.0" indexmap = "2.0" event-listener = "5.2.0" -hashbrown = "0.15.0" +hashbrown = { version = "0.15.0", default-features = false } +[target.'cfg(target_arch = "wasm32")'.dependencies] +tokio = { workspace = true, features = ["sync", "rt"] } +tokio-util = { workspace = true } +wasip3 = { workspace = true } +wit-bindgen = { workspace = true, optional = true } + +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +tokio = { workspace = true, features = ["net", "fs"], optional = true } +tokio-stream = { version = "0.1.8", features = ["fs"], optional = true } + [dev-dependencies] -sqlx = { workspace = true, features = ["postgres", "sqlite", "mysql", "migrate", "macros", "time", "uuid"] } +sqlx = { workspace = true, features = ["mysql", "migrate", "macros", "time", "uuid"] } tokio = { version = "1", features = ["rt"] } [lints] diff --git a/sqlx-core/src/fs.rs b/sqlx-core/src/fs.rs index 0993cbeec6..e4b0c097bd 100644 --- a/sqlx-core/src/fs.rs +++ b/sqlx-core/src/fs.rs @@ -6,6 +6,7 @@ use std::path::{Path, PathBuf}; use crate::rt; pub struct ReadDir { + #[cfg(not(target_arch = "wasm32"))] inner: Option, } @@ -23,34 +24,77 @@ pub struct DirEntry { pub async fn read>(path: P) -> io::Result> { let path = PathBuf::from(path.as_ref()); - rt::spawn_blocking(move || std::fs::read(path)).await + #[cfg(not(target_arch = "wasm32"))] + { + rt::spawn_blocking(move || std::fs::read(path)).await + } + #[cfg(target_arch = "wasm32")] + { + crate::wasm::fs::read(path).await + } } pub async fn read_to_string>(path: P) -> io::Result { let path = PathBuf::from(path.as_ref()); - rt::spawn_blocking(move || std::fs::read_to_string(path)).await + #[cfg(not(target_arch = "wasm32"))] + { + rt::spawn_blocking(move || std::fs::read_to_string(path)).await + } + #[cfg(target_arch = "wasm32")] + { + crate::wasm::fs::read_to_string(path).await + } } pub async fn create_dir_all>(path: P) -> io::Result<()> { let path = PathBuf::from(path.as_ref()); - rt::spawn_blocking(move || std::fs::create_dir_all(path)).await + #[cfg(not(target_arch = "wasm32"))] + { + rt::spawn_blocking(move || std::fs::create_dir_all(path)).await + } + #[cfg(target_arch = "wasm32")] + { + crate::wasm::fs::create_dir_all(path).await + } } pub async fn remove_file>(path: P) -> io::Result<()> { let path = PathBuf::from(path.as_ref()); - rt::spawn_blocking(move || std::fs::remove_file(path)).await + #[cfg(not(target_arch = "wasm32"))] + { + rt::spawn_blocking(move || std::fs::remove_file(path)).await + } + #[cfg(target_arch = "wasm32")] + { + crate::wasm::fs::remove_file(path).await + } } pub async fn remove_dir>(path: P) -> io::Result<()> { let path = PathBuf::from(path.as_ref()); - rt::spawn_blocking(move || std::fs::remove_dir(path)).await + #[cfg(not(target_arch = "wasm32"))] + { + rt::spawn_blocking(move || std::fs::remove_dir(path)).await + } + #[cfg(target_arch = "wasm32")] + { + crate::wasm::fs::remove_dir(path).await + } } pub async fn remove_dir_all>(path: P) -> io::Result<()> { let path = PathBuf::from(path.as_ref()); - rt::spawn_blocking(move || std::fs::remove_dir_all(path)).await + #[cfg(not(target_arch = "wasm32"))] + { + rt::spawn_blocking(move || std::fs::remove_dir_all(path)).await + } + #[cfg(target_arch = "wasm32")] + { + crate::wasm::fs::remove_dir_all(path).await + } } +#[cfg(not(target_arch = "wasm32"))] pub async fn read_dir(path: PathBuf) -> io::Result { let read_dir = rt::spawn_blocking(move || std::fs::read_dir(path)).await?; @@ -59,38 +103,48 @@ pub async fn read_dir(path: PathBuf) -> io::Result { }) } +#[cfg(target_arch = "wasm32")] +pub async fn read_dir(path: PathBuf) -> io::Result { + crate::wasm::fs::read_dir(path).await +} + impl ReadDir { pub async fn next(&mut self) -> io::Result> { - if let Some(mut read_dir) = self.inner.take() { - let maybe = rt::spawn_blocking(move || { - let entry = read_dir.next().transpose()?; - - entry - .map(|entry| -> io::Result<_> { - Ok(( - read_dir, - DirEntry { - path: entry.path(), - file_name: entry.file_name(), - // We always want the metadata as well so might as well fetch - // it in the same blocking call. - metadata: entry.metadata()?, - }, - )) - }) - .transpose() - }) - .await?; - - match maybe { - Some((read_dir, entry)) => { - self.inner = Some(read_dir); - Ok(Some(entry)) + #[cfg(not(target_arch = "wasm32"))] + { + if let Some(mut read_dir) = self.inner.take() { + let maybe = rt::spawn_blocking(move || { + let entry = read_dir.next().transpose()?; + + entry + .map(|entry| -> io::Result<_> { + Ok(( + read_dir, + DirEntry { + path: entry.path(), + file_name: entry.file_name(), + // We always want the metadata as well so might as well fetch + // it in the same blocking call. + metadata: entry.metadata()?, + }, + )) + }) + .transpose() + }) + .await?; + + match maybe { + Some((read_dir, entry)) => { + self.inner = Some(read_dir); + Ok(Some(entry)) + } + None => Ok(None), } - None => Ok(None), + } else { + Ok(None) } - } else { - Ok(None) } + #[cfg(target_arch = "wasm32")] + crate::wasm::fs::next(self).await } } diff --git a/sqlx-core/src/lib.rs b/sqlx-core/src/lib.rs index 494c41e9bf..1c562e14d9 100644 --- a/sqlx-core/src/lib.rs +++ b/sqlx-core/src/lib.rs @@ -22,6 +22,8 @@ // #![cfg_attr(docsrs, feature(doc_cfg))] +pub mod wasm; + #[macro_use] pub mod ext; diff --git a/sqlx-core/src/migrate/mod.rs b/sqlx-core/src/migrate/mod.rs index 39347cf421..50d405b521 100644 --- a/sqlx-core/src/migrate/mod.rs +++ b/sqlx-core/src/migrate/mod.rs @@ -13,5 +13,9 @@ pub use migration_type::MigrationType; pub use migrator::Migrator; pub use source::{MigrationSource, ResolveConfig, ResolveWith}; +#[cfg(target_arch = "wasm32")] +#[doc(hidden)] +pub use source::resolve; +#[cfg(not(target_arch = "wasm32"))] #[doc(hidden)] pub use source::{resolve_blocking, resolve_blocking_with_config}; diff --git a/sqlx-core/src/migrate/source.rs b/sqlx-core/src/migrate/source.rs index 4648e53f1e..05eb6c106f 100644 --- a/sqlx-core/src/migrate/source.rs +++ b/sqlx-core/src/migrate/source.rs @@ -6,7 +6,6 @@ use futures_core::future::BoxFuture; use std::borrow::Cow; use std::collections::BTreeSet; use std::fmt::Debug; -use std::fs; use std::io; use std::path::{Path, PathBuf}; @@ -38,20 +37,15 @@ impl<'s> MigrationSource<'s> for &'s Path { impl MigrationSource<'static> for PathBuf { fn resolve(self) -> BoxFuture<'static, Result, BoxDynError>> { - // Technically this could just be `Box::pin(spawn_blocking(...))` - // but that would actually be a breaking behavior change because it would call - // `spawn_blocking()` on the current thread - Box::pin(async move { - crate::rt::spawn_blocking(move || { - let migrations_with_paths = resolve_blocking(&self)?; - - Ok(migrations_with_paths.into_iter().map(|(m, _p)| m).collect()) - }) - .await - }) + Box::pin(async move { self.as_path().resolve().await }) } } +#[cfg(target_arch = "wasm32")] +pub async fn resolve(path: &PathBuf) -> Result, ResolveError> { + todo!(); +} + /// A [`MigrationSource`] implementation with configurable resolution. /// /// `S` may be `PathBuf`, `&Path` or any type that implements `Into`. @@ -65,11 +59,12 @@ impl<'s, S: Debug + Into + Send + 's> MigrationSource<'s> for ResolveWi Box::pin(async move { let path = self.0.into(); let config = self.1; - + #[cfg(not(target_arch = "wasm32"))] let migrations_with_paths = crate::rt::spawn_blocking(move || resolve_blocking_with_config(&path, &config)) .await?; - + #[cfg(target_arch = "wasm32")] + let migrations_with_paths = resolve(&path).await?; Ok(migrations_with_paths.into_iter().map(|(m, _p)| m).collect()) }) } @@ -148,6 +143,7 @@ impl ResolveConfig { // FIXME: paths should just be part of `Migration` but we can't add a field backwards compatibly // since it's `#[non_exhaustive]`. #[doc(hidden)] +#[cfg(not(target_arch = "wasm32"))] pub fn resolve_blocking(path: &Path) -> Result, ResolveError> { resolve_blocking_with_config(path, &ResolveConfig::new()) } @@ -161,7 +157,7 @@ pub fn resolve_blocking_with_config( message: format!("error canonicalizing path {}", path.display()), source: Some(e), })?; - + use std::fs; let s = fs::read_dir(&path).map_err(|e| ResolveError { message: format!("error reading migration directory {}", path.display()), source: Some(e), diff --git a/sqlx-core/src/net/socket/mod.rs b/sqlx-core/src/net/socket/mod.rs index 0f9aae61b4..1d90f4312a 100644 --- a/sqlx-core/src/net/socket/mod.rs +++ b/sqlx-core/src/net/socket/mod.rs @@ -186,7 +186,13 @@ pub async fn connect_tcp( port: u16, with_socket: Ws, ) -> crate::Result { - #[cfg(feature = "_rt-tokio")] + #[cfg(all(feature = "_rt-tokio", target_arch = "wasm32"))] + { + let res = crate::rt::rt_wasip3::connect_tcp(host, port, with_socket).await; + return res; + } + + #[cfg(all(feature = "_rt-tokio", not(target_arch = "wasm32")))] if crate::rt::rt_tokio::available() { return Ok(with_socket .with_socket(tokio::net::TcpStream::connect((host, port)).await?) @@ -258,6 +264,10 @@ pub async fn connect_uds, Ws: WithSocket>( ) -> crate::Result { #[cfg(unix)] { + #[cfg(target_arch = "wasm32")] + { + todo!("outer socket impl") + } #[cfg(feature = "_rt-tokio")] if crate::rt::rt_tokio::available() { use tokio::net::UnixStream; diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index b698dc9df0..bf6cbfa8c5 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -5,6 +5,7 @@ use crate::database::Database; use crate::error::Error; use crate::pool::{deadline_as_timeout, CloseEvent, Pool, PoolOptions}; use crossbeam_queue::ArrayQueue; +use log::debug; use crate::sync::{AsyncSemaphore, AsyncSemaphoreReleaser}; @@ -347,7 +348,19 @@ impl PoolInner { // result here is `Result, TimeoutError>` // if this block does not return, sleep for the backoff timeout and try again - match crate::rt::timeout(timeout, connect_options.connect()).await { + + let res = crate::rt::timeout(timeout, connect_options.connect()).await; + if let Ok(Ok(_)) = &res { + debug!("pool: connect attempt succeeded"); + } else if let Ok(Err(e)) = &res { + debug!("pool: connect attempt returned error: {:?}", e); + } else if res.is_err() { + debug!( + "pool: connect attempt timed out after {}ms", + timeout.as_millis() + ); + } + match res { // successfully established connection Ok(Ok(mut raw)) => { // See comment on `PoolOptions::after_connect` diff --git a/sqlx-core/src/rt/mod.rs b/sqlx-core/src/rt/mod.rs index 273a1bfcd9..963fcb6ee4 100644 --- a/sqlx-core/src/rt/mod.rs +++ b/sqlx-core/src/rt/mod.rs @@ -12,6 +12,12 @@ pub mod rt_async_io; #[cfg(feature = "_rt-tokio")] pub mod rt_tokio; +#[cfg(target_arch = "wasm32")] +pub mod rt_wasip3; + +#[cfg(target_arch = "wasm32")] +pub mod wasm_worker; + #[derive(Debug, thiserror::Error)] #[error("operation timed out")] pub struct TimeoutError; @@ -20,7 +26,7 @@ pub enum JoinHandle { #[cfg(feature = "_rt-async-std")] AsyncStd(async_std::task::JoinHandle), - #[cfg(feature = "_rt-tokio")] + #[cfg(any(feature = "_rt-tokio", target_arch = "wasm32"))] Tokio(tokio::task::JoinHandle), // Implementation shared by `smol` and `async-global-executor` @@ -35,6 +41,30 @@ pub async fn timeout(duration: Duration, f: F) -> Result Ok(res), + _ = timer => Err(TimeoutError), + }; + } + + // Native: if Tokio is enabled and a handle is available, delegate to it. #[cfg(feature = "_rt-tokio")] if rt_tokio::available() { return tokio::time::timeout(duration, f) @@ -52,6 +82,14 @@ pub async fn timeout(duration: Duration, f: F) -> Result(fut: F) -> JoinHandle where @@ -90,6 +129,17 @@ where } } +#[cfg(target_arch = "wasm32")] +#[track_caller] +pub fn spawn(fut: F) -> JoinHandle +where + F: Future + 'static, + F::Output: 'static, +{ + JoinHandle::Tokio(tokio::task::spawn_local(fut)) +} + +#[cfg(not(target_arch = "wasm32"))] #[track_caller] pub fn spawn_blocking(f: F) -> JoinHandle where @@ -101,17 +151,13 @@ where return JoinHandle::Tokio(handle.spawn_blocking(f)); } - cfg_if! { - if #[cfg(feature = "_rt-async-global-executor")] { - JoinHandle::AsyncTask(Some(async_global_executor::spawn_blocking(f))) - } else if #[cfg(feature = "_rt-smol")] { - JoinHandle::AsyncTask(Some(smol::unblock(f))) - } else if #[cfg(feature = "_rt-async-std")] { - JoinHandle::AsyncStd(async_std::task::spawn_blocking(f)) - } else { - missing_rt(f) - } + #[cfg(feature = "_rt-async-std")] + { + JoinHandle::AsyncStd(async_std::task::spawn_blocking(f)) } + + #[cfg(not(feature = "_rt-async-std"))] + missing_rt(f) } pub async fn yield_now() { @@ -145,18 +191,43 @@ pub async fn yield_now() { #[track_caller] pub fn test_block_on(f: F) -> F::Output { - cfg_if! { - if #[cfg(feature = "_rt-async-io")] { - async_io::block_on(f) - } else if #[cfg(feature = "_rt-tokio")] { - tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .expect("failed to start Tokio runtime") - .block_on(f) - } else { - missing_rt(f) - } + #[cfg(feature = "_rt-async-io")] + { + return async_io::block_on(f); + } + + #[cfg(target_arch = "wasm32")] + { + // Use futures::executor::block_on for WASM + return futures::executor::block_on(f); + } + + #[cfg(feature = "_rt-tokio")] + { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("failed to start Tokio runtime"); + return rt.block_on(f); + } + + #[cfg(all( + feature = "_rt-async-std", + not(feature = "_rt-async-io"), + not(any(feature = "_rt-tokio", target_arch = "wasm32")) + ))] + { + return async_std::task::block_on(f); + } + + #[cfg(not(any( + feature = "_rt-async-io", + feature = "_rt-async-std", + feature = "_rt-tokio", + target_arch = "wasm32", + )))] + { + missing_rt(f) } } @@ -184,7 +255,7 @@ impl Future for JoinHandle { .expect("BUG: task taken") .poll(cx), - #[cfg(feature = "_rt-tokio")] + #[cfg(any(feature = "_rt-tokio", target_arch = "wasm32"))] Self::Tokio(handle) => Pin::new(handle) .poll(cx) .map(|res| res.expect("spawned task panicked")), diff --git a/sqlx-core/src/rt/rt_tokio/mod.rs b/sqlx-core/src/rt/rt_tokio/mod.rs index ce699456db..b22a4453a7 100644 --- a/sqlx-core/src/rt/rt_tokio/mod.rs +++ b/sqlx-core/src/rt/rt_tokio/mod.rs @@ -1,3 +1,4 @@ +#[cfg(not(target_arch = "wasm32"))] mod socket; pub fn available() -> bool { diff --git a/sqlx-core/src/rt/rt_wasip3/mod.rs b/sqlx-core/src/rt/rt_wasip3/mod.rs new file mode 100644 index 0000000000..bb9c5d6ac3 --- /dev/null +++ b/sqlx-core/src/rt/rt_wasip3/mod.rs @@ -0,0 +1,666 @@ +use bytes::BytesMut; +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; +use std::sync::Arc; +//use wasip3::sockets::types::IpSocketAddress; +use core::task::Waker; +use futures_util::future::{AbortHandle, Abortable}; +use futures_util::stream::StreamExt as _; +use tokio::sync::mpsc; +use wasip3::wit_bindgen::rt::async_support; +use wasip3::wit_bindgen::rt::async_support::futures::channel::oneshot; + +use crate::net::WithSocket; +use tracing::debug; + +mod socket; + +pub struct JoinHandle { + rx: oneshot::Receiver, +} + +impl Future for JoinHandle { + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match Pin::new(&mut self.rx).poll(cx) { + Poll::Ready(Ok(v)) => Poll::Ready(v), + Poll::Ready(Err(oneshot::Canceled)) => panic!("wasip3 JoinHandle canceled"), + Poll::Pending => Poll::Pending, + } + } +} + +pub fn spawn(fut: impl Future + 'static) -> JoinHandle { + let (tx, rx) = oneshot::channel(); + async_support::spawn(async move { + let v = fut.await; + _ = tx.send(v); + }); + JoinHandle { rx } +} + +// A tiny poll-aware sender shim backed by `futures::channel::mpsc::Sender`. +// This provides the minimal API `socket.rs` expects: `try_send`, `get_ref` and +// `poll_reserve`. +pub struct WasiPollSender { + inner: Option>, +} + +impl WasiPollSender { + pub fn new(s: mpsc::Sender) -> Self { + Self { inner: Some(s) } + } + + pub fn get_ref(&self) -> Option<&mpsc::Sender> { + // Note: inner holds a `tokio::sync::mpsc::Sender` stored as a + // `Option>` (type alias imported above). Return a + // reference to it if present. + self.inner.as_ref() + } + + pub fn try_send(&self, item: T) -> Result<(), ()> { + if let Some(s) = &self.inner { + s.try_send(item).map_err(|_| ()) + } else { + Err(()) + } + } + + pub fn poll_reserve(&self, cx: &mut Context<'_>) -> Poll> { + // There's no exact `poll_reserve` equivalent in futures mpsc. We emulate + // it by checking if `poll_ready` would be `Ready` by attempting to + // reserve via a short-lived future that yields `Ready` when the sink + // can accept an item. For simplicity, we attempt a non-allocating + // check: futures mpsc provides `poll_ready` on the Sink trait but + // that's not directly available here. As a pragmatic approach, treat + // the sender as always ready and return Pending only if the channel + // is closed. + if self.inner.is_some() { + Poll::Ready(Ok(())) + } else { + Poll::Ready(Err(())) + } + } +} + +pub struct TcpSocket { + pub tx: WasiPollSender>, + pub rx: mpsc::Receiver>, + pub buf: BytesMut, + // Abort handle for the background task spawned with `async_support::spawn`. + pub abort_handle: AbortHandle, +} + +impl Drop for TcpSocket { + fn drop(&mut self) { + // Abort the background task if it's still running. + self.abort_handle.abort(); + } +} + +pub async fn connect_tcp( + _host: &str, + port: u16, + with_socket: Ws, +) -> crate::Result { + // address resolution requires additional processing + // let addresses = wasip3::sockets::ip_name_lookup::resolve_addresses(host.to_string()) + // .await + // .map_err(|e| { + // crate::Error::Io(std::io::Error::new( + // std::io::ErrorKind::Other, + // format!("DNS failed: {:?}", e), + // )) + // })?; + + // let ip = addresses.into_iter().next().ok_or_else(|| { + // crate::Error::Io(std::io::Error::new( + // std::io::ErrorKind::Other, + // "No addresses found", + // )) + // })?; + + // let addr = match ip { + // wasip3::sockets::types::IpAddress::Ipv4(ipv4) => { + // IpSocketAddress::Ipv4(wasip3::sockets::types::Ipv4SocketAddress { + // address: ipv4, + // port, + // }) + // } + // wasip3::sockets::types::IpAddress::Ipv6(ipv6) => { + // IpSocketAddress::Ipv6(wasip3::sockets::types::Ipv6SocketAddress { + // address: ipv6, + // port, + // flow_info: 0, + // scope_id: 0, + // }) + // } + // }; + debug!("wasip3: creating tcp socket for port {}", port); + let sock = + wasip3::sockets::types::TcpSocket::create(wasip3::sockets::types::IpAddressFamily::Ipv4) + .expect("failed to create TCP socket"); + debug!("wasip3: created tcp socket for port {}", port); + sock.connect(wasip3::sockets::types::IpSocketAddress::Ipv4( + wasip3::sockets::types::Ipv4SocketAddress { + address: (127, 0, 0, 1), + port, + }, + )) + .await + .map_err(|e| { + debug!("wasip3: connect failed: {:?}", e); + e + }) + .expect(&format!("failed to connect to 127.0.0.1:{port}")); + + // explicit channel item types so the compiler can infer types used below + let (rx_tx, rx_rx) = mpsc::channel::>(1); + let (tx_tx, mut tx_rx) = mpsc::channel::>(1); + let (mut send_tx, send_rx) = wasip3::wit_stream::new(); + debug!("wasip3: created wit_stream for send/recv"); + let (mut recv_rx, recv_fut) = sock.receive(); + + // Spawn a background task using the wasip3 async runtime and make it abortable. + let (abort_handle, abort_registration) = AbortHandle::new_pair(); + // Give the wasip3 scheduler a quick yield before spawning the background + // task. Use the host-aware `yield_async` so spawned tasks are eligible to + // be polled promptly by the local runtime. + async_support::yield_async().await; + let background = Abortable::new( + async move { + let sock = Arc::new(sock); + debug!("wasip3: background task starting; sock arc cloned"); + + let (ready_tx, ready_rx) = oneshot::channel(); + let spawn_ts = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_millis()) + .unwrap_or_default(); + debug!("wasip3: spawning sock.send task at {}ms", spawn_ts); + + async_support::spawn({ + let sock = Arc::clone(&sock); + async move { + let start_ts = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_millis()) + .unwrap_or_default(); + debug!("wasip3: sock.send task started at {}ms", start_ts); + let fut = sock.send(send_rx); + let sig_ts = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_millis()) + .unwrap_or_default(); + _ = ready_tx.send(()); + debug!("wasip3: sock.send signalled ready at {}ms", sig_ts); + match fut.await { + Ok(_) => { + let done_ts = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_millis()) + .unwrap_or_default(); + debug!("wasip3: sock.send completed at {}ms", done_ts); + } + Err(e) => { + let err_ts = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_millis()) + .unwrap_or_default(); + debug!("wasip3: sock.send error at {}ms: {:?}", err_ts, e); + } + } + drop(sock); + } + }); + // Yield after spawning the send task so the runtime can poll it. + async_support::yield_async().await; + async_support::spawn({ + let sock = Arc::clone(&sock); + async move { + let start_ts = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_millis()) + .unwrap_or_default(); + debug!("wasip3: recv_fut task started at {}ms", start_ts); + match recv_fut.await { + Ok(_) => { + let done_ts = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_millis()) + .unwrap_or_default(); + debug!("wasip3: recv_fut completed at {}ms", done_ts); + } + Err(e) => { + let err_ts = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_millis()) + .unwrap_or_default(); + debug!("wasip3: recv_fut error at {}ms: {:?}", err_ts, e); + } + } + drop(sock); + } + }); + // Yield to the wasip3 scheduler to give the spawned tasks a chance + // to be polled immediately. Without this yield the local runtime + // may not poll newly spawned tasks until the current task yields, + // which can cause head-of-line blocking observed during handshakes. + async_support::yield_async().await; + futures_util::join!( + async { + while let Some(result) = recv_rx.next().await { + // `recv_rx` yields single bytes from the wasip3 receive stream. + debug!("wasip3: recv_rx.next yielded byte: {:#x}", result); + _ = rx_tx.send(vec![result]).await; + } + drop(recv_rx); + drop(rx_tx); + }, + async { + _ = ready_rx.await; + debug!("wasip3: send task ready, draining tx_rx -> send_tx"); + while let Some(buf) = tx_rx.recv().await { + debug!("wasip3: writing {} bytes to send_tx", buf.len()); + let _ = send_tx.write(buf).await; + } + drop(tx_rx); + drop(send_tx); + }, + ); + }, + abort_registration, + ); + + async_support::spawn(async move { + let _ = background.await; + }); + Ok(with_socket + .with_socket(TcpSocket { + tx: WasiPollSender::new(tx_tx), + rx: rx_rx, + buf: bytes::BytesMut::new(), + abort_handle, + }) + .await) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; + use std::sync::Arc; + use std::time::{Duration, Instant}; + + #[test] + fn test_spawn_completes_successfully() { + async { + let handle = spawn(async { 42 }); + let result = handle.await; + assert_eq!(result, 42); + }; + } + + #[test] + fn test_spawn_with_async_computation() { + async { + let handle = spawn(async { + let mut sum = 0; + for i in 1..=10 { + sum += i; + } + sum + }); + + let result = handle.await; + assert_eq!(result, 55); + }; + } + + #[test] + fn test_spawn_multiple_tasks() { + async { + let handle1 = spawn(async { 1 }); + let handle2 = spawn(async { 2 }); + let handle3 = spawn(async { 3 }); + + let result1 = handle1.await; + let result2 = handle2.await; + let result3 = handle3.await; + + assert_eq!(result1 + result2 + result3, 6); + }; + } + + #[test] + fn test_spawn_with_sleep() { + async { + let started = Instant::now(); + + let handle = spawn(async { + crate::rt::sleep(Duration::from_millis(100)).await; + "completed" + }); + + let result = handle.await; + let elapsed = started.elapsed(); + + assert_eq!(result, "completed"); + assert!(elapsed >= Duration::from_millis(100)); + }; + } + + #[test] + fn test_spawn_nested_tasks() { + async { + let outer = spawn(async { + let inner = spawn(async { 10 }); + let value = inner.await; + value * 2 + }); + + let result = outer.await; + assert_eq!(result, 20); + }; + } + + #[test] + fn test_spawn_with_shared_state() { + async { + let counter = Arc::new(AtomicU32::new(0)); + let counter_clone = counter.clone(); + + let handle = spawn(async move { + counter_clone.fetch_add(1, Ordering::SeqCst); + counter_clone.fetch_add(1, Ordering::SeqCst); + }); + + handle.await; + assert_eq!(counter.load(Ordering::SeqCst), 2); + }; + } + + #[test] + fn test_spawn_concurrent_tasks_with_shared_state() { + async { + let counter = Arc::new(AtomicU32::new(0)); + let mut handles = vec![]; + + for _ in 0..5 { + let counter_clone = counter.clone(); + let handle = spawn(async move { + counter_clone.fetch_add(1, Ordering::SeqCst); + }); + handles.push(handle); + } + + for handle in handles { + handle.await; + } + + assert_eq!(counter.load(Ordering::SeqCst), 5); + }; + } + + #[test] + fn test_sleep_duration_accuracy() { + async { + let durations = [ + Duration::from_millis(50), + Duration::from_millis(100), + Duration::from_millis(200), + ]; + + for expected_duration in durations { + let start = Instant::now(); + crate::rt::sleep(expected_duration).await; + let elapsed = start.elapsed(); + + // Allow for some timing variance (±20ms) + assert!( + elapsed >= expected_duration, + "Sleep was too short: expected {:?}, got {:?}", + expected_duration, + elapsed + ); + assert!( + elapsed < expected_duration + Duration::from_millis(50), + "Sleep was too long: expected {:?}, got {:?}", + expected_duration, + elapsed + ); + } + }; + } + + #[test] + fn test_sleep_zero_duration() { + async { + let start = Instant::now(); + crate::rt::sleep(Duration::ZERO).await; + let elapsed = start.elapsed(); + + // Should complete very quickly + assert!(elapsed < Duration::from_millis(10)); + }; + } + + #[test] + fn test_timeout_completes_before_deadline() { + async { + let result = crate::rt::timeout(Duration::from_secs(1), async { + crate::rt::sleep(Duration::from_millis(50)).await; + 42 + }) + .await; + + assert!(result.is_ok()); + assert_eq!(result.unwrap(), 42); + }; + } + + #[test] + fn test_timeout_exceeds_deadline() { + async { + let result = crate::rt::timeout(Duration::from_millis(50), async { + crate::rt::sleep(Duration::from_millis(200)).await; + 42 + }) + .await; + + assert!(result.is_err()); + }; + } + + #[test] + fn test_timeout_immediate_completion() { + async { + let result = crate::rt::timeout(Duration::from_secs(1), async { "immediate" }).await; + + assert!(result.is_ok()); + assert_eq!(result.unwrap(), "immediate"); + }; + } + + #[test] + fn test_timeout_with_computation() { + async { + let result = crate::rt::timeout(Duration::from_secs(1), async { + let mut sum = 0; + for i in 1..=100 { + sum += i; + } + sum + }) + .await; + + assert!(result.is_ok()); + assert_eq!(result.unwrap(), 5050); + }; + } + + #[test] + fn test_spawn_and_timeout_combined() { + async { + let handle = spawn(async { + crate::rt::timeout(Duration::from_millis(100), async { + crate::rt::sleep(Duration::from_millis(50)).await; + "success" + }) + .await + }); + + let result = handle.await; + assert!(result.is_ok()); + assert_eq!(result.unwrap(), "success"); + }; + } + + #[test] + fn test_multiple_sleeps_sequential() { + async { + let start = Instant::now(); + + crate::rt::sleep(Duration::from_millis(50)).await; + crate::rt::sleep(Duration::from_millis(50)).await; + crate::rt::sleep(Duration::from_millis(50)).await; + + let elapsed = start.elapsed(); + + // Total should be at least 150ms + assert!(elapsed >= Duration::from_millis(150)); + }; + } + + #[test] + fn test_multiple_sleeps_concurrent() { + async { + let start = Instant::now(); + + let h1 = spawn(async { + crate::rt::sleep(Duration::from_millis(100)).await; + }); + let h2 = spawn(async { + crate::rt::sleep(Duration::from_millis(100)).await; + }); + let h3 = spawn(async { + crate::rt::sleep(Duration::from_millis(100)).await; + }); + + h1.await; + h2.await; + h3.await; + + let elapsed = start.elapsed(); + + // Should complete in ~100ms, not 300ms (concurrent execution) + assert!(elapsed < Duration::from_millis(200)); + }; + } + + #[test] + fn test_join_handle_future_trait() { + async { + use std::future::Future; + use std::pin::Pin; + use std::task::{Context, Poll}; + + let handle = spawn(async { 99 }); + + // Pin the handle to test Future implementation + let mut pinned = Box::pin(handle); + + // Create a simple waker for testing + let waker = futures_util::task::noop_waker(); + let mut cx = Context::from_waker(&waker); + + // Poll until ready + loop { + match pinned.as_mut().poll(&mut cx) { + Poll::Ready(value) => { + assert_eq!(value, 99); + break; + } + Poll::Pending => continue, + } + } + }; + } + + #[test] + fn test_spawn_with_boolean_result() { + async { + let flag = Arc::new(AtomicBool::new(false)); + let flag_clone = flag.clone(); + + let handle = spawn(async move { + flag_clone.store(true, Ordering::SeqCst); + flag_clone.load(Ordering::SeqCst) + }); + + let result = handle.await; + assert!(result); + assert!(flag.load(Ordering::SeqCst)); + }; + } + + #[test] + fn test_complex_async_workflow() { + async { + // Simulate a complex workflow with spawning, sleeping, and timeouts + let step1 = spawn(async { + crate::rt::sleep(Duration::from_millis(50)).await; + 10 + }); + + let step2 = spawn(async { + crate::rt::sleep(Duration::from_millis(30)).await; + 20 + }); + + let result1 = step1.await; + let result2 = step2.await; + + let step3 = spawn(async move { + crate::rt::timeout(Duration::from_millis(100), async { result1 + result2 }).await + }); + + let final_result = step3.await; + assert!(final_result.is_ok()); + assert_eq!(final_result.unwrap(), 30); + }; + } + + #[test] + fn test_spawn_return_string() { + async { + let handle = spawn(async { String::from("Hello from WASI!") }); + + let result = handle.await; + assert_eq!(result, "Hello from WASI!"); + }; + } + + #[test] + fn test_spawn_with_option_result() { + async { + let handle = spawn(async { Some(42) }); + + let result = handle.await; + assert_eq!(result, Some(42)); + }; + } + + #[test] + fn test_spawn_with_result_type() { + async { + let handle = spawn(async { Ok::(100) }); + + let result = handle.await; + assert!(result.is_ok()); + assert_eq!(result.unwrap(), 100); + }; + } +} diff --git a/sqlx-core/src/rt/rt_wasip3/socket.rs b/sqlx-core/src/rt/rt_wasip3/socket.rs new file mode 100644 index 0000000000..13c386d00e --- /dev/null +++ b/sqlx-core/src/rt/rt_wasip3/socket.rs @@ -0,0 +1,93 @@ +use core::task::{Context, Poll}; + +use bytes::BufMut as _; +use std::io; +use tokio::sync::mpsc::error::TryRecvError; + +use crate::io::ReadBuf; +use crate::net::Socket; + +impl Socket for super::TcpSocket { + fn try_read(&mut self, buf: &mut dyn ReadBuf) -> io::Result { + let n = buf.remaining_mut(); + + // First, drain any buffered data + if !self.buf.is_empty() { + let to_copy = n.min(self.buf.len()); + buf.put_slice(&self.buf.split_to(to_copy)); + return Ok(to_copy); + } + + // Try to receive new data + match self.rx.try_recv() { + Ok(rx_vec) => { + if rx_vec.is_empty() { + return Err(io::ErrorKind::WouldBlock.into()); + } + + if rx_vec.len() <= n { + // All data fits in the buffer + buf.put_slice(&rx_vec); + Ok(rx_vec.len()) + } else { + // Data is larger than buffer, store remainder + buf.put_slice(&rx_vec[..n]); + self.buf.extend_from_slice(&rx_vec[n..]); + Ok(n) + } + } + Err(TryRecvError::Empty) => Err(io::ErrorKind::WouldBlock.into()), + Err(TryRecvError::Disconnected) => Ok(0), + } + } + + fn try_write(&mut self, buf: &[u8]) -> io::Result { + if buf.is_empty() { + return Ok(0); + } + + let n = buf.len(); + match self.tx.try_send(buf.to_vec()) { + Ok(()) => Ok(n), + Err(_) => Err(io::ErrorKind::WouldBlock.into()), + } + } + + fn poll_read_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + // If we have buffered data, we're ready to read + if !self.buf.is_empty() { + return Poll::Ready(Ok(())); + } + + match self.rx.poll_recv(cx) { + Poll::Ready(Some(v)) => { + if !v.is_empty() { + self.buf.extend(v); + Poll::Ready(Ok(())) + } else { + // Empty vec received, wait for more + Poll::Pending + } + } + Poll::Ready(None) => { + // Channel closed + Poll::Ready(Ok(())) + } + Poll::Pending => Poll::Pending, + } + } + + fn poll_write_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + match self.tx.poll_reserve(cx) { + Poll::Ready(Ok(())) => Poll::Ready(Ok(())), + Poll::Ready(Err(())) => Poll::Ready(Err(io::ErrorKind::ConnectionReset.into())), + Poll::Pending => Poll::Pending, + } + } + + fn poll_shutdown(&mut self, _cx: &mut Context<'_>) -> Poll> { + // Drop the sender to signal shutdown + // The abort_handle will be dropped when TcpSocket is dropped + Poll::Ready(Ok(())) + } +} diff --git a/sqlx-core/src/rt/wasm_worker.rs b/sqlx-core/src/rt/wasm_worker.rs new file mode 100644 index 0000000000..9da6e5f9c2 --- /dev/null +++ b/sqlx-core/src/rt/wasm_worker.rs @@ -0,0 +1,447 @@ +//! WASM-only single-threaded worker helpers for operations that touch wit-bindgen / wasip3. +//! These functions execute on the current-thread LocalSet so that `!Send` futures from +//! wit-bindgen never cross threads. + +use futures::join; +use log::debug; +use wasip3::wit_bindgen::rt::async_support; +use wasip3::wit_bindgen::rt::async_support::futures::channel::oneshot; + +/// Dispatch a job to run on the wasip3/local (single-threaded) runtime and +/// return the result across a Send-capable oneshot receiver. The provided +/// closure `job` is executed inside the spawned wasip3 task and may contain +/// `!Send` futures (e.g. from wit-bindgen). The returned future (awaiting the +/// oneshot) is Send so callers that require Send can await it. +pub async fn dispatch(job: F) -> R +where + F: FnOnce() -> Fut + 'static, + Fut: core::future::Future + 'static, + R: Send + 'static, +{ + let (tx, rx) = oneshot::channel::(); + + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_millis()) + .unwrap_or_default(); + debug!("wasm_worker: dispatch job at {}ms", now); + + async_support::spawn(async move { + // Yield to the wasip3 scheduler so any tasks spawned by `job()` get + // an opportunity to be polled quickly. + async_support::yield_async().await; + + let res = job().await; + let _ = tx.send(res); + }); + let out = rx.await.expect("wasip3 task canceled"); + out +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; + use std::sync::Arc; + use std::time::Duration; + + #[test] + fn test_dispatch_simple_value() { + async { + let result = dispatch(|| async { 42 }).await; + assert_eq!(result, 42); + }; + } + + #[test] + fn test_dispatch_string_value() { + async { + let result = dispatch(|| async { String::from("Hello from WASM worker!") }).await; + assert_eq!(result, "Hello from WASM worker!"); + }; + } + + #[test] + fn test_dispatch_computation() { + async { + let result = dispatch(|| async { + let mut sum = 0; + for i in 1..=100 { + sum += i; + } + sum + }) + .await; + assert_eq!(result, 5050); + }; + } + + #[test] + fn test_dispatch_with_sleep() { + async { + let start = std::time::Instant::now(); + + let result = dispatch(|| async { + crate::rt::sleep(Duration::from_millis(100)).await; + "completed" + }) + .await; + + let elapsed = start.elapsed(); + assert_eq!(result, "completed"); + assert!(elapsed >= Duration::from_millis(100)); + }; + } + + #[test] + fn test_dispatch_multiple_sequential() { + async { + let result1 = dispatch(|| async { 10 }).await; + let result2 = dispatch(|| async { 20 }).await; + let result3 = dispatch(|| async { 30 }).await; + + assert_eq!(result1 + result2 + result3, 60); + }; + } + + #[test] + fn test_dispatch_multiple_concurrent() { + async { + let fut1 = dispatch(|| async { 1 }); + let fut2 = dispatch(|| async { 2 }); + let fut3 = dispatch(|| async { 3 }); + + let (r1, r2, r3) = join!(fut1, fut2, fut3); + assert_eq!(r1 + r2 + r3, 6); + }; + } + + #[test] + fn test_dispatch_with_closure_capture() { + async { + let multiplier = 5; + + let result = dispatch(move || async move { multiplier * 10 }).await; + + assert_eq!(result, 50); + }; + } + + #[test] + fn test_dispatch_with_option() { + async { + let result = dispatch(|| async { Some(42) }).await; + + assert_eq!(result, Some(42)); + }; + } + + #[test] + fn test_dispatch_with_result_ok() { + async { + let result = dispatch(|| async { Ok::(100) }).await; + + assert!(result.is_ok()); + assert_eq!(result.unwrap(), 100); + }; + } + + #[test] + fn test_dispatch_with_result_err() { + async { + let result = + dispatch(|| async { Err::("error occurred".to_string()) }).await; + + assert!(result.is_err()); + assert_eq!(result.unwrap_err(), "error occurred"); + }; + } + + #[test] + fn test_dispatch_with_shared_state() { + async { + let counter = Arc::new(AtomicU32::new(0)); + let counter_clone = counter.clone(); + + let result = dispatch(move || async move { + counter_clone.fetch_add(10, Ordering::SeqCst); + counter_clone.fetch_add(20, Ordering::SeqCst); + counter_clone.load(Ordering::SeqCst) + }) + .await; + + assert_eq!(result, 30); + assert_eq!(counter.load(Ordering::SeqCst), 30); + }; + } + + #[test] + fn test_dispatch_multiple_with_shared_state() { + async { + let counter = Arc::new(AtomicU32::new(0)); + + let mut handles = vec![]; + for i in 1..=5 { + let counter_clone = counter.clone(); + let handle = tokio::spawn(async move { + dispatch(move || async move { counter_clone.fetch_add(i, Ordering::SeqCst) }) + .await + }); + handles.push(handle); + } + + for handle in handles { + handle.await.unwrap(); + } + + // Sum of 1+2+3+4+5 = 15 + assert_eq!(counter.load(Ordering::SeqCst), 15); + }; + } + + #[test] + fn test_dispatch_with_boolean_flag() { + async { + let flag = Arc::new(AtomicBool::new(false)); + let flag_clone = flag.clone(); + + dispatch(move || async move { + flag_clone.store(true, Ordering::SeqCst); + }) + .await; + + assert!(flag.load(Ordering::SeqCst)); + }; + } + + #[test] + fn test_dispatch_nested_async_operations() { + async { + let result = dispatch(|| async { + let inner_result = dispatch(|| async { 10 }).await; + inner_result * 2 + }) + .await; + + assert_eq!(result, 20); + }; + } + + #[test] + fn test_dispatch_with_vec_result() { + async { + let result = dispatch(|| async { vec![1, 2, 3, 4, 5] }).await; + + assert_eq!(result.len(), 5); + assert_eq!(result, vec![1, 2, 3, 4, 5]); + }; + } + + #[test] + fn test_dispatch_with_tuple_result() { + async { + let result = dispatch(|| async { (42, "hello", true) }).await; + + assert_eq!(result, (42, "hello", true)); + }; + } + + #[test] + fn test_dispatch_with_struct_result() { + #[derive(Debug, PartialEq)] + struct TestData { + id: u32, + name: String, + } + + async { + let result = dispatch(|| async { + TestData { + id: 1, + name: String::from("test"), + } + }) + .await; + + assert_eq!(result.id, 1); + assert_eq!(result.name, "test"); + }; + } + + #[test] + fn test_dispatch_long_running_job() { + async { + let start = std::time::Instant::now(); + + let result = dispatch(|| async { + // Simulate some work + for _ in 0..5 { + crate::rt::sleep(Duration::from_millis(20)).await; + } + "long job completed" + }) + .await; + + let elapsed = start.elapsed(); + assert_eq!(result, "long job completed"); + assert!(elapsed >= Duration::from_millis(100)); + }; + } + + #[test] + fn test_dispatch_concurrent_long_jobs() { + async { + let start = std::time::Instant::now(); + + let job1 = dispatch(|| async { + crate::rt::sleep(Duration::from_millis(100)).await; + 1 + }); + + let job2 = dispatch(|| async { + crate::rt::sleep(Duration::from_millis(100)).await; + 2 + }); + + let job3 = dispatch(|| async { + crate::rt::sleep(Duration::from_millis(100)).await; + 3 + }); + + let (r1, r2, r3) = join!(job1, job2, job3); + let elapsed = start.elapsed(); + + assert_eq!(r1 + r2 + r3, 6); + // Should complete concurrently in ~100ms, not 300ms + assert!(elapsed < Duration::from_millis(200)); + }; + } + + #[test] + fn test_dispatch_with_conditional_logic() { + async { + let input = 5; + + let result = dispatch(move || async move { + if input > 3 { + "greater" + } else { + "lesser" + } + }) + .await; + + assert_eq!(result, "greater"); + }; + } + + #[test] + fn test_dispatch_with_match_expression() { + async { + let value = 2; + + let result = dispatch(move || async move { + match value { + 1 => "one", + 2 => "two", + 3 => "three", + _ => "other", + } + }) + .await; + + assert_eq!(result, "two"); + }; + } + + #[test] + fn test_dispatch_returns_unit() { + async { + let counter = Arc::new(AtomicU32::new(0)); + let counter_clone = counter.clone(); + + dispatch(move || async move { + counter_clone.fetch_add(1, Ordering::SeqCst); + // Implicitly returns () + }) + .await; + + assert_eq!(counter.load(Ordering::SeqCst), 1); + }; + } + + #[test] + fn test_dispatch_with_timeout() { + async { + let result = crate::rt::timeout( + Duration::from_millis(500), + dispatch(|| async { + crate::rt::sleep(Duration::from_millis(100)).await; + 42 + }), + ) + .await; + + assert!(result.is_ok()); + assert_eq!(result.unwrap(), 42); + }; + } + + #[test] + fn test_dispatch_with_spawn() { + async { + let result = dispatch(|| async { + let handle = crate::rt::spawn(async { 10 }); + let value = handle.await; + value * 2 + }) + .await; + + assert_eq!(result, 20); + }; + } + + #[test] + fn test_dispatch_error_propagation() { + async { + let result: Result = dispatch(|| async { + // Simulate an operation that might fail + if true { + Err("operation failed") + } else { + Ok(42) + } + }) + .await; + + assert!(result.is_err()); + assert_eq!(result.unwrap_err(), "operation failed"); + }; + } + + #[test] + fn test_dispatch_chain_operations() { + async { + let step1 = dispatch(|| async { 5 }).await; + let step2 = dispatch(move || async move { step1 * 2 }).await; + let step3 = dispatch(move || async move { step2 + 10 }).await; + + assert_eq!(step3, 20); + }; + } + + #[test] + fn test_dispatch_with_large_data() { + async { + let result = dispatch(|| async { + // Create a relatively large vector + (0..1000).collect::>() + }) + .await; + + assert_eq!(result.len(), 1000); + assert_eq!(result[0], 0); + assert_eq!(result[999], 999); + }; + } +} diff --git a/sqlx-core/src/testing/mod.rs b/sqlx-core/src/testing/mod.rs index 17022b4652..9a4db13525 100644 --- a/sqlx-core/src/testing/mod.rs +++ b/sqlx-core/src/testing/mod.rs @@ -89,7 +89,7 @@ where DB: TestSupport + Database, DB::Connection: Migrate, for<'c> &'c mut DB::Connection: Executor<'c, Database = DB>, - Fut: Future, + Fut: Future + 'static, Fut::Output: TestTermination, { type Output = Fut::Output; @@ -104,13 +104,17 @@ where DB: TestSupport + Database, DB::Connection: Migrate, for<'c> &'c mut DB::Connection: Executor<'c, Database = DB>, - Fut: Future, + Fut: Future + 'static, Fut::Output: TestTermination, + ::Output: 'static, { type Output = Fut::Output; - fn run_test(self, args: TestArgs) -> Self::Output { - run_test_with_pool(args, |pool| async move { + fn run_test(self, args: TestArgs) -> Self::Output + where + ::Output: 'static, + { + run_test_with_pool(args, move |pool| async move { let conn = pool .acquire() .await @@ -127,19 +131,22 @@ where DB: Database + TestSupport, DB::Connection: Migrate, for<'c> &'c mut DB::Connection: Executor<'c, Database = DB>, - Fut: Future, + Fut: Future + 'static, Fut::Output: TestTermination, { type Output = Fut::Output; - fn run_test(self, args: TestArgs) -> Self::Output { + fn run_test(self, args: TestArgs) -> Self::Output + where + ::Output: 'static, + { run_test(args, self) } } impl TestFn for fn() -> Fut where - Fut: Future, + Fut: Future + 'static, { type Output = Fut::Output; @@ -187,12 +194,13 @@ where DB: TestSupport, DB::Connection: Migrate, for<'c> &'c mut DB::Connection: Executor<'c, Database = DB>, - F: FnOnce(Pool) -> Fut, + F: FnOnce(Pool) -> Fut + 'static, Fut: Future, Fut::Output: TestTermination, + ::Output: 'static, { let test_path = args.test_path; - run_test::(args, |pool_opts, connect_opts| async move { + run_test::(args, move |pool_opts, connect_opts| async move { let pool = pool_opts .connect_with(connect_opts) .await @@ -217,9 +225,10 @@ where DB: TestSupport, DB::Connection: Migrate, for<'c> &'c mut DB::Connection: Executor<'c, Database = DB>, - F: FnOnce(PoolOptions, ::Options) -> Fut, + F: FnOnce(PoolOptions, ::Options) -> Fut + 'static, Fut: Future, Fut::Output: TestTermination, + ::Output: 'static, { crate::rt::test_block_on(async move { let test_context = DB::test_context(&args) diff --git a/sqlx-core/src/wasm/fs.rs b/sqlx-core/src/wasm/fs.rs new file mode 100644 index 0000000000..e06b19a325 --- /dev/null +++ b/sqlx-core/src/wasm/fs.rs @@ -0,0 +1,47 @@ +use std::ffi::OsString; +use std::fs::Metadata; +use std::io; +use std::path::{Path, PathBuf}; +// Stubs +pub struct ReadDir { + inner: Option, +} +// Stubs +pub struct DirEntry { + pub path: PathBuf, + pub file_name: OsString, + pub metadata: Metadata, +} + +// WASM32 stub implementations for async fs functions +pub async fn read>(_path: P) -> io::Result> { + todo!("fs::read is not implemented for wasm32") +} + +pub async fn read_to_string>(_path: P) -> io::Result { + todo!("fs::read_to_string is not implemented for wasm32") +} + +pub async fn create_dir_all>(_path: P) -> io::Result<()> { + todo!("fs::create_dir_all is not implemented for wasm32") +} + +pub async fn remove_file>(_path: P) -> io::Result<()> { + todo!("fs::remove_file is not implemented for wasm32") +} + +pub async fn remove_dir>(_path: P) -> io::Result<()> { + todo!("fs::remove_dir is not implemented for wasm32") +} + +pub async fn remove_dir_all>(_path: P) -> io::Result<()> { + todo!("fs::remove_dir_all is not implemented for wasm32") +} + +pub async fn read_dir(_path: PathBuf) -> io::Result { + todo!("fs::read_dir is not implemented for wasm32") +} + +pub async fn next(_read_dir: &mut ReadDir) -> io::Result> { + todo!("fs::ReadDir::next is not implemented for wasm32") +} diff --git a/sqlx-core/src/wasm/mod.rs b/sqlx-core/src/wasm/mod.rs new file mode 100644 index 0000000000..d521fbd77e --- /dev/null +++ b/sqlx-core/src/wasm/mod.rs @@ -0,0 +1 @@ +pub mod fs; diff --git a/sqlx-macros-core/src/lib.rs b/sqlx-macros-core/src/lib.rs index 9d4204f814..9ce07fdd4a 100644 --- a/sqlx-macros-core/src/lib.rs +++ b/sqlx-macros-core/src/lib.rs @@ -64,7 +64,7 @@ where async_std::task::block_on(f) } else if #[cfg(feature = "_rt-smol")] { sqlx_core::rt::test_block_on(f) - } else if #[cfg(feature = "_rt-tokio")] { + } else if #[cfg(any(feature = "_rt-tokio",target_arch="wasm32"))] { use std::sync::LazyLock; use tokio::runtime::{self, Runtime}; @@ -80,6 +80,7 @@ where TOKIO_RT.block_on(f) } else { + #[cfg(not(any(feature = "_rt-async-std", feature = "tokio", target_arch = "wasm32")))] sqlx_core::rt::missing_rt(f) } } diff --git a/sqlx-macros-core/src/migrate.rs b/sqlx-macros-core/src/migrate.rs index b855703c22..bcdc9abd3f 100644 --- a/sqlx-macros-core/src/migrate.rs +++ b/sqlx-macros-core/src/migrate.rs @@ -113,7 +113,18 @@ pub fn expand_with_path(config: &Config, path: &Path) -> crate::Result Result { @@ -54,6 +54,7 @@ impl<'a> DoHandshake<'a> { async fn do_handshake(self, socket: S) -> Result { let DoHandshake { options } = self; + debug!("mysql: do_handshake: starting handshake"); let mut stream = MySqlStream::with_socket(options, socket); @@ -61,6 +62,11 @@ impl<'a> DoHandshake<'a> { // https://mariadb.com/kb/en/connection/ let handshake: Handshake = stream.recv_packet().await?.decode()?; + debug!( + "mysql: handshake received: server_version='{}', capabilities={:?}, auth_plugin={:?}", + handshake.server_version, handshake.server_capabilities, handshake.auth_plugin + ); + debug!("mysql: handshake packet received, beginning auth"); let mut plugin = handshake.auth_plugin; let nonce = handshake.auth_plugin_data; @@ -98,12 +104,21 @@ impl<'a> DoHandshake<'a> { stream.capabilities |= Capabilities::PROTOCOL_41; let mut stream = tls::maybe_upgrade(stream, self.options).await?; + debug!("mysql: TLS maybe_upgrade complete"); let auth_response = if let (Some(plugin), Some(password)) = (plugin, &options.password) { Some(plugin.scramble(&mut stream, password, &nonce).await?) } else { None }; + let payload = HandshakeResponse { + charset: super::INITIAL_CHARSET, + max_packet_size: MAX_PACKET_SIZE, + username: &options.username, + database: options.database.as_deref(), + auth_plugin: plugin, + auth_response: auth_response.as_deref(), + }; stream.write_packet(HandshakeResponse { charset: super::INITIAL_CHARSET, @@ -116,12 +131,20 @@ impl<'a> DoHandshake<'a> { stream.flush().await?; + debug!( + "mysql: do_handshake: wrote handshake response {:?} and flushed", + payload + ); + debug!("mysql: waiting for final OK/auth packets"); + // This is the blockade loop { let packet = stream.recv_packet().await?; match packet[0] { 0x00 => { let _ok = packet.ok()?; + debug!("mysql: do_handshake: received final OK during auth loop"); + debug!("mysql: handshake/auth complete, breaking loop"); break; } @@ -149,10 +172,12 @@ impl<'a> DoHandshake<'a> { if let (Some(plugin), Some(password)) = (plugin, &options.password) { if plugin.handle(&mut stream, packet, password, &nonce).await? { // plugin signaled authentication is ok + debug!("mysql: plugin signaled authentication OK"); break; } // plugin signaled to continue authentication + debug!("mysql: plugin signaled to continue authentication"); } else { return Err(err_protocol!( "unexpected packet 0x{:02x} during authentication", @@ -163,6 +188,8 @@ impl<'a> DoHandshake<'a> { } } + debug!("mysql: do_handshake: handshake complete, returning stream"); + debug!("mysql: connection should be ready for use"); Ok(stream) } } diff --git a/sqlx-mysql/src/connection/stream.rs b/sqlx-mysql/src/connection/stream.rs index ff931b2f46..c43e445f47 100644 --- a/sqlx-mysql/src/connection/stream.rs +++ b/sqlx-mysql/src/connection/stream.rs @@ -1,8 +1,6 @@ use std::collections::VecDeque; use std::ops::{Deref, DerefMut}; -use bytes::{Buf, Bytes, BytesMut}; - use crate::error::Error; use crate::io::MySqlBufExt; use crate::io::{ProtocolDecode, ProtocolEncode}; @@ -10,6 +8,8 @@ use crate::net::{BufferedSocket, Socket}; use crate::protocol::response::{EofPacket, ErrPacket, OkPacket, Status}; use crate::protocol::{Capabilities, Packet}; use crate::{MySqlConnectOptions, MySqlDatabaseError}; +use bytes::{Buf, Bytes, BytesMut}; +use log::debug; pub struct MySqlStream> { // Wrapping the socket in `Box` allows us to unsize in-place. @@ -103,7 +103,15 @@ impl MySqlStream { T: ProtocolEncode<'en, Capabilities>, { self.sequence_id = 0; + debug!( + "mysql: send_packet - writing packet (sequence_id={})", + self.sequence_id + ); self.write_packet(payload)?; + debug!( + "mysql: send_packet - flushing write buffer (is_empty={})", + self.socket.write_buffer().is_empty() + ); self.flush().await?; Ok(()) } @@ -112,15 +120,40 @@ impl MySqlStream { where T: ProtocolEncode<'en, Capabilities>, { - self.socket - .write_with(Packet(payload), (self.capabilities, &mut self.sequence_id)) + debug!( + "mysql: write_packet - encoding packet (sequence_id={})", + self.sequence_id + ); + let res = self + .socket + .write_with(Packet(payload), (self.capabilities, &mut self.sequence_id)); + debug!( + "mysql: write_packet - encoded packet, result={:?}", + res.is_ok() + ); + res } async fn recv_packet_part(&mut self) -> Result { // https://dev.mysql.com/doc/dev/mysql-server/8.0.12/page_protocol_basic_packets.html // https://mariadb.com/kb/en/library/0-packet/#standard-packet - let mut header: Bytes = self.socket.read(4).await?; + // Read the 4-byte packet header (3 bytes length + 1 byte sequence id). + // Add logging to help diagnose wasm/wasip3 socket read errors. + let mut header: Bytes = match self.socket.read::(4).await { + Ok(h) => { + debug!( + "mysql: recv_packet_part: read header ({} bytes): {:?}", + h.len(), + &h + ); + h + } + Err(e) => { + debug!("mysql: recv_packet_part: error reading header: {:#?}", e); + return Err(e); + } + }; // cannot overflow #[allow(clippy::cast_possible_truncation)] @@ -129,7 +162,20 @@ impl MySqlStream { self.sequence_id = sequence_id.wrapping_add(1); - let payload: Bytes = self.socket.read(packet_size).await?; + // Read the payload according to the size from the header. Log errors. + let payload: Bytes = match self.socket.read::(packet_size).await { + Ok(p) => { + debug!("mysql: recv_packet_part: read payload ({} bytes)", p.len()); + p + } + Err(e) => { + debug!( + "mysql: recv_packet_part: error reading payload (expected {} bytes): {:#?}", + packet_size, e + ); + return Err(e); + } + }; // TODO: packet compression diff --git a/sqlx-mysql/src/migrate.rs b/sqlx-mysql/src/migrate.rs index 0176f93c26..9721bbb8fb 100644 --- a/sqlx-mysql/src/migrate.rs +++ b/sqlx-mysql/src/migrate.rs @@ -34,7 +34,7 @@ fn parse_for_maintenance(url: &str) -> Result<(MySqlConnectOptions, String), Err impl MigrateDatabase for MySql { async fn create_database(url: &str) -> Result<(), Error> { let (options, database) = parse_for_maintenance(url)?; - let mut conn = options.connect().await?; + let mut conn: MySqlConnection = options.connect().await?; let _ = conn .execute(AssertSqlSafe(format!("CREATE DATABASE `{database}`"))) @@ -59,7 +59,7 @@ impl MigrateDatabase for MySql { async fn drop_database(url: &str) -> Result<(), Error> { let (options, database) = parse_for_maintenance(url)?; - let mut conn = options.connect().await?; + let mut conn: MySqlConnection = options.connect().await?; let _ = conn .execute(AssertSqlSafe(format!( diff --git a/sqlx-mysql/src/options/connect.rs b/sqlx-mysql/src/options/connect.rs index f3b0492781..e089ced8fc 100644 --- a/sqlx-mysql/src/options/connect.rs +++ b/sqlx-mysql/src/options/connect.rs @@ -2,10 +2,11 @@ use crate::connection::ConnectOptions; use crate::error::Error; use crate::executor::Executor; use crate::{MySqlConnectOptions, MySqlConnection}; -use log::LevelFilter; +use log::{debug, LevelFilter}; use sqlx_core::sql_str::AssertSqlSafe; use sqlx_core::Url; use std::time::Duration; +// wasm-specific runtime helpers are available via `sqlx_core::rt::wasm_worker`. impl ConnectOptions for MySqlConnectOptions { type Connection = MySqlConnection; @@ -20,75 +21,123 @@ impl ConnectOptions for MySqlConnectOptions { async fn connect(&self) -> Result where - Self::Connection: Sized, + Self::Connection: Sized + Send + 'static, { - let mut conn = MySqlConnection::establish(self).await?; - - // After the connection is established, we initialize by configuring a few - // connection parameters - - // https://mariadb.com/kb/en/sql-mode/ - - // PIPES_AS_CONCAT - Allows using the pipe character (ASCII 124) as string concatenation operator. - // This means that "A" || "B" can be used in place of CONCAT("A", "B"). - - // NO_ENGINE_SUBSTITUTION - If not set, if the available storage engine specified by a CREATE TABLE is - // not available, a warning is given and the default storage - // engine is used instead. - - // NO_ZERO_DATE - Don't allow '0000-00-00'. This is invalid in Rust. - - // NO_ZERO_IN_DATE - Don't allow 'YYYY-00-00'. This is invalid in Rust. - - // -- - - // Setting the time zone allows us to assume that the output - // from a TIMESTAMP field is UTC - - // -- - - // https://mathiasbynens.be/notes/mysql-utf8mb4 - - let mut sql_mode = Vec::new(); - if self.pipes_as_concat { - sql_mode.push(r#"PIPES_AS_CONCAT"#); - } - if self.no_engine_substitution { - sql_mode.push(r#"NO_ENGINE_SUBSTITUTION"#); + // On wasm, the MySQL connection future may contain non-Send internals from + // wasip3/wit-bindgen. Run the connection/initialization on the wasip3 async + // runtime using `async_support::spawn` and communicate the result back over + // a tokio oneshot channel. The returned future (awaiting the oneshot) is + // Send, so callers that require Send are satisfied. + let options = self.clone(); + + // On wasm we must dispatch to the single-threaded wasip3 runtime so + // that any `!Send` futures from wit-bindgen do not escape the local + // runtime. On non-wasm targets we can just run the logic directly. + #[cfg(target_arch = "wasm32")] + { + debug!("mysql: connect.rs: starting connection dispatch (unlocked experiment)"); + let conn_res: Result = + sqlx_core::rt::wasm_worker::dispatch(move || async move { + debug!("mysql: connect.rs: inside wasm_worker dispatch closure"); + let mut conn = MySqlConnection::establish(&options).await?; + debug!("mysql: connect.rs: connection established"); + + let mut sql_mode = Vec::new(); + if options.pipes_as_concat { + sql_mode.push(r#"PIPES_AS_CONCAT"#); + } + if options.no_engine_substitution { + sql_mode.push(r#"NO_ENGINE_SUBSTITUTION"#); + } + + let mut opts = Vec::new(); + if !sql_mode.is_empty() { + opts.push(format!( + r#"sql_mode=(SELECT CONCAT(@@sql_mode, ',{}'))"#, + sql_mode.join(",") + )); + } + + if let Some(timezone) = &options.timezone { + opts.push(format!(r#"time_zone='{}'"#, timezone)); + } + + if options.set_names { + let set_names = if let Some(collation) = &options.collation { + format!(r#"NAMES {} COLLATE {collation}"#, options.charset,) + } else { + format!("NAMES {}", options.charset) + }; + opts.push(set_names); + } + + if !opts.is_empty() { + debug!( + "mysql: connect.rs: running SET statements: {}", + opts.join(", ") + ); + conn.execute(AssertSqlSafe(format!(r#"SET {};"#, opts.join(",")))) + .await?; + debug!("mysql: connect.rs: SET statements complete"); + } + + debug!("mysql: connect.rs: returning connection from dispatch closure"); + Ok(conn) + }) + .await; + debug!("mysql: connect.rs: connection dispatch complete"); + + conn_res } - let mut options = Vec::new(); - if !sql_mode.is_empty() { - options.push(format!( - r#"sql_mode=(SELECT CONCAT(@@sql_mode, ',{}'))"#, - sql_mode.join(",") - )); + #[cfg(not(target_arch = "wasm32"))] + { + debug!("mysql: connect.rs: starting native connection"); + let mut conn = MySqlConnection::establish(&options).await?; + debug!("mysql: connect.rs: connection established"); + + let mut sql_mode = Vec::new(); + if options.pipes_as_concat { + sql_mode.push(r#"PIPES_AS_CONCAT"#); + } + if options.no_engine_substitution { + sql_mode.push(r#"NO_ENGINE_SUBSTITUTION"#); + } + + let mut opts = Vec::new(); + if !sql_mode.is_empty() { + opts.push(format!( + r#"sql_mode=(SELECT CONCAT(@@sql_mode, ',{}'))"#, + sql_mode.join(",") + )); + } + + if let Some(timezone) = &options.timezone { + opts.push(format!(r#"time_zone='{}'"#, timezone)); + } + + if options.set_names { + let set_names = if let Some(collation) = &options.collation { + format!(r#"NAMES {} COLLATE {collation}"#, options.charset,) + } else { + format!("NAMES {}", options.charset) + }; + opts.push(set_names); + } + + if !opts.is_empty() { + debug!( + "mysql: connect.rs: running SET statements: {}", + opts.join(", ") + ); + conn.execute(AssertSqlSafe(format!(r#"SET {};"#, opts.join(",")))) + .await?; + debug!("mysql: connect.rs: SET statements complete"); + } + + debug!("mysql: connect.rs: returning connection from native connect"); + Ok(conn) } - - if let Some(timezone) = &self.timezone { - options.push(format!(r#"time_zone='{}'"#, timezone)); - } - - if self.set_names { - // As it turns out, we don't _have_ to set a collation if we don't want to. - // We can let the server choose the default collation for the charset. - let set_names = if let Some(collation) = &self.collation { - format!(r#"NAMES {} COLLATE {collation}"#, self.charset,) - } else { - // Leaves the default collation up to the server, - // but ensures statements and results are encoded using the proper charset. - format!("NAMES {}", self.charset) - }; - - options.push(set_names); - } - - if !options.is_empty() { - conn.execute(AssertSqlSafe(format!(r#"SET {};"#, options.join(",")))) - .await?; - } - - Ok(conn) } fn log_statements(mut self, level: LevelFilter) -> Self { diff --git a/sqlx-postgres/Cargo.toml b/sqlx-postgres/Cargo.toml index a70fb37d72..4c6d3e6a1b 100644 --- a/sqlx-postgres/Cargo.toml +++ b/sqlx-postgres/Cargo.toml @@ -67,7 +67,7 @@ smallvec = { version = "1.7.0", features = ["serde"] } stringprep = "0.1.2" thiserror = "2.0.0" tracing = { version = "0.1.37", features = ["log"] } -whoami = { version = "1.2.1", default-features = false } +whoami = { version = "2.0.0-pre", default-features = false } serde = { version = "1.0.144", features = ["derive"] } serde_json = { version = "1.0.85", features = ["raw_value"] } diff --git a/sqlx-postgres/src/options/mod.rs b/sqlx-postgres/src/options/mod.rs index efbc43989b..25809f5432 100644 --- a/sqlx-postgres/src/options/mod.rs +++ b/sqlx-postgres/src/options/mod.rs @@ -64,7 +64,9 @@ impl PgConnectOptions { .or_else(|| var("PGHOST").ok()) .unwrap_or_else(|| default_host(port)); - let username = var("PGUSER").ok().unwrap_or_else(whoami::username); + let username = var("PGUSER") + .ok() + .unwrap_or_else(|| whoami::username().unwrap_or_else(|_| "postgres".to_string())); let database = var("PGDATABASE").ok(); diff --git a/tests/mysql/wasi_integration_test.rs b/tests/mysql/wasi_integration_test.rs new file mode 100644 index 0000000000..3aef6ed7b4 --- /dev/null +++ b/tests/mysql/wasi_integration_test.rs @@ -0,0 +1,84 @@ +use std::env; +use std::path::PathBuf; +use std::process::Command; + +fn build_wasm_component(component_name: &str) -> PathBuf { + let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + let component_dir = manifest_dir + .join("tests/mysql/wasm-components") + .join(component_name); + + println!("Building component: {}", component_name); + + let output = Command::new("cargo") + .current_dir(&component_dir) + .args(&["build", "--target", "wasm32-wasip2", "--release"]) + .output() + .expect("Failed to build WASM component"); + + if !output.status.success() { + panic!( + "Failed to build {}: {}", + component_name, + String::from_utf8_lossy(&output.stderr) + ); + } + + // WASM binaries are stored in the workspace root target directory + manifest_dir + .join("target/wasm32-wasip2/release") + .join(format!("{}.wasm", component_name.replace("-", "_"))) +} + +fn run_wasm_test(wasm_path: PathBuf, test_name: &str) -> Result<(), Box> { + println!("Running test: {}", test_name); + + let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set"); + + let status = Command::new("wasmtime") + .args(&[ + "run", + "-Scli=y", + "-Stcp=y", + "-Sinherit-env=y", + "-Sudp=y", + "-Sp3", + "-Sallow-ip-name-lookup=y", + "-Wcomponent-model-async=y", + "-Sinherit-network=y", + ]) + .env("DATABASE_URL", database_url) + .arg(wasm_path.as_os_str()) + .status()?; + + if !status.success() { + return Err(format!("{} failed", test_name).into()); + } + + println!("✓ {} passed!", test_name); + Ok(()) +} + +#[test] +fn test_wasi_mysql_connect() { + let wasm = build_wasm_component("connect-test"); + run_wasm_test(wasm, "Connect Test").expect("Connect test failed"); +} + +#[test] +fn test_wasi_mysql_execute_query() { + let wasm = build_wasm_component("execute-query-test"); + run_wasm_test(wasm, "Execute Query Test").expect("Execute query test failed"); +} + +#[test] +fn test_wasi_mysql_prepared_query() { + let wasm = build_wasm_component("prepared-query-test"); + run_wasm_test(wasm, "Prepared Query Test").expect("Prepared query test failed"); +} + +#[test] +fn test_wasi_mysql_pool_crud() { + let wasm = build_wasm_component("pool-crud-test"); + run_wasm_test(wasm, "Pool CRUD Test").expect("Pool CRUD test failed"); +} diff --git a/tests/mysql/wasm-components/connect-test/Cargo.toml b/tests/mysql/wasm-components/connect-test/Cargo.toml new file mode 100644 index 0000000000..ae54a8bfd1 --- /dev/null +++ b/tests/mysql/wasm-components/connect-test/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "connect-test" +version = "0.1.0" +edition = "2021" +workspace = "../../../../" + +[lib] +crate-type = ["cdylib"] + +[dependencies] +anyhow = "1.0" +futures = "0.3" +sqlx = { path = "../../../../", features = [ "mysql", "runtime-tokio" ] } +clap = { version = "4", features = ["derive"] } +tokio = { version = "1.20.0", features = ["rt"]} +dotenvy = "0.15.0" +wasip3 = "0.2.0+wasi-0.3.0-rc-2025-09-16" + +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +sqlx = { path = "../../../../", features = ["tls-native-tls"] } \ No newline at end of file diff --git a/tests/mysql/wasm-components/connect-test/README.md b/tests/mysql/wasm-components/connect-test/README.md new file mode 100644 index 0000000000..6f2f8d6c66 --- /dev/null +++ b/tests/mysql/wasm-components/connect-test/README.md @@ -0,0 +1,41 @@ +# TODOs Example + +## Setup + +1. Declare the database URL + + ``` + export DATABASE_URL="mysql://root:password@localhost/todos" + ``` + +2. Create the database. + + ``` + $ sqlx db create + ``` + +3. Run sql migrations + + ``` + $ sqlx migrate run + ``` + +## Usage + +Add a todo + +``` +cargo run -- add "todo description" +``` + +Complete a todo. + +``` +cargo run -- done +``` + +List all todos + +``` +cargo run +``` diff --git a/tests/mysql/wasm-components/connect-test/migrations/20200718111257_todos.sql b/tests/mysql/wasm-components/connect-test/migrations/20200718111257_todos.sql new file mode 100644 index 0000000000..700d99900f --- /dev/null +++ b/tests/mysql/wasm-components/connect-test/migrations/20200718111257_todos.sql @@ -0,0 +1,6 @@ +CREATE TABLE IF NOT EXISTS todos +( + id BIGINT UNSIGNED PRIMARY KEY NOT NULL AUTO_INCREMENT, + description TEXT NOT NULL, + done BOOLEAN NOT NULL DEFAULT FALSE +); diff --git a/tests/mysql/wasm-components/connect-test/src/lib.rs b/tests/mysql/wasm-components/connect-test/src/lib.rs new file mode 100644 index 0000000000..0c09dd63a5 --- /dev/null +++ b/tests/mysql/wasm-components/connect-test/src/lib.rs @@ -0,0 +1,31 @@ +use sqlx::mysql::MySqlConnection; +use sqlx::Connection; +use std::env; + +async fn run() -> anyhow::Result<()> { + let database_url = env::var("DATABASE_URL")?; + let mut conn = MySqlConnection::connect(&database_url).await?; + conn.ping().await?; + conn.close().await?; + eprintln!("Connect test passed!"); + Ok(()) +} + +wasip3::cli::command::export!(Component); + +struct Component; + +impl wasip3::exports::cli::run::Guest for Component { + async fn run() -> Result<(), ()> { + tokio::task::LocalSet::new() + .run_until(async { + if let Err(err) = run().await { + eprintln!("Connect test failed: {err:#}"); + Err(()) + } else { + Ok(()) + } + }) + .await + } +} diff --git a/tests/mysql/wasm-components/execute-query-test/Cargo.toml b/tests/mysql/wasm-components/execute-query-test/Cargo.toml new file mode 100644 index 0000000000..76555fce1d --- /dev/null +++ b/tests/mysql/wasm-components/execute-query-test/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "execute-query-test" +version = "0.1.0" +edition = "2021" +workspace = "../../../.." + +[lib] +crate-type = ["cdylib"] + +[dependencies] +anyhow = "1.0" +futures = "0.3" +sqlx = { path = "../../../../", features = [ "mysql", "runtime-tokio" ] } +clap = { version = "4", features = ["derive"] } +tokio = { version = "1.20.0", features = ["rt"]} +dotenvy = "0.15.0" +wasip3 = "0.2.0+wasi-0.3.0-rc-2025-09-16" + +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +sqlx = { path = "../../../../", features = ["tls-native-tls"] } \ No newline at end of file diff --git a/tests/mysql/wasm-components/execute-query-test/README.md b/tests/mysql/wasm-components/execute-query-test/README.md new file mode 100644 index 0000000000..6f2f8d6c66 --- /dev/null +++ b/tests/mysql/wasm-components/execute-query-test/README.md @@ -0,0 +1,41 @@ +# TODOs Example + +## Setup + +1. Declare the database URL + + ``` + export DATABASE_URL="mysql://root:password@localhost/todos" + ``` + +2. Create the database. + + ``` + $ sqlx db create + ``` + +3. Run sql migrations + + ``` + $ sqlx migrate run + ``` + +## Usage + +Add a todo + +``` +cargo run -- add "todo description" +``` + +Complete a todo. + +``` +cargo run -- done +``` + +List all todos + +``` +cargo run +``` diff --git a/tests/mysql/wasm-components/execute-query-test/migrations/20200718111257_todos.sql b/tests/mysql/wasm-components/execute-query-test/migrations/20200718111257_todos.sql new file mode 100644 index 0000000000..700d99900f --- /dev/null +++ b/tests/mysql/wasm-components/execute-query-test/migrations/20200718111257_todos.sql @@ -0,0 +1,6 @@ +CREATE TABLE IF NOT EXISTS todos +( + id BIGINT UNSIGNED PRIMARY KEY NOT NULL AUTO_INCREMENT, + description TEXT NOT NULL, + done BOOLEAN NOT NULL DEFAULT FALSE +); diff --git a/tests/mysql/wasm-components/execute-query-test/src/lib.rs b/tests/mysql/wasm-components/execute-query-test/src/lib.rs new file mode 100644 index 0000000000..a0a94730af --- /dev/null +++ b/tests/mysql/wasm-components/execute-query-test/src/lib.rs @@ -0,0 +1,35 @@ +use sqlx::mysql::MySqlConnection; +use sqlx::{Connection, Executor}; +use std::env; + +async fn run() -> anyhow::Result<()> { + let database_url = env::var("DATABASE_URL")?; + let mut conn = MySqlConnection::connect(&database_url).await?; + + let result = conn.execute("DO 1").await?; + // DO statement affects 0 rows but executes successfully + assert_eq!(result.rows_affected(), 0); + + conn.close().await?; + eprintln!("Execute query test passed!"); + Ok(()) +} + +wasip3::cli::command::export!(Component); + +struct Component; + +impl wasip3::exports::cli::run::Guest for Component { + async fn run() -> Result<(), ()> { + tokio::task::LocalSet::new() + .run_until(async { + if let Err(err) = run().await { + eprintln!("Execute query test failed: {err:#}"); + Err(()) + } else { + Ok(()) + } + }) + .await + } +} diff --git a/tests/mysql/wasm-components/pool-crud-test/Cargo.toml b/tests/mysql/wasm-components/pool-crud-test/Cargo.toml new file mode 100644 index 0000000000..94d1cac85f --- /dev/null +++ b/tests/mysql/wasm-components/pool-crud-test/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "pool-crud-test" +version = "0.1.0" +edition = "2021" +workspace = "../../../../" + +[lib] +crate-type = ["cdylib"] + +[dependencies] +anyhow = "1.0" +futures = "0.3" +sqlx = { path = "../../../../", features = [ "mysql", "runtime-tokio" ] } +clap = { version = "4", features = ["derive"] } +tokio = { version = "1.20.0", features = ["rt"]} +dotenvy = "0.15.0" +wasip3 = "0.2.0+wasi-0.3.0-rc-2025-09-16" + +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +sqlx = { path = "../../../../", features = ["tls-native-tls"] } \ No newline at end of file diff --git a/tests/mysql/wasm-components/pool-crud-test/README.md b/tests/mysql/wasm-components/pool-crud-test/README.md new file mode 100644 index 0000000000..6f2f8d6c66 --- /dev/null +++ b/tests/mysql/wasm-components/pool-crud-test/README.md @@ -0,0 +1,41 @@ +# TODOs Example + +## Setup + +1. Declare the database URL + + ``` + export DATABASE_URL="mysql://root:password@localhost/todos" + ``` + +2. Create the database. + + ``` + $ sqlx db create + ``` + +3. Run sql migrations + + ``` + $ sqlx migrate run + ``` + +## Usage + +Add a todo + +``` +cargo run -- add "todo description" +``` + +Complete a todo. + +``` +cargo run -- done +``` + +List all todos + +``` +cargo run +``` diff --git a/tests/mysql/wasm-components/pool-crud-test/migrations/20200718111257_todos.sql b/tests/mysql/wasm-components/pool-crud-test/migrations/20200718111257_todos.sql new file mode 100644 index 0000000000..700d99900f --- /dev/null +++ b/tests/mysql/wasm-components/pool-crud-test/migrations/20200718111257_todos.sql @@ -0,0 +1,6 @@ +CREATE TABLE IF NOT EXISTS todos +( + id BIGINT UNSIGNED PRIMARY KEY NOT NULL AUTO_INCREMENT, + description TEXT NOT NULL, + done BOOLEAN NOT NULL DEFAULT FALSE +); diff --git a/tests/mysql/wasm-components/pool-crud-test/src/lib.rs b/tests/mysql/wasm-components/pool-crud-test/src/lib.rs new file mode 100644 index 0000000000..9b9022a12d --- /dev/null +++ b/tests/mysql/wasm-components/pool-crud-test/src/lib.rs @@ -0,0 +1,74 @@ +use sqlx::mysql::MySqlPoolOptions; +use sqlx::{Executor, Row}; +use std::env; + +async fn run() -> anyhow::Result<()> { + let database_url = env::var("DATABASE_URL")?; + let pool = MySqlPoolOptions::new() + .max_connections(2) + .connect(&database_url) + .await?; + + // Create table + pool.execute( + r#" + CREATE TABLE IF NOT EXISTS wasi_todos ( + id BIGINT PRIMARY KEY AUTO_INCREMENT, + description TEXT NOT NULL, + done BOOL NOT NULL DEFAULT FALSE + ) + "#, + ) + .await?; + + // Insert + let insert_result = sqlx::query("INSERT INTO wasi_todos (description) VALUES (?)") + .bind("Test todo") + .execute(&pool) + .await?; + assert!(insert_result.last_insert_id() > 0); + + // Select + let row = sqlx::query("SELECT id, description, done FROM wasi_todos WHERE id = ?") + .bind(insert_result.last_insert_id()) + .fetch_one(&pool) + .await?; + let description: &str = row.try_get("description")?; + assert_eq!(description, "Test todo"); + + // Update + let update_result = sqlx::query("UPDATE wasi_todos SET done = TRUE WHERE id = ?") + .bind(insert_result.last_insert_id()) + .execute(&pool) + .await?; + assert_eq!(update_result.rows_affected(), 1); + + // Delete + let delete_result = sqlx::query("DELETE FROM wasi_todos WHERE id = ?") + .bind(insert_result.last_insert_id()) + .execute(&pool) + .await?; + assert_eq!(delete_result.rows_affected(), 1); + + eprintln!("Pool CRUD test passed!"); + Ok(()) +} + +wasip3::cli::command::export!(Component); + +struct Component; + +impl wasip3::exports::cli::run::Guest for Component { + async fn run() -> Result<(), ()> { + tokio::task::LocalSet::new() + .run_until(async { + if let Err(err) = run().await { + eprintln!("Pool CRUD test failed: {err:#}"); + Err(()) + } else { + Ok(()) + } + }) + .await + } +} diff --git a/tests/mysql/wasm-components/prepared-query-test/Cargo.toml b/tests/mysql/wasm-components/prepared-query-test/Cargo.toml new file mode 100644 index 0000000000..2953688436 --- /dev/null +++ b/tests/mysql/wasm-components/prepared-query-test/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "prepared-query-test" +version = "0.1.0" +edition = "2021" +workspace = "../../../../" + +[lib] +crate-type = ["cdylib"] + +[dependencies] +anyhow = "1.0" +futures = "0.3" +sqlx = { path = "../../../../", features = [ "mysql", "runtime-tokio" ] } +clap = { version = "4", features = ["derive"] } +tokio = { version = "1.20.0", features = ["rt"]} +dotenvy = "0.15.0" +wasip3 = "0.2.0+wasi-0.3.0-rc-2025-09-16" + +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +sqlx = { path = "../../../../", features = ["tls-native-tls"] } \ No newline at end of file diff --git a/tests/mysql/wasm-components/prepared-query-test/README.md b/tests/mysql/wasm-components/prepared-query-test/README.md new file mode 100644 index 0000000000..6f2f8d6c66 --- /dev/null +++ b/tests/mysql/wasm-components/prepared-query-test/README.md @@ -0,0 +1,41 @@ +# TODOs Example + +## Setup + +1. Declare the database URL + + ``` + export DATABASE_URL="mysql://root:password@localhost/todos" + ``` + +2. Create the database. + + ``` + $ sqlx db create + ``` + +3. Run sql migrations + + ``` + $ sqlx migrate run + ``` + +## Usage + +Add a todo + +``` +cargo run -- add "todo description" +``` + +Complete a todo. + +``` +cargo run -- done +``` + +List all todos + +``` +cargo run +``` diff --git a/tests/mysql/wasm-components/prepared-query-test/migrations/20200718111257_todos.sql b/tests/mysql/wasm-components/prepared-query-test/migrations/20200718111257_todos.sql new file mode 100644 index 0000000000..700d99900f --- /dev/null +++ b/tests/mysql/wasm-components/prepared-query-test/migrations/20200718111257_todos.sql @@ -0,0 +1,6 @@ +CREATE TABLE IF NOT EXISTS todos +( + id BIGINT UNSIGNED PRIMARY KEY NOT NULL AUTO_INCREMENT, + description TEXT NOT NULL, + done BOOLEAN NOT NULL DEFAULT FALSE +); diff --git a/tests/mysql/wasm-components/prepared-query-test/src/lib.rs b/tests/mysql/wasm-components/prepared-query-test/src/lib.rs new file mode 100644 index 0000000000..f9ec6b710a --- /dev/null +++ b/tests/mysql/wasm-components/prepared-query-test/src/lib.rs @@ -0,0 +1,39 @@ +use sqlx::mysql::MySqlConnection; +use sqlx::Connection; +use std::env; + +async fn run() -> anyhow::Result<()> { + let database_url = env::var("DATABASE_URL")?; + let mut conn = MySqlConnection::connect(&database_url).await?; + + // MySQL returns DOUBLE for arithmetic, so use f64 or cast to INT + let value: i64 = sqlx::query_scalar("SELECT CAST(? + ? AS SIGNED)") + .bind(2_i32) + .bind(3_i32) + .fetch_one(&mut conn) + .await?; + assert_eq!(value, 5); + + conn.close().await?; + eprintln!("Prepared query test passed!"); + Ok(()) +} + +wasip3::cli::command::export!(Component); + +struct Component; + +impl wasip3::exports::cli::run::Guest for Component { + async fn run() -> Result<(), ()> { + tokio::task::LocalSet::new() + .run_until(async { + if let Err(err) = run().await { + eprintln!("Prepared query test failed: {err:#}"); + Err(()) + } else { + Ok(()) + } + }) + .await + } +}