Skip to content

Commit

Permalink
rename
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed Jun 5, 2024
1 parent 759b4c9 commit 33d9a2e
Show file tree
Hide file tree
Showing 10 changed files with 26 additions and 26 deletions.
2 changes: 1 addition & 1 deletion src/frontend/src/binder/bind_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ pub enum BindingCteState {
query: Either<BoundQuery, RecursiveUnion>,
},

ChangeLog {
ChangedLog {
table: Relation,
},
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/binder/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ impl Binder {
table_name,
Rc::new(RefCell::new(BindingCte {
share_id,
state: BindingCteState::ChangeLog {
state: BindingCteState::ChangedLog {
table: from_table_relation,
},
alias,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/binder/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ impl Binder {
// no matter it's recursive or not.
Ok(Relation::Share(Box::new(BoundShare { share_id, input})))
}
BindingCteState::ChangeLog { table } => {
BindingCteState::ChangedLog { table } => {
let input = BoundShareInput::ChangeLog(table);
self.bind_table_to_context(
input.fields()?,
Expand Down
10 changes: 5 additions & 5 deletions src/frontend/src/optimizer/plan_node/generic/changed_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,19 @@ use crate::utils::ColIndexMappingRewriteExt;
use crate::OptimizerContextRef;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ChangeLog<PlanRef> {
pub struct ChangedLog<PlanRef> {
pub input: PlanRef,
pub need_op: bool,
pub need_changed_log_row_id: bool,
}
impl<PlanRef: GenericPlanRef> DistillUnit for ChangeLog<PlanRef> {
impl<PlanRef: GenericPlanRef> DistillUnit for ChangedLog<PlanRef> {
fn distill_with_name<'a>(&self, name: impl Into<Str<'a>>) -> XmlNode<'a> {
childless_record(name, vec![])
}
}
impl<PlanRef: GenericPlanRef> ChangeLog<PlanRef> {
impl<PlanRef: GenericPlanRef> ChangedLog<PlanRef> {
pub fn new(input: PlanRef, need_op: bool, need_changed_log_row_id: bool) -> Self {
ChangeLog {
ChangedLog {
input,
need_op,
need_changed_log_row_id,
Expand All @@ -49,7 +49,7 @@ impl<PlanRef: GenericPlanRef> ChangeLog<PlanRef> {
ColIndexMapping::new(map, self.schema().len())
}
}
impl<PlanRef: GenericPlanRef> GenericPlanNode for ChangeLog<PlanRef> {
impl<PlanRef: GenericPlanRef> GenericPlanNode for ChangedLog<PlanRef> {
fn schema(&self) -> Schema {
let mut fields = self.input.schema().fields.clone();
if self.need_op {
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/optimizer/plan_node/logical_changed_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::PlanRef;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct LogicalChangedLog {
pub base: PlanBase<Logical>,
core: generic::ChangeLog<PlanRef>,
core: generic::ChangedLog<PlanRef>,
}

impl LogicalChangedLog {
Expand All @@ -37,11 +37,11 @@ impl LogicalChangedLog {
}

pub fn new(input: PlanRef, need_op: bool, need_changed_log_row_id: bool) -> Self {
let core = generic::ChangeLog::new(input, need_op, need_changed_log_row_id);
let core = generic::ChangedLog::new(input, need_op, need_changed_log_row_id);
Self::with_core(core)
}

pub fn with_core(core: generic::ChangeLog<PlanRef>) -> Self {
pub fn with_core(core: generic::ChangedLog<PlanRef>) -> Self {
let base = PlanBase::new_logical_with_core(&core);
LogicalChangedLog { base, core }
}
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/stream_changed_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ use crate::PlanRef;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct StreamChangedLog {
pub base: PlanBase<Stream>,
core: generic::ChangeLog<PlanRef>,
core: generic::ChangedLog<PlanRef>,
}

impl StreamChangedLog {
pub fn new(core: generic::ChangeLog<PlanRef>) -> Self {
pub fn new(core: generic::ChangedLog<PlanRef>) -> Self {
let input = core.input.clone();
let dist = input.distribution().clone();
// Filter executor won't change the append-only behavior of the stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,24 @@ use risingwave_common::types::Serial;

use super::{ActorContextRef, BoxedMessageStream, Execute, Executor, Message, StreamExecutorError};

pub struct ChangeLogExecutor {
pub struct ChangedLogExecutor {
_ctx: ActorContextRef,
input: Executor,
need_op: bool,
}

impl Debug for ChangeLogExecutor {
impl Debug for ChangedLogExecutor {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ChangeLogExecutor").finish()
}
}

impl Execute for ChangeLogExecutor {
impl Execute for ChangedLogExecutor {
fn execute(self: Box<Self>) -> BoxedMessageStream {
self.execute_inner().boxed()
}
}
impl ChangeLogExecutor {
impl ChangedLogExecutor {
pub fn new(ctx: ActorContextRef, input: Executor, need_op: bool) -> Self {
Self {
_ctx: ctx,
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ mod backfill;
mod barrier_recv;
mod batch_query;
mod chain;
mod change_log;
mod changed_log;
mod dedup;
mod dispatch;
pub mod dml;
Expand Down Expand Up @@ -115,7 +115,7 @@ pub use backfill::no_shuffle_backfill::*;
pub use barrier_recv::BarrierRecvExecutor;
pub use batch_query::BatchQueryExecutor;
pub use chain::ChainExecutor;
pub use change_log::ChangeLogExecutor;
pub use changed_log::ChangedLogExecutor;
pub use dedup::AppendOnlyDedupExecutor;
pub use dispatch::{DispatchExecutor, DispatcherImpl};
pub use dynamic_filter::DynamicFilterExecutor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ use risingwave_storage::StateStore;

use super::ExecutorBuilder;
use crate::error::StreamResult;
use crate::executor::{ChangeLogExecutor, Executor};
use crate::executor::{ChangedLogExecutor, Executor};
use crate::task::ExecutorParams;

pub struct ChangeLogExecutorBuilder;
pub struct ChangedLogExecutorBuilder;

impl ExecutorBuilder for ChangeLogExecutorBuilder {
impl ExecutorBuilder for ChangedLogExecutorBuilder {
type Node = ChangedLogNode;

async fn new_boxed_executor(
Expand All @@ -32,7 +32,7 @@ impl ExecutorBuilder for ChangeLogExecutorBuilder {
) -> StreamResult<Executor> {
let [input]: [_; 1] = params.input.try_into().unwrap();

let exec = ChangeLogExecutor::new(params.actor_context, input, node.need_op);
let exec = ChangedLogExecutor::new(params.actor_context, input, node.need_op);
Ok((params.info, exec).into())
}
}
6 changes: 3 additions & 3 deletions src/stream/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ mod append_only_dedup;
mod barrier_recv;
mod batch_query;
mod cdc_filter;
mod change_log;
mod changed_log;
mod dml;
mod dynamic_filter;
mod eowc_over_window;
Expand Down Expand Up @@ -96,7 +96,7 @@ use self::union::*;
use self::watermark_filter::WatermarkFilterBuilder;
use crate::error::StreamResult;
use crate::executor::{Execute, Executor, ExecutorInfo};
use crate::from_proto::change_log::ChangeLogExecutorBuilder;
use crate::from_proto::changed_log::ChangedLogExecutorBuilder;
use crate::from_proto::values::ValuesExecutorBuilder;
use crate::task::ExecutorParams;

Expand Down Expand Up @@ -174,6 +174,6 @@ pub async fn create_executor(
NodeBody::OverWindow => OverWindowExecutorBuilder,
NodeBody::StreamFsFetch => FsFetchExecutorBuilder,
NodeBody::SourceBackfill => SourceBackfillExecutorBuilder,
NodeBody::ChangedLog => ChangeLogExecutorBuilder,
NodeBody::ChangedLog => ChangedLogExecutorBuilder,
}
}

0 comments on commit 33d9a2e

Please sign in to comment.