diff --git a/controllers/spec/utils.go b/controllers/spec/utils.go index 2f8958020..3b041c703 100644 --- a/controllers/spec/utils.go +++ b/controllers/spec/utils.go @@ -32,6 +32,10 @@ import ( "github.com/streamnative/function-mesh/controllers/proto" ) +const ( + DefaultParallelism = 1 +) + func convertFunctionDetails(function *v1alpha1.Function) *proto.FunctionDetails { fd := &proto.FunctionDetails{ Tenant: function.Spec.Tenant, @@ -43,7 +47,7 @@ func convertFunctionDetails(function *v1alpha1.Function) *proto.FunctionDetails UserConfig: getUserConfig(function.Spec.FuncConfig), Runtime: proto.FunctionDetails_JAVA, AutoAck: getBoolFromPtrOrDefault(function.Spec.AutoAck, true), - Parallelism: getInt32FromPtrOrDefault(function.Spec.Replicas, 1), + Parallelism: DefaultParallelism, Source: generateFunctionInputSpec(function), Sink: generateFunctionOutputSpec(function), Resources: generateResource(function.Spec.Resources.Requests), @@ -80,7 +84,7 @@ func convertGoFunctionConfs(function *v1alpha1.Function) *GoFunctionConf { //SecretsMap: marshalSecretsMap(function.Spec.SecretsMap), Runtime: int32(proto.FunctionDetails_GO), AutoACK: getBoolFromPtrOrDefault(function.Spec.AutoAck, true), - Parallelism: getInt32FromPtrOrDefault(function.Spec.Replicas, 1), + Parallelism: DefaultParallelism, TimeoutMs: uint64(function.Spec.Timeout), SubscriptionName: function.Spec.SubscriptionName, CleanupSubscription: function.Spec.CleanupSubscription, @@ -217,7 +221,7 @@ func convertSourceDetails(source *v1alpha1.Source) *proto.FunctionDetails { UserConfig: getUserConfig(source.Spec.SourceConfig), Runtime: proto.FunctionDetails_JAVA, AutoAck: true, - Parallelism: getInt32FromPtrOrDefault(source.Spec.Replicas, 1), + Parallelism: DefaultParallelism, Source: generateSourceInputSpec(source), Sink: generateSourceOutputSpec(source), Resources: generateResource(source.Spec.Resources.Requests), @@ -279,7 +283,7 @@ func convertSinkDetails(sink *v1alpha1.Sink) *proto.FunctionDetails { ProcessingGuarantees: convertProcessingGuarantee(sink.Spec.ProcessingGuarantee), Runtime: proto.FunctionDetails_JAVA, AutoAck: getBoolFromPtrOrDefault(sink.Spec.AutoAck, true), - Parallelism: getInt32FromPtrOrDefault(sink.Spec.Replicas, 1), + Parallelism: DefaultParallelism, Source: generateSinkInputSpec(sink), Sink: generateSinkOutputSpec(sink), Resources: generateResource(sink.Spec.Resources.Requests),