Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
sundy-li committed Jan 2, 2025
1 parent 4fe1025 commit c1f10c2
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 28 deletions.
14 changes: 14 additions & 0 deletions driver/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::path::Path;
use std::sync::Arc;

use async_trait::async_trait;
use databend_driver_core::raw_rows::{RawRow, RawRowIterator};
use once_cell::sync::Lazy;
use tokio::io::AsyncRead;
use tokio_stream::StreamExt;
Expand Down Expand Up @@ -121,6 +122,19 @@ pub trait Connection: Send + Sync {
rows.collect().await
}

// raw data reponse query, only for test
async fn query_raw_iter(&self, _sql: &str) -> Result<RawRowIterator> {
Err(Error::BadArgument(format!(
"Unsupported implement query_raw_iter"
)))
}

// raw data reponse query, only for test
async fn query_raw_all(&self, sql: &str) -> Result<Vec<RawRow>> {
let rows = self.query_raw_iter(sql).await?;
rows.collect().await
}

/// Get presigned url for a given operation and stage location.
/// The operation can be "UPLOAD" or "DOWNLOAD".
async fn get_presigned_url(&self, operation: &str, stage: &str) -> Result<PresignedResponse>;
Expand Down
62 changes: 50 additions & 12 deletions driver/src/rest_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
use std::collections::{BTreeMap, VecDeque};
use std::future::Future;
use std::io::Cursor;
use std::marker::PhantomData;
use std::path::Path;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use async_compression::tokio::write::ZstdEncoder;
use async_trait::async_trait;
use databend_driver_core::raw_rows::{RawRow, RawRowIterator, RawRowWithStats};
use log::info;
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
Expand Down Expand Up @@ -83,10 +85,20 @@ impl Connection for RestAPIConnection {
info!("query iter ext: {}", sql);
let resp = self.client.start_query(sql).await?;
let resp = self.wait_for_schema(resp).await?;
let (schema, rows) = RestAPIRows::from_response(self.client.clone(), resp)?;
let (schema, rows) = RestAPIRows::<RowWithStats>::from_response(self.client.clone(), resp)?;
Ok(RowStatsIterator::new(Arc::new(schema), Box::pin(rows)))
}

// raw data reponse query, only for test
async fn query_raw_iter(&self, sql: &str) -> Result<RawRowIterator> {
info!("query raw iter: {}", sql);
let resp = self.client.start_query(sql).await?;
let resp = self.wait_for_schema(resp).await?;
let (schema, rows) =
RestAPIRows::<RawRowWithStats>::from_response(self.client.clone(), resp)?;
Ok(RawRowIterator::new(Arc::new(schema), Box::pin(rows)))
}

