Skip to content

Commit

Permalink
feat: vec-395 enable toggling enable vector integrity check
Browse files Browse the repository at this point in the history
  • Loading branch information
dwelch-spike committed Oct 18, 2024
1 parent 4a9f93a commit bd9f5f4
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 57 deletions.
1 change: 1 addition & 0 deletions cmd/flags/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ const (
HnswHealerParallelism = "hnsw-healer-parallelism"
HnswMergeParallelism = "hnsw-merge-index-parallelism"
HnswMergeReIndexParallelism = "hnsw-merge-reindex-parallelism"
EnableVectorIntegrityCheck = "enable-vector-integrity-check"
TLSProtocols = "tls-protocols"
TLSCaFile = "tls-cafile"
TLSCaPath = "tls-capath"
Expand Down
71 changes: 38 additions & 33 deletions cmd/indexCreate.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,39 +21,41 @@ import (

//nolint:govet // Padding not a concern for a CLI
var indexCreateFlags = &struct {
clientFlags *flags.ClientFlags
yes bool
inputFile string
namespace string
set flags.StringOptionalFlag
indexName string
vectorField string
dimensions uint32
distanceMetric flags.DistanceMetricFlag
indexLabels map[string]string
storageNamespace flags.StringOptionalFlag
storageSet flags.StringOptionalFlag
hnswMaxEdges flags.Uint32OptionalFlag
hnswEf flags.Uint32OptionalFlag
hnswConstructionEf flags.Uint32OptionalFlag
hnswMaxMemQueueSize flags.Uint32OptionalFlag
hnswBatch flags.BatchingFlags
hnswCache flags.CachingFlags
hnswHealer flags.HealerFlags
hnswMerge flags.MergeFlags
clientFlags *flags.ClientFlags
yes bool
inputFile string
namespace string
set flags.StringOptionalFlag
indexName string
vectorField string
dimensions uint32
distanceMetric flags.DistanceMetricFlag
indexLabels map[string]string
storageNamespace flags.StringOptionalFlag
storageSet flags.StringOptionalFlag
hnswMaxEdges flags.Uint32OptionalFlag
hnswEf flags.Uint32OptionalFlag
hnswConstructionEf flags.Uint32OptionalFlag
hnswMaxMemQueueSize flags.Uint32OptionalFlag
hnswBatch flags.BatchingFlags
hnswCache flags.CachingFlags
hnswHealer flags.HealerFlags
hnswMerge flags.MergeFlags
enableVectorIntegrityCheck flags.BoolOptionalFlag
}{
clientFlags: rootFlags.clientFlags,
set: flags.StringOptionalFlag{},
storageNamespace: flags.StringOptionalFlag{},
storageSet: flags.StringOptionalFlag{},
hnswMaxEdges: flags.Uint32OptionalFlag{},
hnswEf: flags.Uint32OptionalFlag{},
hnswConstructionEf: flags.Uint32OptionalFlag{},
hnswMaxMemQueueSize: flags.Uint32OptionalFlag{},
hnswBatch: *flags.NewHnswBatchingFlags(),
hnswCache: *flags.NewHnswCachingFlags(),
hnswHealer: *flags.NewHnswHealerFlags(),
hnswMerge: *flags.NewHnswMergeFlags(),
clientFlags: rootFlags.clientFlags,
set: flags.StringOptionalFlag{},
storageNamespace: flags.StringOptionalFlag{},
storageSet: flags.StringOptionalFlag{},
hnswMaxEdges: flags.Uint32OptionalFlag{},
hnswEf: flags.Uint32OptionalFlag{},
hnswConstructionEf: flags.Uint32OptionalFlag{},
hnswMaxMemQueueSize: flags.Uint32OptionalFlag{},
hnswBatch: *flags.NewHnswBatchingFlags(),
hnswCache: *flags.NewHnswCachingFlags(),
hnswHealer: *flags.NewHnswHealerFlags(),
hnswMerge: *flags.NewHnswMergeFlags(),
enableVectorIntegrityCheck: flags.BoolOptionalFlag{},
}

func newIndexCreateFlagSet() *pflag.FlagSet {
Expand All @@ -72,7 +74,8 @@ func newIndexCreateFlagSet() *pflag.FlagSet {
flagSet.Var(&indexCreateFlags.hnswMaxEdges, flags.HnswMaxEdges, "Maximum number bi-directional links per HNSW vertex. Greater values of 'm' in general provide better recall for data with high dimensionality, while lower values work well for data with lower dimensionality. The storage space required for the index increases proportionally with 'm'.") //nolint:lll // For readability
flagSet.Var(&indexCreateFlags.hnswConstructionEf, flags.HnswConstructionEf, "The number of candidate nearest neighbors shortlisted during index creation. Larger values provide better recall at the cost of longer index update times.") //nolint:lll // For readability
flagSet.Var(&indexCreateFlags.hnswEf, flags.HnswEf, "The default number of candidate nearest neighbors shortlisted during search. Larger values provide better recall at the cost of longer search times.") //nolint:lll // For readability
flagSet.Var(&indexCreateFlags.hnswMaxMemQueueSize, flags.HnswMaxMemQueueSize, "Maximum size of in-memory queue for inserted/updated vector records.") //nolint:lll // For readability
flagSet.Var(&indexCreateFlags.hnswMaxMemQueueSize, flags.HnswMaxMemQueueSize, "Maximum size of in-memory queue for inserted/updated vector records.") //nolint:lll // For readability //nolint:lll // For readability
flagSet.Var(&indexUpdateFlags.enableVectorIntegrityCheck, flags.EnableVectorIntegrityCheck, "Enable/disable vector integrity check. Defaults to enabled.") //nolint:lll // For readability
flagSet.AddFlagSet(indexCreateFlags.hnswBatch.NewFlagSet())
flagSet.AddFlagSet(indexCreateFlags.hnswCache.NewFlagSet())
flagSet.AddFlagSet(indexCreateFlags.hnswHealer.NewFlagSet())
Expand Down Expand Up @@ -228,6 +231,7 @@ asvec index create -i myindex -n test -s testset -d 256 -m COSINE --%s vector \
slog.Any(flags.HnswEf, indexCreateFlags.hnswEf.Val),
slog.Any(flags.HnswConstructionEf, indexCreateFlags.hnswConstructionEf.Val),
slog.Any(flags.HnswMaxMemQueueSize, indexCreateFlags.hnswMaxMemQueueSize.Val),
slog.Any(flags.EnableVectorIntegrityCheck, indexCreateFlags.enableVectorIntegrityCheck),
)...,
)

Expand Down Expand Up @@ -348,6 +352,7 @@ func runCreateIndexFromFlags(client *avs.Client) error {
IndexParallelism: indexCreateFlags.hnswMerge.IndexParallelism.Val,
ReIndexParallelism: indexCreateFlags.hnswMerge.ReIndexParallelism.Val,
},
EnableVectorIntegrityCheck: indexCreateFlags.enableVectorIntegrityCheck.Val,
},
}

