Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add rollout restart e2e/v2 test #2875

Draft
wants to merge 1 commit into
base: refactor/test-e2e/add-v2-e2e-testing
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ NGT_LDFLAGS = -fopenmp -lopenblas -llapack
FAISS_LDFLAGS = $(NGT_LDFLAGS) -lgfortran
HDF5_LDFLAGS = -lhdf5 -lhdf5_hl -lsz -laec -lz -ldl
CGO_LDFLAGS = $(FAISS_LDFLAGS) $(HDF5_LDFLAGS)
TEST_LDFLAGS = $(LDFLAGS) $(FAISS_LDFLAGS) $(HDF5_LDFLAGS)
TEST_LDFLAGS = $(FAISS_LDFLAGS) $(HDF5_LDFLAGS)

ifeq ($(GOARCH),amd64)
CFLAGS ?= -mno-avx512f -mno-avx512dq -mno-avx512cd -mno-avx512bw -mno-avx512vl
Expand Down
4 changes: 4 additions & 0 deletions Makefile.d/e2e.mk
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
e2e:
$(call run-e2e-crud-test,-run TestE2EStandardCRUD)

.PHONY: e2e/v2/crud/unaly/rollout/restart
e2e/v2/crud/unaly/rollout/restart:
$(call run-e2e-v2-test,-run TestE2EUnaryRolloutRestartAgentCRUD)

.PHONY: e2e/faiss
## run e2e/faiss
e2e/faiss:
Expand Down
15 changes: 15 additions & 0 deletions Makefile.d/functions.mk
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,21 @@ define telepresence
## --deployment-type "$(SWAP_DEPLOYMENT_TYPE)"
endef

define run-e2e-v2-test
GOPRIVATE=$(GOPRIVATE) \
GOARCH=$(GOARCH) \
GOOS=$(GOOS) \
CGO_LDFLAGS="$(TEST_LDFLAGS)" \
go test \
-race \
-mod=readonly \
$(ROOTDIR)/tests/v2/e2e/crud \
$1 \
-tags "e2e" \
-args \
-config $(E2E_CONFIG)
endef

