Skip to content

Commit

Permalink
refactor(frontend): TableScan instead of scan_to_stream for `COPY T…
Browse files Browse the repository at this point in the history
…O` (GreptimeTeam#2244)

* refactor(frontend): TableScan instead of `scan_to_stream` for `COPY TO`

Signed-off-by: Zhenchi <[email protected]>

* fix: format

Signed-off-by: Zhenchi <[email protected]>

---------

Signed-off-by: Zhenchi <[email protected]>
  • Loading branch information
zhongzc authored and paomian committed Oct 19, 2023
1 parent e5b9aa5 commit 49c298d
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 32 deletions.
9 changes: 5 additions & 4 deletions src/frontend/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,12 @@ impl StatementExecutor {
Statement::ShowTables(stmt) => self.show_tables(stmt, query_ctx).await,

Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => {
let req = to_copy_table_request(stmt, query_ctx)?;
let req = to_copy_table_request(stmt, query_ctx.clone())?;
match req.direction {
CopyDirection::Export => {
self.copy_table_to(req).await.map(Output::AffectedRows)
}
CopyDirection::Export => self
.copy_table_to(req, query_ctx)
.await
.map(Output::AffectedRows),
CopyDirection::Import => {
self.copy_table_from(req).await.map(Output::AffectedRows)
}
Expand Down
26 changes: 15 additions & 11 deletions src/frontend/src/statement/backup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use common_datasource::file_format::Format;
use common_query::Output;
use common_telemetry::info;
use session::context::QueryContextBuilder;
use snafu::{ensure, ResultExt};
use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest};

Expand Down Expand Up @@ -65,17 +66,20 @@ impl StatementExecutor {
);

let exported = self
.copy_table_to(CopyTableRequest {
catalog_name: req.catalog_name.clone(),
schema_name: req.schema_name.clone(),
table_name,
location: table_file,
with: req.with.clone(),
connection: req.connection.clone(),
pattern: None,
direction: CopyDirection::Export,
timestamp_range: req.time_range,
})
.copy_table_to(
CopyTableRequest {
catalog_name: req.catalog_name.clone(),
schema_name: req.schema_name.clone(),
table_name,
location: table_file,
with: req.with.clone(),
connection: req.connection.clone(),
pattern: None,
direction: CopyDirection::Export,
timestamp_range: req.time_range,
},
QueryContextBuilder::default().build(),
)
.await?;
exported_rows += exported;
}
Expand Down
59 changes: 42 additions & 17 deletions src/frontend/src/statement/copy_table_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,32 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_base::readable_size::ReadableSize;
use common_datasource::file_format::csv::stream_to_csv;
use common_datasource::file_format::json::stream_to_json;
use common_datasource::file_format::Format;
use common_datasource::object_store::{build_backend, parse_url};
use common_query::Output;
use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
use common_recordbatch::SendableRecordBatchStream;
use datafusion::datasource::DefaultTableSource;
use datafusion_common::TableReference as DfTableReference;
use datafusion_expr::LogicalPlanBuilder;
use object_store::ObjectStore;
use query::plan::LogicalPlan;
use session::context::QueryContextRef;
use snafu::ResultExt;
use storage::sst::SstInfo;
use storage::{ParquetWriter, Source};
use store_api::storage::ScanRequest;
use table::engine::TableReference;
use table::requests::CopyTableRequest;
use table::table::adapter::DfTableProviderAdapter;

use crate::error::{self, Result, WriteParquetSnafu};
use crate::error::{
self, BuildDfLogicalPlanSnafu, ExecLogicalPlanSnafu, Result, WriteParquetSnafu,
};
use crate::statement::StatementExecutor;

impl StatementExecutor {
Expand Down Expand Up @@ -72,16 +82,18 @@ impl StatementExecutor {
}
}

pub(crate) async fn copy_table_to(&self, req: CopyTableRequest) -> Result<usize> {
let table_ref = TableReference {
catalog: &req.catalog_name,
schema: &req.schema_name,
table: &req.table_name,
};
pub(crate) async fn copy_table_to(
&self,
req: CopyTableRequest,
query_ctx: QueryContextRef,
) -> Result<usize> {
let table_ref = TableReference::full(&req.catalog_name, &req.schema_name, &req.table_name);
let table = self.get_table(&table_ref).await?;

let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?;

let df_table_ref = DfTableReference::from(table_ref);

let filters = table
.schema()
.timestamp_column()
Expand All @@ -91,20 +103,33 @@ impl StatementExecutor {
req.timestamp_range.as_ref(),
)
})
.map(|filter| filter.df_expr().clone())
.into_iter()
.collect::<Vec<_>>();

let scan_req = ScanRequest {
let table_provider = Arc::new(DfTableProviderAdapter::new(table));
let table_source = Arc::new(DefaultTableSource::new(table_provider));

let plan = LogicalPlanBuilder::scan_with_filters(
df_table_ref.to_owned_reference(),
table_source,
None,
filters,
..Default::default()
)
.context(BuildDfLogicalPlanSnafu)?
.build()
.context(BuildDfLogicalPlanSnafu)?;

let output = self
.query_engine
.execute(LogicalPlan::DfPlan(plan), query_ctx)
.await
.context(ExecLogicalPlanSnafu)?;
let stream = match output {
Output::Stream(stream) => stream,
Output::RecordBatches(record_batches) => record_batches.as_stream(),
_ => unreachable!(),
};
let stream =
table
.scan_to_stream(scan_req)
.await
.with_context(|_| error::CopyTableSnafu {
table_name: table_ref.to_string(),
})?;

let (_schema, _host, path) = parse_url(&req.location).context(error::ParseUrlSnafu)?;
let object_store =
Expand Down
7 changes: 7 additions & 0 deletions src/table/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;

use common_base::paths::DATA_DIR;
use common_procedure::BoxedProcedure;
use datafusion_common::TableReference as DfTableReference;
use store_api::storage::RegionNumber;

use crate::error::{self, Result};
Expand Down Expand Up @@ -63,6 +64,12 @@ impl<'a> Display for TableReference<'a> {
}
}

impl<'a> From<TableReference<'a>> for DfTableReference<'a> {
fn from(val: TableReference<'a>) -> Self {
DfTableReference::full(val.catalog, val.schema, val.table)
}
}

/// CloseTableResult
///
/// Returns [`CloseTableResult::Released`] and closed region numbers if a table was removed
Expand Down

0 comments on commit 49c298d

Please sign in to comment.