Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: clarify the meaning of table in TableCatalog and TableFragments #19510

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,11 @@ message Function {
message AggregateFunction {}
}

// Includes full information about a table.
//
// Here `Table` is an internal concept, corresponding to _a table in storage_, all of which can be `SELECT`ed.
// It is not the same as a user-side table created by `CREATE TABLE`.
//
// See `TableCatalog` struct in frontend crate for more information.
message Table {
enum TableType {
Expand Down
2 changes: 1 addition & 1 deletion proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ message DropIndexResponse {
}

message ReplaceTablePlan {
// The new table catalog, with the correct table ID and a new version.
// The new table catalog, with the correct (old) table ID and a new version.
// If the new version does not match the subsequent version in the meta service's
// catalog, this request will be rejected.
catalog.Table table = 1;
Expand Down
5 changes: 4 additions & 1 deletion proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ service HeartbeatService {
rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse);
}

// Fragments of a Streaming Job
// Fragments of a Streaming Job.
// It's for all kinds of streaming jobs, and ideally should be called `StreamingJobFragments`.
// It's not the same as a storage table correlated with a `TableCatalog`.
message TableFragments {
// The state of the fragments of this table
enum State {
Expand Down Expand Up @@ -96,6 +98,7 @@ message TableFragments {
// Use `VnodeCountCompat::vnode_count` to access it.
optional uint32 maybe_vnode_count = 8;
}
// The id of the streaming job.
uint32 table_id = 1;
State state = 2;
map<uint32, Fragment> fragments = 3;
Expand Down
11 changes: 5 additions & 6 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,12 @@ use crate::expr::ExprImpl;
use crate::optimizer::property::Cardinality;
use crate::user::UserId;

/// Includes full information about a table.
/// `TableCatalog` Includes full information about a table.
///
/// Currently, it can be either:
/// - a table or a source
/// - a materialized view
/// - an index
/// Here `Table` is an internal concept, corresponding to _a table in storage_, all of which can be `SELECT`ed.
/// It is not the same as a user-side table created by `CREATE TABLE`.
///
/// Use `self.table_type()` to determine the type of the table.
/// Use [`Self::table_type()`] to determine the [`TableType`] of the table.
///
/// # Column ID & Column Index
///
Expand Down Expand Up @@ -191,6 +189,7 @@ pub enum TableType {
/// Tables created by `CREATE MATERIALIZED VIEW`.
MaterializedView,
/// Tables serving as index for `TableType::Table` or `TableType::MaterializedView`.
/// An index has both a `TableCatalog` and an `IndexCatalog`.
Index,
/// Internal tables for executors.
Internal,
Expand Down
10 changes: 7 additions & 3 deletions src/meta/src/barrier/checkpoint/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,11 @@ impl CheckpointControl {
.values()
{
progress.extend([(
creating_job.info.table_fragments.table_id().table_id,
creating_job
.info
.stream_job_fragments
.stream_job_id()
.table_id,
creating_job.gen_ddl_progress(),
)]);
}
Expand Down Expand Up @@ -676,7 +680,7 @@ impl DatabaseCheckpointControl {
resps,
self.creating_streaming_job_controls[&table_id]
.info
.table_fragments
.stream_job_fragments
.all_table_ids()
.map(TableId::new),
is_first_time,
Expand Down Expand Up @@ -830,7 +834,7 @@ impl DatabaseCheckpointControl {
.expect("checked Some")
.to_mutation(None)
.expect("should have some mutation in `CreateStreamingJob` command");
let job_id = info.table_fragments.table_id();
let job_id = info.stream_job_fragments.stream_job_id();
control_stream_manager.add_partial_graph(self.database_id, Some(job_id))?;
self.creating_streaming_job_controls.insert(
job_id,
Expand Down
20 changes: 10 additions & 10 deletions src/meta/src/barrier/checkpoint/creating_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,19 @@ impl CreatingStreamingJobControl {
initial_mutation: Mutation,
) -> Self {
info!(
table_id = info.table_fragments.table_id().table_id,
table_id = info.stream_job_fragments.stream_job_id().table_id,
definition = info.definition,
"new creating job"
);
let snapshot_backfill_actors = info.table_fragments.snapshot_backfill_actor_ids();
let snapshot_backfill_actors = info.stream_job_fragments.snapshot_backfill_actor_ids();
let mut create_mview_tracker = CreateMviewProgressTracker::default();
create_mview_tracker.update_tracking_jobs(Some((&info, None)), [], version_stat);
let fragment_infos: HashMap<_, _> = info.new_fragment_info().collect();

let table_id = info.table_fragments.table_id();
let table_id = info.stream_job_fragments.stream_job_id();
let table_id_str = format!("{}", table_id.table_id);

let actors_to_create = info.table_fragments.actors_to_create();
let actors_to_create = info.stream_job_fragments.actors_to_create();
let graph_info = InflightStreamingJobInfo {
job_id: table_id,
fragment_infos,
Expand Down Expand Up @@ -121,7 +121,7 @@ impl CreatingStreamingJobControl {
} else {
let progress = create_mview_tracker
.gen_ddl_progress()
.remove(&self.info.table_fragments.table_id().table_id)
.remove(&self.info.stream_job_fragments.stream_job_id().table_id)
.expect("should exist");
format!("Snapshot [{}]", progress.progress)
}
Expand All @@ -143,7 +143,7 @@ impl CreatingStreamingJobControl {
}
};
DdlProgress {
id: self.info.table_fragments.table_id().table_id as u64,
id: self.info.stream_job_fragments.stream_job_id().table_id as u64,
statement: self.info.definition.clone(),
progress,
}
Expand Down Expand Up @@ -202,7 +202,7 @@ impl CreatingStreamingJobControl {
command: Option<&Command>,
barrier_info: &BarrierInfo,
) -> MetaResult<()> {
let table_id = self.info.table_fragments.table_id();
let table_id = self.info.stream_job_fragments.stream_job_id();
let start_consume_upstream =
if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command {
jobs_to_merge.contains_key(&table_id)
Expand All @@ -211,7 +211,7 @@ impl CreatingStreamingJobControl {
};
if start_consume_upstream {
info!(
table_id = self.info.table_fragments.table_id().table_id,
table_id = self.info.stream_job_fragments.stream_job_id().table_id,
prev_epoch = barrier_info.prev_epoch(),
"start consuming upstream"
);
Expand All @@ -235,7 +235,7 @@ impl CreatingStreamingJobControl {
{
Self::inject_barrier(
DatabaseId::new(self.info.streaming_job.database_id()),
self.info.table_fragments.table_id(),
self.info.stream_job_fragments.stream_job_id(),
control_stream_manager,
&mut self.barrier_control,
&self.graph_info,
Expand All @@ -260,7 +260,7 @@ impl CreatingStreamingJobControl {
let prev_barriers_to_inject = self.status.update_progress(&resp.create_mview_progress);
self.barrier_control.collect(epoch, worker_id, resp);
if let Some(prev_barriers_to_inject) = prev_barriers_to_inject {
let table_id = self.info.table_fragments.table_id();
let table_id = self.info.stream_job_fragments.stream_job_id();
for info in prev_barriers_to_inject {
Self::inject_barrier(
DatabaseId::new(self.info.streaming_job.database_id()),
Expand Down
Loading
Loading