define run-e2e-crud-test
GOPRIVATE=$(GOPRIVATE) \
GOARCH=$(GOARCH) \
Expand Down
11 changes: 11 additions & 0 deletions tests/v2/e2e/config/sample.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
target:
addrs:
- localhost
dataset:
name: "fashion-mnist-784-euclidean.hdf5"
index:
wait_after_inesrt: "2m"
kubernetes:
kubeconfig: ${HOME}/.kube/config
portforward:
enabled: false
9 changes: 5 additions & 4 deletions tests/v2/e2e/crud/crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,12 @@ func TestMain(m *testing.M) {
ctx, cancel = context.WithCancel(context.Background())
defer cancel()

kclient, err = k8s.NewClient(cfg.Kubernetes.KubeConfig, "")
if err != nil {
log.Fatalf("failed to create kubernetes client: %v", err)
}

if cfg.Kubernetes.PortForward.Enabled {
kclient, err = k8s.NewClient(cfg.Kubernetes.KubeConfig, "")
if err != nil {
log.Fatalf("failed to create kubernetes client: %v", err)
}
stop, _, err := k8s.Portforward(ctx, kclient,
cfg.Kubernetes.PortForward.Namespace,
cfg.Kubernetes.PortForward.PodName,
Expand Down
237 changes: 237 additions & 0 deletions tests/v2/e2e/crud/rollout_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
//go:build e2e

//
// Copyright (C) 2019-2025 vdaas.org vald team <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

// package crud provides e2e tests using ann-benchmarks datasets
package crud

import (
"strconv"
"testing"
"time"

"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/vdaas/vald/apis/grpc/v1/payload"
"github.com/vdaas/vald/internal/net/grpc/status"
"github.com/vdaas/vald/internal/safety"
"github.com/vdaas/vald/internal/sync/errgroup"
"github.com/vdaas/vald/tests/v2/e2e/kubernetes"
)

func TestE2EUnaryRolloutRestartAgentCRUD(t *testing.T) {
timestamp := time.Now().UnixNano()

t.Log(cfg, ctx)

{
res, err := client.IndexProperty(ctx, &payload.Empty{})
if err != nil {
st, ok := status.FromError(err)
if ok && st != nil {
t.Errorf("failed to get IndexProperty %v status: %s", err, st.String())
} else {
t.Errorf("failed to get IndexProperty %v", err)
}
}
t.Logf("IndexProperty: %v", res.String())
}

var eg errgroup.Group
if cfg.Insert.Num != 0 {
eg, _ := errgroup.New(ctx)
eg.SetLimit(int(cfg.Insert.Concurrency))
for i, vec := range ds.Train[cfg.Insert.Offset : cfg.Insert.Offset+cfg.Insert.Num] {
id := strconv.Itoa(i)
ts := cfg.Insert.Timestamp
if ts == 0 {
ts = timestamp
}
eg.Go(safety.RecoverFunc(func() error {
res, err := client.Insert(ctx, &payload.Insert_Request{
Vector: &payload.Object_Vector{
Id: id,
Vector: vec,
Timestamp: ts,
},
Config: &payload.Insert_Config{
Timestamp: ts,
SkipStrictExistCheck: cfg.Insert.SkipStrictExistCheck,
},
})
if err != nil {
st, ok := status.FromError(err)
if ok && st != nil {
t.Errorf("failed to insert vector: %v, status: %s", err, st.String())
} else {
t.Errorf("failed to insert vector: %v", err)
}
}
t.Logf("vector %v id %s inserted to %s", vec, id, res.String())
return nil
}))
}
eg.Wait()

sleep(t, cfg.Index.WaitAfterInsert)

}

indexStatus(t, ctx)

// TODO: inifinite search
done := make(chan struct{})
if cfg.Search.Num != 0 {
go func() {
for {
select {
case <-done:
return
default:
eg, _ = errgroup.New(ctx)
eg.SetLimit(int(cfg.Search.Concurrency))
for i, vec := range ds.Test[cfg.Search.Offset : cfg.Search.Offset+cfg.Search.Num] {
for _, query := range cfg.Search.Queries {
id := strconv.Itoa(i)
rid := id + "-" + payload.Search_AggregationAlgorithm_name[int32(query.Algorithm)]
eg.Go(safety.RecoverFunc(func() error {
var ratio *wrapperspb.FloatValue
if query.Ratio != 0 {
ratio = wrapperspb.Float(query.Ratio)
} else {
ratio = nil
}
res, err := client.Search(ctx, &payload.Search_Request{
Vector: vec,
Config: &payload.Search_Config{
RequestId: rid,
Num: query.K,
Radius: query.Radius,
Epsilon: query.Epsilon,
Timeout: query.Timeout.Nanoseconds(),
AggregationAlgorithm: query.Algorithm,
MinNum: query.MinNum,
Ratio: ratio,
Nprobe: query.Nprobe,
},
})
if err != nil {
st, ok := status.FromError(err)
if ok && st != nil {
t.Errorf("failed to search vector: %v, status: %s", err, st.String())
} else {
t.Errorf("failed to search vector: %v", err)
}
}
t.Logf("vector %v id %s searched recall: %f, payload %s", vec, rid, calculateRecall(t, res, i), res.String())
return nil
}))
}
}
eg.Wait()
}
}
}()

eg, _ = errgroup.New(ctx)
eg.Go(safety.RecoverFunc(func() error {
statefulSetClient := kclient.GetClientSet().AppsV1().StatefulSets("default")
err := kubernetes.RolloutRestart(ctx, statefulSetClient, "vald-agent")
if err != nil {
t.Logf("failed to rollout restart: %v", err)
return err
}
obj, matched, err := kubernetes.WaitForStatus(ctx, statefulSetClient, "vald-agent", kubernetes.StatusAvailable)
t.Logf("Completed wait for stateful set ready: %#v", obj)
if matched {
return nil
}
return err
}))
err := eg.Wait()
close(done)
if err != nil {
t.Fatalf("failed to rollout restart: %s", err.Error())
}
}

eg, _ = errgroup.New(ctx)
eg.SetLimit(int(cfg.Remove.Concurrency))
for i := range ds.Train[cfg.Remove.Offset : cfg.Remove.Offset+cfg.Remove.Num] {
id := strconv.Itoa(i)
ts := cfg.Remove.Timestamp
if ts == 0 {
ts = timestamp
}
eg.Go(safety.RecoverFunc(func() error {
res, err := client.Remove(ctx, &payload.Remove_Request{
Id: &payload.Object_ID{Id: id},
Config: &payload.Remove_Config{
Timestamp: ts,
SkipStrictExistCheck: cfg.Remove.SkipStrictExistCheck,
},
})
if err != nil {
st, ok := status.FromError(err)
if ok && st != nil {
t.Errorf("failed to remove vector: %v, status: %s", err, st.String())
} else {
t.Errorf("failed to remove vector: %v", err)
}
}
t.Logf("id %s'd vector removed to %s", id, res.String())
return nil
}))
}
eg.Wait()

{
rts := time.Now().Add(-time.Hour).UnixNano()
res, err := client.RemoveByTimestamp(ctx, &payload.Remove_TimestampRequest{
Timestamps: []*payload.Remove_Timestamp{
{
Timestamp: rts,
Operator: payload.Remove_Timestamp_Le,
},
},
})
if err != nil {
st, ok := status.FromError(err)
if ok && st != nil {
t.Errorf("failed to remove by timestamp vector: %v, status: %s", err, st.String())
} else {
t.Errorf("failed to remove by timestamp vector: %v", err)
}
}
t.Logf("removed by timestamp %s to %s", time.Unix(0, rts).String(), res.String())
}

{
res, err := client.Flush(ctx, &payload.Flush_Request{})
if err != nil {
st, ok := status.FromError(err)
if ok && st != nil {
t.Errorf("failed to flush %v, status: %s", err, st.String())
} else {
t.Errorf("failed to flush %v", err)
}
}
t.Logf("flushed %s", res.String())
}

indexStatus(t, ctx)
}
Loading