Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
sundy-li committed Jan 3, 2025
1 parent c1f10c2 commit fcccf49
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 26 deletions.
6 changes: 3 additions & 3 deletions driver/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ pub trait Connection: Send + Sync {

// raw data reponse query, only for test

Check warning on line 125 in driver/src/conn.rs

View workflow job for this annotation

GitHub Actions / check

"reponse" should be "response".
async fn query_raw_iter(&self, _sql: &str) -> Result<RawRowIterator> {
Err(Error::BadArgument(format!(
"Unsupported implement query_raw_iter"
)))
Err(Error::BadArgument(
"Unsupported implement query_raw_iter".to_string(),
))
}

// raw data reponse query, only for test

Check warning on line 132 in driver/src/conn.rs

View workflow job for this annotation

GitHub Actions / check

"reponse" should be "response".
Expand Down
3 changes: 2 additions & 1 deletion driver/src/rest_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ impl FromRowStats for RawRowWithStats {
}

fn try_from_row(row: Vec<Option<String>>, schema: SchemaRef) -> Result<Self> {
Ok(RawRowWithStats::Row(RawRow::new(schema, row)))
let rows = Row::try_from((schema, row.clone()))?;
Ok(RawRowWithStats::Row(RawRow::new(rows, row)))
}
}
40 changes: 24 additions & 16 deletions sql/src/raw_rows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ use std::task::Poll;

use tokio_stream::{Stream, StreamExt};

use crate::error::Error;
use crate::error::Result;
use crate::rows::Row;
use crate::rows::ServerStats;
use crate::schema::SchemaRef;
use crate::value::Value;

#[derive(Clone, Debug)]
pub enum RawRowWithStats {
Expand All @@ -30,39 +33,44 @@ pub enum RawRowWithStats {

#[derive(Clone, Debug, Default)]
pub struct RawRow {
pub schema: SchemaRef,
pub values: Vec<Option<String>>,
pub row: Row,
pub raw_row: Vec<Option<String>>,
}

impl RawRow {
pub fn new(schema: SchemaRef, values: Vec<Option<String>>) -> Self {
Self { schema, values }
pub fn new(row: Row, raw_row: Vec<Option<String>>) -> Self {
Self { row, raw_row }
}

pub fn len(&self) -> usize {
self.values.len()
self.raw_row.len()
}

pub fn is_empty(&self) -> bool {
self.values.is_empty()
self.raw_row.is_empty()
}

pub fn values(&self) -> &[Option<String>] {
&self.values
&self.raw_row
}

pub fn schema(&self) -> SchemaRef {
self.schema.clone()
}

pub fn from_vec(schema: SchemaRef, values: Vec<Option<String>>) -> Self {
Self { schema, values }
self.row.schema()
}
}

impl From<(SchemaRef, Vec<Option<String>>)> for RawRow {
fn from(value: (SchemaRef, Vec<Option<String>>)) -> Self {
Self::new(value.0, value.1)
impl TryFrom<(SchemaRef, Vec<Option<String>>)> for RawRow {
type Error = Error;

fn try_from((schema, data): (SchemaRef, Vec<Option<String>>)) -> Result<Self> {
let mut values: Vec<Value> = Vec::new();
for (i, field) in schema.fields().iter().enumerate() {
let val: Option<&str> = data.get(i).and_then(|v| v.as_deref());
values.push(Value::try_from((&field.data_type, val))?);
}

let row = Row::new(schema, values);
Ok(RawRow::new(row, data))
}
}

Expand All @@ -71,7 +79,7 @@ impl IntoIterator for RawRow {
type IntoIter = std::vec::IntoIter<Self::Item>;

fn into_iter(self) -> Self::IntoIter {
self.values.into_iter()
self.raw_row.into_iter()
}
}

Expand Down
7 changes: 3 additions & 4 deletions ttc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ authors = { workspace = true }
repository = { workspace = true }

[dependencies]
databend-driver = { path = "../driver"}
bytes = "1"
clap = { version = "4.4", features = ["derive", "env"] }
databend-driver = { path = "../driver" }
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", default-features = false, features = ["std"] }
tokio = { version = "1.34", features = [
Expand All @@ -22,16 +24,13 @@ tokio = { version = "1.34", features = [
"parking_lot",
"full",
] }
clap = { version = "4.4", features = ["derive", "env"] }
bytes = "1"

[[bin]]
name = "ttc-server"
path = "src/server.rs"
doctest = false
test = false


[[bin]]
name = "ttc-client"
path = "src/client.rs"
Expand Down
5 changes: 3 additions & 2 deletions ttc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use bytes::BytesMut;
use databend_driver::{Client, Connection, Row, Value};
use databend_driver::{Client, Connection};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};

Expand Down Expand Up @@ -58,6 +58,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let l = format!("127.0.0.1:{}", config.port);
let listener = TcpListener::bind(&l).await?;
println!("Rust TTC Server running on {l}");
println!("Ready to accept connections");

loop {
let (socket, _) = listener.accept().await?;
Expand Down Expand Up @@ -125,7 +126,7 @@ async fn execute_command(
};
match results {
Ok(results) => {
response.values = results.into_iter().map(|row| row.values).collect();
response.values = results.into_iter().map(|row| row.raw_row).collect();
}
Err(err) => response.error = Some(err.to_string()),
}
Expand Down

0 comments on commit fcccf49

Please sign in to comment.