Skip to content

Commit

Permalink
refactor: Remove PhysicalPlan trait and use ExecutionPlan directly (#…
Browse files Browse the repository at this point in the history
…3894)

* refactor: remove PhysicalPlan

* refactor: remove physical_plan mod

* refactor: import

* fix merge error

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
Co-authored-by: Ruihang Xia <[email protected]>
  • Loading branch information
evenyag and waynexia authored May 11, 2024
1 parent fa6c371 commit d0820bb
Show file tree
Hide file tree
Showing 28 changed files with 165 additions and 558 deletions.
2 changes: 1 addition & 1 deletion src/catalog/src/information_schema/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ use common_config::Mode;
use common_error::ext::BoxedError;
use common_meta::cluster::{ClusterInfo, NodeInfo, NodeStatus};
use common_meta::peer::Peer;
use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_telemetry::warn;
use common_time::timestamp::Timestamp;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/information_schema/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ use common_catalog::consts::{
SEMANTIC_TYPE_TIME_INDEX,
};
use common_error::ext::BoxedError;
use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/information_schema/key_column_usage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use std::sync::{Arc, Weak};
use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::INFORMATION_SCHEMA_KEY_COLUMN_USAGE_TABLE_ID;
use common_error::ext::BoxedError;
use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/information_schema/memory_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use std::sync::Arc;

use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_error::ext::BoxedError;
use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/information_schema/partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ use std::sync::{Arc, Weak};
use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::INFORMATION_SCHEMA_PARTITIONS_TABLE_ID;
use common_error::ext::BoxedError;
use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_time::datetime::DateTime;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/information_schema/region_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::INFORMATION_SCHEMA_REGION_PEERS_TABLE_ID;
use common_error::ext::BoxedError;
use common_meta::rpc::router::RegionRoute;
use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/information_schema/runtime_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ use std::sync::Arc;
use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::INFORMATION_SCHEMA_RUNTIME_METRICS_TABLE_ID;
use common_error::ext::BoxedError;
use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_time::util::current_time_millis;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/information_schema/schemata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use std::sync::{Arc, Weak};
use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::INFORMATION_SCHEMA_SCHEMATA_TABLE_ID;
use common_error::ext::BoxedError;
use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/information_schema/table_constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use std::sync::{Arc, Weak};
use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::INFORMATION_SCHEMA_TABLE_CONSTRAINTS_TABLE_ID;
use common_error::ext::BoxedError;
use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/information_schema/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use std::sync::{Arc, Weak};
use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::INFORMATION_SCHEMA_TABLES_TABLE_ID;
use common_error::ext::BoxedError;
use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
Expand Down
12 changes: 4 additions & 8 deletions src/common/query/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ pub mod columnar_value;
pub mod error;
mod function;
pub mod logical_plan;
pub mod physical_plan;
pub mod prelude;
mod signature;

Expand All @@ -26,7 +25,7 @@ use std::sync::Arc;
use api::greptime_proto::v1::add_column_location::LocationType;
use api::greptime_proto::v1::AddColumnLocation as Location;
use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
use physical_plan::PhysicalPlan;
use datafusion::physical_plan::ExecutionPlan;
use serde::{Deserialize, Serialize};
use sqlparser_derive::{Visit, VisitMut};

Expand All @@ -49,7 +48,7 @@ pub enum OutputData {
#[derive(Debug, Default)]
pub struct OutputMeta {
/// May exist for query output. One can retrieve execution metrics from this plan.
pub plan: Option<Arc<dyn PhysicalPlan>>,
pub plan: Option<Arc<dyn ExecutionPlan>>,
pub cost: OutputCost,
}

Expand Down Expand Up @@ -102,11 +101,11 @@ impl Debug for OutputData {
}

impl OutputMeta {
pub fn new(plan: Option<Arc<dyn PhysicalPlan>>, cost: usize) -> Self {
pub fn new(plan: Option<Arc<dyn ExecutionPlan>>, cost: usize) -> Self {
Self { plan, cost }
}

pub fn new_with_plan(plan: Arc<dyn PhysicalPlan>) -> Self {
pub fn new_with_plan(plan: Arc<dyn ExecutionPlan>) -> Self {
Self {
plan: Some(plan),
cost: 0,
Expand All @@ -118,9 +117,6 @@ impl OutputMeta {
}
}

pub use datafusion::physical_plan::ExecutionPlan as DfPhysicalPlan;
pub type DfPhysicalPlanRef = Arc<dyn DfPhysicalPlan>;

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Visit, VisitMut)]
pub enum AddColumnLocation {
First,
Expand Down
Loading

0 comments on commit d0820bb

Please sign in to comment.