Expand Down
47 changes: 26 additions & 21 deletions cmd/indexUpdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,35 @@ import (

//nolint:govet // Padding not a concern for a CLI
var indexUpdateFlags = &struct {
clientFlags *flags.ClientFlags
yes bool
namespace string
indexName string
indexLabels map[string]string
hnswMaxMemQueueSize flags.Uint32OptionalFlag
hnswBatch flags.BatchingFlags
hnswCache flags.CachingFlags
hnswHealer flags.HealerFlags
hnswMerge flags.MergeFlags
clientFlags *flags.ClientFlags
yes bool
namespace string
indexName string
indexLabels map[string]string
hnswMaxMemQueueSize flags.Uint32OptionalFlag
hnswBatch flags.BatchingFlags
hnswCache flags.CachingFlags
hnswHealer flags.HealerFlags
hnswMerge flags.MergeFlags
enableVectorIntegrityCheck flags.BoolOptionalFlag
}{
clientFlags: rootFlags.clientFlags,
hnswMaxMemQueueSize: flags.Uint32OptionalFlag{},
hnswBatch: *flags.NewHnswBatchingFlags(),
hnswCache: *flags.NewHnswCachingFlags(),
hnswHealer: *flags.NewHnswHealerFlags(),
hnswMerge: *flags.NewHnswMergeFlags(),
clientFlags: rootFlags.clientFlags,
hnswMaxMemQueueSize: flags.Uint32OptionalFlag{},
hnswBatch: *flags.NewHnswBatchingFlags(),
hnswCache: *flags.NewHnswCachingFlags(),
hnswHealer: *flags.NewHnswHealerFlags(),
hnswMerge: *flags.NewHnswMergeFlags(),
enableVectorIntegrityCheck: flags.BoolOptionalFlag{},
}

func newIndexUpdateFlagSet() *pflag.FlagSet {
flagSet := &pflag.FlagSet{}
flagSet.BoolVarP(&indexUpdateFlags.yes, flags.Yes, "y", false, "When true do not prompt for confirmation.") //nolint:lll // For readability
flagSet.StringVarP(&indexUpdateFlags.namespace, flags.Namespace, flags.NamespaceShort, "", "The namespace for the index.") //nolint:lll // For readability
flagSet.StringVarP(&indexUpdateFlags.indexName, flags.IndexName, flags.IndexNameShort, "", "The name of the index.") //nolint:lll // For readability
flagSet.StringToStringVar(&indexUpdateFlags.indexLabels, flags.IndexLabels, nil, "The distance metric for the index.") //nolint:lll // For readability
flagSet.Var(&indexUpdateFlags.hnswMaxMemQueueSize, flags.HnswMaxMemQueueSize, "Maximum size of in-memory queue for inserted/updated vector records.") //nolint:lll // For readability
flagSet.BoolVarP(&indexUpdateFlags.yes, flags.Yes, "y", false, "When true do not prompt for confirmation.") //nolint:lll // For readability
flagSet.StringVarP(&indexUpdateFlags.namespace, flags.Namespace, flags.NamespaceShort, "", "The namespace for the index.") //nolint:lll // For readability
flagSet.StringVarP(&indexUpdateFlags.indexName, flags.IndexName, flags.IndexNameShort, "", "The name of the index.") //nolint:lll // For readability
flagSet.StringToStringVar(&indexUpdateFlags.indexLabels, flags.IndexLabels, nil, "The distance metric for the index.") //nolint:lll // For readability
flagSet.Var(&indexUpdateFlags.hnswMaxMemQueueSize, flags.HnswMaxMemQueueSize, "Maximum size of in-memory queue for inserted/updated vector records.") //nolint:lll // For readability
flagSet.Var(&indexUpdateFlags.enableVectorIntegrityCheck, flags.EnableVectorIntegrityCheck, "Enable/disable vector integrity check. Defaults to enabled.") //nolint:lll // For readability
flagSet.AddFlagSet(indexUpdateFlags.hnswBatch.NewFlagSet())
flagSet.AddFlagSet(indexUpdateFlags.hnswCache.NewFlagSet())
flagSet.AddFlagSet(indexUpdateFlags.hnswHealer.NewFlagSet())
Expand Down Expand Up @@ -84,6 +87,7 @@ asvec index update -i myindex -n test --%s 10000 --%s 10000ms --%s 10s --%s 16 -
slog.String(flags.IndexName, indexUpdateFlags.indexName),
slog.Any(flags.IndexLabels, indexUpdateFlags.indexLabels),
slog.String(flags.HnswMaxMemQueueSize, indexUpdateFlags.hnswMaxMemQueueSize.String()),
slog.Any(flags.EnableVectorIntegrityCheck, indexCreateFlags.enableVectorIntegrityCheck),
)...,
)

