Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/pkg/sysdb/coordinator/model/collection_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ type ValueTypes struct {
SparseVector *SparseVectorValueType `json:"sparse_vector,omitempty"`
Int *IntValueType `json:"int,omitempty"`
Float *FloatValueType `json:"float,omitempty"`
Boolean *BoolValueType `json:"boolean,omitempty"`
Boolean *BoolValueType `json:"bool,omitempty"`
}

type Schema struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func TestUpdateSchemaFromConfig_HnswSuccess(t *testing.T) {
"config": {}
}
},
"boolean": {
"bool": {
"bool_inverted_index": {
"enabled": true,
"config": {}
Expand Down Expand Up @@ -312,7 +312,7 @@ func TestUpdateSchemaFromConfig_SpannSuccess(t *testing.T) {
"config": {}
}
},
"boolean": {
"bool": {
"bool_inverted_index": {
"enabled": true,
"config": {}
Expand Down Expand Up @@ -481,7 +481,7 @@ func TestUpdateSchemaFromConfig_EmbeddingFunction(t *testing.T) {
"config": {}
}
},
"boolean": {
"bool": {
"bool_inverted_index": {
"enabled": true,
"config": {}
Expand Down
2 changes: 1 addition & 1 deletion go/pkg/sysdb/coordinator/table_catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1555,7 +1555,7 @@ func TestUpdateCollection_WithSchema(t *testing.T) {
"config": {}
}
},
"boolean": {
"bool": {
"bool_inverted_index": {
"enabled": true,
"config": {}
Expand Down
15 changes: 10 additions & 5 deletions rust/cli/src/commands/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use chroma_segment::local_segment_manager::LocalSegmentManager;
use chroma_sqlite::db::SqliteDb;
use chroma_sysdb::SysDb;
use chroma_system::System;
use chroma_types::{CollectionUuid, KnnIndex, ListCollectionsRequest};
use chroma_types::{CollectionUuid, KnnIndex, ListCollectionsRequest, Schema};
use clap::Parser;
use colored::Colorize;
use dialoguer::Confirm;
Expand Down Expand Up @@ -108,10 +108,15 @@ async fn trigger_vector_segments_max_seq_id_migration(
for collection_id in collection_ids {
let mut collection = sysdb.get_collection_with_segments(collection_id).await?;

collection
.collection
.reconcile_schema_with_config(default_knn_index)
.map_err(|e| Box::new(e) as Box<dyn Error>)?;
if collection.collection.schema.is_none() {
collection.collection.schema = Some(
Schema::convert_collection_config_to_schema(
&collection.collection.config,
default_knn_index,
)
.map_err(|e| Box::new(e) as Box<dyn Error>)?,
);
}

// If collection is uninitialized, that means nothing has been written yet.
let dim = match collection.collection.dimension {
Expand Down
18 changes: 10 additions & 8 deletions rust/frontend/src/get_collection_with_segments_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,14 +185,16 @@ impl CollectionsWithSegmentsProvider {
.await?
};

// reconcile schema and config
let reconciled_schema = Schema::reconcile_schema_and_config(
collection_and_segments_sysdb.collection.schema.as_ref(),
Some(&collection_and_segments_sysdb.collection.config),
knn_index,
)
.map_err(CollectionsWithSegmentsProviderError::InvalidSchema)?;
collection_and_segments_sysdb.collection.schema = Some(reconciled_schema);
if collection_and_segments_sysdb.collection.schema.is_none() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have we tested that schema is None and not {} for older collections?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

collection_and_segments_sysdb.collection.schema = Some(
Copy link
Contributor

@sanketkedia sanketkedia Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are always passing schema down to the reader then should we update the reader in distributed_hnsw.rs to use schema instead of collection config (with fallback to legacy metadata). Similar to local_hnsw.rs. Makes things uniform and easier to understand. (I understand that the current code is also correct, this is just a code design nit)

Schema::convert_collection_config_to_schema(
&collection_and_segments_sysdb.collection.config,
knn_index,
)
.map_err(CollectionsWithSegmentsProviderError::InvalidSchema)?,
);
}

self.set_collection_with_segments(collection_and_segments_sysdb.clone())
.await;
Ok(collection_and_segments_sysdb)
Expand Down
11 changes: 6 additions & 5 deletions rust/frontend/src/impls/service_based_frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ impl ServiceBasedFrontend {
if self.enable_schema {
for collection in collections.iter_mut() {
collection
.reconcile_schema_with_config(self.default_knn_index)
.reconcile_schema_for_read()
.map_err(GetCollectionsError::InvalidSchema)?;
}
}
Expand Down Expand Up @@ -425,7 +425,7 @@ impl ServiceBasedFrontend {
if self.enable_schema {
for collection in &mut collections {
collection
.reconcile_schema_with_config(self.default_knn_index)
.reconcile_schema_for_read()
.map_err(GetCollectionError::InvalidSchema)?;
}
}
Expand All @@ -450,7 +450,7 @@ impl ServiceBasedFrontend {

if self.enable_schema {
collection
.reconcile_schema_with_config(self.default_knn_index)
.reconcile_schema_for_read()
.map_err(GetCollectionByCrnError::InvalidSchema)?;
}
Ok(collection)
Expand Down Expand Up @@ -630,9 +630,10 @@ impl ServiceBasedFrontend {
// that was retrieved from sysdb, rather than the one that was passed in
if self.enable_schema {
collection
.reconcile_schema_with_config(self.default_knn_index)
.reconcile_schema_for_read()
.map_err(CreateCollectionError::InvalidSchema)?;
}

Ok(collection)
}

Expand Down Expand Up @@ -735,7 +736,7 @@ impl ServiceBasedFrontend {
.await?;
collection_and_segments
.collection
.reconcile_schema_with_config(self.default_knn_index)
.reconcile_schema_for_read()
.map_err(ForkCollectionError::InvalidSchema)?;
let collection = collection_and_segments.collection.clone();
let latest_collection_logical_size_bytes = collection_and_segments
Expand Down
21 changes: 16 additions & 5 deletions rust/log/src/local_compaction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use chroma_sysdb::SysDb;
use chroma_system::Handler;
use chroma_system::{Component, ComponentContext};
use chroma_types::{
Chunk, CollectionUuid, GetCollectionWithSegmentsError, KnnIndex, LogRecord, SchemaError,
Chunk, CollectionUuid, GetCollectionWithSegmentsError, KnnIndex, LogRecord, Schema, SchemaError,
};
use serde::{Deserialize, Serialize};
use thiserror::Error;
Expand Down Expand Up @@ -141,9 +141,15 @@ impl Handler<BackfillMessage> for LocalCompactionManager {
.get_collection_with_segments(message.collection_id)
.await?;
let schema_previously_persisted = collection_and_segments.collection.schema.is_some();
collection_and_segments
.collection
.reconcile_schema_with_config(KnnIndex::Hnsw)?;
if !schema_previously_persisted {
collection_and_segments.collection.schema = Some(
Schema::convert_collection_config_to_schema(
&collection_and_segments.collection.config,
KnnIndex::Hnsw,
)
.map_err(CompactionManagerError::SchemaReconcileError)?,
);
}
// If collection is uninitialized, that means nothing has been written yet.
let dim = match collection_and_segments.collection.dimension {
Some(dim) => dim,
Expand Down Expand Up @@ -267,7 +273,12 @@ impl Handler<PurgeLogsMessage> for LocalCompactionManager {
.get_collection_with_segments(message.collection_id)
.await?;
let mut collection = collection_segments.collection.clone();
collection.reconcile_schema_with_config(KnnIndex::Hnsw)?;
if collection.schema.is_none() {
collection.schema = Some(
Schema::convert_collection_config_to_schema(&collection.config, KnnIndex::Hnsw)
.map_err(CompactionManagerError::SchemaReconcileError)?,
);
}
// If dimension is None, that means nothing has been written yet.
let dim = match collection.dimension {
Some(dim) => dim,
Expand Down
43 changes: 27 additions & 16 deletions rust/segment/src/distributed_hnsw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,12 @@ impl DistributedHNSWSegmentWriter {
) -> Result<Box<DistributedHNSWSegmentWriter>, Box<DistributedHNSWSegmentFromSegmentError>>
{
let hnsw_configuration = collection
.config
.get_hnsw_config_with_legacy_fallback(segment)
.schema
.as_ref()
.map(|schema| schema.get_internal_hnsw_config_with_legacy_fallback(segment))
.transpose()
.map_err(DistributedHNSWSegmentFromSegmentError::InvalidHnswConfiguration)?
.flatten()
.ok_or(DistributedHNSWSegmentFromSegmentError::MissingHnswConfiguration)?;

// TODO: this is hacky, we use the presence of files to determine if we need to load or create the index
Expand Down Expand Up @@ -314,9 +317,12 @@ impl DistributedHNSWSegmentReader {
) -> Result<Box<DistributedHNSWSegmentReader>, Box<DistributedHNSWSegmentFromSegmentError>>
{
let hnsw_configuration = collection
.config
.get_hnsw_config_with_legacy_fallback(segment)
.schema
.as_ref()
.map(|schema| schema.get_internal_hnsw_config_with_legacy_fallback(segment))
.transpose()
.map_err(DistributedHNSWSegmentFromSegmentError::InvalidHnswConfiguration)?
.flatten()
.ok_or(DistributedHNSWSegmentFromSegmentError::MissingHnswConfiguration)?;

// TODO: this is hacky, we use the presence of files to determine if we need to load or create the index
Expand Down Expand Up @@ -394,7 +400,7 @@ pub mod test {
use chroma_index::{HnswIndexConfig, DEFAULT_MAX_ELEMENTS};
use chroma_types::{
Collection, CollectionUuid, InternalCollectionConfiguration, InternalHnswConfiguration,
Segment, SegmentUuid,
KnnIndex, Schema, Segment, SegmentUuid,
};
use tempfile::tempdir;
use uuid::Uuid;
Expand Down Expand Up @@ -423,18 +429,20 @@ pub mod test {
config.persist_path,
Some(persist_path.to_str().unwrap().to_string())
);
let config = InternalCollectionConfiguration {
vector_index: chroma_types::VectorIndexConfiguration::Hnsw(InternalHnswConfiguration {
max_neighbors: 10,
..Default::default()
}),
embedding_function: None,
};

// Try partial override
let collection = Collection {
config: InternalCollectionConfiguration {
vector_index: chroma_types::VectorIndexConfiguration::Hnsw(
InternalHnswConfiguration {
max_neighbors: 10,
..Default::default()
},
),
embedding_function: None,
},
config: config.clone(),
schema: Some(
Schema::convert_collection_config_to_schema(&config, KnnIndex::Hnsw).unwrap(),
),
..Default::default()
};

Expand All @@ -448,9 +456,12 @@ pub mod test {
};

let hnsw_params = collection
.config
.get_hnsw_config_with_legacy_fallback(&segment)
.schema
.as_ref()
.map(|schema| schema.get_internal_hnsw_config_with_legacy_fallback(&segment))
.transpose()
.unwrap()
.flatten()
.unwrap();
let config = HnswIndexConfig::new_persistent(
hnsw_params.max_neighbors,
Expand Down
30 changes: 15 additions & 15 deletions rust/segment/src/distributed_spann.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,14 @@ impl SpannSegmentWriter {
return Err(SpannSegmentWriterError::InvalidArgument);
}

let reconciled_schema = Schema::reconcile_schema_and_config(
collection.schema.as_ref(),
Some(&collection.config),
KnnIndex::Spann,
)
.map_err(SpannSegmentWriterError::InvalidSchema)?;
let schema = if let Some(schema) = &collection.schema {
schema.clone()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why clone here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because i cant do collection.schema directly, i have to borrow the schema with as_ref

the other alternative would be to borrow collection.schema directly. this works, just want to confirm this is safe?

        let schema = if let Some(schema) = &collection.schema {
            schema.clone()
        } else {
            Schema::convert_collection_config_to_schema(&collection.config, KnnIndex::Spann)
                .map_err(SpannSegmentWriterError::InvalidSchema)?
        };

} else {
Schema::convert_collection_config_to_schema(&collection.config, KnnIndex::Spann)
.map_err(SpannSegmentWriterError::InvalidSchema)?
};

let params = reconciled_schema
let params = schema
.get_internal_spann_config()
.ok_or(SpannSegmentWriterError::MissingSpannConfiguration)?;

Expand Down Expand Up @@ -690,8 +690,8 @@ mod test {
..Default::default()
};
collection.schema = Some(
Schema::reconcile_schema_and_config(None, Some(&collection.config), KnnIndex::Spann)
.expect("Error reconciling schema for test collection"),
Schema::convert_collection_config_to_schema(&collection.config, KnnIndex::Spann)
.expect("Error converting config to schema for test collection"),
);

let pl_block_size = 5 * 1024 * 1024;
Expand Down Expand Up @@ -927,8 +927,8 @@ mod test {
..Default::default()
};
collection.schema = Some(
Schema::reconcile_schema_and_config(None, Some(&collection.config), KnnIndex::Spann)
.expect("Error reconciling schema for test collection"),
Schema::convert_collection_config_to_schema(&collection.config, KnnIndex::Spann)
.expect("Error converting config to schema for test collection"),
);

let pl_block_size = 5 * 1024 * 1024;
Expand Down Expand Up @@ -1089,8 +1089,8 @@ mod test {
..Default::default()
};
collection.schema = Some(
Schema::reconcile_schema_and_config(None, Some(&collection.config), KnnIndex::Spann)
.expect("Error reconciling schema for test collection"),
Schema::convert_collection_config_to_schema(&collection.config, KnnIndex::Spann)
.expect("Error converting config to schema for test collection"),
);

let segment_id = SegmentUuid::new();
Expand Down Expand Up @@ -1220,8 +1220,8 @@ mod test {
..Default::default()
};
collection.schema = Some(
Schema::reconcile_schema_and_config(None, Some(&collection.config), KnnIndex::Spann)
.expect("Error reconciling schema for test collection"),
Schema::convert_collection_config_to_schema(&collection.config, KnnIndex::Spann)
.expect("Error converting config to schema for test collection"),
);

let pl_block_size = 5 * 1024 * 1024;
Expand Down
24 changes: 24 additions & 0 deletions rust/types/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use super::{Metadata, MetadataValueConversionError};
use crate::{
chroma_proto, test_segment, CollectionConfiguration, InternalCollectionConfiguration, KnnIndex,
Schema, SchemaError, Segment, SegmentScope, UpdateCollectionConfiguration, UpdateMetadata,
VectorIndexConfiguration,
};
use chroma_error::{ChromaError, ErrorCodes};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -243,6 +244,29 @@ impl Collection {
Ok(())
}

/// Reconcile the collection schema and configuration when serving read requests.
///
/// The read path needs to tolerate collections that only have a configuration persisted.
/// This helper hydrates `schema` from the stored configuration when needed, or regenerates
/// the configuration from the existing schema to keep both representations consistent.
pub fn reconcile_schema_for_read(&mut self) -> Result<(), SchemaError> {
if let Some(schema) = self.schema.as_ref() {
self.config = InternalCollectionConfiguration::try_from(schema)
.map_err(|reason| SchemaError::InvalidSchema { reason })?;
} else {
let knn_index = match self.config.vector_index {
VectorIndexConfiguration::Hnsw(_) => KnnIndex::Hnsw,
VectorIndexConfiguration::Spann(_) => KnnIndex::Spann,
};
self.schema = Some(Schema::convert_collection_config_to_schema(
&self.config,
knn_index,
)?);
}

Ok(())
}

pub fn test_collection(dim: i32) -> Self {
Collection {
name: "test_collection".to_string(),
Expand Down
Loading
Loading