diff --git a/docs/core-concepts/inter-step-buffer-service.md b/docs/core-concepts/inter-step-buffer-service.md index 2f4bf966a7..5af8f6028a 100644 --- a/docs/core-concepts/inter-step-buffer-service.md +++ b/docs/core-concepts/inter-step-buffer-service.md @@ -46,7 +46,7 @@ The version `latest` in the ConfigMap should only be used for testing purpose. I ### Replicas -An optional property `spec.jetstream.replicas` (defaults to 3) can be specified, which gives the total number of nodes. An odd number 3 or 5 is suggested. If the given number < 3, 3 will be used. +An optional property `spec.jetstream.replicas` (defaults to 3) can be specified, which gives the total number of nodes. ### Persistence diff --git a/pkg/apis/numaflow/v1alpha1/jetstream_buffer_service.go b/pkg/apis/numaflow/v1alpha1/jetstream_buffer_service.go index 2bcd235f9c..57871bec45 100644 --- a/pkg/apis/numaflow/v1alpha1/jetstream_buffer_service.go +++ b/pkg/apis/numaflow/v1alpha1/jetstream_buffer_service.go @@ -79,7 +79,7 @@ func (j JetStreamBufferService) GetReplicas() int { if j.Replicas == nil { return 3 } - if *j.Replicas < 3 { + if *j.Replicas == 2 { return 3 } return int(*j.Replicas) diff --git a/pkg/apis/numaflow/v1alpha1/jetstream_buffer_service_test.go b/pkg/apis/numaflow/v1alpha1/jetstream_buffer_service_test.go index 9a4b75fad9..becf05d86a 100644 --- a/pkg/apis/numaflow/v1alpha1/jetstream_buffer_service_test.go +++ b/pkg/apis/numaflow/v1alpha1/jetstream_buffer_service_test.go @@ -125,6 +125,9 @@ func Test_JSBufferGetReplicas(t *testing.T) { five := int32(5) s.Replicas = &five assert.Equal(t, 5, s.GetReplicas()) + one := int32(1) + s.Replicas = &one + assert.Equal(t, 1, s.GetReplicas()) two := int32(2) s.Replicas = &two assert.Equal(t, 3, s.GetReplicas()) diff --git a/pkg/isbsvc/jetstream_service.go b/pkg/isbsvc/jetstream_service.go index 4c2fd0ca69..10f64fac0d 100644 --- a/pkg/isbsvc/jetstream_service.go +++ b/pkg/isbsvc/jetstream_service.go @@ -97,7 +97,7 @@ func (jss *jetStreamSvc) CreateBuffersAndBuckets(ctx context.Context, buffers, b TTL: time.Hour * 24 * 30, // 30 days MaxBytes: 0, Storage: nats.FileStorage, - Replicas: 3, + Replicas: v.GetInt("stream.replicas"), }); err != nil { return fmt.Errorf("failed to create side inputs KV %q, %w", kvName, err) } diff --git a/pkg/reconciler/isbsvc/installer/assets/jetstream/nats-cluster.conf b/pkg/reconciler/isbsvc/installer/assets/jetstream/nats-cluster.conf new file mode 100644 index 0000000000..c9bb41860a --- /dev/null +++ b/pkg/reconciler/isbsvc/installer/assets/jetstream/nats-cluster.conf @@ -0,0 +1,44 @@ +port: {{.ClientPort}} +pid_file: "/var/run/nats/nats.pid" + +max_payload: {{.MaxPayload}} + +############### +# # +# Monitoring # +# # +############### +http: {{.MonitorPort}} +server_name: $POD_NAME +################################### +# # +# NATS JetStream # +# # +################################### +jetstream { + {{.EncryptionSettings}} + store_dir: "/data/jetstream/store" + max_memory_store: {{.MaxMemoryStore}} + max_file_store: {{.MaxFileStore}} +} + +################################### +# # +# NATS Cluster # +# # +################################### +cluster { + port: {{.ClusterPort}} + name: {{.ClusterName}} + routes: [{{.Routes}}] + cluster_advertise: $CLUSTER_ADVERTISE + connect_retries: 120 + {{.TLSConfig}} +} +lame_duck_duration: 120s +################## +# # +# Authorization # +# # +################## +include ./auth.conf \ No newline at end of file diff --git a/pkg/reconciler/isbsvc/installer/assets/jetstream/nats.conf b/pkg/reconciler/isbsvc/installer/assets/jetstream/nats.conf index c9bb41860a..b57a602293 100644 --- a/pkg/reconciler/isbsvc/installer/assets/jetstream/nats.conf +++ b/pkg/reconciler/isbsvc/installer/assets/jetstream/nats.conf @@ -22,19 +22,6 @@ jetstream { max_file_store: {{.MaxFileStore}} } -################################### -# # -# NATS Cluster # -# # -################################### -cluster { - port: {{.ClusterPort}} - name: {{.ClusterName}} - routes: [{{.Routes}}] - cluster_advertise: $CLUSTER_ADVERTISE - connect_retries: 120 - {{.TLSConfig}} -} lame_duck_duration: 120s ################## # # diff --git a/pkg/reconciler/isbsvc/installer/jetstream.go b/pkg/reconciler/isbsvc/installer/jetstream.go index d132682306..798d75da99 100644 --- a/pkg/reconciler/isbsvc/installer/jetstream.go +++ b/pkg/reconciler/isbsvc/installer/jetstream.go @@ -390,9 +390,6 @@ func (r *jetStreamInstaller) createConfigMap(ctx context.Context) error { svcName := generateJetStreamServiceName(r.isbs) ssName := generateJetStreamStatefulSetName(r.isbs) replicas := r.isbs.Spec.JetStream.GetReplicas() - if replicas < 3 { - replicas = 3 - } routes := []string{} for j := 0; j < replicas; j++ { routes = append(routes, fmt.Sprintf("nats://%s-%s.%s.%s.svc:%s", ssName, strconv.Itoa(j), svcName, r.isbs.Namespace, strconv.Itoa(int(clusterPort)))) @@ -422,7 +419,13 @@ func (r *jetStreamInstaller) createConfigMap(ctx context.Context) error { return fmt.Errorf("failed to merge customized jetstream settings, %w", err) } } - confTpl := template.Must(template.ParseFS(jetStremAssets, "assets/jetstream/nats.conf")) + var confTpl *template.Template + if replicas > 2 { + confTpl = template.Must(template.ParseFS(jetStremAssets, "assets/jetstream/nats-cluster.conf")) + } else { + confTpl = template.Must(template.ParseFS(jetStremAssets, "assets/jetstream/nats.conf")) + } + var confTplOutput bytes.Buffer if err := confTpl.Execute(&confTplOutput, struct { ClusterName string diff --git a/pkg/reconciler/isbsvc/validate.go b/pkg/reconciler/isbsvc/validate.go index 98759b0154..31e65b830d 100644 --- a/pkg/reconciler/isbsvc/validate.go +++ b/pkg/reconciler/isbsvc/validate.go @@ -47,8 +47,8 @@ func ValidateInterStepBufferService(isbs *dfv1.InterStepBufferService) error { if x.Version == "" { return fmt.Errorf(`invalid spec: "spec.jetstream.version" is not defined`) } - if x.Replicas != nil && *(x.Replicas) < 3 { - return fmt.Errorf(`invalid spec: min value for "spec.jetstream.replicas" is 3`) + if x.Replicas != nil && (*x.Replicas == 2 || *x.Replicas <= 0) { + return fmt.Errorf(`invalid spec: min value for "spec.jetstream.replicas" is 1 and can't be 2`) } } return nil diff --git a/test/api-e2e/api_test.go b/test/api-e2e/api_test.go index 616d950184..0d6acbb609 100644 --- a/test/api-e2e/api_test.go +++ b/test/api-e2e/api_test.go @@ -71,6 +71,43 @@ func (s *APISuite) TestISBSVC() { stopPortForward() } +func (s *APISuite) TestISBSVCReplica1() { + var err error + numaflowServerPodName := s.GetNumaflowServerPodName() + if numaflowServerPodName == "" { + panic("failed to find the nuamflow-server pod") + } + stopPortForward := s.StartPortForward(numaflowServerPodName, 8443) + + var testISBSVC v1alpha1.InterStepBufferService + err = json.Unmarshal(testISBSVCReplica1Spec, &testISBSVC) + assert.NoError(s.T(), err) + createISBSVCBody := HTTPExpect(s.T(), "https://localhost:8443").POST(fmt.Sprintf("/api/v1/namespaces/%s/isb-services", Namespace)).WithJSON(testISBSVC). + Expect(). + Status(200).Body().Raw() + var createISBSVCSuccessExpect = `{"data":null}` + assert.Contains(s.T(), createISBSVCBody, createISBSVCSuccessExpect) + + listISBSVCBody := HTTPExpect(s.T(), "https://localhost:8443").GET(fmt.Sprintf("/api/v1/namespaces/%s/isb-services", Namespace)). + Expect(). + Status(200).Body().Raw() + assert.Contains(s.T(), listISBSVCBody, testISBSVCReplica1Name) + + getISBSVCBody := HTTPExpect(s.T(), "https://localhost:8443").GET(fmt.Sprintf("/api/v1/namespaces/%s/isb-services/%s", Namespace, testISBSVCReplica1Name)). + Expect(). + Status(200).Body().Raw() + assert.Contains(s.T(), getISBSVCBody, fmt.Sprintf(`"name":"%s"`, testISBSVCReplica1Name)) + assert.Contains(s.T(), getISBSVCBody, `"status":"healthy"`) + + deleteISBSVC := HTTPExpect(s.T(), "https://localhost:8443").DELETE(fmt.Sprintf("/api/v1/namespaces/%s/isb-services/%s", Namespace, testISBSVCReplica1Name)). + Expect(). + Status(200).Body().Raw() + var deleteISBSVCSuccessExpect = `{"data":null}` + assert.Contains(s.T(), deleteISBSVC, deleteISBSVCSuccessExpect) + + stopPortForward() +} + func (s *APISuite) TestPipeline0() { var err error numaflowServerPodName := s.GetNumaflowServerPodName() diff --git a/test/api-e2e/testdata.go b/test/api-e2e/testdata.go index d3f1c1b0ab..653e596d0b 100644 --- a/test/api-e2e/testdata.go +++ b/test/api-e2e/testdata.go @@ -119,5 +119,24 @@ var ( } } } +`) + testISBSVCReplica1Name = "test-isbsvc-replica-1" + testISBSVCReplica1Spec = []byte(` +{ + "apiVersion": "numaflow.numaproj.io/v1alpha1", + "kind": "InterStepBufferService", + "metadata": { + "name": "test-isbsvc" + }, + "spec": { + "jetstream": { + "persistence": { + "volumeSize": "3Gi" + }, + "replicas": 1, + "version": "latest" + } + } +} `) )