Skip to content

Commit

Permalink
Initial AVS 0.11.0 support (#17) vec-396 vec-395 vec-402 vec-403
Browse files Browse the repository at this point in the history
* build(dep): use dev go client with avs 0.10.1-snapshot support

* feat: vec-396 add vector records and vertices to index ls verbose output

* feat: vec-395 enable creating indexes with vector integrity check specified using --hnsw-vector-integrity-check

* feat: vec-402 add --hnsw-batch-reindex-interval and --hnsw-batch-max-reindex-records to batch params

* refactor!: vec-403 rename --hnsw-batch-max-records and --hnsw-batch-interval batch params to --hnsw-batch-max-index-records and --hnsw-batch-index-interval

* ci: update test docker compose files to use avs 0.11.1
  • Loading branch information
dwelch-spike authored Oct 23, 2024
1 parent b7de05e commit 74f4ea3
Show file tree
Hide file tree
Showing 21 changed files with 328 additions and 172 deletions.
7 changes: 5 additions & 2 deletions cmd/flags/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ const (
HnswConstructionEf = "hnsw-ef-construction"
HnswEf = "hnsw-ef"
HnswMaxMemQueueSize = "hnsw-max-mem-queue-size"
BatchMaxRecords = "hnsw-batch-max-records"
BatchInterval = "hnsw-batch-interval"
BatchMaxIndexRecords = "hnsw-batch-max-index-records"
BatchIndexInterval = "hnsw-batch-index-interval"
BatchMaxReindexRecords = "hnsw-batch-max-reindex-records"
BatchReindexInterval = "hnsw-batch-reindex-interval"
HnswCacheMaxEntries = "hnsw-cache-max-entries"
HnswCacheExpiry = "hnsw-cache-expiry"
HnswHealerMaxScanRatePerNode = "hnsw-healer-max-scan-rate-per-node"
Expand All @@ -54,6 +56,7 @@ const (
HnswHealerParallelism = "hnsw-healer-parallelism"
HnswMergeParallelism = "hnsw-merge-index-parallelism"
HnswMergeReIndexParallelism = "hnsw-merge-reindex-parallelism"
HnswVectorIntegrityCheck = "hnsw-vector-integrity-check"
TLSProtocols = "tls-protocols"
TLSCaFile = "tls-cafile"
TLSCaPath = "tls-capath"
Expand Down
6 changes: 3 additions & 3 deletions cmd/flags/credentials_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ func TestCredentialsFlag_Type(t *testing.T) {
func TestCredentialsFlag_String(t *testing.T) {
// Test string representation with user and password
flag := CredentialsFlag{
User: StringOptionalFlag{Val: tests.GetStrPtr("username")},
Password: StringOptionalFlag{Val: tests.GetStrPtr("password")},
User: StringOptionalFlag{Val: tests.Ptr("username")},
Password: StringOptionalFlag{Val: tests.Ptr("password")},
}
str := flag.String()
expected := "username:password"
Expand All @@ -58,7 +58,7 @@ func TestCredentialsFlag_String(t *testing.T) {

// Test string representation with user only
flag = CredentialsFlag{
User: StringOptionalFlag{Val: tests.GetStrPtr("username")},
User: StringOptionalFlag{Val: tests.Ptr("username")},
Password: StringOptionalFlag{},
}
str = flag.String()
Expand Down
29 changes: 19 additions & 10 deletions cmd/flags/hnsw.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,37 @@ import (
)

type BatchingFlags struct {
MaxRecords Uint32OptionalFlag
Interval DurationOptionalFlag
MaxIndexRecords Uint32OptionalFlag
IndexInterval DurationOptionalFlag
MaxReindexRecords Uint32OptionalFlag
ReindexInterval DurationOptionalFlag
}

func NewHnswBatchingFlags() *BatchingFlags {
return &BatchingFlags{
MaxRecords: Uint32OptionalFlag{},
Interval: DurationOptionalFlag{},
MaxIndexRecords: Uint32OptionalFlag{},
IndexInterval: DurationOptionalFlag{},
MaxReindexRecords: Uint32OptionalFlag{},
ReindexInterval: DurationOptionalFlag{},
}
}

func (cf *BatchingFlags) NewFlagSet() *pflag.FlagSet {
flagSet := &pflag.FlagSet{}
flagSet.Var(&cf.MaxRecords, BatchMaxRecords, "Maximum number of records to fit in a batch.") //nolint:lll // For readability
flagSet.Var(&cf.Interval, BatchInterval, "The maximum amount of time to wait before finalizing a batch.") //nolint:lll // For readability
flagSet.Var(&cf.MaxIndexRecords, BatchMaxIndexRecords, "Maximum number of records to fit in a batch.") //nolint:lll // For readability
flagSet.Var(&cf.IndexInterval, BatchIndexInterval, "The maximum amount of time to wait before finalizing a batch.") //nolint:lll // For readability
flagSet.Var(&cf.MaxReindexRecords, BatchMaxReindexRecords, "Maximum number of re-index records to fit in a batch.") //nolint:lll // For readability
flagSet.Var(&cf.ReindexInterval, BatchReindexInterval, "The maximum amount of time to wait before finalizing a re-index batch.") //nolint:lll // For readability

return flagSet
}

func (cf *BatchingFlags) NewSLogAttr() []any {
return []any{
slog.Any(BatchMaxRecords, cf.MaxRecords.Val),
slog.Any(BatchInterval, cf.Interval.Val),
slog.Any(BatchMaxIndexRecords, cf.MaxIndexRecords.Val),
slog.Any(BatchIndexInterval, cf.IndexInterval.Val),
slog.Any(BatchMaxReindexRecords, cf.MaxIndexRecords.Val),
slog.Any(BatchReindexInterval, cf.IndexInterval.Val),
}
}

Expand All @@ -45,10 +53,11 @@ func NewHnswCachingFlags() *CachingFlags {
}
}

//nolint:lll // For readability
func (cf *CachingFlags) NewFlagSet() *pflag.FlagSet {
flagSet := &pflag.FlagSet{}
flagSet.Var(&cf.MaxEntries, HnswCacheMaxEntries, "Maximum number of entries to cache.") //nolint:lll // For readability
flagSet.Var(&cf.Expiry, HnswCacheExpiry, "A cache entry will expire after this amount of time has passed since the entry was added to cache") //nolint:lll // For readability
flagSet.Var(&cf.MaxEntries, HnswCacheMaxEntries, "Maximum number of entries to cache.")
flagSet.Var(&cf.Expiry, HnswCacheExpiry, "A cache entry will expire after this amount of time has passed since the entry was added to cache")

return flagSet
}
Expand Down
10 changes: 10 additions & 0 deletions cmd/flags/optionals.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,13 @@ func (f *DurationOptionalFlag) Uint32() *uint32 {

return &milli
}

func (f *DurationOptionalFlag) Int64() *int64 {
if f.Val == nil {
return nil
}

milli := f.Val.Milliseconds()

return &milli
}
19 changes: 19 additions & 0 deletions cmd/flags/optionals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package flags

import (
"testing"
"time"

"github.com/stretchr/testify/suite"
)
Expand Down Expand Up @@ -91,3 +92,21 @@ func (suite *OptionalFlagSuite) TestIntOptionalFlag() {
suite.T().Errorf("Expected error, got nil")
}
}

func (suite *OptionalFlagSuite) TestDurationOptionalFlag() {
f := &DurationOptionalFlag{}

err := f.Set("300ms")
if err != nil {
suite.T().Errorf("Unexpected error: %v", err)
}

if f.Val == nil || *f.Val != time.Duration(300)*time.Millisecond {
suite.T().Errorf("Expected 300ms, got %v", f.Val)
}

err = f.Set("not a time")
if err == nil {
suite.T().Errorf("Expected error, got nil")
}
}
81 changes: 44 additions & 37 deletions cmd/indexCreate.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,39 +21,41 @@ import (

//nolint:govet // Padding not a concern for a CLI
var indexCreateFlags = &struct {
clientFlags *flags.ClientFlags
yes bool
inputFile string
namespace string
set flags.StringOptionalFlag
indexName string
vectorField string
dimensions uint32
distanceMetric flags.DistanceMetricFlag
indexLabels map[string]string
storageNamespace flags.StringOptionalFlag
storageSet flags.StringOptionalFlag
hnswMaxEdges flags.Uint32OptionalFlag
hnswEf flags.Uint32OptionalFlag
hnswConstructionEf flags.Uint32OptionalFlag
hnswMaxMemQueueSize flags.Uint32OptionalFlag
hnswBatch flags.BatchingFlags
hnswCache flags.CachingFlags
hnswHealer flags.HealerFlags
hnswMerge flags.MergeFlags
clientFlags *flags.ClientFlags
yes bool
inputFile string
namespace string
set flags.StringOptionalFlag
indexName string
vectorField string
dimensions uint32
distanceMetric flags.DistanceMetricFlag
indexLabels map[string]string
storageNamespace flags.StringOptionalFlag
storageSet flags.StringOptionalFlag
hnswMaxEdges flags.Uint32OptionalFlag
hnswEf flags.Uint32OptionalFlag
hnswConstructionEf flags.Uint32OptionalFlag
hnswMaxMemQueueSize flags.Uint32OptionalFlag
hnswBatch flags.BatchingFlags
hnswCache flags.CachingFlags
hnswHealer flags.HealerFlags
hnswMerge flags.MergeFlags
hnswVectorIntegrityCheck flags.BoolOptionalFlag
}{
clientFlags: rootFlags.clientFlags,
set: flags.StringOptionalFlag{},
storageNamespace: flags.StringOptionalFlag{},
storageSet: flags.StringOptionalFlag{},
hnswMaxEdges: flags.Uint32OptionalFlag{},
hnswEf: flags.Uint32OptionalFlag{},
hnswConstructionEf: flags.Uint32OptionalFlag{},
hnswMaxMemQueueSize: flags.Uint32OptionalFlag{},
hnswBatch: *flags.NewHnswBatchingFlags(),
hnswCache: *flags.NewHnswCachingFlags(),
hnswHealer: *flags.NewHnswHealerFlags(),
hnswMerge: *flags.NewHnswMergeFlags(),
clientFlags: rootFlags.clientFlags,
set: flags.StringOptionalFlag{},
storageNamespace: flags.StringOptionalFlag{},
storageSet: flags.StringOptionalFlag{},
hnswMaxEdges: flags.Uint32OptionalFlag{},
hnswEf: flags.Uint32OptionalFlag{},
hnswConstructionEf: flags.Uint32OptionalFlag{},
hnswMaxMemQueueSize: flags.Uint32OptionalFlag{},
hnswBatch: *flags.NewHnswBatchingFlags(),
hnswCache: *flags.NewHnswCachingFlags(),
hnswHealer: *flags.NewHnswHealerFlags(),
hnswMerge: *flags.NewHnswMergeFlags(),
hnswVectorIntegrityCheck: flags.BoolOptionalFlag{},
}

func newIndexCreateFlagSet() *pflag.FlagSet {
Expand All @@ -72,7 +74,8 @@ func newIndexCreateFlagSet() *pflag.FlagSet {
flagSet.Var(&indexCreateFlags.hnswMaxEdges, flags.HnswMaxEdges, "Maximum number bi-directional links per HNSW vertex. Greater values of 'm' in general provide better recall for data with high dimensionality, while lower values work well for data with lower dimensionality. The storage space required for the index increases proportionally with 'm'.") //nolint:lll // For readability
flagSet.Var(&indexCreateFlags.hnswConstructionEf, flags.HnswConstructionEf, "The number of candidate nearest neighbors shortlisted during index creation. Larger values provide better recall at the cost of longer index update times.") //nolint:lll // For readability
flagSet.Var(&indexCreateFlags.hnswEf, flags.HnswEf, "The default number of candidate nearest neighbors shortlisted during search. Larger values provide better recall at the cost of longer search times.") //nolint:lll // For readability
flagSet.Var(&indexCreateFlags.hnswMaxMemQueueSize, flags.HnswMaxMemQueueSize, "Maximum size of in-memory queue for inserted/updated vector records.") //nolint:lll // For readability
flagSet.Var(&indexCreateFlags.hnswMaxMemQueueSize, flags.HnswMaxMemQueueSize, "Maximum size of in-memory queue for inserted/updated vector records.") //nolint:lll // For readability //nolint:lll // For readability
flagSet.Var(&indexCreateFlags.hnswVectorIntegrityCheck, flags.HnswVectorIntegrityCheck, "Enable/disable vector integrity check. Defaults to enabled.") //nolint:lll // For readability
flagSet.AddFlagSet(indexCreateFlags.hnswBatch.NewFlagSet())
flagSet.AddFlagSet(indexCreateFlags.hnswCache.NewFlagSet())
flagSet.AddFlagSet(indexCreateFlags.hnswHealer.NewFlagSet())
Expand Down Expand Up @@ -228,6 +231,7 @@ asvec index create -i myindex -n test -s testset -d 256 -m COSINE --%s vector \
slog.Any(flags.HnswEf, indexCreateFlags.hnswEf.Val),
slog.Any(flags.HnswConstructionEf, indexCreateFlags.hnswConstructionEf.Val),
slog.Any(flags.HnswMaxMemQueueSize, indexCreateFlags.hnswMaxMemQueueSize.Val),
slog.Any(flags.HnswVectorIntegrityCheck, indexCreateFlags.hnswVectorIntegrityCheck.Val),
)...,
)

Expand Down Expand Up @@ -330,12 +334,14 @@ func runCreateIndexFromFlags(client *avs.Client) error {
EfConstruction: indexCreateFlags.hnswConstructionEf.Val,
MaxMemQueueSize: indexCreateFlags.hnswMaxMemQueueSize.Val,
BatchingParams: &protos.HnswBatchingParams{
MaxRecords: indexCreateFlags.hnswBatch.MaxRecords.Val,
Interval: indexCreateFlags.hnswBatch.Interval.Uint32(),
MaxIndexRecords: indexCreateFlags.hnswBatch.MaxIndexRecords.Val,
IndexInterval: indexCreateFlags.hnswBatch.IndexInterval.Uint32(),
MaxReindexRecords: indexCreateFlags.hnswBatch.MaxReindexRecords.Val,
ReindexInterval: indexCreateFlags.hnswBatch.ReindexInterval.Uint32(),
},
CachingParams: &protos.HnswCachingParams{
IndexCachingParams: &protos.HnswCachingParams{
MaxEntries: indexCreateFlags.hnswCache.MaxEntries.Val,
Expiry: indexCreateFlags.hnswCache.Expiry.Uint64(),
Expiry: indexCreateFlags.hnswCache.Expiry.Int64(),
},
HealerParams: &protos.HnswHealerParams{
MaxScanRatePerNode: indexCreateFlags.hnswHealer.MaxScanRatePerNode.Val,
Expand All @@ -348,6 +354,7 @@ func runCreateIndexFromFlags(client *avs.Client) error {
IndexParallelism: indexCreateFlags.hnswMerge.IndexParallelism.Val,
ReIndexParallelism: indexCreateFlags.hnswMerge.ReIndexParallelism.Val,
},
EnableVectorIntegrityCheck: indexCreateFlags.hnswVectorIntegrityCheck.Val,
},
}

Expand Down
Loading

0 comments on commit 74f4ea3

Please sign in to comment.