diff --git a/test/fixtures/expect.go b/test/fixtures/expect.go index afa30447d9..996d762bd6 100644 --- a/test/fixtures/expect.go +++ b/test/fixtures/expect.go @@ -204,6 +204,15 @@ func (t *Expect) DaemonPodLogContains(pipelineName, regex string, opts ...PodLog return t } +func (t *Expect) MvtxDaemonPodsRunning() *Expect { + t.t.Helper() + timeout := 2 * time.Minute + if err := WaitForMvtxDaemonPodsRunning(t.kubeClient, Namespace, t.monoVertex.Name, timeout); err != nil { + t.t.Fatalf("Expected mvtx daemon pods of %q running: %v", t.monoVertex.Name, err) + } + return t +} + func (t *Expect) When() *When { return &When{ t: t.t, diff --git a/test/fixtures/util.go b/test/fixtures/util.go index 13a4026384..d2f27dc5f7 100644 --- a/test/fixtures/util.go +++ b/test/fixtures/util.go @@ -280,7 +280,7 @@ func WaitForMonoVertexPodRunning(kubeClient kubernetes.Interface, monoVertexClie } ok := len(podList.Items) > 0 && len(podList.Items) == monoVertex.CalculateReplicas() // pod number should equal to desired replicas for _, p := range podList.Items { - ok = ok && p.Status.Phase == corev1.PodRunning + ok = ok && isPodReady(p) } if ok { return nil @@ -378,6 +378,26 @@ func WaitForDaemonPodsRunning(kubeClient kubernetes.Interface, namespace, pipeli } } +func WaitForMvtxDaemonPodsRunning(kubeClient kubernetes.Interface, namespace, mvtx string, timeout time.Duration) error { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + labelSelector := fmt.Sprintf("%s=%s,%s=%s", dfv1.KeyMonoVertexName, mvtx, dfv1.KeyComponent, dfv1.ComponentMonoVertexDaemon) + for { + podList, err := kubeClient.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector, FieldSelector: "status.phase=Running"}) + if err != nil { + return fmt.Errorf("error getting mvtx daemon pod name: %w", err) + } + ok := len(podList.Items) > 0 + for _, p := range podList.Items { + ok = ok && p.Status.Phase == corev1.PodRunning + } + if ok { + return nil + } + time.Sleep(2 * time.Second) + } +} + func VertexPodLogNotContains(ctx context.Context, kubeClient kubernetes.Interface, namespace, pipelineName, vertexName, regex string, opts ...PodLogCheckOption) (bool, error) { labelSelector := fmt.Sprintf("%s=%s,%s=%s", dfv1.KeyPipelineName, pipelineName, dfv1.KeyVertexName, vertexName) podList, err := kubeClient.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector, FieldSelector: "status.phase=Running"}) diff --git a/test/fixtures/when.go b/test/fixtures/when.go index 986085bdd6..154c7cfda0 100644 --- a/test/fixtures/when.go +++ b/test/fixtures/when.go @@ -243,6 +243,50 @@ func (w *When) DaemonPodPortForward(pipelineName string, localPort, remotePort i return w } +func (w *When) MonoVertexPodPortForward(localPort, remotePort int) *When { + w.t.Helper() + labelSelector := fmt.Sprintf("%s=%s,%s=%s", dfv1.KeyComponent, dfv1.ComponentMonoVertex, dfv1.KeyMonoVertexName, w.monoVertex.Name) + ctx := context.Background() + podList, err := w.kubeClient.CoreV1().Pods(Namespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector, FieldSelector: "status.phase=Running"}) + if err != nil { + w.t.Fatalf("Error getting mvtx pod name: %v", err) + } + podName := podList.Items[0].GetName() + w.t.Logf("MonoVertex POD name: %s", podName) + + stopCh := make(chan struct{}, 1) + if err = PodPortForward(w.restConfig, Namespace, podName, localPort, remotePort, stopCh); err != nil { + w.t.Fatalf("Expected mvtx pod port-forward: %v", err) + } + if w.portForwarderStopChannels == nil { + w.portForwarderStopChannels = make(map[string]chan struct{}) + } + w.portForwarderStopChannels[podName] = stopCh + return w +} + +func (w *When) MvtxDaemonPodPortForward(localPort, remotePort int) *When { + w.t.Helper() + labelSelector := fmt.Sprintf("%s=%s,%s=%s", dfv1.KeyComponent, dfv1.ComponentMonoVertexDaemon, dfv1.KeyMonoVertexName, w.monoVertex.Name) + ctx := context.Background() + podList, err := w.kubeClient.CoreV1().Pods(Namespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector, FieldSelector: "status.phase=Running"}) + if err != nil { + w.t.Fatalf("Error getting mvtx daemon pod name: %v", err) + } + podName := podList.Items[0].GetName() + w.t.Logf("MonoVertex Daemon POD name: %s", podName) + + stopCh := make(chan struct{}, 1) + if err = PodPortForward(w.restConfig, Namespace, podName, localPort, remotePort, stopCh); err != nil { + w.t.Fatalf("Expected mvtx daemon pod port-forward: %v", err) + } + if w.portForwarderStopChannels == nil { + w.portForwarderStopChannels = make(map[string]chan struct{}) + } + w.portForwarderStopChannels[podName] = stopCh + return w +} + func (w *When) UXServerPodPortForward(localPort, remotePort int) *When { w.t.Helper() labelSelector := fmt.Sprintf("%s=%s", dfv1.KeyComponent, dfv1.ComponentUXServer) diff --git a/test/monovertex-e2e/monovertex_test.go b/test/monovertex-e2e/monovertex_test.go index 3fd72d6554..ba0c640527 100644 --- a/test/monovertex-e2e/monovertex_test.go +++ b/test/monovertex-e2e/monovertex_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/suite" + dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" . "github.com/numaproj/numaflow/test/fixtures" ) @@ -35,7 +36,20 @@ func (s *MonoVertexSuite) TestMonoVertexWithTransformer() { When().CreateMonoVertexAndWait() defer w.DeleteMonoVertexAndWait() - w.Expect().MonoVertexPodsRunning() + w.Expect().MonoVertexPodsRunning().MvtxDaemonPodsRunning() + + defer w.MonoVertexPodPortForward(8931, dfv1.MonoVertexMetricsPort). + MvtxDaemonPodPortForward(3232, dfv1.MonoVertexDaemonServicePort). + TerminateAllPodPortForwards() + + // Check metrics endpoints + HTTPExpect(s.T(), "https://localhost:8931").GET("/metrics"). + Expect(). + Status(200) + + HTTPExpect(s.T(), "https://localhost:3232").GET("/metrics"). + Expect(). + Status(200) // Expect the messages to be processed by the transformer. w.Expect().MonoVertexPodLogContains("AssignEventTime", PodLogCheckOptionWithContainer("transformer"))