Skip to content

Commit

Permalink
feat: Jetstream support for replica of 1 Fixes #944 (#1177)
Browse files Browse the repository at this point in the history
Signed-off-by: jmillage <[email protected]>
Co-authored-by: jmillage <[email protected]>
  • Loading branch information
joelcomp1 and jmillage authored Oct 10, 2023
1 parent 3ce9889 commit ef62c5c
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 22 deletions.
2 changes: 1 addition & 1 deletion docs/core-concepts/inter-step-buffer-service.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ The version `latest` in the ConfigMap should only be used for testing purpose. I

### Replicas

An optional property `spec.jetstream.replicas` (defaults to 3) can be specified, which gives the total number of nodes. An odd number 3 or 5 is suggested. If the given number < 3, 3 will be used.
An optional property `spec.jetstream.replicas` (defaults to 3) can be specified, which gives the total number of nodes.

### Persistence

Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/jetstream_buffer_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (j JetStreamBufferService) GetReplicas() int {
if j.Replicas == nil {
return 3
}
if *j.Replicas < 3 {
if *j.Replicas == 2 {
return 3
}
return int(*j.Replicas)
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/numaflow/v1alpha1/jetstream_buffer_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ func Test_JSBufferGetReplicas(t *testing.T) {
five := int32(5)
s.Replicas = &five
assert.Equal(t, 5, s.GetReplicas())
one := int32(1)
s.Replicas = &one
assert.Equal(t, 1, s.GetReplicas())
two := int32(2)
s.Replicas = &two
assert.Equal(t, 3, s.GetReplicas())
Expand Down
2 changes: 1 addition & 1 deletion pkg/isbsvc/jetstream_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (jss *jetStreamSvc) CreateBuffersAndBuckets(ctx context.Context, buffers, b
TTL: time.Hour * 24 * 30, // 30 days
MaxBytes: 0,
Storage: nats.FileStorage,
Replicas: 3,
Replicas: v.GetInt("stream.replicas"),
}); err != nil {
return fmt.Errorf("failed to create side inputs KV %q, %w", kvName, err)
}
Expand Down
44 changes: 44 additions & 0 deletions pkg/reconciler/isbsvc/installer/assets/jetstream/nats-cluster.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
port: {{.ClientPort}}
pid_file: "/var/run/nats/nats.pid"

max_payload: {{.MaxPayload}}

###############
# #
# Monitoring #
# #
###############
http: {{.MonitorPort}}
server_name: $POD_NAME
###################################
# #
# NATS JetStream #
# #
###################################
jetstream {
{{.EncryptionSettings}}
store_dir: "/data/jetstream/store"
max_memory_store: {{.MaxMemoryStore}}
max_file_store: {{.MaxFileStore}}
}

###################################
# #
# NATS Cluster #
# #
###################################
cluster {
port: {{.ClusterPort}}
name: {{.ClusterName}}
routes: [{{.Routes}}]
cluster_advertise: $CLUSTER_ADVERTISE
connect_retries: 120
{{.TLSConfig}}
}
lame_duck_duration: 120s
##################
# #
# Authorization #
# #
##################
include ./auth.conf
13 changes: 0 additions & 13 deletions pkg/reconciler/isbsvc/installer/assets/jetstream/nats.conf
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,6 @@ jetstream {
max_file_store: {{.MaxFileStore}}
}

###################################
# #
# NATS Cluster #
# #
###################################
cluster {
port: {{.ClusterPort}}
name: {{.ClusterName}}
routes: [{{.Routes}}]
cluster_advertise: $CLUSTER_ADVERTISE
connect_retries: 120
{{.TLSConfig}}
}
lame_duck_duration: 120s
##################
# #
Expand Down
11 changes: 7 additions & 4 deletions pkg/reconciler/isbsvc/installer/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,9 +390,6 @@ func (r *jetStreamInstaller) createConfigMap(ctx context.Context) error {
svcName := generateJetStreamServiceName(r.isbs)
ssName := generateJetStreamStatefulSetName(r.isbs)
replicas := r.isbs.Spec.JetStream.GetReplicas()
if replicas < 3 {
replicas = 3
}
routes := []string{}
for j := 0; j < replicas; j++ {
routes = append(routes, fmt.Sprintf("nats://%s-%s.%s.%s.svc:%s", ssName, strconv.Itoa(j), svcName, r.isbs.Namespace, strconv.Itoa(int(clusterPort))))
Expand Down Expand Up @@ -422,7 +419,13 @@ func (r *jetStreamInstaller) createConfigMap(ctx context.Context) error {
return fmt.Errorf("failed to merge customized jetstream settings, %w", err)
}
}
confTpl := template.Must(template.ParseFS(jetStremAssets, "assets/jetstream/nats.conf"))
var confTpl *template.Template
if replicas > 2 {
confTpl = template.Must(template.ParseFS(jetStremAssets, "assets/jetstream/nats-cluster.conf"))
} else {
confTpl = template.Must(template.ParseFS(jetStremAssets, "assets/jetstream/nats.conf"))
}

var confTplOutput bytes.Buffer
if err := confTpl.Execute(&confTplOutput, struct {
ClusterName string
Expand Down
4 changes: 2 additions & 2 deletions pkg/reconciler/isbsvc/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ func ValidateInterStepBufferService(isbs *dfv1.InterStepBufferService) error {
if x.Version == "" {
return fmt.Errorf(`invalid spec: "spec.jetstream.version" is not defined`)
}
if x.Replicas != nil && *(x.Replicas) < 3 {
return fmt.Errorf(`invalid spec: min value for "spec.jetstream.replicas" is 3`)
if x.Replicas != nil && (*x.Replicas == 2 || *x.Replicas <= 0) {
return fmt.Errorf(`invalid spec: min value for "spec.jetstream.replicas" is 1 and can't be 2`)
}
}
return nil
Expand Down
37 changes: 37 additions & 0 deletions test/api-e2e/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,43 @@ func (s *APISuite) TestISBSVC() {
stopPortForward()
}

func (s *APISuite) TestISBSVCReplica1() {
var err error
numaflowServerPodName := s.GetNumaflowServerPodName()
if numaflowServerPodName == "" {
panic("failed to find the nuamflow-server pod")
}
stopPortForward := s.StartPortForward(numaflowServerPodName, 8443)

var testISBSVC v1alpha1.InterStepBufferService
err = json.Unmarshal(testISBSVCReplica1Spec, &testISBSVC)
assert.NoError(s.T(), err)
createISBSVCBody := HTTPExpect(s.T(), "https://localhost:8443").POST(fmt.Sprintf("/api/v1/namespaces/%s/isb-services", Namespace)).WithJSON(testISBSVC).
Expect().
Status(200).Body().Raw()
var createISBSVCSuccessExpect = `{"data":null}`
assert.Contains(s.T(), createISBSVCBody, createISBSVCSuccessExpect)

listISBSVCBody := HTTPExpect(s.T(), "https://localhost:8443").GET(fmt.Sprintf("/api/v1/namespaces/%s/isb-services", Namespace)).
Expect().
Status(200).Body().Raw()
assert.Contains(s.T(), listISBSVCBody, testISBSVCReplica1Name)

getISBSVCBody := HTTPExpect(s.T(), "https://localhost:8443").GET(fmt.Sprintf("/api/v1/namespaces/%s/isb-services/%s", Namespace, testISBSVCReplica1Name)).
Expect().
Status(200).Body().Raw()
assert.Contains(s.T(), getISBSVCBody, fmt.Sprintf(`"name":"%s"`, testISBSVCReplica1Name))
assert.Contains(s.T(), getISBSVCBody, `"status":"healthy"`)

deleteISBSVC := HTTPExpect(s.T(), "https://localhost:8443").DELETE(fmt.Sprintf("/api/v1/namespaces/%s/isb-services/%s", Namespace, testISBSVCReplica1Name)).
Expect().
Status(200).Body().Raw()
var deleteISBSVCSuccessExpect = `{"data":null}`
assert.Contains(s.T(), deleteISBSVC, deleteISBSVCSuccessExpect)

stopPortForward()
}

func (s *APISuite) TestPipeline0() {
var err error
numaflowServerPodName := s.GetNumaflowServerPodName()
Expand Down
19 changes: 19 additions & 0 deletions test/api-e2e/testdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,5 +119,24 @@ var (
}
}
}
`)
testISBSVCReplica1Name = "test-isbsvc-replica-1"
testISBSVCReplica1Spec = []byte(`
{
"apiVersion": "numaflow.numaproj.io/v1alpha1",
"kind": "InterStepBufferService",
"metadata": {
"name": "test-isbsvc"
},
"spec": {
"jetstream": {
"persistence": {
"volumeSize": "3Gi"
},
"replicas": 1,
"version": "latest"
}
}
}
`)
)

0 comments on commit ef62c5c

Please sign in to comment.