Skip to content

Commit

Permalink
refine col id
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Nov 27, 2024
1 parent 39e77fb commit ee18a7e
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 8 deletions.
18 changes: 11 additions & 7 deletions src/connector/src/parser/additional_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,11 +284,16 @@ pub fn source_add_partition_offset_cols(
skip_col_id: bool,
) -> ([bool; 2], [ColumnDesc; 2]) {
let mut columns_exist = [false; 2];
let mut last_column_id = if skip_col_id {
// col id will be filled outside later. Here just use a placeholder.
ColumnId::new(0)
} else {
max_column_id(columns)

let mut last_column_id = max_column_id(columns);
let mut assign_col_id = || {
if skip_col_id {
// col id will be filled outside later. Here just use a placeholder.
ColumnId::placeholder()
} else {
last_column_id = last_column_id.next();
last_column_id
}
};

let additional_columns: Vec<_> = {
Expand All @@ -298,11 +303,10 @@ pub fn source_add_partition_offset_cols(
["partition", "file", "offset"]
.iter()
.filter_map(|col_type| {
last_column_id = last_column_id.next();
if compat_col_types.contains(col_type) {
Some(
build_additional_column_desc(
last_column_id,
assign_col_id(),
connector_name,
col_type,
None,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1564,7 +1564,7 @@ pub async fn bind_create_source_or_table_with_connector(
let (columns_exist, additional_columns) = source_add_partition_offset_cols(
&columns,
&with_properties.get_connector().unwrap(),
true,
true, // col_id filled below at col_id_gen.generate
);
for (existed, c) in columns_exist.into_iter().zip_eq_fast(additional_columns) {
if !existed {
Expand Down

0 comments on commit ee18a7e

Please sign in to comment.