From 7d49de3007212d7f8c904c0ea9babfe811711553 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Mon, 6 Jan 2025 10:31:41 +0800 Subject: [PATCH] feat: rest_api support fn query_row_batch (#555) * feat: rest_api support fn query_row_batch * rm integration for py3.7 --- .github/workflows/bindings.python.yml | 2 +- .gitignore | 3 +- core/src/response.rs | 4 +- driver/src/rest_api.rs | 69 ++++++++++++++++++++++++--- 4 files changed, 68 insertions(+), 10 deletions(-) diff --git a/.github/workflows/bindings.python.yml b/.github/workflows/bindings.python.yml index 3370844f..f06ee809 100644 --- a/.github/workflows/bindings.python.yml +++ b/.github/workflows/bindings.python.yml @@ -108,7 +108,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - pyver: ["3.7", "3.8", "3.9", "3.10", "3.11", "3.12", "3.13"] + pyver: ["3.8", "3.9", "3.10", "3.11", "3.12", "3.13"] steps: - uses: actions/checkout@v4 - name: Setup Python diff --git a/.gitignore b/.gitignore index 3b2b9fab..3d4251d4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,6 @@ .vscode .idea -target/ +**/target Cargo.lock venv/ @@ -9,3 +9,4 @@ venv/ /dist +**/.DS_Store diff --git a/core/src/response.rs b/core/src/response.rs index 048964f5..e7268ea8 100644 --- a/core/src/response.rs +++ b/core/src/response.rs @@ -14,7 +14,7 @@ use crate::error_code::ErrorCode; use crate::session::SessionState; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; #[derive(Deserialize, Debug)] pub struct QueryStats { @@ -53,7 +53,7 @@ pub struct ProgressValues { pub bytes: usize, } -#[derive(Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct SchemaField { pub name: String, #[serde(rename = "type")] diff --git a/driver/src/rest_api.rs b/driver/src/rest_api.rs index 140393de..9d6ce129 100644 --- a/driver/src/rest_api.rs +++ b/driver/src/rest_api.rs @@ -29,9 +29,9 @@ use tokio::fs::File; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio_stream::Stream; -use databend_client::APIClient; use databend_client::PresignedResponse; use databend_client::QueryResponse; +use databend_client::{APIClient, SchemaField}; use databend_driver_core::error::{Error, Result}; use databend_driver_core::rows::{Row, RowIterator, RowStatsIterator, RowWithStats, ServerStats}; use databend_driver_core::schema::{Schema, SchemaRef}; @@ -84,7 +84,7 @@ impl Connection for RestAPIConnection { async fn query_iter_ext(&self, sql: &str) -> Result { info!("query iter ext: {}", sql); let resp = self.client.start_query(sql).await?; - let resp = self.wait_for_schema(resp).await?; + let resp = self.wait_for_schema(resp, true).await?; let (schema, rows) = RestAPIRows::::from_response(self.client.clone(), resp)?; Ok(RowStatsIterator::new(Arc::new(schema), Box::pin(rows))) } @@ -93,7 +93,7 @@ impl Connection for RestAPIConnection { async fn query_raw_iter(&self, sql: &str) -> Result { info!("query raw iter: {}", sql); let resp = self.client.start_query(sql).await?; - let resp = self.wait_for_schema(resp).await?; + let resp = self.wait_for_schema(resp, true).await?; let (schema, rows) = RestAPIRows::::from_response(self.client.clone(), resp)?; Ok(RawRowIterator::new(Arc::new(schema), Box::pin(rows))) @@ -221,8 +221,14 @@ impl<'o> RestAPIConnection { }) } - async fn wait_for_schema(&self, resp: QueryResponse) -> Result { - if !resp.data.is_empty() || !resp.schema.is_empty() || resp.stats.progresses.has_progress() + async fn wait_for_schema( + &self, + resp: QueryResponse, + return_on_progress: bool, + ) -> Result { + if !resp.data.is_empty() + || !resp.schema.is_empty() + || (return_on_progress && resp.stats.progresses.has_progress()) { return Ok(resp); } @@ -240,7 +246,7 @@ impl<'o> RestAPIConnection { if !result.data.is_empty() || !result.schema.is_empty() - || result.stats.progresses.has_progress() + || (return_on_progress && result.stats.progresses.has_progress()) { break; } @@ -262,6 +268,12 @@ impl<'o> RestAPIConnection { fn default_copy_options() -> BTreeMap<&'o str, &'o str> { vec![("purge", "true")].into_iter().collect() } + + pub async fn query_row_batch(&self, sql: &str) -> Result { + let resp = self.client.start_query(sql).await?; + let resp = self.wait_for_schema(resp, false).await?; + RowBatch::from_response(self.client.clone(), resp) + } } type PageFut = Pin> + Send>>; @@ -380,3 +392,48 @@ impl FromRowStats for RawRowWithStats { Ok(RawRowWithStats::Row(RawRow::new(rows, row))) } } + +pub struct RowBatch { + schema: Vec, + client: Arc, + query_id: String, + node_id: Option, + + next_uri: Option, + data: Vec>>, +} + +impl RowBatch { + pub fn schema(&self) -> Vec { + self.schema.clone() + } + + fn from_response(client: Arc, mut resp: QueryResponse) -> Result { + Ok(Self { + schema: std::mem::take(&mut resp.schema), + client, + query_id: resp.id, + node_id: resp.node_id, + next_uri: resp.next_uri, + data: resp.data, + }) + } + + pub async fn fetch_next_page(&mut self) -> Result>>> { + if !self.data.is_empty() { + return Ok(std::mem::take(&mut self.data)); + } + while let Some(next_uri) = &self.next_uri { + let resp = self + .client + .query_page(&self.query_id, next_uri, &self.node_id) + .await?; + + self.next_uri = resp.next_uri; + if !resp.data.is_empty() { + return Ok(resp.data); + } + } + Ok(vec![]) + } +}