async fn get_presigned_url(&self, operation: &str, stage: &str) -> Result<PresignedResponse> {
info!("get presigned url: {} {}", operation, stage);
let sql = format!("PRESIGN {} {}", operation, stage);
Expand Down Expand Up @@ -254,7 +266,7 @@ impl<'o> RestAPIConnection {

type PageFut = Pin<Box<dyn Future<Output = Result<QueryResponse>> + Send>>;

pub struct RestAPIRows {
pub struct RestAPIRows<T> {
client: Arc<APIClient>,
schema: SchemaRef,
data: VecDeque<Vec<Option<String>>>,
Expand All @@ -263,9 +275,10 @@ pub struct RestAPIRows {
node_id: Option<String>,
next_uri: Option<String>,
next_page: Option<PageFut>,
_phantom: std::marker::PhantomData<T>,
}

impl RestAPIRows {
impl<T> RestAPIRows<T> {
fn from_response(client: Arc<APIClient>, resp: QueryResponse) -> Result<(Schema, Self)> {
let schema: Schema = resp.schema.try_into()?;
let rows = Self {
Expand All @@ -277,24 +290,25 @@ impl RestAPIRows {
data: resp.data.into(),
stats: Some(ServerStats::from(resp.stats)),
next_page: None,
_phantom: PhantomData,
};
Ok((schema, rows))
}
}

impl Stream for RestAPIRows {
type Item = Result<RowWithStats>;
impl<T: FromRowStats + std::marker::Unpin> Stream for RestAPIRows<T> {
type Item = Result<T>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(ss) = self.stats.take() {
return Poll::Ready(Some(Ok(RowWithStats::Stats(ss))));
return Poll::Ready(Some(Ok(T::from_stats(ss))));
}
// Skip to fetch next page if there is only one row left in buffer.
// Therefore we could guarantee the `/final` called before the last row.
if self.data.len() > 1 {
if let Some(row) = self.data.pop_front() {
let row = Row::try_from((self.schema.clone(), row))?;
return Poll::Ready(Some(Ok(RowWithStats::Row(row))));
let row = T::try_from_row(row, self.schema.clone())?;
return Poll::Ready(Some(Ok(row)));
}
}
match self.next_page {
Expand All @@ -307,8 +321,7 @@ impl Stream for RestAPIRows {
self.next_page = None;
let mut new_data = resp.data.into();
self.data.append(&mut new_data);
let stats = ServerStats::from(resp.stats);
Poll::Ready(Some(Ok(RowWithStats::Stats(stats))))
Poll::Ready(Some(Ok(T::from_stats(resp.stats.into()))))
}
Poll::Ready(Err(e)) => {
self.next_page = None;
Expand All @@ -332,12 +345,37 @@ impl Stream for RestAPIRows {
}
None => match self.data.pop_front() {
Some(row) => {
let row = Row::try_from((self.schema.clone(), row))?;
Poll::Ready(Some(Ok(RowWithStats::Row(row))))
let row = T::try_from_row(row, self.schema.clone())?;
Poll::Ready(Some(Ok(row)))
}
None => Poll::Ready(None),
},
},
}
}
}

trait FromRowStats: Send + Sync + Clone {
fn from_stats(stats: ServerStats) -> Self;
fn try_from_row(row: Vec<Option<String>>, schema: SchemaRef) -> Result<Self>;
}

impl FromRowStats for RowWithStats {
fn from_stats(stats: ServerStats) -> Self {
RowWithStats::Stats(stats)
}

fn try_from_row(row: Vec<Option<String>>, schema: SchemaRef) -> Result<Self> {
Ok(RowWithStats::Row(Row::try_from((schema, row))?))
}
}

impl FromRowStats for RawRowWithStats {
fn from_stats(stats: ServerStats) -> Self {
RawRowWithStats::Stats(stats)
}

fn try_from_row(row: Vec<Option<String>>, schema: SchemaRef) -> Result<Self> {
Ok(RawRowWithStats::Row(RawRow::new(schema, row)))
}
}
1 change: 1 addition & 0 deletions sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

mod cursor_ext;
pub mod error;
pub mod raw_rows;
pub mod rows;
pub mod schema;
pub mod value;
Expand Down
142 changes: 142 additions & 0 deletions sql/src/raw_rows.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::pin::Pin;
use std::task::Context;
use std::task::Poll;

use tokio_stream::{Stream, StreamExt};

use crate::error::Result;
use crate::rows::ServerStats;
use crate::schema::SchemaRef;

#[derive(Clone, Debug)]
pub enum RawRowWithStats {
Row(RawRow),
Stats(ServerStats),
}

#[derive(Clone, Debug, Default)]
pub struct RawRow {
pub schema: SchemaRef,
pub values: Vec<Option<String>>,
}

impl RawRow {
pub fn new(schema: SchemaRef, values: Vec<Option<String>>) -> Self {
Self { schema, values }
}

pub fn len(&self) -> usize {
self.values.len()
}

pub fn is_empty(&self) -> bool {
self.values.is_empty()
}

pub fn values(&self) -> &[Option<String>] {
&self.values
}

pub fn schema(&self) -> SchemaRef {
self.schema.clone()
}

pub fn from_vec(schema: SchemaRef, values: Vec<Option<String>>) -> Self {
Self { schema, values }
}
}

impl From<(SchemaRef, Vec<Option<String>>)> for RawRow {
fn from(value: (SchemaRef, Vec<Option<String>>)) -> Self {
Self::new(value.0, value.1)
}
}

impl IntoIterator for RawRow {
type Item = Option<String>;
type IntoIter = std::vec::IntoIter<Self::Item>;

fn into_iter(self) -> Self::IntoIter {
self.values.into_iter()
}
}

#[derive(Clone, Debug)]
pub struct RawRows {
rows: Vec<RawRow>,
}

impl RawRows {
pub fn new(rows: Vec<RawRow>) -> Self {
Self { rows }
}

pub fn rows(&self) -> &[RawRow] {
&self.rows
}

pub fn len(&self) -> usize {
self.rows.len()
}

pub fn is_empty(&self) -> bool {
self.rows.is_empty()
}
}

impl IntoIterator for RawRows {
type Item = RawRow;
type IntoIter = std::vec::IntoIter<Self::Item>;

fn into_iter(self) -> Self::IntoIter {
self.rows.into_iter()
}
}

pub struct RawRowIterator {
schema: SchemaRef,
it: Pin<Box<dyn Stream<Item = Result<RawRow>> + Send>>,
}

impl RawRowIterator {
pub fn new(
schema: SchemaRef,
it: Pin<Box<dyn Stream<Item = Result<RawRowWithStats>> + Send>>,
) -> Self {
let it = it.filter_map(|r| match r {
Ok(RawRowWithStats::Row(r)) => Some(Ok(r)),
Ok(_) => None,
Err(err) => Some(Err(err)),
});
Self {
schema,
it: Box::pin(it),
}
}

pub fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}

impl Stream for RawRowIterator {
type Item = Result<RawRow>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.it).poll_next(cx)
}
}
5 changes: 3 additions & 2 deletions ttc/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Dockerfile
FROM rust as builder
FROM rust:1.83.0-bullseye as builder

# Set the current working directory inside the container
WORKDIR /usr/src
Expand All @@ -10,7 +10,8 @@ COPY . .
# Build the application
RUN cargo build --bin ttc-server --package databend-ttc --release

RUN cp /usr/src/target/release/ttc-server /usr/local/bin/ttc-server
FROM debian:bullseye-slim
COPY --from=builder /usr/src/target/release/ttc-server /usr/local/bin/ttc-server

# Set the startup command
# docker run --net host datafuselabs/ttc-rust -P 9092 --databend_dsn databend://default:@127.0.0.1:8000
Expand Down
16 changes: 2 additions & 14 deletions ttc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,15 @@ async fn execute_command(
) -> Result<(), Box<dyn std::error::Error>> {
let command_str = String::from_utf8_lossy(command);

let results = conn.query_all(&command_str).await;
let results = conn.query_raw_all(&command_str).await;

let mut response = Response {
values: vec![],
error: None,
};
match results {
Ok(results) => {
response.values = results.into_iter().map(|row| row_to_vec(row)).collect();
response.values = results.into_iter().map(|row| row.values).collect();
}
Err(err) => response.error = Some(err.to_string()),
}
Expand All @@ -145,15 +145,3 @@ async fn execute_command(

Ok(())
}

fn row_to_vec(row: Row) -> Vec<Option<String>> {
row.into_iter()
.map(|v| {
if v == Value::Null {
None
} else {
Some(v.to_string())
}
})
.collect()
}

0 comments on commit c1f10c2

Please sign in to comment.