Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ members = [
"influxdb2_client",
"iox_http",
"iox_query_influxql",
"iox_query_influxql_rewrite",
"iox_query",
"iox_system_tables",
"iox_time",
"iox_v1_query_api",
"logfmt",
"meta_data_cache",
"metric_exporters",
Expand Down Expand Up @@ -74,26 +76,26 @@ arrow-schema = { version = "55" }
bincode = { version = "2", default-features = false, features = ["alloc", "derive"] }
# Use DataFusion fork
# See https://github.com/influxdata/arrow-datafusion/pull/73 for contents
datafusion = { git = "https://github.com/influxdata/arrow-datafusion.git", rev = "a9cf9aca9ebf0d6c04e0861d2baebffa0ba77dbc" }
datafusion-proto = { git = "https://github.com/influxdata/arrow-datafusion.git", rev = "a9cf9aca9ebf0d6c04e0861d2baebffa0ba77dbc" }
datafusion = { git = "https://github.com/influxdata/arrow-datafusion.git", rev = "ee81b1cc652bde6c131973d091b178836692112d" }
datafusion-proto = { git = "https://github.com/influxdata/arrow-datafusion.git", rev = "ee81b1cc652bde6c131973d091b178836692112d" }
hashbrown = { version = "0.14.5" }
http = { version = "1" }
http-body = { version = "1" }
http-body-util = { version = "0.1" }
hyper = { version = "1" }
hyper-util = { version = "0.1" }
object_store = { version = "0.12.3", features = ["aws", "azure", "gcp"] }
object_store = { version = "0.12.4", features = ["aws", "azure", "gcp"] }
parquet = { version = "55", features = ["object_store"] }
pbjson = { version = "0.7" }
pbjson-build = { version = "0.7" }
pbjson = { version = "0.8" }
pbjson-build = { version = "0.8" }
pbjson-types = { version = "0.7" }
proptest = { version = "1", default-features = false, features = ["std"] }
prost = { version = "0.13" }
prost-build = { version = "0.13" }
prost-types = { version = "0.13" }
reqwest = { version = "0.12", default-features = false }
rstest = { version = "0.21" }
sqlx = { version = "0.8.6", features = ["sqlite"] }
rstest = { version = "0.26" }
sqlx = { version = "0.8.6" }
tower = { version = "0.5" }
tracing = { version = "0.1", features = ["log", "max_level_trace"] }
tracing-log = { version = "0.2" }
Expand Down
2 changes: 1 addition & 1 deletion arrow_util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ comfy-table = { version = "7.2", default-features = false }
hashbrown = { workspace = true }
num-traits = "0.2"
parquet = { workspace = true }
regex = "1.11.2"
regex = "1.12.2"
snafu = "0.8"
uuid = "1"
workspace-hack = { version = "0.1", path = "../workspace-hack" }
Expand Down
4 changes: 2 additions & 2 deletions authz/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ snafu = "0.8"

[dev-dependencies]
assert_matches = "1.5.0"
parking_lot = "0.12.4"
parking_lot = "0.12.5"
paste = "1.0.15"
test_helpers_authz = { path = "../test_helpers_authz" }
tokio = "1.47.1"
tokio = "1.48.0"

[features]
http = ["dep:http"]
2 changes: 1 addition & 1 deletion backoff/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ license.workspace = true
workspace = true

[dependencies]
tokio = { version = "1.47", features = ["macros", "time"] }
tokio = { version = "1.48", features = ["macros", "time"] }
tracing = { workspace = true }
rand = "0.9"
snafu = "0.8"
Expand Down
2 changes: 1 addition & 1 deletion catalog_cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ iox_http_util = { path = "../iox_http_util" }
tracing = { workspace = true }
reqwest = { workspace = true }
snafu = "0.8"
tokio = { version = "1.47", default-features = false, features = [
tokio = { version = "1.48", default-features = false, features = [
"macros",
"rt",
] }
Expand Down
43 changes: 17 additions & 26 deletions catalog_cache/benches/list_encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,25 @@ fn encode_partition_snapshot(i: usize) -> Bytes {
let partition_key = PartitionKey::from(format!("arbitrary_{i}"));
let expected_partition_hash_id = PartitionHashId::new(table_id, &partition_key);
let generation = 6;
let parquet_file_defaults = ParquetFile {

let partition = Partition::new_catalog_only(
partition_id,
table_id,
partition_key.clone(),
Default::default(),
Default::default(),
Default::default(),
Default::default(),
None, // max_time
Default::default(),
);
// Create associated Parquet file
let parquet_files = vec![ParquetFile {
id: ParquetFileId::new(7 + i as i64),
namespace_id,
table_id,
partition_id,
partition_hash_id: Some(expected_partition_hash_id.clone()),
partition_hash_id: expected_partition_hash_id.clone(),
object_store_id: ObjectStoreId::from_str("00000000-0000-0001-0000-000000000000").unwrap(),
min_time: Timestamp::new(2),
max_time: Timestamp::new(3),
Expand All @@ -120,31 +133,9 @@ fn encode_partition_snapshot(i: usize) -> Bytes {
column_set: ColumnSet::empty(),
max_l0_created_at: Timestamp::new(6),
source: None,
};
}];

let partition = Partition::new_catalog_only(
partition_id,
Some(expected_partition_hash_id.clone()),
table_id,
partition_key.clone(),
Default::default(),
Default::default(),
Default::default(),
Default::default(),
None, // max_time
);
// Create associated Parquet files:
let parquet_files = vec![
// one addressed by numeric ID,
ParquetFile {
partition_hash_id: None,
..parquet_file_defaults.clone()
},
// one addressed by hash ID.
parquet_file_defaults.clone(),
];

// Encode the partition and its Parquet files,
// Encode the partition and its Parquet file
let snapshot = PartitionSnapshot::encode(
namespace_id,
partition,
Expand Down
4 changes: 2 additions & 2 deletions client_util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ reqwest = { workspace = true, features = ["stream", "rustls-tls-native-roots"] }
# This direct dependency on rustls can probably be removed when tonic is upgraded to 0.13+.
# See <https://github.com/influxdata/influxdb_iox/issues/14683> for more details.
rustls = { version = "0.23", default-features = false }
thiserror = "2.0.16"
thiserror = "2.0.17"
tonic = { version = "0.12", features = ["gzip", "tls", "tls-native-roots", "zstd"] }
tower = { workspace = true }
workspace-hack = { version = "0.1", path = "../workspace-hack" }

[dev-dependencies]
tokio = { version = "1.47", features = [
tokio = { version = "1.48", features = [
"macros",
"parking_lot",
"rt-multi-thread",
Expand Down
4 changes: 2 additions & 2 deletions data_types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ arrow = { workspace = true }
arrow-buffer = { workspace = true }
bytes = "1.10"
chrono = { version = "0.4", default-features = false }
croaring = "2.4.0"
croaring = "2.5.1"
influxdb-line-protocol = { path = "../influxdb_line_protocol" }
iox_time = { path = "../iox_time" }
generated_types = { path = "../generated_types" }
Expand All @@ -33,7 +33,7 @@ sqlx = { workspace = true, features = [
"postgres",
"uuid",
] }
thiserror = "2.0.16"
thiserror = "2.0.17"
uuid = { version = "1", features = ["v4"] }
workspace-hack = { version = "0.1", path = "../workspace-hack" }

Expand Down
80 changes: 27 additions & 53 deletions data_types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -810,15 +810,30 @@ pub struct TableSchema {

/// the table's columns by their name
pub columns: ColumnsByName,

/// Whether or not iceberg is enabled for this table
pub iceberg_enabled: bool,
}

impl TableSchema {
/// Initialize new `TableSchema` from the information in the given `Table`.
/// Initialize new [`TableSchema`] from the information in the given [`Table`].
pub fn new_empty_from(table: &Table) -> Self {
Self {
id: table.id,
partition_template: table.partition_template.clone(),
columns: ColumnsByName::default(),
iceberg_enabled: table.iceberg_enabled,
}
}

/// Initialize a new [`TableSchema`] with the given id, no columns, default partition, and
/// iceberg disabled.
pub fn new_with(id: TableId) -> Self {
Self {
id,
partition_template: TablePartitionTemplateOverride::default(),
columns: ColumnsByName::default(),
iceberg_enabled: false,
}
}

Expand Down Expand Up @@ -1077,8 +1092,8 @@ pub struct ParquetFile {
pub table_id: TableId,
/// the partition identifier
pub partition_id: PartitionId,
/// the optional partition hash id
pub partition_hash_id: Option<PartitionHashId>,
/// the partition hash id
pub partition_hash_id: PartitionHashId,
/// the uuid used in the object store path for this file
pub object_store_id: ObjectStoreId,
/// the min timestamp of data in this file
Expand Down Expand Up @@ -1178,11 +1193,7 @@ impl ParquetFile {

/// Estimate the memory consumption of this object and its contents
pub fn size(&self) -> usize {
let hash_id = self
.partition_hash_id
.as_ref()
.map(|x| x.size())
.unwrap_or_default();
let hash_id = self.partition_hash_id.size();

size_of_val(self) + hash_id + self.column_set.size() - size_of_val(&self.column_set)
}
Expand Down Expand Up @@ -1211,7 +1222,7 @@ impl ParquetFile {

/// Temporary to aid incremental migration
pub fn transition_partition_id(&self) -> TransitionPartitionId {
TransitionPartitionId::from_parts(self.partition_id, self.partition_hash_id.clone())
TransitionPartitionId::from_parts(self.partition_id, Some(self.partition_hash_id.clone()))
}
}

Expand All @@ -1222,10 +1233,7 @@ impl From<ParquetFile> for catalog_proto::ParquetFile {
namespace_id: v.namespace_id.get(),
table_id: v.table_id.get(),
partition_id: v.partition_id.get(),
partition_hash_id: v
.partition_hash_id
.map(|x| x.as_bytes().to_vec())
.unwrap_or_default(),
partition_hash_id: v.partition_hash_id.as_bytes().to_vec(),
object_store_id: v.object_store_id.to_string(),
min_time: v.min_time.get(),
max_time: v.max_time.get(),
Expand Down Expand Up @@ -1266,11 +1274,7 @@ impl TryFrom<catalog_proto::ParquetFile> for ParquetFile {
namespace_id: NamespaceId::new(v.namespace_id),
table_id: TableId::new(v.table_id),
partition_id: PartitionId::new(v.partition_id),
partition_hash_id: if v.partition_hash_id.is_empty() {
None
} else {
Some(v.partition_hash_id[..].try_into()?)
},
partition_hash_id: v.partition_hash_id[..].try_into()?,
object_store_id: ObjectStoreId::from_str(&v.object_store_id)?,
min_time: Timestamp::new(v.min_time),
max_time: Timestamp::new(v.max_time),
Expand Down Expand Up @@ -1346,7 +1350,7 @@ pub struct ParquetFileParams {
/// the partition identifier
pub partition_id: PartitionId,
/// the partition hash ID
pub partition_hash_id: Option<PartitionHashId>,
pub partition_hash_id: PartitionHashId,
/// the uuid used in the object store path for this file
pub object_store_id: ObjectStoreId,
/// the min timestamp of data in this file
Expand Down Expand Up @@ -3329,6 +3333,7 @@ mod tests {
id: TableId::new(1),
partition_template: Default::default(),
columns: ColumnsByName::default(),
iceberg_enabled: false,
};
let schema2 = TableSchema {
id: TableId::new(2),
Expand All @@ -3339,6 +3344,7 @@ mod tests {
name: String::from("foo"),
column_type: ColumnType::Bool,
}]),
iceberg_enabled: false,
};
assert!(schema1.size() < schema2.size());
}
Expand All @@ -3361,11 +3367,7 @@ mod tests {
id: NamespaceId::new(1),
active_tables: BTreeMap::from([(
String::from("foo"),
TableSchema {
id: TableId::new(1),
columns: ColumnsByName::default(),
partition_template: Default::default(),
},
TableSchema::new_with(TableId::new(1)),
)]),
deleted_tables: BTreeSet::new(),
partition_template: Default::default(),
Expand Down Expand Up @@ -3412,41 +3414,13 @@ mod tests {

#[test]
fn catalog_service_parquet_file_serde_roundtrip() {
// This part of the test can be removed when all partitions have hash IDs.
let old_style_parquet_file = ParquetFile {
id: ParquetFileId::new(3),
namespace_id: NamespaceId::new(4),
table_id: TableId::new(5),
partition_id: PartitionId::new(6),
partition_hash_id: None, // this is the important part for this test
object_store_id: ObjectStoreId::new(),
min_time: Timestamp::new(30),
max_time: Timestamp::new(50),
to_delete: None,
file_size_bytes: 1024,
row_count: 42,
compaction_level: CompactionLevel::Initial,
created_at: Timestamp::new(70),
column_set: ColumnSet::empty(),
max_l0_created_at: Timestamp::new(70),
source: None,
};
let catalog_proto_old_style_parquet_file =
catalog_proto::ParquetFile::from(old_style_parquet_file.clone());
let round_trip_old_style_parquet_file =
ParquetFile::try_from(catalog_proto_old_style_parquet_file).unwrap();
assert_eq!(old_style_parquet_file, round_trip_old_style_parquet_file);

let table_id = TableId::new(5);
let parquet_file = ParquetFile {
id: ParquetFileId::new(3),
namespace_id: NamespaceId::new(4),
table_id,
partition_id: PartitionId::new(6),
partition_hash_id: Some(PartitionHashId::new(
table_id,
&PartitionKey::from("arbitrary"),
)),
partition_hash_id: PartitionHashId::new(table_id, &PartitionKey::from("arbitrary")),
object_store_id: ObjectStoreId::new(),
min_time: Timestamp::new(30),
max_time: Timestamp::new(50),
Expand Down
Loading