From ea920c1c22ff1c357ee2ce5cc0e9c446df343a44 Mon Sep 17 00:00:00 2001 From: test Date: Sat, 27 Apr 2024 21:41:10 +0800 Subject: [PATCH 01/11] remove `Database` in `cli export` --- Cargo.lock | 3 ++ src/client/src/region.rs | 91 ++++++++++++++++++++++++++++++- src/cmd/Cargo.toml | 3 ++ src/cmd/src/cli/export.rs | 109 ++++++++++++++++++++++---------------- 4 files changed, 159 insertions(+), 47 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 453689ebc290..2acc7f75a7f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1578,6 +1578,8 @@ dependencies = [ name = "cmd" version = "0.7.2" dependencies = [ + "api", + "arrow-flight", "async-trait", "auth", "catalog", @@ -1588,6 +1590,7 @@ dependencies = [ "common-catalog", "common-config", "common-error", + "common-grpc", "common-macro", "common-meta", "common-procedure", diff --git a/src/client/src/region.rs b/src/client/src/region.rs index a401fa434803..a72dd0c503cf 100644 --- a/src/client/src/region.rs +++ b/src/client/src/region.rs @@ -26,12 +26,13 @@ use common_error::status_code::StatusCode; use common_grpc::flight::{FlightDecoder, FlightMessage}; use common_meta::datanode_manager::Datanode; use common_meta::error::{self as meta_error, Result as MetaResult}; +use common_query::Output; use common_recordbatch::error::ExternalSnafu; use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream}; use common_telemetry::error; use common_telemetry::tracing_context::TracingContext; use prost::Message; -use snafu::{location, Location, OptionExt, ResultExt}; +use snafu::{ensure, location, Location, OptionExt, ResultExt}; use tokio_stream::StreamExt; use crate::error::{ @@ -166,6 +167,94 @@ impl RegionRequester { Ok(Box::pin(record_batch_stream)) } + pub async fn do_get_output(&self, ticket: Ticket) -> Result { + let mut flight_client = self.client.make_flight_client()?; + let response = flight_client + .mut_inner() + .do_get(ticket) + .await + .map_err(|e| { + let tonic_code = e.code(); + let e: error::Error = e.into(); + let code = e.status_code(); + let msg = e.to_string(); + let error = Error::FlightGet { + tonic_code, + addr: flight_client.addr().to_string(), + source: BoxedError::new(ServerSnafu { code, msg }.build()), + }; + error!( + e; "Failed to do Flight get, addr: {}, code: {}", + flight_client.addr(), + tonic_code + ); + error + })?; + + let flight_data_stream = response.into_inner(); + let mut decoder = FlightDecoder::default(); + + let mut flight_message_stream = flight_data_stream.map(move |flight_data| { + flight_data + .map_err(Error::from) + .and_then(|data| decoder.try_decode(data).context(ConvertFlightDataSnafu)) + }); + + let Some(first_flight_message) = flight_message_stream.next().await else { + return IllegalFlightMessagesSnafu { + reason: "Expect the response not to be empty", + } + .fail(); + }; + + let first_flight_message = first_flight_message?; + + match first_flight_message { + FlightMessage::AffectedRows(rows) => { + ensure!( + flight_message_stream.next().await.is_none(), + IllegalFlightMessagesSnafu { + reason: "Expect 'AffectedRows' Flight messages to be the one and the only!" + } + ); + Ok(Output::new_with_affected_rows(rows)) + } + FlightMessage::Recordbatch(_) | FlightMessage::Metrics(_) => { + IllegalFlightMessagesSnafu { + reason: "The first flight message cannot be a RecordBatch or Metrics message", + } + .fail() + } + FlightMessage::Schema(schema) => { + let stream = Box::pin(stream!({ + while let Some(flight_message) = flight_message_stream.next().await { + let flight_message = flight_message + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + match flight_message { + FlightMessage::Recordbatch(record_batch) => yield Ok(record_batch), + FlightMessage::Metrics(_) => {} + FlightMessage::AffectedRows(_) | FlightMessage::Schema(_) => { + yield IllegalFlightMessagesSnafu {reason: format!("A Schema message must be succeeded exclusively by a set of RecordBatch messages, flight_message: {:?}", flight_message)} + .fail() + .map_err(BoxedError::new) + .context(ExternalSnafu); + break; + } + } + } + })); + let record_batch_stream = RecordBatchStreamWrapper { + schema, + stream, + output_ordering: None, + metrics: Default::default(), + }; + Ok(Output::new_with_stream(Box::pin(record_batch_stream))) + } + } + } + async fn handle_inner(&self, request: RegionRequest) -> Result { let request_type = request .body diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index 24bbe69df18c..83150ae9dde3 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -16,6 +16,8 @@ tokio-console = ["common-telemetry/tokio-console"] workspace = true [dependencies] +api.workspace = true +arrow-flight.workspace = true async-trait.workspace = true auth.workspace = true catalog.workspace = true @@ -26,6 +28,7 @@ common-base.workspace = true common-catalog.workspace = true common-config.workspace = true common-error.workspace = true +common-grpc.workspace = true common-macro.workspace = true common-meta.workspace = true common-procedure.workspace = true diff --git a/src/cmd/src/cli/export.rs b/src/cmd/src/cli/export.rs index 4122fbd6d7a1..80a2a60943dd 100644 --- a/src/cmd/src/cli/export.rs +++ b/src/cmd/src/cli/export.rs @@ -15,15 +15,22 @@ use std::path::Path; use std::sync::Arc; +use api::v1::greptime_request::Request; +use api::v1::query_request::Query; +use arrow_flight::Ticket; use async_trait::async_trait; use clap::{Parser, ValueEnum}; use client::api::v1::auth_header::AuthScheme; -use client::api::v1::Basic; -use client::{Client, Database, OutputData, DEFAULT_SCHEMA_NAME}; +use client::api::v1::{AuthHeader, Basic, GreptimeRequest, QueryRequest, RequestHeader}; +use client::region::RegionRequester; +use client::{Client, OutputData, DEFAULT_SCHEMA_NAME}; +use common_query::Output; use common_recordbatch::util::collect; +use common_telemetry::tracing_context::W3cTrace; use common_telemetry::{debug, error, info, warn}; use datatypes::scalars::ScalarVector; use datatypes::vectors::{StringVector, Vector}; +use prost::Message; use snafu::{OptionExt, ResultExt}; use tokio::fs::File; use tokio::io::{AsyncWriteExt, BufWriter}; @@ -88,43 +95,70 @@ impl ExportCommand { addr: self.addr.clone(), })?; let (catalog, schema) = split_database(&self.database)?; - let mut database_client = Database::new( - catalog.clone(), - schema.clone().unwrap_or(DEFAULT_SCHEMA_NAME.to_string()), - client, - ); - if let Some(auth_basic) = &self.auth_basic { + let auth_header = if let Some(auth_basic) = &self.auth_basic { let (username, password) = auth_basic.split_once(':').context(IllegalConfigSnafu { msg: "auth_basic cannot be split by ':'".to_string(), })?; - database_client.set_auth(AuthScheme::Basic(Basic { - username: username.to_string(), - password: password.to_string(), - })); - } + Some(AuthHeader { + auth_scheme: Some(AuthScheme::Basic(Basic { + username: username.to_string(), + password: password.to_string(), + })), + }) + } else { + None + }; Ok(Instance::new(Box::new(Export { - client: database_client, catalog, schema, output_dir: self.output_dir.clone(), parallelism: self.export_jobs, target: self.target.clone(), + auth_header, + region_requester: RegionRequester::new(client), }))) } } pub struct Export { - client: Database, catalog: String, schema: Option, output_dir: String, parallelism: usize, target: ExportTarget, + auth_header: Option, + region_requester: RegionRequester, } impl Export { + /// Execute a sql query. + async fn sql(&self, sql: &str, catalog: String, schema: String) -> Result { + let request = Request::Query(QueryRequest { + query: Some(Query::Sql(sql.to_string())), + }); + let rpc_request = GreptimeRequest { + header: Some(RequestHeader { + catalog: catalog, + schema: schema, + authorization: self.auth_header.clone(), + dbname: String::default(), + timezone: String::default(), + tracing_context: W3cTrace::new(), + }), + request: Some(request), + }; + let ticket = Ticket { + ticket: rpc_request.encode_to_vec().into(), + }; + + self.region_requester + .do_get_output(ticket) + .await + .with_context(|_| RequestDatabaseSnafu { sql }) + } + /// Iterate over all db names. /// /// Newbie: `db_name` is catalog + schema. @@ -132,15 +166,13 @@ impl Export { if let Some(schema) = &self.schema { Ok(vec![(self.catalog.clone(), schema.clone())]) } else { - let mut client = self.client.clone(); - client.set_catalog(self.catalog.clone()); - let result = - client - .sql("show databases") - .await - .with_context(|_| RequestDatabaseSnafu { - sql: "show databases".to_string(), - })?; + let result = self + .sql( + "show databases", + self.catalog.clone(), + DEFAULT_SCHEMA_NAME.to_string(), + ) + .await?; let OutputData::Stream(stream) = result.data else { NotDataFromOutputSnafu.fail()? }; @@ -175,13 +207,9 @@ impl Export { information_schema.tables where table_type = \'BASE TABLE\'\ and table_catalog = \'{catalog}\' and table_schema = \'{schema}\'", ); - let mut client = self.client.clone(); - client.set_catalog(catalog); - client.set_schema(schema); - let result = client - .sql(&sql) - .await - .with_context(|_| RequestDatabaseSnafu { sql })?; + let result = self + .sql(&sql, catalog.to_string(), schema.to_string()) + .await?; let OutputData::Stream(stream) = result.data else { NotDataFromOutputSnafu.fail()? }; @@ -230,13 +258,9 @@ impl Export { r#"show create table "{}"."{}"."{}""#, catalog, schema, table ); - let mut client = self.client.clone(); - client.set_catalog(catalog); - client.set_schema(schema); - let result = client - .sql(&sql) - .await - .with_context(|_| RequestDatabaseSnafu { sql })?; + let result = self + .sql(&sql, catalog.to_string(), schema.to_string()) + .await?; let OutputData::Stream(stream) = result.data else { NotDataFromOutputSnafu.fail()? }; @@ -321,20 +345,13 @@ impl Export { .context(FileIoSnafu)?; let output_dir = Path::new(&self.output_dir).join(format!("{catalog}-{schema}/")); - let mut client = self.client.clone(); - client.set_catalog(catalog.clone()); - client.set_schema(schema.clone()); - // copy database to let sql = format!( "copy database {} to '{}' with (format='parquet');", schema, output_dir.to_str().unwrap() ); - client - .sql(sql.clone()) - .await - .context(RequestDatabaseSnafu { sql })?; + self.sql(&sql, catalog.clone(), schema.clone()).await?; info!("finished exporting {catalog}.{schema} data"); // export copy from sql From 88a5915aef1f11d9dc6a2db267815233e9538d7f Mon Sep 17 00:00:00 2001 From: test Date: Sun, 28 Apr 2024 00:14:49 +0800 Subject: [PATCH 02/11] move `Database` to tests-integration --- Cargo.lock | 8 + Cargo.toml | 1 + benchmarks/Cargo.toml | 1 + benchmarks/src/bin/nyc-taxi.rs | 3 +- src/client/examples/logical.rs | 115 ----------- src/client/examples/stream_ingest.rs | 181 ------------------ src/client/src/client.rs | 14 +- src/client/src/lib.rs | 4 - src/cmd/src/cli.rs | 4 +- src/cmd/src/cli/export.rs | 79 -------- src/cmd/src/cli/repl.rs | 31 +-- src/common/test-util/src/recordbatch.rs | 17 -- src/servers/Cargo.toml | 1 + src/servers/tests/grpc/mod.rs | 3 +- tests-integration/Cargo.toml | 5 + .../src/database.rs | 95 +++++++-- tests-integration/src/lib.rs | 2 + .../src/stream_insert.rs | 5 +- tests-integration/tests/grpc.rs | 3 +- tests/runner/Cargo.toml | 1 + tests/runner/src/env.rs | 5 +- 21 files changed, 133 insertions(+), 445 deletions(-) delete mode 100644 src/client/examples/logical.rs delete mode 100644 src/client/examples/stream_ingest.rs rename {src/client => tests-integration}/src/database.rs (82%) rename {src/client => tests-integration}/src/stream_insert.rs (98%) diff --git a/Cargo.lock b/Cargo.lock index 2acc7f75a7f0..7ec42bd4f566 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -897,6 +897,7 @@ dependencies = [ "rskafka", "serde", "store-api", + "tests-integration", "tokio", "toml 0.8.12", "uuid", @@ -9201,6 +9202,7 @@ dependencies = [ "strum 0.25.0", "table", "tempfile", + "tests-integration", "tikv-jemalloc-ctl", "tokio", "tokio-postgres", @@ -9544,6 +9546,7 @@ dependencies = [ "serde_json", "sqlness", "tempfile", + "tests-integration", "tinytemplate", "tokio", ] @@ -10247,11 +10250,13 @@ version = "0.7.2" dependencies = [ "api", "arrow-flight", + "async-stream", "async-trait", "auth", "axum", "catalog", "chrono", + "clap 4.5.4", "client", "cmd", "common-base", @@ -10274,6 +10279,7 @@ dependencies = [ "dotenv", "frontend", "futures", + "futures-util", "itertools 0.10.5", "meta-client", "meta-srv", @@ -10293,6 +10299,7 @@ dependencies = [ "serde_json", "servers", "session", + "snafu", "sql", "sqlx", "store-api", @@ -10302,6 +10309,7 @@ dependencies = [ "time", "tokio", "tokio-postgres", + "tokio-stream", "tonic 0.11.0", "tower", "uuid", diff --git a/Cargo.toml b/Cargo.toml index 1e938f58a89d..c57c4d86c21a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -223,6 +223,7 @@ sql = { path = "src/sql" } store-api = { path = "src/store-api" } substrait = { path = "src/common/substrait" } table = { path = "src/table" } +tests-integration = { path = "tests-integration" } [workspace.dependencies.meter-macros] git = "https://github.com/GreptimeTeam/greptime-meter.git" diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 18b44e944858..12113f1f2c17 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -33,6 +33,7 @@ rand.workspace = true rskafka.workspace = true serde.workspace = true store-api.workspace = true +tests-integration.workspace = true tokio.workspace = true toml.workspace = true uuid.workspace = true diff --git a/benchmarks/src/bin/nyc-taxi.rs b/benchmarks/src/bin/nyc-taxi.rs index bfc26f3daeae..43d28414fa54 100644 --- a/benchmarks/src/bin/nyc-taxi.rs +++ b/benchmarks/src/bin/nyc-taxi.rs @@ -29,10 +29,11 @@ use client::api::v1::column::Values; use client::api::v1::{ Column, ColumnDataType, ColumnDef, CreateTableExpr, InsertRequest, InsertRequests, SemanticType, }; -use client::{Client, Database, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use client::{Client, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use futures_util::TryStreamExt; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; +use tests_integration::database::Database; use tokio::task::JoinSet; const CATALOG_NAME: &str = "greptime"; diff --git a/src/client/examples/logical.rs b/src/client/examples/logical.rs deleted file mode 100644 index 13f116555519..000000000000 --- a/src/client/examples/logical.rs +++ /dev/null @@ -1,115 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use api::v1::{ColumnDataType, ColumnDef, CreateTableExpr, SemanticType, TableId}; -use client::{Client, Database}; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; -use prost::Message; -use substrait_proto::proto::plan_rel::RelType as PlanRelType; -use substrait_proto::proto::read_rel::{NamedTable, ReadType}; -use substrait_proto::proto::rel::RelType; -use substrait_proto::proto::{PlanRel, ReadRel, Rel}; -use tracing::{event, Level}; - -fn main() { - tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::builder().finish()) - .unwrap(); - - run(); -} - -#[tokio::main] -async fn run() { - let client = Client::with_urls(vec!["127.0.0.1:3001"]); - - let create_table_expr = CreateTableExpr { - catalog_name: "greptime".to_string(), - schema_name: "public".to_string(), - table_name: "test_logical_dist_exec".to_string(), - desc: String::default(), - column_defs: vec![ - ColumnDef { - name: "timestamp".to_string(), - data_type: ColumnDataType::TimestampMillisecond as i32, - is_nullable: false, - default_constraint: vec![], - semantic_type: SemanticType::Timestamp as i32, - comment: String::new(), - ..Default::default() - }, - ColumnDef { - name: "key".to_string(), - data_type: ColumnDataType::Uint64 as i32, - is_nullable: false, - default_constraint: vec![], - semantic_type: SemanticType::Tag as i32, - comment: String::new(), - ..Default::default() - }, - ColumnDef { - name: "value".to_string(), - data_type: ColumnDataType::Uint64 as i32, - is_nullable: false, - default_constraint: vec![], - semantic_type: SemanticType::Field as i32, - comment: String::new(), - ..Default::default() - }, - ], - time_index: "timestamp".to_string(), - primary_keys: vec!["key".to_string()], - create_if_not_exists: false, - table_options: Default::default(), - table_id: Some(TableId { id: 1024 }), - engine: MITO_ENGINE.to_string(), - }; - - let db = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); - let result = db.create(create_table_expr).await.unwrap(); - event!(Level::INFO, "create table result: {:#?}", result); - - let logical = mock_logical_plan(); - event!(Level::INFO, "plan size: {:#?}", logical.len()); - let result = db.logical_plan(logical).await.unwrap(); - - event!(Level::INFO, "result: {:#?}", result); -} - -fn mock_logical_plan() -> Vec { - let catalog_name = "greptime".to_string(); - let schema_name = "public".to_string(); - let table_name = "test_logical_dist_exec".to_string(); - - let named_table = NamedTable { - names: vec![catalog_name, schema_name, table_name], - advanced_extension: None, - }; - let read_type = ReadType::NamedTable(named_table); - - let read_rel = ReadRel { - read_type: Some(read_type), - ..Default::default() - }; - - let mut buf = vec![]; - let rel = Rel { - rel_type: Some(RelType::Read(Box::new(read_rel))), - }; - let plan_rel = PlanRel { - rel_type: Some(PlanRelType::Rel(rel)), - }; - plan_rel.encode(&mut buf).unwrap(); - - buf -} diff --git a/src/client/examples/stream_ingest.rs b/src/client/examples/stream_ingest.rs deleted file mode 100644 index 94f9773096b9..000000000000 --- a/src/client/examples/stream_ingest.rs +++ /dev/null @@ -1,181 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use api::v1::*; -use client::{Client, Database, DEFAULT_SCHEMA_NAME}; -use derive_new::new; -use tracing::{error, info}; - -fn main() { - tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::builder().finish()) - .unwrap(); - - run(); -} - -#[tokio::main] -async fn run() { - let greptimedb_endpoint = - std::env::var("GREPTIMEDB_ENDPOINT").unwrap_or_else(|_| "localhost:4001".to_owned()); - - let greptimedb_dbname = - std::env::var("GREPTIMEDB_DBNAME").unwrap_or_else(|_| DEFAULT_SCHEMA_NAME.to_owned()); - - let grpc_client = Client::with_urls(vec![&greptimedb_endpoint]); - - let client = Database::new_with_dbname(greptimedb_dbname, grpc_client); - - let stream_inserter = client.streaming_inserter().unwrap(); - - if let Err(e) = stream_inserter - .insert(vec![to_insert_request(weather_records_1())]) - .await - { - error!("Error: {e:?}"); - } - - if let Err(e) = stream_inserter - .insert(vec![to_insert_request(weather_records_2())]) - .await - { - error!("Error: {e:?}"); - } - - let result = stream_inserter.finish().await; - - match result { - Ok(rows) => { - info!("Rows written: {rows}"); - } - Err(e) => { - error!("Error: {e:?}"); - } - }; -} - -#[derive(new)] -struct WeatherRecord { - timestamp_millis: i64, - collector: String, - temperature: f32, - humidity: i32, -} - -fn weather_records_1() -> Vec { - vec![ - WeatherRecord::new(1686109527000, "c1".to_owned(), 26.4, 15), - WeatherRecord::new(1686023127000, "c1".to_owned(), 29.3, 20), - WeatherRecord::new(1685936727000, "c1".to_owned(), 31.8, 13), - WeatherRecord::new(1686109527000, "c2".to_owned(), 20.4, 67), - WeatherRecord::new(1686023127000, "c2".to_owned(), 18.0, 74), - WeatherRecord::new(1685936727000, "c2".to_owned(), 19.2, 81), - ] -} - -fn weather_records_2() -> Vec { - vec![ - WeatherRecord::new(1686109527001, "c3".to_owned(), 26.4, 15), - WeatherRecord::new(1686023127002, "c3".to_owned(), 29.3, 20), - WeatherRecord::new(1685936727003, "c3".to_owned(), 31.8, 13), - WeatherRecord::new(1686109527004, "c4".to_owned(), 20.4, 67), - WeatherRecord::new(1686023127005, "c4".to_owned(), 18.0, 74), - WeatherRecord::new(1685936727006, "c4".to_owned(), 19.2, 81), - ] -} - -/// This function generates some random data and bundle them into a -/// `InsertRequest`. -/// -/// Data structure: -/// -/// - `ts`: a timestamp column -/// - `collector`: a tag column -/// - `temperature`: a value field of f32 -/// - `humidity`: a value field of i32 -/// -fn to_insert_request(records: Vec) -> InsertRequest { - // convert records into columns - let rows = records.len(); - - // transpose records into columns - let (timestamp_millis, collectors, temp, humidity) = records.into_iter().fold( - ( - Vec::with_capacity(rows), - Vec::with_capacity(rows), - Vec::with_capacity(rows), - Vec::with_capacity(rows), - ), - |mut acc, rec| { - acc.0.push(rec.timestamp_millis); - acc.1.push(rec.collector); - acc.2.push(rec.temperature); - acc.3.push(rec.humidity); - - acc - }, - ); - - let columns = vec![ - // timestamp column: `ts` - Column { - column_name: "ts".to_owned(), - values: Some(column::Values { - timestamp_millisecond_values: timestamp_millis, - ..Default::default() - }), - semantic_type: SemanticType::Timestamp as i32, - datatype: ColumnDataType::TimestampMillisecond as i32, - ..Default::default() - }, - // tag column: collectors - Column { - column_name: "collector".to_owned(), - values: Some(column::Values { - string_values: collectors.into_iter().collect(), - ..Default::default() - }), - semantic_type: SemanticType::Tag as i32, - datatype: ColumnDataType::String as i32, - ..Default::default() - }, - // field column: temperature - Column { - column_name: "temperature".to_owned(), - values: Some(column::Values { - f32_values: temp, - ..Default::default() - }), - semantic_type: SemanticType::Field as i32, - datatype: ColumnDataType::Float32 as i32, - ..Default::default() - }, - // field column: humidity - Column { - column_name: "humidity".to_owned(), - values: Some(column::Values { - i32_values: humidity, - ..Default::default() - }), - semantic_type: SemanticType::Field as i32, - datatype: ColumnDataType::Int32 as i32, - ..Default::default() - }, - ]; - - InsertRequest { - table_name: "weather_demo".to_owned(), - columns, - row_count: rows as u32, - } -} diff --git a/src/client/src/client.rs b/src/client/src/client.rs index 47a8df49f156..869fd086e8e3 100644 --- a/src/client/src/client.rs +++ b/src/client/src/client.rs @@ -28,21 +28,21 @@ use tonic::transport::Channel; use crate::load_balance::{LoadBalance, Loadbalancer}; use crate::{error, Result}; -pub(crate) struct DatabaseClient { - pub(crate) inner: GreptimeDatabaseClient, +pub struct DatabaseClient { + pub inner: GreptimeDatabaseClient, } -pub(crate) struct FlightClient { +pub struct FlightClient { addr: String, client: FlightServiceClient, } impl FlightClient { - pub(crate) fn addr(&self) -> &str { + pub fn addr(&self) -> &str { &self.addr } - pub(crate) fn mut_inner(&mut self) -> &mut FlightServiceClient { + pub fn mut_inner(&mut self) -> &mut FlightServiceClient { &mut self.client } } @@ -154,7 +154,7 @@ impl Client { .as_bytes() as usize } - pub(crate) fn make_flight_client(&self) -> Result { + pub fn make_flight_client(&self) -> Result { let (addr, channel) = self.find_channel()?; Ok(FlightClient { addr, @@ -164,7 +164,7 @@ impl Client { }) } - pub(crate) fn make_database_client(&self) -> Result { + pub fn make_database_client(&self) -> Result { let (_, channel) = self.find_channel()?; Ok(DatabaseClient { inner: GreptimeDatabaseClient::new(channel) diff --git a/src/client/src/lib.rs b/src/client/src/lib.rs index 1a854c5daa27..be8346faf7b0 100644 --- a/src/client/src/lib.rs +++ b/src/client/src/lib.rs @@ -14,12 +14,10 @@ mod client; pub mod client_manager; -mod database; pub mod error; pub mod load_balance; mod metrics; pub mod region; -mod stream_insert; pub use api; use api::v1::greptime_response::Response; @@ -31,9 +29,7 @@ pub use common_recordbatch::{RecordBatches, SendableRecordBatchStream}; use snafu::OptionExt; pub use self::client::Client; -pub use self::database::Database; pub use self::error::{Error, Result}; -pub use self::stream_insert::StreamInserter; use crate::error::{IllegalDatabaseResponseSnafu, ServerSnafu}; pub fn from_grpc_response(response: GreptimeResponse) -> Result { diff --git a/src/cmd/src/cli.rs b/src/cmd/src/cli.rs index 35dc1e4ba7dc..c44b99ad6bf9 100644 --- a/src/cmd/src/cli.rs +++ b/src/cmd/src/cli.rs @@ -22,7 +22,7 @@ mod helper; // Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373 #[allow(unused)] -mod repl; +// mod repl; // TODO(weny): Removes it #[allow(deprecated)] mod upgrade; @@ -31,7 +31,7 @@ use async_trait::async_trait; use bench::BenchTableMetadataCommand; use clap::Parser; use common_telemetry::logging::LoggingOptions; -pub use repl::Repl; +// pub use repl::Repl; use upgrade::UpgradeCommand; use self::export::ExportCommand; diff --git a/src/cmd/src/cli/export.rs b/src/cmd/src/cli/export.rs index 80a2a60943dd..afe65ed671cb 100644 --- a/src/cmd/src/cli/export.rs +++ b/src/cmd/src/cli/export.rs @@ -437,82 +437,3 @@ fn split_database(database: &str) -> Result<(String, Option)> { Ok((catalog.to_string(), Some(schema.to_string()))) } } - -#[cfg(test)] -mod tests { - use clap::Parser; - use client::{Client, Database}; - use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; - - use crate::error::Result; - use crate::options::{CliOptions, Options}; - use crate::{cli, standalone, App}; - - #[tokio::test(flavor = "multi_thread")] - async fn test_export_create_table_with_quoted_names() -> Result<()> { - let output_dir = tempfile::tempdir().unwrap(); - - let standalone = standalone::Command::parse_from([ - "standalone", - "start", - "--data-home", - &*output_dir.path().to_string_lossy(), - ]); - let Options::Standalone(standalone_opts) = - standalone.load_options(&CliOptions::default())? - else { - unreachable!() - }; - let mut instance = standalone.build(*standalone_opts).await?; - instance.start().await?; - - let client = Client::with_urls(["127.0.0.1:4001"]); - let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); - database - .sql(r#"CREATE DATABASE "cli.export.create_table";"#) - .await - .unwrap(); - database - .sql( - r#"CREATE TABLE "cli.export.create_table"."a.b.c"( - ts TIMESTAMP, - TIME INDEX (ts) - ) engine=mito; - "#, - ) - .await - .unwrap(); - - let output_dir = tempfile::tempdir().unwrap(); - let cli = cli::Command::parse_from([ - "cli", - "export", - "--addr", - "127.0.0.1:4001", - "--output-dir", - &*output_dir.path().to_string_lossy(), - "--target", - "create-table", - ]); - let mut cli_app = cli.build().await?; - cli_app.start().await?; - - instance.stop().await?; - - let output_file = output_dir - .path() - .join("greptime-cli.export.create_table.sql"); - let res = std::fs::read_to_string(output_file).unwrap(); - let expect = r#"CREATE TABLE IF NOT EXISTS "a.b.c" ( - "ts" TIMESTAMP(3) NOT NULL, - TIME INDEX ("ts") -) - -ENGINE=mito -; -"#; - assert_eq!(res.trim(), expect.trim()); - - Ok(()) - } -} diff --git a/src/cmd/src/cli/repl.rs b/src/cmd/src/cli/repl.rs index c622aa462140..0c2af0f1b7e8 100644 --- a/src/cmd/src/cli/repl.rs +++ b/src/cmd/src/cli/repl.rs @@ -19,7 +19,7 @@ use std::time::Instant; use catalog::kvbackend::{ CachedMetaKvBackend, CachedMetaKvBackendBuilder, KvBackendCatalogManager, }; -use client::{Client, Database, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use client::{Client, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_base::Plugins; use common_error::ext::ErrorExt; use common_meta::cache_invalidator::MultiCacheInvalidator; @@ -85,20 +85,21 @@ impl Repl { } let client = Client::with_urls([&cmd.grpc_addr]); - let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); - - let query_engine = if let Some(meta_addr) = &cmd.meta_addr { - create_query_engine(meta_addr).await.map(Some)? - } else { - None - }; - - Ok(Self { - rl, - prompt: "> ".to_string(), - database, - query_engine, - }) + todo!() + // let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); + // + // let query_engine = if let Some(meta_addr) = &cmd.meta_addr { + // create_query_engine(meta_addr).await.map(Some)? + // } else { + // None + // }; + // + // Ok(Self { + // rl, + // prompt: "> ".to_string(), + // database, + // query_engine, + // }) } /// Parse the next command diff --git a/src/common/test-util/src/recordbatch.rs b/src/common/test-util/src/recordbatch.rs index 64a6262a08f3..47c949d40715 100644 --- a/src/common/test-util/src/recordbatch.rs +++ b/src/common/test-util/src/recordbatch.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use client::Database; use common_query::OutputData; use common_recordbatch::util; @@ -21,22 +20,6 @@ pub enum ExpectedOutput<'a> { QueryResult(&'a str), } -pub async fn execute_and_check_output(db: &Database, sql: &str, expected: ExpectedOutput<'_>) { - let output = db.sql(sql).await.unwrap(); - let output = output.data; - - match (&output, expected) { - (OutputData::AffectedRows(x), ExpectedOutput::AffectedRows(y)) => { - assert_eq!(*x, y, "actual: \n{}", x) - } - (OutputData::RecordBatches(_), ExpectedOutput::QueryResult(x)) - | (OutputData::Stream(_), ExpectedOutput::QueryResult(x)) => { - check_output_stream(output, x).await - } - _ => panic!(), - } -} - pub async fn check_output_stream(output: OutputData, expected: &str) { let recordbatches = match output { OutputData::Stream(stream) => util::collect_batches(stream).await.unwrap(), diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index ef8a2f751d7c..7df644b7720a 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -125,6 +125,7 @@ serde_json.workspace = true session = { workspace = true, features = ["testing"] } table.workspace = true tempfile = "3.0.0" +tests-integration.workspace = true tokio-postgres = "0.7" tokio-postgres-rustls = "0.11" tokio-test = "0.4" diff --git a/src/servers/tests/grpc/mod.rs b/src/servers/tests/grpc/mod.rs index 701dce7419f5..183fabb5d44e 100644 --- a/src/servers/tests/grpc/mod.rs +++ b/src/servers/tests/grpc/mod.rs @@ -21,7 +21,7 @@ use arrow_flight::flight_service_server::{FlightService, FlightServiceServer}; use async_trait::async_trait; use auth::tests::MockUserProvider; use auth::UserProviderRef; -use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use client::{Client, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_runtime::{Builder as RuntimeBuilder, Runtime}; use servers::error::{Result, StartGrpcSnafu, TcpBindSnafu}; use servers::grpc::flight::FlightCraftWrapper; @@ -31,6 +31,7 @@ use servers::server::Server; use snafu::ResultExt; use table::test_util::MemTable; use table::TableRef; +use tests_integration::database::Database; use tokio::net::TcpListener; use tokio_stream::wrappers::TcpListenerStream; diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index f843e6615c1b..e6807620c049 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -13,11 +13,13 @@ workspace = true [dependencies] api.workspace = true arrow-flight.workspace = true +async-stream.workspace = true async-trait = "0.1" auth.workspace = true axum.workspace = true catalog.workspace = true chrono.workspace = true +clap.workspace = true client = { workspace = true, features = ["testing"] } cmd.workspace = true common-base.workspace = true @@ -38,6 +40,7 @@ datatypes.workspace = true dotenv.workspace = true frontend = { workspace = true, features = ["testing"] } futures.workspace = true +futures-util.workspace = true meta-client.workspace = true meta-srv = { workspace = true, features = ["mock"] } mysql_async = { version = "0.33", default-features = false, features = [ @@ -53,6 +56,7 @@ secrecy = "0.8" serde_json.workspace = true servers = { workspace = true, features = ["testing"] } session.workspace = true +snafu.workspace = true sql.workspace = true sqlx = { version = "0.6", features = [ "runtime-tokio-rustls", @@ -65,6 +69,7 @@ table.workspace = true tempfile.workspace = true time = "0.3" tokio.workspace = true +tokio-stream = { workspace = true, features = ["net"] } tonic.workspace = true tower = "0.4" uuid.workspace = true diff --git a/src/client/src/database.rs b/tests-integration/src/database.rs similarity index 82% rename from src/client/src/database.rs rename to tests-integration/src/database.rs index 70dc7397f5ed..b76ee816b2a2 100644 --- a/src/client/src/database.rs +++ b/tests-integration/src/database.rs @@ -23,6 +23,8 @@ use api::v1::{ }; use arrow_flight::Ticket; use async_stream::stream; +use client::error::{ConvertFlightDataSnafu, Error, IllegalFlightMessagesSnafu, ServerSnafu}; +use client::{from_grpc_response, Client, Result}; use common_error::ext::{BoxedError, ErrorExt}; use common_grpc::flight::{FlightDecoder, FlightMessage}; use common_query::Output; @@ -34,8 +36,7 @@ use futures_util::StreamExt; use prost::Message; use snafu::{ensure, ResultExt}; -use crate::error::{ConvertFlightDataSnafu, Error, IllegalFlightMessagesSnafu, ServerSnafu}; -use crate::{error, from_grpc_response, metrics, Client, Result, StreamInserter}; +use crate::stream_insert::StreamInserter; pub const DEFAULT_LOOKBACK_STRING: &str = "5m"; @@ -127,12 +128,10 @@ impl Database { } pub async fn insert(&self, requests: InsertRequests) -> Result { - let _timer = metrics::METRIC_GRPC_INSERT.start_timer(); self.handle(Request::Inserts(requests)).await } pub async fn row_insert(&self, requests: RowInsertRequests) -> Result { - let _timer = metrics::METRIC_GRPC_INSERT.start_timer(); self.handle(Request::RowInserts(requests)).await } @@ -157,7 +156,6 @@ impl Database { } pub async fn delete(&self, request: DeleteRequests) -> Result { - let _timer = metrics::METRIC_GRPC_DELETE.start_timer(); self.handle(Request::Deletes(request)).await } @@ -188,7 +186,6 @@ impl Database { where S: AsRef, { - let _timer = metrics::METRIC_GRPC_SQL.start_timer(); self.do_get(Request::Query(QueryRequest { query: Some(Query::Sql(sql.as_ref().to_string())), })) @@ -196,7 +193,6 @@ impl Database { } pub async fn logical_plan(&self, logical_plan: Vec) -> Result { - let _timer = metrics::METRIC_GRPC_LOGICAL_PLAN.start_timer(); self.do_get(Request::Query(QueryRequest { query: Some(Query::LogicalPlan(logical_plan)), })) @@ -210,7 +206,6 @@ impl Database { end: &str, step: &str, ) -> Result { - let _timer = metrics::METRIC_GRPC_PROMQL_RANGE_QUERY.start_timer(); self.do_get(Request::Query(QueryRequest { query: Some(Query::PromRangeQuery(PromRangeQuery { query: promql.to_string(), @@ -224,7 +219,6 @@ impl Database { } pub async fn create(&self, expr: CreateTableExpr) -> Result { - let _timer = metrics::METRIC_GRPC_CREATE_TABLE.start_timer(); self.do_get(Request::Ddl(DdlRequest { expr: Some(DdlExpr::CreateTable(expr)), })) @@ -232,7 +226,6 @@ impl Database { } pub async fn alter(&self, expr: AlterExpr) -> Result { - let _timer = metrics::METRIC_GRPC_ALTER.start_timer(); self.do_get(Request::Ddl(DdlRequest { expr: Some(DdlExpr::Alter(expr)), })) @@ -240,7 +233,6 @@ impl Database { } pub async fn drop_table(&self, expr: DropTableExpr) -> Result { - let _timer = metrics::METRIC_GRPC_DROP_TABLE.start_timer(); self.do_get(Request::Ddl(DdlRequest { expr: Some(DdlExpr::DropTable(expr)), })) @@ -248,7 +240,6 @@ impl Database { } pub async fn truncate_table(&self, expr: TruncateTableExpr) -> Result { - let _timer = metrics::METRIC_GRPC_TRUNCATE_TABLE.start_timer(); self.do_get(Request::Ddl(DdlRequest { expr: Some(DdlExpr::TruncateTable(expr)), })) @@ -256,8 +247,6 @@ impl Database { } async fn do_get(&self, request: Request) -> Result { - // FIXME(paomian): should be added some labels for metrics - let _timer = metrics::METRIC_GRPC_DO_GET.start_timer(); let request = self.to_rpc_request(request); let request = Ticket { ticket: request.encode_to_vec().into(), @@ -267,7 +256,7 @@ impl Database { let response = client.mut_inner().do_get(request).await.map_err(|e| { let tonic_code = e.code(); - let e: error::Error = e.into(); + let e: Error = e.into(); let code = e.status_code(); let msg = e.to_string(); let error = Error::FlightGet { @@ -358,8 +347,14 @@ pub struct FlightContext { mod tests { use api::v1::auth_header::AuthScheme; use api::v1::{AuthHeader, Basic}; + use clap::Parser; + use client::Client; + use cmd::error::Result as CmdResult; + use cmd::options::{CliOptions, Options}; + use cmd::{cli, standalone, App}; + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; - use crate::database::FlightContext; + use super::{Database, FlightContext}; #[test] fn test_flight_ctx() { @@ -382,4 +377,72 @@ mod tests { }) )) } + + #[tokio::test(flavor = "multi_thread")] + async fn test_export_create_table_with_quoted_names() -> CmdResult<()> { + let output_dir = tempfile::tempdir().unwrap(); + + let standalone = standalone::Command::parse_from([ + "standalone", + "start", + "--data-home", + &*output_dir.path().to_string_lossy(), + ]); + let Options::Standalone(standalone_opts) = + standalone.load_options(&CliOptions::default())? + else { + unreachable!() + }; + let mut instance = standalone.build(*standalone_opts).await?; + instance.start().await?; + + let client = Client::with_urls(["127.0.0.1:4001"]); + let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); + database + .sql(r#"CREATE DATABASE "cli.export.create_table";"#) + .await + .unwrap(); + database + .sql( + r#"CREATE TABLE "cli.export.create_table"."a.b.c"( + ts TIMESTAMP, + TIME INDEX (ts) + ) engine=mito; + "#, + ) + .await + .unwrap(); + + let output_dir = tempfile::tempdir().unwrap(); + let cli = cli::Command::parse_from([ + "cli", + "export", + "--addr", + "127.0.0.1:4001", + "--output-dir", + &*output_dir.path().to_string_lossy(), + "--target", + "create-table", + ]); + let mut cli_app = cli.build().await?; + cli_app.start().await?; + + instance.stop().await?; + + let output_file = output_dir + .path() + .join("greptime-cli.export.create_table.sql"); + let res = std::fs::read_to_string(output_file).unwrap(); + let expect = r#"CREATE TABLE IF NOT EXISTS "a.b.c" ( + "ts" TIMESTAMP(3) NOT NULL, + TIME INDEX ("ts") +) + +ENGINE=mito +; +"#; + assert_eq!(res.trim(), expect.trim()); + + Ok(()) + } } diff --git a/tests-integration/src/lib.rs b/tests-integration/src/lib.rs index d3e700151345..4dcfdcc09603 100644 --- a/tests-integration/src/lib.rs +++ b/tests-integration/src/lib.rs @@ -15,12 +15,14 @@ #![feature(assert_matches)] pub mod cluster; +pub mod database; mod grpc; mod influxdb; mod instance; mod opentsdb; mod otlp; mod prom_store; +mod stream_insert; pub mod test_util; pub mod standalone; diff --git a/src/client/src/stream_insert.rs b/tests-integration/src/stream_insert.rs similarity index 98% rename from src/client/src/stream_insert.rs rename to tests-integration/src/stream_insert.rs index a75144786012..bd35c132c98a 100644 --- a/src/client/src/stream_insert.rs +++ b/tests-integration/src/stream_insert.rs @@ -18,15 +18,14 @@ use api::v1::{ AuthHeader, GreptimeRequest, GreptimeResponse, InsertRequest, InsertRequests, RequestHeader, RowInsertRequest, RowInsertRequests, }; +use client::error::{self, Result}; +use client::from_grpc_response; use tokio::sync::mpsc; use tokio::task::JoinHandle; use tokio_stream::wrappers::ReceiverStream; use tonic::transport::Channel; use tonic::{Response, Status}; -use crate::error::{self, Result}; -use crate::from_grpc_response; - /// A structure that provides some methods for streaming data insert. /// /// [`StreamInserter`] cannot be constructed via the `StreamInserter::new` method. diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index 0b38ac252c30..5a14751bbc45 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -20,7 +20,7 @@ use api::v1::{ PromqlRequest, RequestHeader, SemanticType, }; use auth::user_provider_from_option; -use client::{Client, Database, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use client::{Client, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_catalog::consts::MITO_ENGINE; use common_query::Output; use common_recordbatch::RecordBatches; @@ -30,6 +30,7 @@ use servers::http::prometheus::{ PrometheusResponse, }; use servers::server::Server; +use tests_integration::database::Database; use tests_integration::test_util::{ setup_grpc_server, setup_grpc_server_with, setup_grpc_server_with_user_provider, StorageType, }; diff --git a/tests/runner/Cargo.toml b/tests/runner/Cargo.toml index 6e8848de5c83..9967be7c6aaf 100644 --- a/tests/runner/Cargo.toml +++ b/tests/runner/Cargo.toml @@ -19,5 +19,6 @@ serde.workspace = true serde_json.workspace = true sqlness = { version = "0.5" } tempfile.workspace = true +tests-integration.workspace = true tinytemplate = "1.2" tokio.workspace = true diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index ea3e3e1bc10a..399f65840b9c 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -24,14 +24,13 @@ use std::time::Duration; use async_trait::async_trait; use client::error::ServerSnafu; -use client::{ - Client, Database as DB, Error as ClientError, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, -}; +use client::{Client, Error as ClientError, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::ext::ErrorExt; use common_query::{Output, OutputData}; use common_recordbatch::RecordBatches; use serde::Serialize; use sqlness::{Database, EnvController, QueryContext}; +use tests_integration::database::Database as DB; use tinytemplate::TinyTemplate; use tokio::sync::Mutex as TokioMutex; From aca5a71951c023c2454505683f03864bc251188e Mon Sep 17 00:00:00 2001 From: test Date: Sun, 28 Apr 2024 00:52:55 +0800 Subject: [PATCH 03/11] fix clippy --- src/cmd/src/cli/export.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cmd/src/cli/export.rs b/src/cmd/src/cli/export.rs index afe65ed671cb..d844f5e223f7 100644 --- a/src/cmd/src/cli/export.rs +++ b/src/cmd/src/cli/export.rs @@ -140,8 +140,8 @@ impl Export { }); let rpc_request = GreptimeRequest { header: Some(RequestHeader { - catalog: catalog, - schema: schema, + catalog, + schema, authorization: self.auth_header.clone(), dbname: String::default(), timezone: String::default(), From efec7cba139b87e8980e4966e894edf7ab57f584 Mon Sep 17 00:00:00 2001 From: test Date: Sun, 28 Apr 2024 12:30:17 +0800 Subject: [PATCH 04/11] move `DatabaseClient` along with `Database` --- src/client/src/client.rs | 18 ++---------------- tests-integration/src/database.rs | 19 +++++++++++++++++-- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/src/client/src/client.rs b/src/client/src/client.rs index 869fd086e8e3..5e82295c16f6 100644 --- a/src/client/src/client.rs +++ b/src/client/src/client.rs @@ -14,7 +14,6 @@ use std::sync::Arc; -use api::v1::greptime_database_client::GreptimeDatabaseClient; use api::v1::health_check_client::HealthCheckClient; use api::v1::prometheus_gateway_client::PrometheusGatewayClient; use api::v1::region::region_client::RegionClient as PbRegionClient; @@ -28,10 +27,6 @@ use tonic::transport::Channel; use crate::load_balance::{LoadBalance, Loadbalancer}; use crate::{error, Result}; -pub struct DatabaseClient { - pub inner: GreptimeDatabaseClient, -} - pub struct FlightClient { addr: String, client: FlightServiceClient, @@ -138,7 +133,7 @@ impl Client { Ok((addr, channel)) } - fn max_grpc_recv_message_size(&self) -> usize { + pub fn max_grpc_recv_message_size(&self) -> usize { self.inner .channel_manager .config() @@ -146,7 +141,7 @@ impl Client { .as_bytes() as usize } - fn max_grpc_send_message_size(&self) -> usize { + pub fn max_grpc_send_message_size(&self) -> usize { self.inner .channel_manager .config() @@ -164,15 +159,6 @@ impl Client { }) } - pub fn make_database_client(&self) -> Result { - let (_, channel) = self.find_channel()?; - Ok(DatabaseClient { - inner: GreptimeDatabaseClient::new(channel) - .max_decoding_message_size(self.max_grpc_recv_message_size()) - .max_encoding_message_size(self.max_grpc_send_message_size()), - }) - } - pub(crate) fn raw_region_client(&self) -> Result> { let (_, channel) = self.find_channel()?; Ok(PbRegionClient::new(channel) diff --git a/tests-integration/src/database.rs b/tests-integration/src/database.rs index b76ee816b2a2..a22039a13c6e 100644 --- a/tests-integration/src/database.rs +++ b/tests-integration/src/database.rs @@ -14,6 +14,7 @@ use api::v1::auth_header::AuthScheme; use api::v1::ddl_request::Expr as DdlExpr; +use api::v1::greptime_database_client::GreptimeDatabaseClient; use api::v1::greptime_request::Request; use api::v1::query_request::Query; use api::v1::{ @@ -35,6 +36,7 @@ use common_telemetry::tracing_context::W3cTrace; use futures_util::StreamExt; use prost::Message; use snafu::{ensure, ResultExt}; +use tonic::transport::Channel; use crate::stream_insert::StreamInserter; @@ -58,6 +60,19 @@ pub struct Database { ctx: FlightContext, } +pub struct DatabaseClient { + pub inner: GreptimeDatabaseClient, +} + +fn make_database_client(client: &Client) -> Result { + let (_, channel) = client.find_channel()?; + Ok(DatabaseClient { + inner: GreptimeDatabaseClient::new(channel) + .max_decoding_message_size(client.max_grpc_recv_message_size()) + .max_encoding_message_size(client.max_grpc_send_message_size()), + }) +} + impl Database { /// Create database service client using catalog and schema pub fn new(catalog: impl Into, schema: impl Into, client: Client) -> Self { @@ -143,7 +158,7 @@ impl Database { &self, channel_size: usize, ) -> Result { - let client = self.client.make_database_client()?.inner; + let client = make_database_client(&self.client)?.inner; let stream_inserter = StreamInserter::new( client, @@ -160,7 +175,7 @@ impl Database { } async fn handle(&self, request: Request) -> Result { - let mut client = self.client.make_database_client()?.inner; + let mut client = make_database_client(&self.client)?.inner; let request = self.to_rpc_request(request); let response = client.handle(request).await?.into_inner(); from_grpc_response(response) From f0184e5702a774c4a4480485a9854aaabeb08d98 Mon Sep 17 00:00:00 2001 From: test Date: Mon, 29 Apr 2024 18:14:35 +0800 Subject: [PATCH 05/11] `cli export` now use http api --- Cargo.lock | 2 + src/client/src/region.rs | 91 +---------------- src/cmd/Cargo.toml | 2 + src/cmd/src/cli/export.rs | 200 +++++++++++++------------------------- src/cmd/src/error.rs | 11 ++- 5 files changed, 85 insertions(+), 221 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7ec42bd4f566..7b894331869e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1583,6 +1583,7 @@ dependencies = [ "arrow-flight", "async-trait", "auth", + "base64 0.21.7", "catalog", "chrono", "clap 4.5.4", @@ -1623,6 +1624,7 @@ dependencies = [ "query", "rand", "regex", + "reqwest", "rexpect", "rustyline 10.1.1", "serde", diff --git a/src/client/src/region.rs b/src/client/src/region.rs index a72dd0c503cf..a401fa434803 100644 --- a/src/client/src/region.rs +++ b/src/client/src/region.rs @@ -26,13 +26,12 @@ use common_error::status_code::StatusCode; use common_grpc::flight::{FlightDecoder, FlightMessage}; use common_meta::datanode_manager::Datanode; use common_meta::error::{self as meta_error, Result as MetaResult}; -use common_query::Output; use common_recordbatch::error::ExternalSnafu; use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream}; use common_telemetry::error; use common_telemetry::tracing_context::TracingContext; use prost::Message; -use snafu::{ensure, location, Location, OptionExt, ResultExt}; +use snafu::{location, Location, OptionExt, ResultExt}; use tokio_stream::StreamExt; use crate::error::{ @@ -167,94 +166,6 @@ impl RegionRequester { Ok(Box::pin(record_batch_stream)) } - pub async fn do_get_output(&self, ticket: Ticket) -> Result { - let mut flight_client = self.client.make_flight_client()?; - let response = flight_client - .mut_inner() - .do_get(ticket) - .await - .map_err(|e| { - let tonic_code = e.code(); - let e: error::Error = e.into(); - let code = e.status_code(); - let msg = e.to_string(); - let error = Error::FlightGet { - tonic_code, - addr: flight_client.addr().to_string(), - source: BoxedError::new(ServerSnafu { code, msg }.build()), - }; - error!( - e; "Failed to do Flight get, addr: {}, code: {}", - flight_client.addr(), - tonic_code - ); - error - })?; - - let flight_data_stream = response.into_inner(); - let mut decoder = FlightDecoder::default(); - - let mut flight_message_stream = flight_data_stream.map(move |flight_data| { - flight_data - .map_err(Error::from) - .and_then(|data| decoder.try_decode(data).context(ConvertFlightDataSnafu)) - }); - - let Some(first_flight_message) = flight_message_stream.next().await else { - return IllegalFlightMessagesSnafu { - reason: "Expect the response not to be empty", - } - .fail(); - }; - - let first_flight_message = first_flight_message?; - - match first_flight_message { - FlightMessage::AffectedRows(rows) => { - ensure!( - flight_message_stream.next().await.is_none(), - IllegalFlightMessagesSnafu { - reason: "Expect 'AffectedRows' Flight messages to be the one and the only!" - } - ); - Ok(Output::new_with_affected_rows(rows)) - } - FlightMessage::Recordbatch(_) | FlightMessage::Metrics(_) => { - IllegalFlightMessagesSnafu { - reason: "The first flight message cannot be a RecordBatch or Metrics message", - } - .fail() - } - FlightMessage::Schema(schema) => { - let stream = Box::pin(stream!({ - while let Some(flight_message) = flight_message_stream.next().await { - let flight_message = flight_message - .map_err(BoxedError::new) - .context(ExternalSnafu)?; - match flight_message { - FlightMessage::Recordbatch(record_batch) => yield Ok(record_batch), - FlightMessage::Metrics(_) => {} - FlightMessage::AffectedRows(_) | FlightMessage::Schema(_) => { - yield IllegalFlightMessagesSnafu {reason: format!("A Schema message must be succeeded exclusively by a set of RecordBatch messages, flight_message: {:?}", flight_message)} - .fail() - .map_err(BoxedError::new) - .context(ExternalSnafu); - break; - } - } - } - })); - let record_batch_stream = RecordBatchStreamWrapper { - schema, - stream, - output_ordering: None, - metrics: Default::default(), - }; - Ok(Output::new_with_stream(Box::pin(record_batch_stream))) - } - } - } - async fn handle_inner(&self, request: RegionRequest) -> Result { let request_type = request .body diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index 83150ae9dde3..95db21e508e4 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -20,6 +20,7 @@ api.workspace = true arrow-flight.workspace = true async-trait.workspace = true auth.workspace = true +base64.workspace = true catalog.workspace = true chrono.workspace = true clap.workspace = true @@ -61,6 +62,7 @@ prost.workspace = true query.workspace = true rand.workspace = true regex.workspace = true +reqwest.workspace = true rustyline = "10.1" serde.workspace = true serde_json.workspace = true diff --git a/src/cmd/src/cli/export.rs b/src/cmd/src/cli/export.rs index d844f5e223f7..92e2814c0793 100644 --- a/src/cmd/src/cli/export.rs +++ b/src/cmd/src/cli/export.rs @@ -15,22 +15,15 @@ use std::path::Path; use std::sync::Arc; -use api::v1::greptime_request::Request; -use api::v1::query_request::Query; -use arrow_flight::Ticket; use async_trait::async_trait; +use base64::engine::general_purpose; +use base64::Engine as _; use clap::{Parser, ValueEnum}; -use client::api::v1::auth_header::AuthScheme; -use client::api::v1::{AuthHeader, Basic, GreptimeRequest, QueryRequest, RequestHeader}; -use client::region::RegionRequester; -use client::{Client, OutputData, DEFAULT_SCHEMA_NAME}; -use common_query::Output; -use common_recordbatch::util::collect; -use common_telemetry::tracing_context::W3cTrace; +use client::{Client, DEFAULT_SCHEMA_NAME}; use common_telemetry::{debug, error, info, warn}; -use datatypes::scalars::ScalarVector; -use datatypes::vectors::{StringVector, Vector}; -use prost::Message; +use serde_json::Value; +use servers::http::greptime_result_v1::GreptimedbV1Response; +use servers::http::GreptimeQueryOutput; use snafu::{OptionExt, ResultExt}; use tokio::fs::File; use tokio::io::{AsyncWriteExt, BufWriter}; @@ -38,9 +31,8 @@ use tokio::sync::Semaphore; use crate::cli::{Instance, Tool}; use crate::error::{ - CollectRecordBatchesSnafu, ConnectServerSnafu, EmptyResultSnafu, Error, FileIoSnafu, - IllegalConfigSnafu, InvalidDatabaseNameSnafu, NotDataFromOutputSnafu, RequestDatabaseSnafu, - Result, + ConnectServerSnafu, EmptyResultSnafu, Error, FileIoSnafu, InvalidDatabaseNameSnafu, + QuerySqlSnafu, Result, SerdeJsonSnafu, }; type TableReference = (String, String, String); @@ -96,16 +88,9 @@ impl ExportCommand { })?; let (catalog, schema) = split_database(&self.database)?; - let auth_header = if let Some(auth_basic) = &self.auth_basic { - let (username, password) = auth_basic.split_once(':').context(IllegalConfigSnafu { - msg: "auth_basic cannot be split by ':'".to_string(), - })?; - Some(AuthHeader { - auth_scheme: Some(AuthScheme::Basic(Basic { - username: username.to_string(), - password: password.to_string(), - })), - }) + let auth_header = if let Some(basic) = &self.auth_basic { + let encoded = general_purpose::STANDARD.encode(basic); + Some(format!("basic {}", encoded)) } else { None }; @@ -117,7 +102,6 @@ impl ExportCommand { parallelism: self.export_jobs, target: self.target.clone(), auth_header, - region_requester: RegionRequester::new(client), }))) } } @@ -128,35 +112,36 @@ pub struct Export { output_dir: String, parallelism: usize, target: ExportTarget, - auth_header: Option, - region_requester: RegionRequester, + auth_header: Option, } impl Export { - /// Execute a sql query. - async fn sql(&self, sql: &str, catalog: String, schema: String) -> Result { - let request = Request::Query(QueryRequest { - query: Some(Query::Sql(sql.to_string())), - }); - let rpc_request = GreptimeRequest { - header: Some(RequestHeader { - catalog, - schema, - authorization: self.auth_header.clone(), - dbname: String::default(), - timezone: String::default(), - tracing_context: W3cTrace::new(), - }), - request: Some(request), - }; - let ticket = Ticket { - ticket: rpc_request.encode_to_vec().into(), - }; + /// Execute one single sql query. + async fn sql(&self, sql: &str) -> Result>>> { + let client = reqwest::Client::new(); + let mut request = client + .get(format!( + "http://localhost:4000/v1/sql?db={}-{}&sql={}", + self.catalog, + self.schema.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME), + sql + )) + .header("Content-Type", "application/x-www-form-urlencoded"); + if let Some(ref auth) = self.auth_header { + request = request.header("Authorization", auth); + } - self.region_requester - .do_get_output(ticket) - .await - .with_context(|_| RequestDatabaseSnafu { sql }) + let response = request.send().await.context(QuerySqlSnafu)?; + snafu::ensure!(response.status() == 200, EmptyResultSnafu); + + let text = response.text().await.context(QuerySqlSnafu)?; + // .with_context(|_| RequestDatabaseSnafu { sql })?; + + let body = serde_json::from_str::(&text).context(SerdeJsonSnafu)?; + match &body.output()[0] { + GreptimeQueryOutput::Records(records) => Ok(Some(records.rows().clone())), + GreptimeQueryOutput::AffectedRows(_) => Ok(None), + } } /// Iterate over all db names. @@ -166,33 +151,19 @@ impl Export { if let Some(schema) = &self.schema { Ok(vec![(self.catalog.clone(), schema.clone())]) } else { - let result = self - .sql( - "show databases", - self.catalog.clone(), - DEFAULT_SCHEMA_NAME.to_string(), - ) - .await?; - let OutputData::Stream(stream) = result.data else { - NotDataFromOutputSnafu.fail()? + let result = self.sql("show databases").await?; + let Some(records) = result else { + EmptyResultSnafu.fail()? }; - let record_batch = collect(stream) - .await - .context(CollectRecordBatchesSnafu)? - .pop() - .context(EmptyResultSnafu)?; - let schemas = record_batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - let mut result = Vec::with_capacity(schemas.len()); - for i in 0..schemas.len() { - let schema = schemas.get_data(i).unwrap().to_owned(); + let mut result = Vec::with_capacity(records.len()); + for value in records { + let serde_json::Value::String(schema) = &value[0] else { + unreachable!() + }; if schema == common_catalog::consts::INFORMATION_SCHEMA_NAME { continue; } - result.push((self.catalog.clone(), schema)); + result.push((self.catalog.clone(), schema.clone())); } Ok(result) } @@ -207,47 +178,27 @@ impl Export { information_schema.tables where table_type = \'BASE TABLE\'\ and table_catalog = \'{catalog}\' and table_schema = \'{schema}\'", ); - let result = self - .sql(&sql, catalog.to_string(), schema.to_string()) - .await?; - let OutputData::Stream(stream) = result.data else { - NotDataFromOutputSnafu.fail()? - }; - let Some(record_batch) = collect(stream) - .await - .context(CollectRecordBatchesSnafu)? - .pop() - else { - return Ok(vec![]); + let result = self.sql(&sql).await?; + let Some(records) = result else { + EmptyResultSnafu.fail()? }; - debug!("Fetched table list: {}", record_batch.pretty_print()); + debug!("Fetched table list: {:?}", records); - if record_batch.num_rows() == 0 { + if records.is_empty() { return Ok(vec![]); } - let mut result = Vec::with_capacity(record_batch.num_rows()); - let catalog_column = record_batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - let schema_column = record_batch - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - let table_column = record_batch - .column(2) - .as_any() - .downcast_ref::() - .unwrap(); - for i in 0..record_batch.num_rows() { - let catalog = catalog_column.get_data(i).unwrap().to_owned(); - let schema = schema_column.get_data(i).unwrap().to_owned(); - let table = table_column.get_data(i).unwrap().to_owned(); - result.push((catalog, schema, table)); + let mut result = Vec::with_capacity(records.len()); + for value in records { + let mut t = Vec::with_capacity(3); + for v in &value { + let serde_json::Value::String(value) = v else { + unreachable!() + }; + t.push(value); + } + result.push((t[0].clone(), t[1].clone(), t[2].clone())); } Ok(result) @@ -258,26 +209,15 @@ impl Export { r#"show create table "{}"."{}"."{}""#, catalog, schema, table ); - let result = self - .sql(&sql, catalog.to_string(), schema.to_string()) - .await?; - let OutputData::Stream(stream) = result.data else { - NotDataFromOutputSnafu.fail()? + let result = self.sql(&sql).await?; + let Some(records) = result else { + EmptyResultSnafu.fail()? }; - let record_batch = collect(stream) - .await - .context(CollectRecordBatchesSnafu)? - .pop() - .context(EmptyResultSnafu)?; - let create_table = record_batch - .column(1) - .as_any() - .downcast_ref::() - .unwrap() - .get_data(0) - .unwrap(); - - Ok(format!("{create_table};\n")) + let serde_json::Value::String(create_table) = &records[0][1] else { + unreachable!() + }; + + Ok(format!("{};\n", create_table)) } async fn export_create_table(&self) -> Result<()> { @@ -351,7 +291,7 @@ impl Export { schema, output_dir.to_str().unwrap() ); - self.sql(&sql, catalog.clone(), schema.clone()).await?; + self.sql(&sql).await?; info!("finished exporting {catalog}.{schema} data"); // export copy from sql diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index 1951ed5b0e29..8b620c48db32 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -218,6 +218,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to finish http request"))] + QuerySql { + #[snafu(source)] + error: reqwest::Error, + location: Location, + }, + #[snafu(display("Expect data from output, but got another thing"))] NotDataFromOutput { location: Location }, @@ -301,7 +308,9 @@ impl ErrorExt for Error { Error::SubstraitEncodeLogicalPlan { source, .. } => source.status_code(), Error::StartCatalogManager { source, .. } => source.status_code(), - Error::SerdeJson { .. } | Error::FileIo { .. } => StatusCode::Unexpected, + Error::SerdeJson { .. } | Error::FileIo { .. } | Error::QuerySql { .. } => { + StatusCode::Unexpected + } Error::Other { source, .. } => source.status_code(), From 44be8560e8a60f1490915eb0af2ec7969c10de50 Mon Sep 17 00:00:00 2001 From: test Date: Mon, 29 Apr 2024 18:17:42 +0800 Subject: [PATCH 06/11] add TODO for `tests-integration` dependencies --- Cargo.toml | 1 + benchmarks/Cargo.toml | 1 + src/servers/Cargo.toml | 1 + tests/runner/Cargo.toml | 1 + 4 files changed, 4 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index c57c4d86c21a..89a043fd9c4d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -223,6 +223,7 @@ sql = { path = "src/sql" } store-api = { path = "src/store-api" } substrait = { path = "src/common/substrait" } table = { path = "src/table" } +# TODO some code depends on this tests-integration = { path = "tests-integration" } [workspace.dependencies.meter-macros] diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 12113f1f2c17..ed7f038596e3 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -33,6 +33,7 @@ rand.workspace = true rskafka.workspace = true serde.workspace = true store-api.workspace = true +# TODO depend `Database` client tests-integration.workspace = true tokio.workspace = true toml.workspace = true diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 7df644b7720a..6f1e89daf58c 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -125,6 +125,7 @@ serde_json.workspace = true session = { workspace = true, features = ["testing"] } table.workspace = true tempfile = "3.0.0" +# TODO depend `Database` client tests-integration.workspace = true tokio-postgres = "0.7" tokio-postgres-rustls = "0.11" diff --git a/tests/runner/Cargo.toml b/tests/runner/Cargo.toml index 9967be7c6aaf..7cb36c1645cb 100644 --- a/tests/runner/Cargo.toml +++ b/tests/runner/Cargo.toml @@ -19,6 +19,7 @@ serde.workspace = true serde_json.workspace = true sqlness = { version = "0.5" } tempfile.workspace = true +# TODO depend `Database` client tests-integration.workspace = true tinytemplate = "1.2" tokio.workspace = true From cd6a665ea50cd9649814189e9314ec7ffc22fc64 Mon Sep 17 00:00:00 2001 From: test Date: Mon, 29 Apr 2024 19:22:40 +0800 Subject: [PATCH 07/11] cleanup code --- src/cmd/src/cli/export.rs | 42 ++++++++++++++++++++------------------- src/cmd/src/cli/repl.rs | 29 +++++++++++++-------------- src/cmd/src/error.rs | 15 ++++---------- 3 files changed, 40 insertions(+), 46 deletions(-) diff --git a/src/cmd/src/cli/export.rs b/src/cmd/src/cli/export.rs index 92e2814c0793..79bd8630169a 100644 --- a/src/cmd/src/cli/export.rs +++ b/src/cmd/src/cli/export.rs @@ -19,7 +19,7 @@ use async_trait::async_trait; use base64::engine::general_purpose; use base64::Engine as _; use clap::{Parser, ValueEnum}; -use client::{Client, DEFAULT_SCHEMA_NAME}; +use client::DEFAULT_SCHEMA_NAME; use common_telemetry::{debug, error, info, warn}; use serde_json::Value; use servers::http::greptime_result_v1::GreptimedbV1Response; @@ -31,8 +31,8 @@ use tokio::sync::Semaphore; use crate::cli::{Instance, Tool}; use crate::error::{ - ConnectServerSnafu, EmptyResultSnafu, Error, FileIoSnafu, InvalidDatabaseNameSnafu, - QuerySqlSnafu, Result, SerdeJsonSnafu, + EmptyResultSnafu, Error, FileIoSnafu, HttpQuerySqlSnafu, InvalidDatabaseNameSnafu, Result, + SerdeJsonSnafu, }; type TableReference = (String, String, String); @@ -79,13 +79,6 @@ pub struct ExportCommand { impl ExportCommand { pub async fn build(&self) -> Result { - let client = Client::with_urls([self.addr.clone()]); - client - .health_check() - .await - .with_context(|_| ConnectServerSnafu { - addr: self.addr.clone(), - })?; let (catalog, schema) = split_database(&self.database)?; let auth_header = if let Some(basic) = &self.auth_basic { @@ -96,6 +89,7 @@ impl ExportCommand { }; Ok(Instance::new(Box::new(Export { + addr: self.addr.clone(), catalog, schema, output_dir: self.output_dir.clone(), @@ -107,6 +101,7 @@ impl ExportCommand { } pub struct Export { + addr: String, catalog: String, schema: Option, output_dir: String, @@ -118,24 +113,31 @@ pub struct Export { impl Export { /// Execute one single sql query. async fn sql(&self, sql: &str) -> Result>>> { + let url = format!( + "http://{}/v1/sql?db={}-{}&sql={}", + self.addr, + self.catalog, + self.schema.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME), + sql + ); + let client = reqwest::Client::new(); let mut request = client - .get(format!( - "http://localhost:4000/v1/sql?db={}-{}&sql={}", - self.catalog, - self.schema.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME), - sql - )) + .get(&url) .header("Content-Type", "application/x-www-form-urlencoded"); if let Some(ref auth) = self.auth_header { request = request.header("Authorization", auth); } - let response = request.send().await.context(QuerySqlSnafu)?; + let response = request + .send() + .await + .with_context(|_| HttpQuerySqlSnafu { reason: url })?; snafu::ensure!(response.status() == 200, EmptyResultSnafu); - let text = response.text().await.context(QuerySqlSnafu)?; - // .with_context(|_| RequestDatabaseSnafu { sql })?; + let text = response.text().await.with_context(|_| HttpQuerySqlSnafu { + reason: "cannot get response text".to_string(), + })?; let body = serde_json::from_str::(&text).context(SerdeJsonSnafu)?; match &body.output()[0] { @@ -175,7 +177,7 @@ impl Export { // TODO: SQL injection hurts let sql = format!( "select table_catalog, table_schema, table_name from \ - information_schema.tables where table_type = \'BASE TABLE\'\ + information_schema.tables where table_type = \'BASE TABLE\' \ and table_catalog = \'{catalog}\' and table_schema = \'{schema}\'", ); let result = self.sql(&sql).await?; diff --git a/src/cmd/src/cli/repl.rs b/src/cmd/src/cli/repl.rs index 0c2af0f1b7e8..219d52bbb98b 100644 --- a/src/cmd/src/cli/repl.rs +++ b/src/cmd/src/cli/repl.rs @@ -85,21 +85,20 @@ impl Repl { } let client = Client::with_urls([&cmd.grpc_addr]); - todo!() - // let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); - // - // let query_engine = if let Some(meta_addr) = &cmd.meta_addr { - // create_query_engine(meta_addr).await.map(Some)? - // } else { - // None - // }; - // - // Ok(Self { - // rl, - // prompt: "> ".to_string(), - // database, - // query_engine, - // }) + let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); + + let query_engine = if let Some(meta_addr) = &cmd.meta_addr { + create_query_engine(meta_addr).await.map(Some)? + } else { + None + }; + + Ok(Self { + rl, + prompt: "> ".to_string(), + database, + query_engine, + }) } /// Parse the next command diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index 8b620c48db32..8739190f51dd 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -139,13 +139,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to request database, sql: {sql}"))] - RequestDatabase { - sql: String, - location: Location, - source: client::Error, - }, - #[snafu(display("Failed to collect RecordBatches"))] CollectRecordBatches { location: Location, @@ -218,8 +211,9 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to finish http request"))] - QuerySql { + #[snafu(display("Failed to run http request: {reason}"))] + HttpQuerySql { + reason: String, #[snafu(source)] error: reqwest::Error, location: Location, @@ -298,7 +292,6 @@ impl ErrorExt for Error { | Error::StopProcedureManager { source, .. } => source.status_code(), Error::StartWalOptionsAllocator { source, .. } => source.status_code(), Error::ReplCreation { .. } | Error::Readline { .. } => StatusCode::Internal, - Error::RequestDatabase { source, .. } => source.status_code(), Error::CollectRecordBatches { source, .. } | Error::PrettyPrintRecordBatches { source, .. } => source.status_code(), Error::StartMetaClient { source, .. } => source.status_code(), @@ -308,7 +301,7 @@ impl ErrorExt for Error { Error::SubstraitEncodeLogicalPlan { source, .. } => source.status_code(), Error::StartCatalogManager { source, .. } => source.status_code(), - Error::SerdeJson { .. } | Error::FileIo { .. } | Error::QuerySql { .. } => { + Error::SerdeJson { .. } | Error::FileIo { .. } | Error::HttpQuerySql { .. } => { StatusCode::Unexpected } From 90869df182298b65a701031c731ec10df878e477 Mon Sep 17 00:00:00 2001 From: test Date: Mon, 29 Apr 2024 19:42:59 +0800 Subject: [PATCH 08/11] 'cli export' test use http api --- tests-integration/src/database.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests-integration/src/database.rs b/tests-integration/src/database.rs index a22039a13c6e..82151ca5aea5 100644 --- a/tests-integration/src/database.rs +++ b/tests-integration/src/database.rs @@ -433,7 +433,7 @@ mod tests { "cli", "export", "--addr", - "127.0.0.1:4001", + "127.0.0.1:4000", "--output-dir", &*output_dir.path().to_string_lossy(), "--target", From d6a32096d4ebd5c7dc2a4197f502ca8eca93855e Mon Sep 17 00:00:00 2001 From: test Date: Mon, 29 Apr 2024 20:00:25 +0800 Subject: [PATCH 09/11] remove unsed dependencies in cmd crate --- Cargo.lock | 3 --- src/cmd/Cargo.toml | 3 --- 2 files changed, 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7b894331869e..a1076790546f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1579,8 +1579,6 @@ dependencies = [ name = "cmd" version = "0.7.2" dependencies = [ - "api", - "arrow-flight", "async-trait", "auth", "base64 0.21.7", @@ -1592,7 +1590,6 @@ dependencies = [ "common-catalog", "common-config", "common-error", - "common-grpc", "common-macro", "common-meta", "common-procedure", diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index 95db21e508e4..ae27c4e374d6 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -16,8 +16,6 @@ tokio-console = ["common-telemetry/tokio-console"] workspace = true [dependencies] -api.workspace = true -arrow-flight.workspace = true async-trait.workspace = true auth.workspace = true base64.workspace = true @@ -29,7 +27,6 @@ common-base.workspace = true common-catalog.workspace = true common-config.workspace = true common-error.workspace = true -common-grpc.workspace = true common-macro.workspace = true common-meta.workspace = true common-procedure.workspace = true From cb094aa80257bd50a27dd6477937ea67e0b0fa43 Mon Sep 17 00:00:00 2001 From: test Date: Mon, 29 Apr 2024 21:43:53 +0800 Subject: [PATCH 10/11] apply review comments and clean code --- src/cmd/src/cli/export.rs | 26 ++++++++++++++------------ src/cmd/src/error.rs | 8 ++++---- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/src/cmd/src/cli/export.rs b/src/cmd/src/cli/export.rs index 79bd8630169a..70ca80d11db3 100644 --- a/src/cmd/src/cli/export.rs +++ b/src/cmd/src/cli/export.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use async_trait::async_trait; use base64::engine::general_purpose; -use base64::Engine as _; +use base64::Engine; use clap::{Parser, ValueEnum}; use client::DEFAULT_SCHEMA_NAME; use common_telemetry::{debug, error, info, warn}; @@ -121,29 +121,31 @@ impl Export { sql ); - let client = reqwest::Client::new(); - let mut request = client + let mut request = reqwest::Client::new() .get(&url) .header("Content-Type", "application/x-www-form-urlencoded"); if let Some(ref auth) = self.auth_header { request = request.header("Authorization", auth); } - let response = request - .send() - .await - .with_context(|_| HttpQuerySqlSnafu { reason: url })?; - snafu::ensure!(response.status() == 200, EmptyResultSnafu); + let response = request.send().await.with_context(|_| HttpQuerySqlSnafu { + reason: format!("bad url: {}", url), + })?; + let response = response + .error_for_status() + .with_context(|_| HttpQuerySqlSnafu { + reason: format!("query failed: {}", sql), + })?; let text = response.text().await.with_context(|_| HttpQuerySqlSnafu { reason: "cannot get response text".to_string(), })?; let body = serde_json::from_str::(&text).context(SerdeJsonSnafu)?; - match &body.output()[0] { - GreptimeQueryOutput::Records(records) => Ok(Some(records.rows().clone())), - GreptimeQueryOutput::AffectedRows(_) => Ok(None), - } + Ok(body.output().first().and_then(|output| match output { + GreptimeQueryOutput::Records(records) => Some(records.rows().clone()), + GreptimeQueryOutput::AffectedRows(_) => None, + })) } /// Iterate over all db names. diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index 8739190f51dd..d24f00f5be44 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -291,7 +291,9 @@ impl ErrorExt for Error { Error::StartProcedureManager { source, .. } | Error::StopProcedureManager { source, .. } => source.status_code(), Error::StartWalOptionsAllocator { source, .. } => source.status_code(), - Error::ReplCreation { .. } | Error::Readline { .. } => StatusCode::Internal, + Error::ReplCreation { .. } | Error::Readline { .. } | Error::HttpQuerySql { .. } => { + StatusCode::Internal + } Error::CollectRecordBatches { source, .. } | Error::PrettyPrintRecordBatches { source, .. } => source.status_code(), Error::StartMetaClient { source, .. } => source.status_code(), @@ -301,9 +303,7 @@ impl ErrorExt for Error { Error::SubstraitEncodeLogicalPlan { source, .. } => source.status_code(), Error::StartCatalogManager { source, .. } => source.status_code(), - Error::SerdeJson { .. } | Error::FileIo { .. } | Error::HttpQuerySql { .. } => { - StatusCode::Unexpected - } + Error::SerdeJson { .. } | Error::FileIo { .. } => StatusCode::Unexpected, Error::Other { source, .. } => source.status_code(), From af715936938dd71e7f02b6ab675e6053fb88b121 Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 1 May 2024 12:23:08 +0800 Subject: [PATCH 11/11] remove unused methods Signed-off-by: tison --- tests-integration/src/database.rs | 97 +------------------- tests-integration/src/lib.rs | 1 - tests-integration/src/stream_insert.rs | 117 ------------------------- 3 files changed, 3 insertions(+), 212 deletions(-) delete mode 100644 tests-integration/src/stream_insert.rs diff --git a/tests-integration/src/database.rs b/tests-integration/src/database.rs index 82151ca5aea5..31254cefdcc2 100644 --- a/tests-integration/src/database.rs +++ b/tests-integration/src/database.rs @@ -18,9 +18,8 @@ use api::v1::greptime_database_client::GreptimeDatabaseClient; use api::v1::greptime_request::Request; use api::v1::query_request::Query; use api::v1::{ - AlterExpr, AuthHeader, CreateTableExpr, DdlRequest, DeleteRequests, DropTableExpr, - GreptimeRequest, InsertRequests, PromRangeQuery, QueryRequest, RequestHeader, - RowInsertRequests, TruncateTableExpr, + AlterExpr, AuthHeader, CreateTableExpr, DdlRequest, GreptimeRequest, InsertRequests, + QueryRequest, RequestHeader, }; use arrow_flight::Ticket; use async_stream::stream; @@ -38,8 +37,6 @@ use prost::Message; use snafu::{ensure, ResultExt}; use tonic::transport::Channel; -use crate::stream_insert::StreamInserter; - pub const DEFAULT_LOOKBACK_STRING: &str = "5m"; #[derive(Clone, Debug, Default)] @@ -104,34 +101,14 @@ impl Database { } } - pub fn catalog(&self) -> &String { - &self.catalog - } - pub fn set_catalog(&mut self, catalog: impl Into) { self.catalog = catalog.into(); } - pub fn schema(&self) -> &String { - &self.schema - } - pub fn set_schema(&mut self, schema: impl Into) { self.schema = schema.into(); } - pub fn dbname(&self) -> &String { - &self.dbname - } - - pub fn set_dbname(&mut self, dbname: impl Into) { - self.dbname = dbname.into(); - } - - pub fn timezone(&self) -> &String { - &self.timezone - } - pub fn set_timezone(&mut self, timezone: impl Into) { self.timezone = timezone.into(); } @@ -146,34 +123,6 @@ impl Database { self.handle(Request::Inserts(requests)).await } - pub async fn row_insert(&self, requests: RowInsertRequests) -> Result { - self.handle(Request::RowInserts(requests)).await - } - - pub fn streaming_inserter(&self) -> Result { - self.streaming_inserter_with_channel_size(65536) - } - - pub fn streaming_inserter_with_channel_size( - &self, - channel_size: usize, - ) -> Result { - let client = make_database_client(&self.client)?.inner; - - let stream_inserter = StreamInserter::new( - client, - self.dbname().to_string(), - self.ctx.auth_header.clone(), - channel_size, - ); - - Ok(stream_inserter) - } - - pub async fn delete(&self, request: DeleteRequests) -> Result { - self.handle(Request::Deletes(request)).await - } - async fn handle(&self, request: Request) -> Result { let mut client = make_database_client(&self.client)?.inner; let request = self.to_rpc_request(request); @@ -207,32 +156,6 @@ impl Database { .await } - pub async fn logical_plan(&self, logical_plan: Vec) -> Result { - self.do_get(Request::Query(QueryRequest { - query: Some(Query::LogicalPlan(logical_plan)), - })) - .await - } - - pub async fn prom_range_query( - &self, - promql: &str, - start: &str, - end: &str, - step: &str, - ) -> Result { - self.do_get(Request::Query(QueryRequest { - query: Some(Query::PromRangeQuery(PromRangeQuery { - query: promql.to_string(), - start: start.to_string(), - end: end.to_string(), - step: step.to_string(), - lookback: DEFAULT_LOOKBACK_STRING.to_string(), - })), - })) - .await - } - pub async fn create(&self, expr: CreateTableExpr) -> Result { self.do_get(Request::Ddl(DdlRequest { expr: Some(DdlExpr::CreateTable(expr)), @@ -247,20 +170,6 @@ impl Database { .await } - pub async fn drop_table(&self, expr: DropTableExpr) -> Result { - self.do_get(Request::Ddl(DdlRequest { - expr: Some(DdlExpr::DropTable(expr)), - })) - .await - } - - pub async fn truncate_table(&self, expr: TruncateTableExpr) -> Result { - self.do_get(Request::Ddl(DdlRequest { - expr: Some(DdlExpr::TruncateTable(expr)), - })) - .await - } - async fn do_get(&self, request: Request) -> Result { let request = self.to_rpc_request(request); let request = Ticket { @@ -354,7 +263,7 @@ impl Database { } #[derive(Default, Debug, Clone)] -pub struct FlightContext { +struct FlightContext { auth_header: Option, } diff --git a/tests-integration/src/lib.rs b/tests-integration/src/lib.rs index 4dcfdcc09603..e4db599fd5e7 100644 --- a/tests-integration/src/lib.rs +++ b/tests-integration/src/lib.rs @@ -22,7 +22,6 @@ mod instance; mod opentsdb; mod otlp; mod prom_store; -mod stream_insert; pub mod test_util; pub mod standalone; diff --git a/tests-integration/src/stream_insert.rs b/tests-integration/src/stream_insert.rs deleted file mode 100644 index bd35c132c98a..000000000000 --- a/tests-integration/src/stream_insert.rs +++ /dev/null @@ -1,117 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use api::v1::greptime_database_client::GreptimeDatabaseClient; -use api::v1::greptime_request::Request; -use api::v1::{ - AuthHeader, GreptimeRequest, GreptimeResponse, InsertRequest, InsertRequests, RequestHeader, - RowInsertRequest, RowInsertRequests, -}; -use client::error::{self, Result}; -use client::from_grpc_response; -use tokio::sync::mpsc; -use tokio::task::JoinHandle; -use tokio_stream::wrappers::ReceiverStream; -use tonic::transport::Channel; -use tonic::{Response, Status}; - -/// A structure that provides some methods for streaming data insert. -/// -/// [`StreamInserter`] cannot be constructed via the `StreamInserter::new` method. -/// You can use the following way to obtain [`StreamInserter`]. -/// -/// ```ignore -/// let grpc_client = Client::with_urls(vec!["127.0.0.1:4002"]); -/// let client = Database::new_with_dbname("db_name", grpc_client); -/// let stream_inserter = client.streaming_inserter().unwrap(); -/// ``` -/// -/// If you want to see a concrete usage example, please see -/// [stream_inserter.rs](https://github.com/GreptimeTeam/greptimedb/blob/main/src/client/examples/stream_ingest.rs). -pub struct StreamInserter { - sender: mpsc::Sender, - - auth_header: Option, - - dbname: String, - - join: JoinHandle, Status>>, -} - -impl StreamInserter { - pub(crate) fn new( - mut client: GreptimeDatabaseClient, - dbname: String, - auth_header: Option, - channel_size: usize, - ) -> StreamInserter { - let (send, recv) = tokio::sync::mpsc::channel(channel_size); - - let join: JoinHandle, Status>> = - tokio::spawn(async move { - let recv_stream = ReceiverStream::new(recv); - client.handle_requests(recv_stream).await - }); - - StreamInserter { - sender: send, - auth_header, - dbname, - join, - } - } - - pub async fn insert(&self, requests: Vec) -> Result<()> { - let inserts = InsertRequests { inserts: requests }; - let request = self.to_rpc_request(Request::Inserts(inserts)); - - self.sender.send(request).await.map_err(|e| { - error::ClientStreamingSnafu { - err_msg: e.to_string(), - } - .build() - }) - } - - pub async fn row_insert(&self, requests: Vec) -> Result<()> { - let inserts = RowInsertRequests { inserts: requests }; - let request = self.to_rpc_request(Request::RowInserts(inserts)); - - self.sender.send(request).await.map_err(|e| { - error::ClientStreamingSnafu { - err_msg: e.to_string(), - } - .build() - }) - } - - pub async fn finish(self) -> Result { - drop(self.sender); - - let response = self.join.await.unwrap()?; - let response = response.into_inner(); - from_grpc_response(response) - } - - fn to_rpc_request(&self, request: Request) -> GreptimeRequest { - GreptimeRequest { - header: Some(RequestHeader { - authorization: self.auth_header.clone(), - dbname: self.dbname.clone(), - ..Default::default() - }), - request: Some(request), - } - } -}