Expand Down Expand Up @@ -119,6 +123,7 @@ asvec index update -i myindex -n test --%s 10000 --%s 10000ms --%s 10s --%s 16 -
IndexParallelism: indexUpdateFlags.hnswMerge.IndexParallelism.Val,
ReIndexParallelism: indexUpdateFlags.hnswMerge.ReIndexParallelism.Val,
},
EnableVectorIntegrityCheck: indexCreateFlags.enableVectorIntegrityCheck.Val,
}

ctx, cancel := context.WithTimeout(context.Background(), indexUpdateFlags.clientFlags.Timeout)
Expand Down
2 changes: 1 addition & 1 deletion cmd/writers/indexList.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (itw *IndexTableWriter) AppendIndexRow(
{"Healer Parallelism*", v.HnswParams.HealerParams.GetParallelism()},
{"Merge Index Parallelism*", v.HnswParams.MergeParams.GetIndexParallelism()},
{"Merge Re-Index Parallelism*", v.HnswParams.MergeParams.GetReIndexParallelism()},
// TODO enable this when testing is done {"Enable Vector Integrity Check", v.HnswParams.GetEnableVectorIntegrityCheck()},
{"Enable Vector Integrity Check", v.HnswParams.GetEnableVectorIntegrityCheck()},
})

