Skip to content

Commit

Permalink
test: more e2e test for monovertex (#2313)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy authored Jan 8, 2025
1 parent 97b84cf commit cd1fcb4
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 2 deletions.
9 changes: 9 additions & 0 deletions test/fixtures/expect.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,15 @@ func (t *Expect) DaemonPodLogContains(pipelineName, regex string, opts ...PodLog
return t
}

func (t *Expect) MvtxDaemonPodsRunning() *Expect {
t.t.Helper()
timeout := 2 * time.Minute
if err := WaitForMvtxDaemonPodsRunning(t.kubeClient, Namespace, t.monoVertex.Name, timeout); err != nil {
t.t.Fatalf("Expected mvtx daemon pods of %q running: %v", t.monoVertex.Name, err)
}
return t
}

func (t *Expect) When() *When {
return &When{
t: t.t,
Expand Down
22 changes: 21 additions & 1 deletion test/fixtures/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func WaitForMonoVertexPodRunning(kubeClient kubernetes.Interface, monoVertexClie
}
ok := len(podList.Items) > 0 && len(podList.Items) == monoVertex.CalculateReplicas() // pod number should equal to desired replicas
for _, p := range podList.Items {
ok = ok && p.Status.Phase == corev1.PodRunning
ok = ok && isPodReady(p)
}
if ok {
return nil
Expand Down Expand Up @@ -378,6 +378,26 @@ func WaitForDaemonPodsRunning(kubeClient kubernetes.Interface, namespace, pipeli
}
}

func WaitForMvtxDaemonPodsRunning(kubeClient kubernetes.Interface, namespace, mvtx string, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
labelSelector := fmt.Sprintf("%s=%s,%s=%s", dfv1.KeyMonoVertexName, mvtx, dfv1.KeyComponent, dfv1.ComponentMonoVertexDaemon)
for {
podList, err := kubeClient.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector, FieldSelector: "status.phase=Running"})
if err != nil {
return fmt.Errorf("error getting mvtx daemon pod name: %w", err)
}
ok := len(podList.Items) > 0
for _, p := range podList.Items {
ok = ok && p.Status.Phase == corev1.PodRunning
}
if ok {
return nil
}
time.Sleep(2 * time.Second)
}
}

func VertexPodLogNotContains(ctx context.Context, kubeClient kubernetes.Interface, namespace, pipelineName, vertexName, regex string, opts ...PodLogCheckOption) (bool, error) {
labelSelector := fmt.Sprintf("%s=%s,%s=%s", dfv1.KeyPipelineName, pipelineName, dfv1.KeyVertexName, vertexName)
podList, err := kubeClient.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector, FieldSelector: "status.phase=Running"})
Expand Down
44 changes: 44 additions & 0 deletions test/fixtures/when.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,50 @@ func (w *When) DaemonPodPortForward(pipelineName string, localPort, remotePort i
return w
}

func (w *When) MonoVertexPodPortForward(localPort, remotePort int) *When {
w.t.Helper()
labelSelector := fmt.Sprintf("%s=%s,%s=%s", dfv1.KeyComponent, dfv1.ComponentMonoVertex, dfv1.KeyMonoVertexName, w.monoVertex.Name)
ctx := context.Background()
podList, err := w.kubeClient.CoreV1().Pods(Namespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector, FieldSelector: "status.phase=Running"})
if err != nil {
w.t.Fatalf("Error getting mvtx pod name: %v", err)
}
podName := podList.Items[0].GetName()
w.t.Logf("MonoVertex POD name: %s", podName)

stopCh := make(chan struct{}, 1)
if err = PodPortForward(w.restConfig, Namespace, podName, localPort, remotePort, stopCh); err != nil {
w.t.Fatalf("Expected mvtx pod port-forward: %v", err)
}
if w.portForwarderStopChannels == nil {
w.portForwarderStopChannels = make(map[string]chan struct{})
}
w.portForwarderStopChannels[podName] = stopCh
return w
}

func (w *When) MvtxDaemonPodPortForward(localPort, remotePort int) *When {
w.t.Helper()
labelSelector := fmt.Sprintf("%s=%s,%s=%s", dfv1.KeyComponent, dfv1.ComponentMonoVertexDaemon, dfv1.KeyMonoVertexName, w.monoVertex.Name)
ctx := context.Background()
podList, err := w.kubeClient.CoreV1().Pods(Namespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector, FieldSelector: "status.phase=Running"})
if err != nil {
w.t.Fatalf("Error getting mvtx daemon pod name: %v", err)
}
podName := podList.Items[0].GetName()
w.t.Logf("MonoVertex Daemon POD name: %s", podName)

stopCh := make(chan struct{}, 1)
if err = PodPortForward(w.restConfig, Namespace, podName, localPort, remotePort, stopCh); err != nil {
w.t.Fatalf("Expected mvtx daemon pod port-forward: %v", err)
}
if w.portForwarderStopChannels == nil {
w.portForwarderStopChannels = make(map[string]chan struct{})
}
w.portForwarderStopChannels[podName] = stopCh
return w
}

func (w *When) UXServerPodPortForward(localPort, remotePort int) *When {
w.t.Helper()
labelSelector := fmt.Sprintf("%s=%s", dfv1.KeyComponent, dfv1.ComponentUXServer)
Expand Down
16 changes: 15 additions & 1 deletion test/monovertex-e2e/monovertex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/stretchr/testify/suite"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
. "github.com/numaproj/numaflow/test/fixtures"
)

Expand All @@ -35,7 +36,20 @@ func (s *MonoVertexSuite) TestMonoVertexWithTransformer() {
When().CreateMonoVertexAndWait()
defer w.DeleteMonoVertexAndWait()

w.Expect().MonoVertexPodsRunning()
w.Expect().MonoVertexPodsRunning().MvtxDaemonPodsRunning()

defer w.MonoVertexPodPortForward(8931, dfv1.MonoVertexMetricsPort).
MvtxDaemonPodPortForward(3232, dfv1.MonoVertexDaemonServicePort).
TerminateAllPodPortForwards()

// Check metrics endpoints
HTTPExpect(s.T(), "https://localhost:8931").GET("/metrics").
Expect().
Status(200)

HTTPExpect(s.T(), "https://localhost:3232").GET("/metrics").
Expect().
Status(200)

// Expect the messages to be processed by the transformer.
w.Expect().MonoVertexPodLogContains("AssignEventTime", PodLogCheckOptionWithContainer("transformer"))
Expand Down

0 comments on commit cd1fcb4

Please sign in to comment.