From 889f4f1121472e586d576657cb54e1ab11887b1b Mon Sep 17 00:00:00 2001 From: Joel Takvorian Date: Mon, 16 Dec 2024 08:46:45 +0100 Subject: [PATCH] Improve e2e troubleshooting (#448) * Improve e2e troubleshooting Improve / fix some issues with e2e tests: - Add more logs; print some useful information such as when cluster is still up - Improve readiness (e.g: had agents pods crashing) - Use more up to date templates for loki and kafka (similar to what we have in docs repo) * remove -a flag; do not tag e2e Tagging e2e is not necessary and has some undesired side effect such as excluding these e2e source files from building/linting, which can invisibilise some problems * Add doc --- Makefile | 7 ++- README.md | 4 ++ e2e/README.md | 66 +++++++++++++++++++++ e2e/basic/common.go | 11 ++-- e2e/basic/flow_test.go | 3 +- e2e/cluster/base/02-loki.yml | 74 +++++++++++++++++++---- e2e/cluster/kind.go | 87 +++++++++++++++++++++------- e2e/cluster/kind_test.go | 31 +++++++--- e2e/cluster/tester/loki.go | 14 +++++ e2e/cluster/tester/pods.go | 15 ++++- e2e/ipfix/ipfix_test.go | 4 +- e2e/kafka/kafka_test.go | 28 +++++---- e2e/kafka/manifests/10-kafka-crd.yml | 4 +- 13 files changed, 276 insertions(+), 72 deletions(-) create mode 100644 e2e/README.md diff --git a/Makefile b/Makefile index be7af0d89..237b2af5c 100644 --- a/Makefile +++ b/Makefile @@ -144,7 +144,7 @@ docker-generate: ## Create the container that generates the eBPF binaries .PHONY: compile compile: ## Compile ebpf agent project @echo "### Compiling project" - GOARCH=${GOARCH} GOOS=$(GOOS) go build -mod vendor -a -o bin/netobserv-ebpf-agent cmd/netobserv-ebpf-agent.go + GOARCH=${GOARCH} GOOS=$(GOOS) go build -mod vendor -o bin/netobserv-ebpf-agent cmd/netobserv-ebpf-agent.go .PHONY: build-and-push-bc-image build-and-push-bc-image: docker-generate ## Build and push bytecode image @@ -153,7 +153,7 @@ build-and-push-bc-image: docker-generate ## Build and push bytecode image .PHONY: test test: ## Test code using go test @echo "### Testing code" - GOOS=$(GOOS) go test -mod vendor -a ./... -coverpkg=./... -coverprofile cover.all.out + GOOS=$(GOOS) go test -mod vendor ./pkg/... ./cmd/... -coverpkg=./... -coverprofile cover.all.out .PHONY: cov-exclude-generated cov-exclude-generated: @@ -175,7 +175,8 @@ tests-e2e: prereqs ## Run e2e tests go clean -testcache # making the local agent image available to kind in two ways, so it will work in different # environments: (1) as image tagged in the local repository (2) as image archive. - $(OCI_BIN) build . --build-arg TARGETARCH=$(GOARCH) -t localhost/ebpf-agent:test + rm -f ebpf-agent.tar || true + $(OCI_BIN) build . --build-arg LDFLAGS="" --build-arg TARGETARCH=$(GOARCH) -t localhost/ebpf-agent:test $(OCI_BIN) save -o ebpf-agent.tar localhost/ebpf-agent:test GOOS=$(GOOS) go test -p 1 -timeout 30m -v -mod vendor -tags e2e ./e2e/... diff --git a/README.md b/README.md index 7d94c135b..d5c66a89b 100644 --- a/README.md +++ b/README.md @@ -132,6 +132,10 @@ make generate Regularly tested on Fedora. +### Running end-to-end tests + +Refer to the specific documentation: [e2e readme](./e2e/README.md) + ## Known issues ### Extrenal Traffic in Openshift (OVN-Kubernetes CNI) diff --git a/e2e/README.md b/e2e/README.md new file mode 100644 index 000000000..827eb43c6 --- /dev/null +++ b/e2e/README.md @@ -0,0 +1,66 @@ +## eBPF Agent e2e tests + +e2e tests can be run with: + +```bash +make tests-e2e +``` + +If you use podman, you may need to run it as root instead: + +```bash +sudo make tests-e2e +``` + +### What it does + +It builds an image with the current code, including pre-generated BPF bytecode, starts a KIND cluster and deploys the agent on it. It also deploys a typical NetObserv stack, that includes flowlogs-pipeline, Loki and/or Kafka. + +It then runs a couple of smoke tests on that cluster, such as testing sending pings between pods and verifying that the expected flows are created. + +The tests leverage Kube's [e2e-framework](https://github.com/kubernetes-sigs/e2e-framework). They are based on manifest files that you can find in [this directory](./cluster/base/). + +### How to troubleshoot + +During the tests, you can run any `kubectl` command to the KIND cluster. + +If you use podman/root and don't want to open a root session you can simply copy the root kube config: + +```bash +sudo cp /root/.kube/config /tmp/agent-kind-kubeconfig +sudo -E chown $USER:$USER /tmp/agent-kind-kubeconfig +export KUBECONFIG=/tmp/agent-kind-kubeconfig +``` + +Then: + +```bash +$ kubectl get pods +NAME READY STATUS RESTARTS AGE +flp-29bmd 1/1 Running 0 6s +loki-7c98dfd6d4-c8q9m 1/1 Running 0 56s +``` + +### Cleanup + +The KIND cluster should be cleaned up after tests. Sometimes it won't, like with forced exit or for some kinds of failures. +When that's the case, you should see a message telling you to manually cleanup the cluster: + +``` +^CSIGTERM received, cluster might still be running +To clean up, run: kind delete cluster --name basic-test-cluster20241212-125815 +FAIL github.com/netobserv/netobserv-ebpf-agent/e2e/basic 172.852s +``` + +If that's not the case, you can manually retrieve the cluster name to delete: + +```bash +$ kind get clusters +basic-test-cluster20241212-125815 + +$ kind delete cluster --name=basic-test-cluster20241212-125815 +Deleting cluster "basic-test-cluster20241212-125815" ... +Deleted nodes: ["basic-test-cluster20241212-125815-control-plane"] +``` + +If not cleaned up, a subsequent run of e2e tests will fail due to addresses (ports) already in use. diff --git a/e2e/basic/common.go b/e2e/basic/common.go index 1bea80094..46f710524 100644 --- a/e2e/basic/common.go +++ b/e2e/basic/common.go @@ -1,5 +1,3 @@ -//go:build e2e - package basic import ( @@ -37,7 +35,7 @@ func (bt *FlowCaptureTester) DoTest(t *testing.T, isIPFIX bool) { return ctx }, ).Assess("correctness of client -> server (as Service) request flows", - func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context { + func(ctx context.Context, t *testing.T, _ *envconf.Config) context.Context { lq := bt.lokiQuery(t, `{DstK8S_OwnerName="server",SrcK8S_OwnerName="client"}`+ `|="\"DstAddr\":\"`+pci.serverServiceIP+`\""`) @@ -82,7 +80,7 @@ func (bt *FlowCaptureTester) DoTest(t *testing.T, isIPFIX bool) { return ctx }, ).Assess("correctness of client -> server (as Pod) request flows", - func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context { + func(ctx context.Context, t *testing.T, _ *envconf.Config) context.Context { lq := bt.lokiQuery(t, `{DstK8S_OwnerName="server",SrcK8S_OwnerName="client"}`+ `|="\"DstAddr\":\"`+pci.serverPodIP+`\""`) @@ -124,7 +122,7 @@ func (bt *FlowCaptureTester) DoTest(t *testing.T, isIPFIX bool) { return ctx }, ).Assess("correctness of server (from Service) -> client response flows", - func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context { + func(ctx context.Context, t *testing.T, _ *envconf.Config) context.Context { lq := bt.lokiQuery(t, `{DstK8S_OwnerName="client",SrcK8S_OwnerName="server"}`+ `|="\"SrcAddr\":\"`+pci.serverServiceIP+`\""`) @@ -167,7 +165,7 @@ func (bt *FlowCaptureTester) DoTest(t *testing.T, isIPFIX bool) { return ctx }, ).Assess("correctness of server (from Pod) -> client response flows", - func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context { + func(ctx context.Context, t *testing.T, _ *envconf.Config) context.Context { lq := bt.lokiQuery(t, `{DstK8S_OwnerName="client",SrcK8S_OwnerName="server"}`+ `|="\"SrcAddr\":\"`+pci.serverPodIP+`\""`) @@ -282,6 +280,7 @@ func (bt *FlowCaptureTester) lokiQuery(t *testing.T, logQL string) tester.LokiQu query, err = bt.Cluster.Loki().Query(1, logQL) require.NoError(t, err) require.NotNil(t, query) + require.NotNil(t, query.Data) require.NotEmpty(t, query.Data.Result) }, test.Interval(time.Second)) result := query.Data.Result[0] diff --git a/e2e/basic/flow_test.go b/e2e/basic/flow_test.go index 97035e686..89cf13b9a 100644 --- a/e2e/basic/flow_test.go +++ b/e2e/basic/flow_test.go @@ -1,5 +1,3 @@ -//go:build e2e - package basic import ( @@ -152,6 +150,7 @@ func getPingFlows(t *testing.T, newerThan time.Time, expectedBytes int) (sent, r }, test.Interval(time.Second)) test.Eventually(t, time.Minute, func(t require.TestingT) { + // testCluster.Loki().DebugPrint(100, `{app="netobserv-flowcollector",DstK8S_OwnerName="pinger"}`) query, err = testCluster.Loki(). Query(1, fmt.Sprintf(`{SrcK8S_OwnerName="server",DstK8S_OwnerName="pinger"}`+ `|~"\"Proto\":1[,}]"`+ // Proto 1 == ICMP diff --git a/e2e/cluster/base/02-loki.yml b/e2e/cluster/base/02-loki.yml index 463f84086..1d6a1b270 100644 --- a/e2e/cluster/base/02-loki.yml +++ b/e2e/cluster/base/02-loki.yml @@ -20,6 +20,11 @@ data: server: http_listen_port: 3100 grpc_listen_port: 9096 + grpc_server_max_recv_msg_size: 10485760 + http_server_read_timeout: 1m + http_server_write_timeout: 1m + log_level: error + target: all common: path_prefix: /loki-store storage: @@ -31,9 +36,32 @@ data: instance_addr: 127.0.0.1 kvstore: store: inmemory + compactor: + compaction_interval: 5m + retention_enabled: true + retention_delete_delay: 2h + retention_delete_worker_count: 150 + frontend: + compress_responses: true + ingester: + chunk_encoding: snappy + chunk_retain_period: 1m + query_range: + align_queries_with_step: true + cache_results: true + max_retries: 5 + results_cache: + cache: + enable_fifocache: true + fifocache: + max_size_bytes: 500MB + validity: 24h + parallelise_shardable_queries: true + query_scheduler: + max_outstanding_requests_per_tenant: 2048 schema_config: configs: - - from: 2020-10-24 + - from: 2022-01-01 store: boltdb-shipper object_store: filesystem schema: v11 @@ -47,15 +75,39 @@ data: active_index_directory: /loki-store/index shared_store: filesystem cache_location: /loki-store/boltdb-cache - datasource.yaml: | - apiVersion: 1 - datasources: - - name: Loki - type: loki - access: proxy - url: http://localhost:3100 - isDefault: true - version: 1 + cache_ttl: 24h + limits_config: + ingestion_rate_strategy: global + ingestion_rate_mb: 10 + ingestion_burst_size_mb: 10 + max_label_name_length: 1024 + max_label_value_length: 2048 + max_label_names_per_series: 30 + reject_old_samples: true + reject_old_samples_max_age: 15m + creation_grace_period: 10m + enforce_metric_name: false + max_line_size: 256000 + max_line_size_truncate: false + max_entries_limit_per_query: 10000 + max_streams_per_user: 0 + max_global_streams_per_user: 0 + unordered_writes: true + max_chunks_per_query: 2000000 + max_query_length: 721h + max_query_parallelism: 32 + max_query_series: 10000 + cardinality_limit: 100000 + max_streams_matchers_per_query: 1000 + max_concurrent_tail_requests: 10 + retention_period: 24h + max_cache_freshness_per_query: 5m + max_queriers_per_tenant: 0 + per_stream_rate_limit: 3MB + per_stream_rate_limit_burst: 15MB + max_query_lookback: 0 + min_sharding_lookback: 0s + split_queries_by_interval: 1m --- apiVersion: apps/v1 kind: Deployment @@ -83,7 +135,7 @@ spec: name: loki-config containers: - name: loki - image: grafana/loki:2.4.1 + image: grafana/loki:2.9.0 volumeMounts: - mountPath: "/loki-store" name: loki-store diff --git a/e2e/cluster/kind.go b/e2e/cluster/kind.go index d447dd1c0..a36af8f04 100644 --- a/e2e/cluster/kind.go +++ b/e2e/cluster/kind.go @@ -12,13 +12,14 @@ import ( "fmt" "io" "os" + "os/signal" "path" + rt2 "runtime" "sort" + "syscall" "testing" "time" - rt2 "runtime" - "github.com/netobserv/netobserv-ebpf-agent/e2e/cluster/tester" "github.com/sirupsen/logrus" @@ -90,18 +91,43 @@ var defaultBaseDeployments = map[DeployID]Deployment{ Loki: { Order: ExternalServices, ManifestFile: path.Join(packageDir(), "base", "02-loki.yml"), - ReadyFunction: func(*envconf.Config) error { - return (&tester.Loki{BaseURL: "http://127.0.0.1:30100"}).Ready() + Ready: &Readiness{ + Function: func(*envconf.Config) error { return (&tester.Loki{BaseURL: "http://localhost:30100"}).Ready() }, + Description: "Check that http://localhost:30100 is reachable (Loki NodePort)", + Timeout: 5 * time.Minute, + Retry: 5 * time.Second, }, }, FlowLogsPipeline: { Order: NetObservServices, ManifestFile: path.Join(packageDir(), "base", "03-flp.yml"), + Ready: &Readiness{ + Function: testPodsReady("flp"), + Description: "Check that flp pods are up and running", + Timeout: 5 * time.Minute, + Retry: 5 * time.Second, + }, }, Agent: { Order: WithAgent, ManifestFile: path.Join(packageDir(), "base", "04-agent.yml"), + Ready: &Readiness{ + Function: testPodsReady("netobserv-ebpf-agent"), + Description: "Check that agent pods are up and running", + Timeout: 5 * time.Minute, + Retry: 5 * time.Second, + }, }, } +func testPodsReady(dsName string) func(*envconf.Config) error { + return func(cfg *envconf.Config) error { + pods, err := tester.NewPods(cfg) + if err != nil { + return err + } + return pods.DSReady(context.Background(), "default", dsName) + } +} + // Deployment of components. Not only K8s deployments but also Pods, Services, DaemonSets, ... type Deployment struct { // Order of the deployment. Deployments with the same order will be executed by alphabetical @@ -109,9 +135,14 @@ type Deployment struct { Order DeployOrder // ManifestFile path to the kubectl-like YAML manifest file ManifestFile string - // ReadyFunction is an optional function that returns error if the deployment is not ready. - // Used when it's needed to wait before starting tests or deploying later components. - ReadyFunction func(*envconf.Config) error + Ready *Readiness +} + +type Readiness struct { + Function func(*envconf.Config) error + Description string + Timeout time.Duration + Retry time.Duration } // Kind cluster deployed by each TestMain function, prepared for a given test scenario. @@ -146,6 +177,7 @@ func Deploy(def Deployment) Option { // Timeout for long-running operations (e.g. deployments, readiness probes...) func Timeout(t time.Duration) Option { + log.Infof("Timeout set to %s", t.String()) return func(k *Kind) { k.timeout = t } @@ -156,6 +188,9 @@ func Timeout(t time.Duration) Option { // backend doesn't provide access to the local images, where the ebpf-agent.tar container image // is located. Usually it will be the project root. func NewKind(kindClusterName, baseDir string, options ...Option) *Kind { + fmt.Println() + fmt.Println() + log.Infof("Starting KIND cluster %s", kindClusterName) k := &Kind{ testEnv: env.New(), baseDir: baseDir, @@ -191,10 +226,19 @@ func (k *Kind) Run(m *testing.M) { currentOrder = c.Order } envFuncs = append(envFuncs, deploy(c)) - readyFuncs = append(readyFuncs, withTimeout(isReady(c), k.timeout)) + readyFuncs = append(readyFuncs, isReady(c)) } envFuncs = append(envFuncs, readyFuncs...) + exit := make(chan os.Signal, 1) + signal.Notify(exit, os.Interrupt, syscall.SIGTERM) + go func() { + <-exit + fmt.Println("SIGTERM received, cluster might still be running") + fmt.Printf("To clean up, run: \033[33mkind delete cluster --name %s\033[0m\n", k.clusterName) + os.Exit(1) + }() + log.Info("starting kind setup") code := k.testEnv.Setup(envFuncs...). Finish( @@ -244,7 +288,7 @@ func (k *Kind) TestEnv() env.Environment { // Loki client pointing to the Loki instance inside the test cluster func (k *Kind) Loki() *tester.Loki { - return &tester.Loki{BaseURL: "http://127.0.0.1:30100"} + return &tester.Loki{BaseURL: "http://localhost:30100"} } func deploy(definition Deployment) env.Func { @@ -285,6 +329,7 @@ func deployManifestFile(definition Deployment, if !errors.Is(err, io.EOF) { return fmt.Errorf("decoding manifest raw object: %w", err) } + log.WithField("file", definition.ManifestFile).Info("done") // eof return nil } @@ -344,7 +389,7 @@ func (k *Kind) loadLocalImage() env.Func { } // withTimeout retries the execution of an env.Func until it succeeds or a timeout is reached -func withTimeout(f env.Func, timeout time.Duration) env.Func { +func withTimeout(f env.Func, timeout, retry time.Duration) env.Func { tlog := log.WithField("function", "withTimeout") return func(ctx context.Context, config *envconf.Config) (context.Context, error) { start := time.Now() @@ -356,26 +401,24 @@ func withTimeout(f env.Func, timeout time.Duration) env.Func { if time.Since(start) > timeout { return ctx, fmt.Errorf("timeout (%s) trying to execute function: %w", timeout, err) } - tlog.WithError(err).Debug("function did not succeed. Retrying after 5s") - time.Sleep(5 * time.Second) + tlog.WithError(err).Debugf("function did not succeed. Retrying after %s", retry.String()) + time.Sleep(retry) } } } // isReady succeeds if the passed deployment does not have ReadyFunction, or it succeeds func isReady(definition Deployment) env.Func { - return withTimeout(func(ctx context.Context, cfg *envconf.Config) (context.Context, error) { - if definition.ReadyFunction != nil { - log.WithFields(logrus.Fields{ - "function": "isReady", - "deployment": definition.ManifestFile, - }).Debug("checking readiness") - if err := definition.ReadyFunction(cfg); err != nil { + if definition.Ready != nil { + log.WithFields(logrus.Fields{"deployment": definition.ManifestFile, "readiness": definition.Ready.Description}).Infof("Readiness check set with timeout: %s", definition.Ready.Timeout.String()) + return withTimeout(func(ctx context.Context, cfg *envconf.Config) (context.Context, error) { + if err := definition.Ready.Function(cfg); err != nil { return ctx, fmt.Errorf("component not ready: %w", err) } - } - return ctx, nil - }, time.Minute*20) + return ctx, nil + }, definition.Ready.Timeout, definition.Ready.Retry) + } + return func(ctx context.Context, _ *envconf.Config) (context.Context, error) { return ctx, nil } } // helper to get the base directory of this package, allowing to load the test deployment diff --git a/e2e/cluster/kind_test.go b/e2e/cluster/kind_test.go index fd7397ef6..0967dec41 100644 --- a/e2e/cluster/kind_test.go +++ b/e2e/cluster/kind_test.go @@ -18,13 +18,28 @@ func TestOrderManifests(t *testing.T) { Deploy(Deployment{Order: ExternalServices, ManifestFile: "sql"}), Override(Loki, Deployment{Order: ExternalServices, ManifestFile: "loki"})) + var orders []DeployOrder + var files []string + for _, m := range tc.orderedManifests() { + orders = append(orders, m.Order) + files = append(files, m.ManifestFile) + } + // verify that deployments are overridden and/or inserted in proper order - require.Equal(t, []Deployment{ - {Order: Preconditions, ManifestFile: path.Join(packageDir(), "base", "01-permissions.yml")}, - {Order: ExternalServices, ManifestFile: "sql"}, - {Order: ExternalServices, ManifestFile: "loki"}, - {Order: NetObservServices, ManifestFile: path.Join(packageDir(), "base", "03-flp.yml")}, - {Order: WithAgent, ManifestFile: path.Join(packageDir(), "base", "04-agent.yml")}, - {ManifestFile: "pods.yml"}, - }, tc.orderedManifests()) + require.Equal(t, []DeployOrder{ + Preconditions, + ExternalServices, + ExternalServices, + NetObservServices, + WithAgent, + 0, + }, orders) + require.Equal(t, []string{ + path.Join(packageDir(), "base", "01-permissions.yml"), + "sql", + "loki", + path.Join(packageDir(), "base", "03-flp.yml"), + path.Join(packageDir(), "base", "04-agent.yml"), + "pods.yml", + }, files) } diff --git a/e2e/cluster/tester/loki.go b/e2e/cluster/tester/loki.go index 4c7f2b2a8..c6dee3b22 100644 --- a/e2e/cluster/tester/loki.go +++ b/e2e/cluster/tester/loki.go @@ -51,6 +51,20 @@ func (l *Loki) Ready() error { return nil } +func (l *Loki) DebugPrint(limit int, query string) { + fmt.Printf("---- DEBUG PRINT %d ----\n", limit) + resp, err := l.Query(limit, query) + if err != nil { + fmt.Printf("Error: %v\n", err) + return + } + if resp == nil { + fmt.Printf("Response is nil\n") + return + } + fmt.Printf("LOKI CONTENT: %v\n", resp.Data.Result) +} + // Query executes an arbitrary logQL query, given a limit in the results func (l *Loki) Query(limit int, logQL string) (*LokiQueryResponse, error) { status, body, err := l.get(fmt.Sprintf("%s?%s=%d&%s&%s=%s", diff --git a/e2e/cluster/tester/pods.go b/e2e/cluster/tester/pods.go index 5ac141f51..e7190afc0 100644 --- a/e2e/cluster/tester/pods.go +++ b/e2e/cluster/tester/pods.go @@ -33,9 +33,7 @@ func NewPods(cfg *envconf.Config) (*Pods, error) { }, nil } -func (p *Pods) MACAddress( - ctx context.Context, namespace, name, iface string, -) (net.HardwareAddr, error) { +func (p *Pods) MACAddress(ctx context.Context, namespace, name, iface string) (net.HardwareAddr, error) { mac, errStr, err := p.Execute(ctx, namespace, name, "cat", "/sys/class/net/"+iface+"/address") if err != nil { return nil, fmt.Errorf("executing command: %w", err) @@ -78,3 +76,14 @@ func (p *Pods) Execute(ctx context.Context, namespace, name string, command ...s } return buf.String(), errBuf.String(), nil } + +func (p *Pods) DSReady(ctx context.Context, namespace, name string) error { + ds, err := p.client.AppsV1().DaemonSets(namespace).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("getting DS %s: %w", name, err) + } + if ds.Status.NumberReady != 1 { + return fmt.Errorf("%s not ready", name) + } + return nil +} diff --git a/e2e/ipfix/ipfix_test.go b/e2e/ipfix/ipfix_test.go index 6a30bebf7..738360e38 100644 --- a/e2e/ipfix/ipfix_test.go +++ b/e2e/ipfix/ipfix_test.go @@ -1,5 +1,3 @@ -//go:build e2e - package basic import ( @@ -14,7 +12,7 @@ import ( const ( clusterNamePrefix = "ipfix-test-cluster" - testTimeout = 20 * time.Minute + testTimeout = 10 * time.Minute namespace = "default" ) diff --git a/e2e/kafka/kafka_test.go b/e2e/kafka/kafka_test.go index 3f144b6ea..197dd7cbd 100644 --- a/e2e/kafka/kafka_test.go +++ b/e2e/kafka/kafka_test.go @@ -1,5 +1,3 @@ -//go:build e2e - package basic import ( @@ -43,12 +41,16 @@ func TestMain(m *testing.M) { }), cluster.Deploy(cluster.Deployment{ Order: cluster.ExternalServices, ManifestFile: path.Join("manifests", "11-kafka-cluster.yml"), - ReadyFunction: func(cfg *envconf.Config) error { - // wait for kafka to be ready - if !checkResources(cfg.Client(), "kafka-cluster-zookeeper", "kafka-cluster-kafka", "strimzi-cluster-operator", "kafka-cluster-entity-operator") { - return errors.New("waiting for kafka cluster to be ready") - } - return nil + Ready: &cluster.Readiness{ + Function: func(cfg *envconf.Config) error { + // wait for kafka to be ready + if !checkResources(cfg.Client(), "kafka-cluster-zookeeper", "kafka-cluster-kafka", "strimzi-cluster-operator", "kafka-cluster-entity-operator") { + return errors.New("waiting for kafka cluster to be ready") + } + return nil + }, + Timeout: 10 * time.Minute, + Retry: 20 * time.Second, }, }), cluster.Override(cluster.FlowLogsPipeline, cluster.Deployment{ @@ -88,13 +90,14 @@ func checkResources(client klient.Client, list ...string) bool { return false } deplInfo := []string{} - for _, p := range depl.Items { + for i := range depl.Items { + p := &depl.Items[i] deplInfo = append(deplInfo, fmt.Sprintf("%s (%d/%d)", p.Name, p.Status.ReadyReplicas, p.Status.Replicas)) if _, toCheck := ready[p.Name]; toCheck { ready[p.Name] = p.Status.ReadyReplicas == 1 } } - klog.Infof("Deployments: " + strings.Join(deplInfo, ", ")) + klog.Infof("Deployments: %s", strings.Join(deplInfo, ", ")) var sfs appsv1.StatefulSetList err = client.Resources(namespace).List(context.TODO(), &sfs) if err != nil { @@ -102,13 +105,14 @@ func checkResources(client klient.Client, list ...string) bool { return false } sfsInfo := []string{} - for _, p := range sfs.Items { + for i := range sfs.Items { + p := &sfs.Items[i] sfsInfo = append(sfsInfo, fmt.Sprintf("%s (%d/%d/%d)", p.Name, p.Status.ReadyReplicas, p.Status.AvailableReplicas, p.Status.Replicas)) if _, toCheck := ready[p.Name]; toCheck { ready[p.Name] = p.Status.ReadyReplicas == 1 } } - klog.Infof("StatefulSets: " + strings.Join(sfsInfo, ", ")) + klog.Infof("StatefulSets: %s", strings.Join(sfsInfo, ", ")) for _, state := range ready { if !state { return false diff --git a/e2e/kafka/manifests/10-kafka-crd.yml b/e2e/kafka/manifests/10-kafka-crd.yml index 469cd72a2..1082217fa 100644 --- a/e2e/kafka/manifests/10-kafka-crd.yml +++ b/e2e/kafka/manifests/10-kafka-crd.yml @@ -11678,10 +11678,10 @@ spec: resources: limits: cpu: 1000m - memory: 384Mi + memory: 500Mi requests: cpu: 200m - memory: 384Mi + memory: 100Mi strategy: type: Recreate