row = append(row, renderTable(tHNSW, format))
Expand Down
29 changes: 27 additions & 2 deletions e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,20 @@ func (suite *CmdTestSuite) TestSuccessfulCreateIndexCmd() {
WithStorageSet("name").
Build(),
},
{
name: "test with enable vector integrity check",
indexName: "integidx",
indexNamespace: "test",
cmd: "index create -y -n test -i integidx -d 256 -m COSINE --vector-field vector --hnsw-healer-max-scan-rate-per-node 1000 --hnsw-healer-max-scan-page-size 1000 --hnsw-healer-reindex-percent 10.10 --hnsw-healer-schedule \"0 0 0 ? * *\" --hnsw-healer-parallelism 10 --enable-vector-integrity-check false",
expectedIndex: tests.NewIndexDefinitionBuilder(false, "integidx", "test", 256, protos.VectorDistanceMetric_COSINE, "vector").
WithHnswHealerMaxScanRatePerNode(1000).
WithHnswHealerMaxScanPageSize(1000).
WithHnswHealerReindexPercent(10.10).
WithHnswHealerSchedule("0 0 0 ? * *").
WithHnswHealerParallelism(10).
WithEnableVectorIntegrityCheck(false).
Build(),
},
}

for _, tc := range testCases {
Expand Down Expand Up @@ -524,6 +538,15 @@ func (suite *CmdTestSuite) TestSuccessfulUpdateIndexCmd() {
WithHnswMergeReIndexParallelism(11).
Build(),
},
{
name: "test with enable vector integrity check",
indexName: "successful-update",
indexNamespace: "test",
cmd: "index update -y -n test -i successful-update --enable-vector-integrity-check false",
expectedIndex: newBuilder().
WithEnableVectorIntegrityCheck(false).
Build(),
},
}

for _, tc := range testCases {
Expand Down Expand Up @@ -747,7 +770,8 @@ Healer Re-index % *\,10.00%
Healer Schedule*\,0 0/15 * ? * * *
Healer Parallelism*\,1
Merge Index Parallelism*\,80
Merge Re-Index Parallelism*\,26"
Merge Re-Index Parallelism*\,26
Enable Vector Integrity Check\,true"
2,list1,test,,vector,256,COSINE,0,0,0,map[foo:bar],"Namespace\,test
Set\,list1","HNSW
Max Edges\,16
Expand All @@ -764,7 +788,8 @@ Healer Re-index % *\,10.00%
Healer Schedule*\,0 0/15 * ? * * *
Healer Parallelism*\,1
Merge Index Parallelism*\,80
Merge Re-Index Parallelism*\,26"
Merge Re-Index Parallelism*\,26
Enable Vector Integrity Check\,true"
Values ending with * can be dynamically configured using the 'asvec index update' command.
`,
},
Expand Down
6 changes: 6 additions & 0 deletions tests/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type IndexDefinitionBuilder struct {
HnswHealerSchedule *string
hnswMergeIndexParallelism *uint32
hnswMergeReIndexParallelism *uint32
enableVectorIntegrityCheck *bool
}

func NewIndexDefinitionBuilder(
Expand Down Expand Up @@ -167,6 +168,11 @@ func (idb *IndexDefinitionBuilder) WithHnswMergeReIndexParallelism(mergeParallel
return idb
}

func (idb *IndexDefinitionBuilder) WithEnableVectorIntegrityCheck(enableVectorIntegrityCheck bool) *IndexDefinitionBuilder {
idb.enableVectorIntegrityCheck = &enableVectorIntegrityCheck
return idb
}

func (idb *IndexDefinitionBuilder) Build() *protos.IndexDefinition {
var indexDef *protos.IndexDefinition

Expand Down

0 comments on commit bd9f5f4

Please sign in to comment.