From 6cec7b48c56f7e471460040598b2554e8ae8317e Mon Sep 17 00:00:00 2001 From: Twice Date: Tue, 5 Nov 2024 00:13:14 +0800 Subject: [PATCH] feat(adapter/kv): support async iterating on scan results (#5208) --- core/Cargo.lock | 45 ++++++++++++++ core/Cargo.toml | 5 +- core/src/raw/adapters/kv/api.rs | 69 +++++++++++++++++++++- core/src/raw/adapters/kv/backend.rs | 61 +++++++++++++++---- core/src/raw/adapters/kv/mod.rs | 10 ++++ core/src/services/atomicserver/backend.rs | 2 + core/src/services/cacache/backend.rs | 2 + core/src/services/cloudflare_kv/backend.rs | 8 ++- core/src/services/d1/backend.rs | 2 + core/src/services/etcd/backend.rs | 9 ++- core/src/services/foundationdb/backend.rs | 2 + core/src/services/gridfs/backend.rs | 2 + core/src/services/libsql/backend.rs | 2 + core/src/services/memcached/backend.rs | 2 + core/src/services/mongodb/backend.rs | 2 + core/src/services/mysql/backend.rs | 2 + core/src/services/nebula_graph/backend.rs | 9 ++- core/src/services/persy/backend.rs | 2 + core/src/services/postgresql/backend.rs | 2 + core/src/services/redb/backend.rs | 2 + core/src/services/redis/backend.rs | 2 + core/src/services/rocksdb/backend.rs | 10 +++- core/src/services/sled/backend.rs | 10 +++- core/src/services/sqlite/backend.rs | 63 ++++++++++++++++---- core/src/services/surrealdb/backend.rs | 2 + core/src/services/tikv/backend.rs | 2 + 26 files changed, 289 insertions(+), 40 deletions(-) diff --git a/core/Cargo.lock b/core/Cargo.lock index ebcd9138a570..96b630981cbf 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -80,6 +80,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "aliasable" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "250f629c0161ad8107cf89319e990051fae62832fd343083bea452d93e2205fd" + [[package]] name = "aligned-array" version = "1.0.1" @@ -5037,6 +5043,7 @@ dependencies = [ "opentelemetry", "opentelemetry-otlp", "opentelemetry_sdk", + "ouroboros", "percent-encoding", "persy", "pretty_assertions", @@ -5369,6 +5376,31 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "ouroboros" +version = "0.18.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "944fa20996a25aded6b4795c6d63f10014a7a83f8be9828a11860b08c5fc4a67" +dependencies = [ + "aliasable", + "ouroboros_macro", + "static_assertions", +] + +[[package]] +name = "ouroboros_macro" +version = "0.18.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39b0deead1528fd0e5947a8546a9642a9777c25f6e1e26f34c97b204bbb465bd" +dependencies = [ + "heck 0.4.1", + "itertools 0.12.1", + "proc-macro2", + "proc-macro2-diagnostics", + "quote", + "syn 2.0.79", +] + [[package]] name = "outref" version = "0.5.1" @@ -5909,6 +5941,19 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "proc-macro2-diagnostics" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af066a9c399a26e020ada66a034357a868728e72cd426f3adcd35f80d88d88c8" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.79", + "version_check", + "yansi", +] + [[package]] name = "procfs" version = "0.16.0" diff --git a/core/Cargo.toml b/core/Cargo.toml index b904bd7e59a4..5ba079e964f0 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -194,7 +194,7 @@ services-s3 = [ services-seafile = [] services-sftp = ["dep:openssh", "dep:openssh-sftp-client", "dep:bb8"] services-sled = ["dep:sled", "internal-tokio-rt"] -services-sqlite = ["dep:sqlx", "sqlx?/sqlite"] +services-sqlite = ["dep:sqlx", "sqlx?/sqlite", "dep:ouroboros"] services-supabase = [] services-surrealdb = ["dep:surrealdb"] services-swift = [] @@ -277,6 +277,9 @@ sqlx = { version = "0.8.0", features = [ # For http based services. reqsign = { version = "0.16.1", default-features = false, optional = true } +# for self-referencing structs +ouroboros = { version = "0.18.4", optional = true } + # for services-atomic-server atomic_lib = { version = "0.39.0", optional = true } # for services-cacache diff --git a/core/src/raw/adapters/kv/api.rs b/core/src/raw/adapters/kv/api.rs index acf449d58bdc..8f9b8e965837 100644 --- a/core/src/raw/adapters/kv/api.rs +++ b/core/src/raw/adapters/kv/api.rs @@ -17,6 +17,7 @@ use std::fmt::Debug; use std::future::ready; +use std::ops::DerefMut; use futures::Future; @@ -25,10 +26,76 @@ use crate::Capability; use crate::Scheme; use crate::*; +/// Scan is the async iterator returned by `Adapter::scan`. +pub trait Scan: Send + Sync + Unpin { + /// Fetch the next key in the current key prefix + /// + /// `Ok(None)` means no further key will be returned + fn next(&mut self) -> impl Future>> + MaybeSend; +} + +/// A noop implementation of Scan +impl Scan for () { + async fn next(&mut self) -> Result> { + Ok(None) + } +} + +/// A Scan implementation for all trivial non-async iterators +pub struct ScanStdIter(I); + +#[cfg(any( + feature = "services-cloudflare-kv", + feature = "services-etcd", + feature = "services-nebula-graph", + feature = "services-rocksdb", + feature = "services-sled" +))] +impl ScanStdIter +where + I: Iterator> + Unpin + Send + Sync, +{ + /// Create a new ScanStdIter from an Iterator + pub(crate) fn new(inner: I) -> Self { + Self(inner) + } +} + +impl Scan for ScanStdIter +where + I: Iterator> + Unpin + Send + Sync, +{ + async fn next(&mut self) -> Result> { + self.0.next().transpose() + } +} + +/// A type-erased wrapper of Scan +pub type Scanner = Box; + +pub trait ScanDyn: Unpin + Send + Sync { + fn next_dyn(&mut self) -> BoxedFuture>>; +} + +impl ScanDyn for T { + fn next_dyn(&mut self) -> BoxedFuture>> { + Box::pin(self.next()) + } +} + +impl Scan for Box { + async fn next(&mut self) -> Result> { + self.deref_mut().next_dyn().await + } +} + /// KvAdapter is the adapter to underlying kv services. /// /// By implement this trait, any kv service can work as an OpenDAL Service. pub trait Adapter: Send + Sync + Debug + Unpin + 'static { + /// TODO: use default associate type `= ()` after stablized + type Scanner: Scan; + /// Return the metadata of this key value accessor. fn metadata(&self) -> Metadata; @@ -81,7 +148,7 @@ pub trait Adapter: Send + Sync + Debug + Unpin + 'static { } /// Scan a key prefix to get all keys that start with this key. - fn scan(&self, path: &str) -> impl Future>> + MaybeSend { + fn scan(&self, path: &str) -> impl Future> + MaybeSend { let _ = path; ready(Err(Error::new( diff --git a/core/src/raw/adapters/kv/backend.rs b/core/src/raw/adapters/kv/backend.rs index 3a7ea3525d94..6b625c78b6eb 100644 --- a/core/src/raw/adapters/kv/backend.rs +++ b/core/src/raw/adapters/kv/backend.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use std::vec::IntoIter; -use super::Adapter; +use super::{Adapter, Scan}; use crate::raw::oio::HierarchyLister; use crate::raw::oio::QueueBuf; use crate::raw::*; @@ -68,8 +68,8 @@ impl Access for Backend { type BlockingReader = Buffer; type Writer = KvWriter; type BlockingWriter = KvWriter; - type Lister = HierarchyLister; - type BlockingLister = HierarchyLister; + type Lister = HierarchyLister>; + type BlockingLister = HierarchyLister; fn info(&self) -> Arc { let mut am: AccessorInfo = self.kv.metadata().into(); @@ -182,19 +182,60 @@ impl Access for Backend { fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> { let p = build_abs_path(&self.root, path); let res = self.kv.blocking_scan(&p)?; - let lister = KvLister::new(&self.root, res); + let lister = BlockingKvLister::new(&self.root, res); let lister = HierarchyLister::new(lister, path, args.recursive()); Ok((RpList::default(), lister)) } } -pub struct KvLister { +pub struct KvLister { + root: String, + inner: Iter, +} + +impl KvLister +where + Iter: Scan, +{ + fn new(root: &str, inner: Iter) -> Self { + Self { + root: root.to_string(), + inner, + } + } + + async fn inner_next(&mut self) -> Result> { + Ok(self.inner.next().await?.map(|v| { + let mode = if v.ends_with('/') { + EntryMode::DIR + } else { + EntryMode::FILE + }; + let mut path = build_rel_path(&self.root, &v); + if path.is_empty() { + path = "/".to_string(); + } + oio::Entry::new(&path, Metadata::new(mode)) + })) + } +} + +impl oio::List for KvLister +where + Iter: Scan, +{ + async fn next(&mut self) -> Result> { + self.inner_next().await + } +} + +pub struct BlockingKvLister { root: String, inner: IntoIter, } -impl KvLister { +impl BlockingKvLister { fn new(root: &str, inner: Vec) -> Self { Self { root: root.to_string(), @@ -218,13 +259,7 @@ impl KvLister { } } -impl oio::List for KvLister { - async fn next(&mut self) -> Result> { - Ok(self.inner_next()) - } -} - -impl oio::BlockingList for KvLister { +impl oio::BlockingList for BlockingKvLister { fn next(&mut self) -> Result> { Ok(self.inner_next()) } diff --git a/core/src/raw/adapters/kv/mod.rs b/core/src/raw/adapters/kv/mod.rs index facb6efe1c59..c03c8d71b808 100644 --- a/core/src/raw/adapters/kv/mod.rs +++ b/core/src/raw/adapters/kv/mod.rs @@ -22,6 +22,16 @@ mod api; pub use api::Adapter; pub use api::Metadata; +pub use api::Scan; +#[cfg(any( + feature = "services-cloudflare-kv", + feature = "services-etcd", + feature = "services-nebula-graph", + feature = "services-rocksdb", + feature = "services-sled" +))] +pub(crate) use api::ScanStdIter; +pub use api::Scanner; mod backend; pub use backend::Backend; diff --git a/core/src/services/atomicserver/backend.rs b/core/src/services/atomicserver/backend.rs index ac5655bead01..2a8318daa7ad 100644 --- a/core/src/services/atomicserver/backend.rs +++ b/core/src/services/atomicserver/backend.rs @@ -351,6 +351,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type Scanner = (); + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Atomicserver, diff --git a/core/src/services/cacache/backend.rs b/core/src/services/cacache/backend.rs index 85914d864fd1..2083f124cf53 100644 --- a/core/src/services/cacache/backend.rs +++ b/core/src/services/cacache/backend.rs @@ -85,6 +85,8 @@ impl Debug for Adapter { } impl kv::Adapter for Adapter { + type Scanner = (); + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Cacache, diff --git a/core/src/services/cloudflare_kv/backend.rs b/core/src/services/cloudflare_kv/backend.rs index 6a15187c3fa1..ce68192f58e4 100644 --- a/core/src/services/cloudflare_kv/backend.rs +++ b/core/src/services/cloudflare_kv/backend.rs @@ -181,6 +181,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type Scanner = kv::Scanner; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::CloudflareKv, @@ -240,7 +242,7 @@ impl kv::Adapter for Adapter { } } - async fn scan(&self, path: &str) -> Result> { + async fn scan(&self, path: &str) -> Result { let mut url = format!("{}/keys", self.url_prefix); if !path.is_empty() { url = format!("{}?prefix={}", url, path); @@ -261,7 +263,9 @@ impl kv::Adapter for Adapter { format!("failed to parse error response: {}", e), ) })?; - Ok(response.result.into_iter().map(|r| r.name).collect()) + Ok(Box::new(kv::ScanStdIter::new( + response.result.into_iter().map(|r| Ok(r.name)), + ))) } _ => Err(parse_error(resp)), } diff --git a/core/src/services/d1/backend.rs b/core/src/services/d1/backend.rs index f50fd6747425..b1c992b1a8c6 100644 --- a/core/src/services/d1/backend.rs +++ b/core/src/services/d1/backend.rs @@ -258,6 +258,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type Scanner = (); + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::D1, diff --git a/core/src/services/etcd/backend.rs b/core/src/services/etcd/backend.rs index 103e7c7abb6b..d37fb35d2a80 100644 --- a/core/src/services/etcd/backend.rs +++ b/core/src/services/etcd/backend.rs @@ -17,6 +17,7 @@ use std::fmt::Debug; use std::fmt::Formatter; +use std::vec; use bb8::PooledConnection; use bb8::RunError; @@ -271,6 +272,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type Scanner = kv::ScanStdIter>>; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Etcd, @@ -310,7 +313,7 @@ impl kv::Adapter for Adapter { Ok(()) } - async fn scan(&self, path: &str) -> Result> { + async fn scan(&self, path: &str) -> Result { let mut client = self.conn().await?; let get_options = Some(GetOptions::new().with_prefix().with_keys_only()); let resp = client @@ -323,10 +326,10 @@ impl kv::Adapter for Adapter { Error::new(ErrorKind::Unexpected, "store key is not valid utf-8 string") .set_source(err) })?; - res.push(v); + res.push(Ok(v)); } - Ok(res) + Ok(kv::ScanStdIter::new(res.into_iter())) } } diff --git a/core/src/services/foundationdb/backend.rs b/core/src/services/foundationdb/backend.rs index 4d4adfa52fd2..d28b70152bcf 100644 --- a/core/src/services/foundationdb/backend.rs +++ b/core/src/services/foundationdb/backend.rs @@ -110,6 +110,8 @@ impl Debug for Adapter { } impl kv::Adapter for Adapter { + type Scanner = (); + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Foundationdb, diff --git a/core/src/services/gridfs/backend.rs b/core/src/services/gridfs/backend.rs index db2bf34456ed..6d7898d1dd99 100644 --- a/core/src/services/gridfs/backend.rs +++ b/core/src/services/gridfs/backend.rs @@ -212,6 +212,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type Scanner = (); + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Gridfs, diff --git a/core/src/services/libsql/backend.rs b/core/src/services/libsql/backend.rs index ff7b9551e6a8..c0870e374678 100644 --- a/core/src/services/libsql/backend.rs +++ b/core/src/services/libsql/backend.rs @@ -305,6 +305,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type Scanner = (); + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Libsql, diff --git a/core/src/services/memcached/backend.rs b/core/src/services/memcached/backend.rs index c89b81ba704a..d0cc42c9211f 100644 --- a/core/src/services/memcached/backend.rs +++ b/core/src/services/memcached/backend.rs @@ -197,6 +197,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type Scanner = (); + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Memcached, diff --git a/core/src/services/mongodb/backend.rs b/core/src/services/mongodb/backend.rs index 24abe6fb03ef..786c34dbe80c 100644 --- a/core/src/services/mongodb/backend.rs +++ b/core/src/services/mongodb/backend.rs @@ -226,6 +226,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type Scanner = (); + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Mongodb, diff --git a/core/src/services/mysql/backend.rs b/core/src/services/mysql/backend.rs index 0b1481ef01ce..4569431b7e9f 100644 --- a/core/src/services/mysql/backend.rs +++ b/core/src/services/mysql/backend.rs @@ -188,6 +188,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type Scanner = (); + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Mysql, diff --git a/core/src/services/nebula_graph/backend.rs b/core/src/services/nebula_graph/backend.rs index d03dd5bf2b13..9c34018bfdb2 100644 --- a/core/src/services/nebula_graph/backend.rs +++ b/core/src/services/nebula_graph/backend.rs @@ -19,6 +19,7 @@ use std::fmt::Debug; #[cfg(feature = "tests")] use std::time::Duration; +use std::vec; use base64::engine::general_purpose::STANDARD as BASE64; use base64::engine::Engine as _; @@ -269,6 +270,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type Scanner = kv::ScanStdIter>>; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::NebulaGraph, @@ -359,7 +362,7 @@ impl kv::Adapter for Adapter { Ok(()) } - async fn scan(&self, path: &str) -> Result> { + async fn scan(&self, path: &str) -> Result { let path = path.replace("'", "\\'").replace('"', "\\\""); let query = format!( "LOOKUP ON {} WHERE {}.{} STARTS WITH '{}' YIELD properties(vertex).{} AS {};", @@ -381,9 +384,9 @@ impl kv::Adapter for Adapter { .map_err(parse_nebulagraph_dataset_error)?; let sub_path = value.as_string().map_err(parse_nebulagraph_dataset_error)?; - res_vec.push(sub_path); + res_vec.push(Ok(sub_path)); } - Ok(res_vec) + Ok(kv::ScanStdIter::new(res_vec.into_iter())) } } diff --git a/core/src/services/persy/backend.rs b/core/src/services/persy/backend.rs index 10b48db08137..e5317b11221d 100644 --- a/core/src/services/persy/backend.rs +++ b/core/src/services/persy/backend.rs @@ -152,6 +152,8 @@ impl Debug for Adapter { } impl kv::Adapter for Adapter { + type Scanner = (); + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Persy, diff --git a/core/src/services/postgresql/backend.rs b/core/src/services/postgresql/backend.rs index 8b4ea56eb223..b7cbacc997ad 100644 --- a/core/src/services/postgresql/backend.rs +++ b/core/src/services/postgresql/backend.rs @@ -187,6 +187,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type Scanner = (); + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Postgresql, diff --git a/core/src/services/redb/backend.rs b/core/src/services/redb/backend.rs index d6dc290e758e..281a5ac96a43 100644 --- a/core/src/services/redb/backend.rs +++ b/core/src/services/redb/backend.rs @@ -111,6 +111,8 @@ impl Debug for Adapter { } impl kv::Adapter for Adapter { + type Scanner = (); + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Redb, diff --git a/core/src/services/redis/backend.rs b/core/src/services/redis/backend.rs index bf8cff201856..e04f5af29daf 100644 --- a/core/src/services/redis/backend.rs +++ b/core/src/services/redis/backend.rs @@ -327,6 +327,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type Scanner = (); + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Redis, diff --git a/core/src/services/rocksdb/backend.rs b/core/src/services/rocksdb/backend.rs index 5ed0f38ec6bc..ecbf9bdfcf09 100644 --- a/core/src/services/rocksdb/backend.rs +++ b/core/src/services/rocksdb/backend.rs @@ -108,6 +108,8 @@ impl Debug for Adapter { } impl kv::Adapter for Adapter { + type Scanner = kv::Scanner; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Rocksdb, @@ -164,13 +166,15 @@ impl kv::Adapter for Adapter { self.db.delete(path).map_err(parse_rocksdb_error) } - async fn scan(&self, path: &str) -> Result> { + async fn scan(&self, path: &str) -> Result { let cloned_self = self.clone(); let cloned_path = path.to_string(); - task::spawn_blocking(move || cloned_self.blocking_scan(cloned_path.as_str())) + let res = task::spawn_blocking(move || cloned_self.blocking_scan(cloned_path.as_str())) .await - .map_err(new_task_join_error)? + .map_err(new_task_join_error)??; + + Ok(Box::new(kv::ScanStdIter::new(res.into_iter().map(Ok)))) } /// TODO: we only need key here. diff --git a/core/src/services/sled/backend.rs b/core/src/services/sled/backend.rs index d1d1e7f18453..c4cb4bdf854b 100644 --- a/core/src/services/sled/backend.rs +++ b/core/src/services/sled/backend.rs @@ -137,6 +137,8 @@ impl Debug for Adapter { } impl kv::Adapter for Adapter { + type Scanner = kv::Scanner; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Sled, @@ -199,13 +201,15 @@ impl kv::Adapter for Adapter { Ok(()) } - async fn scan(&self, path: &str) -> Result> { + async fn scan(&self, path: &str) -> Result { let cloned_self = self.clone(); let cloned_path = path.to_string(); - task::spawn_blocking(move || cloned_self.blocking_scan(cloned_path.as_str())) + let res = task::spawn_blocking(move || cloned_self.blocking_scan(cloned_path.as_str())) .await - .map_err(new_task_join_error)? + .map_err(new_task_join_error)??; + + Ok(Box::new(kv::ScanStdIter::new(res.into_iter().map(Ok)))) } fn blocking_scan(&self, path: &str) -> Result> { diff --git a/core/src/services/sqlite/backend.rs b/core/src/services/sqlite/backend.rs index 0b5c556f8cb3..06158048341a 100644 --- a/core/src/services/sqlite/backend.rs +++ b/core/src/services/sqlite/backend.rs @@ -17,8 +17,15 @@ use std::fmt::Debug; use std::fmt::Formatter; +use std::pin::Pin; use std::str::FromStr; +use std::task::Context; +use std::task::Poll; +use futures::stream::BoxStream; +use futures::Stream; +use futures::StreamExt; +use ouroboros::self_referencing; use sqlx::sqlite::SqliteConnectOptions; use sqlx::SqlitePool; use tokio::sync::OnceCell; @@ -188,7 +195,35 @@ impl Adapter { } } +#[self_referencing] +pub struct SqliteScanner { + pool: SqlitePool, + query: String, + + #[borrows(pool, query)] + #[covariant] + stream: BoxStream<'this, Result>, +} + +impl Stream for SqliteScanner { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.with_stream_mut(|s| s.poll_next_unpin(cx)) + } +} + +unsafe impl Sync for SqliteScanner {} + +impl kv::Scan for SqliteScanner { + async fn next(&mut self) -> Result> { + ::next(self).await.transpose() + } +} + impl kv::Adapter for Adapter { + type Scanner = SqliteScanner; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Sqlite, @@ -249,19 +284,25 @@ impl kv::Adapter for Adapter { Ok(()) } - async fn scan(&self, path: &str) -> Result> { + async fn scan(&self, path: &str) -> Result { let pool = self.get_client().await?; + let stream = SqliteScannerBuilder { + pool: pool.clone(), + query: format!( + "SELECT `{}` FROM `{}` WHERE `{}` LIKE $1", + self.key_field, self.table, self.key_field + ), + stream_builder: |pool, query| { + sqlx::query_scalar(query) + .bind(format!("{path}%")) + .fetch(pool) + .map(|v| v.map_err(parse_sqlite_error)) + .boxed() + }, + } + .build(); - let value = sqlx::query_scalar(&format!( - "SELECT `{}` FROM `{}` WHERE `{}` LIKE $1", - self.key_field, self.table, self.key_field - )) - .bind(format!("{path}%")) - .fetch_all(pool) - .await - .map_err(parse_sqlite_error)?; - - Ok(value) + Ok(stream) } } diff --git a/core/src/services/surrealdb/backend.rs b/core/src/services/surrealdb/backend.rs index a7c098a0ad8e..47b91e36057f 100644 --- a/core/src/services/surrealdb/backend.rs +++ b/core/src/services/surrealdb/backend.rs @@ -283,6 +283,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type Scanner = (); + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Surrealdb, diff --git a/core/src/services/tikv/backend.rs b/core/src/services/tikv/backend.rs index 6f23f156ee70..275dcf9bbdc4 100644 --- a/core/src/services/tikv/backend.rs +++ b/core/src/services/tikv/backend.rs @@ -185,6 +185,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type Scanner = (); + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Tikv,