-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FunctionMesh Horizontal Scaling Collusion with Immutable Deployments #444
Comments
Quick update we deployed the function-mesh without argocd, we still observed the issue and observed the following set of log in the operator logs:
|
can you please paste the Function configuration? |
apiVersion: compute.functionmesh.io/v1alpha1
kind: FunctionMesh
metadata:
name: fm-{{ .Release.Namespace }}
spec:
sinks:
- name: {{ "sink1" | lower }}
image: "registry.company.com/project/sink1:{{ ( index .Values.helm.parameters 1 ).value | replace "\"" "" }}"
tenant: "public"
namespace: "namespace1"
className: "org.apache.pulsar.io.redis.company_protobuf_sink.companysink1Sink"
sinkConfig:
redisHosts: "{{ cat "redis-" .Release.Namespace "-redis-cluster-headless" | replace " " "" }}:6379"
redisPassword: $(REDIS_PASSWORD)
redisDatabase: 0
clientMode: "cluster"
autoAck: true
input:
topics:
- persistent://public/namespace1/sink1
typeClassName: "[B"
replicas: 1
maxReplicas: 5
clusterName: pulsar-{{ .Release.Namespace }}
resources: &resources
limits:
cpu: 4
memory: 2147483648
requests:
cpu: 2
memory: 1073741824
pulsar: &pulsar
pulsarConfig: "mesh-{{ .Release.Namespace }}"
tlsSecret: "mesh-tlssecret-{{ .Release.Namespace }}"
authSecret: "mesh-authsecret-{{ .Release.Namespace }}"
java: &java-redis-sink
jar: /pulsar/company-protobuf-redis-sink-2.7.0.nar
jarLocation: "" # leave empty since we will not download package from Pulsar Packages
pod: &pod
env:
- name: REDIS_PASSWORD
valueFrom:
secretKeyRef:
name: {{ cat "redis-" .Release.Namespace "-redis-cluster" | replace " " "" }}
key: "redis-password"
annotations:
"managed-function": "true"
autoScalingMetrics:
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 60
volumes:
- name: ca
secret:
secretName: pulsar-{{ .Release.Namespace }}-ca-tls
items:
- key: ca.crt
path: ca.crt
- name: admin
secret:
secretName: pulsar-{{ .Release.Namespace }}-client-admin
items:
- key: tls.key
path: tls.key
- key: tls.crt
path: tls.crt
volumeMounts: &volumeMounts
- name: ca
mountPath: /pulsar/certs/ca
- name: admin
mountPath: /pulsar/certs/admin
- name: {{ "sink2" | lower }}
image: "registry.company.com/project/mqtt_data_sink_io:{{ ( index .Values.helm.parameters 1 ).value | replace "\"" "" }}"
tenant: "public"
namespace: "namespace1"
className: "org.apache.pulsar.io.mqtt.sink2Sink"
autoAck: true
sinkConfig:
mqttBrokerUri: "ssl://mosquitto-{{ .Release.Namespace }}-mosquitto:8883"
qos: 2
retainRecords: false
topic: "/data/"
username: "admin"
password: "mospass"
caCert: "/pulsar/certs/mqttca/ca.crt"
sslHostnameVerificationEnabled: false
input:
topics:
- persistent://public/namespace1/MQTTData
typeClassName: "[B"
replicas: 1
maxReplicas: 5
clusterName: pulsar-{{ .Release.Namespace }}
resources:
<<: *resources
pulsar:
<<: *pulsar
java:
jar: /pulsar/pulsar-io-mqtt-data-sink-2.7.0.nar
jarLocation: "" # leave empty since we will not download package from Pulsar Packages
pod: &pod-mqtt
annotations:
"managed-function": "true"
autoScalingMetrics:
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 60
volumes:
- name: ca
secret:
secretName: pulsar-{{ .Release.Namespace }}-ca-tls
items:
- key: ca.crt
path: ca.crt
- name: admin
secret:
secretName: pulsar-{{ .Release.Namespace }}-client-admin
items:
- key: tls.key
path: tls.key
- key: tls.crt
path: tls.crt
- name: mqtt-certs
secret:
defaultMode: 420
items:
- key: tls.crt
path: tls.crt
- key: tls.key
path: tls.key
secretName: mosquitto-vedge-dev-client-tls-mqtt
- name: mqtt-ca
secret:
defaultMode: 420
items:
- key: ca.crt
path: ca.crt
secretName: mosquitto-vedge-dev-ca-tls
volumeMounts: &volumeMounts-mqtt
- name: ca
mountPath: /pulsar/certs/ca
- name: admin
mountPath: /pulsar/certs/admin
- mountPath: /pulsar/certs/mqtt
name: mqtt-certs
readOnly: true
- mountPath: /pulsar/certs/mqttca
name: mqtt-ca
readOnly: true
- name: {{ "sink3" | lower }}
image: "registry.company.com/project/mqtt_logs_sink_io:{{ ( index .Values.helm.parameters 1 ).value | replace "\"" "" }}"
tenant: "public"
namespace: "namespace1"
className: "org.apache.pulsar.io.mqtt.MQTTLogsSink"
autoAck: true
sinkConfig:
mqttBrokerUri: "ssl://mosquitto-{{ .Release.Namespace }}-mosquitto:8883"
qos: 2
retainRecords: false
topic: "/logs/"
username: "admin"
password: "mospass"
caCert: "/pulsar/certs/mqttca/ca.crt"
sslHostnameVerificationEnabled: false
input:
topics:
- persistent://public/namespace1/MQTTLogs
typeClassName: "[B"
replicas: 1
maxReplicas: 5
clusterName: pulsar-{{ .Release.Namespace }}
resources:
<<: *resources
pulsar:
<<: *pulsar
java:
jar: /pulsar/pulsar-io-mqtt-logs-sink-2.8.2.nar
jarLocation: "" # leave empty since we will not download package from Pulsar Packages
pod:
<<: *pod-mqtt
volumeMounts:
*volumeMounts-mqtt
functions:
- name: {{ "func1" | lower }}
image: "registry.company.com/project/func1:{{ ( index .Values.helm.parameters 1 ).value | replace "\"" "" }}"
tenant: "public"
namespace: "namespace1"
className: "org.apache.pulsar.io.redis.func1"
autoAck: true
funcConfig:
redisConfig:
redisHosts: "{{ cat "redis-" .Release.Namespace "-redis-cluster-headless" | replace " " "" }}:6379"
redisPassword: $(REDIS_PASSWORD)
redisDatabase: 0
clientMode: "cluster"
OutTopics: "persistent://public/namespace1/mqtttopic"
API:
forwardEnabled: "true"
entryPoint: persistent://public/namespace1/topic1
input:
topicPattern: persistent://public/namespace1/topic2
typeClassName: "[B"
replicas: 1
maxReplicas: 5
maxPendingAsyncRequests: 1000
forwardSourceMessageProperty: true
clusterName: pulsar-{{ .Release.Namespace }}
resources:
<<: *resources
pulsar:
<<: *pulsar
java:
<<: *java-redis-sink
pod:
<<: *pod
volumeMounts:
*volumeMounts
- name: {{ "func2" | lower }}
image: "registry.company.com/project/func2:{{ ( index .Values.helm.parameters 1 ).value | replace "\"" "" }}"
tenant: "public"
namespace: "namespace2"
className: "com.company.pulsar.func2"
autoAck: false
funcConfig:
"ConfigResponseTopic": "persistent://public/C2G/"
input:
topics:
- persistent://public/namespace2/new
typeClassName: "java.lang.String"
replicas: 1
maxReplicas: 5
maxPendingAsyncRequests: 1000
forwardSourceMessageProperty: true
clusterName: pulsar-{{ .Release.Namespace }}
resources:
<<: *resources
pulsar:
<<: *pulsar
java: &java-edge-function
jar: "/pulsar/company_edge_functions-{{ ( index .Values.helm.parameters 1 ).value | replace "\"" "" | trimPrefix "v" }}.nar"
jarLocation: "" # leave empty since we will not download package from Pulsar Packages
pod:
<<: *pod
volumeMounts:
*volumeMounts
- name: {{ "func3" | lower }}
image: "registry.company.com/project/func3:{{ ( index .Values.helm.parameters 1 ).value | replace "\"" "" }}"
tenant: "public"
namespace: "namespace1"
className: "com.company.pulsar.adaptertopic"
autoAck: false
input:
topics:
- persistent://public/namespace1/adaptertopic
typeClassName: "[B"
funcConfig:
"OutTopics": "persistent://public/namespace1/standardtopic"
replicas: 1
maxReplicas: 5
maxPendingAsyncRequests: 1000
forwardSourceMessageProperty: true
clusterName: pulsar-{{ .Release.Namespace }}
resources:
<<: *resources
pulsar:
<<: *pulsar
java:
jar: "/pulsar/company_tr181_functions-{{ ( index .Values.helm.parameters 1 ).value | replace "\"" "" | trimPrefix "v" }}.nar"
jarLocation: "" # leave empty since we will not download package from Pulsar Packages
pod:
<<: *pod
volumeMounts:
*volumeMounts
## This one should be reconfigured
- name: {{ "func4" | lower }}
image: "registry.company.com/project/func4:{{ ( index .Values.helm.parameters 1 ).value | replace "\"" "" }}"
tenant: "public"
namespace: "namespace3"
className: "com.company.pulsar.func4"
autoAck: false
input:
topicPattern: persistent://public/namespace3/(?!Ingestion)([a-zA-Z0-9]+)
typeClassName: "[B"
replicas: 4
maxReplicas: 10
maxPendingAsyncRequests: 1000
forwardSourceMessageProperty: true
clusterName: pulsar-{{ .Release.Namespace }}
resources:
<<: *resources
pulsar:
<<: *pulsar
java:
<<: *java-edge-function
pod:
<<: *pod
volumeMounts:
*volumeMounts
- name: {{ "func5" | lower }}
image: "registry.company.com/project/func5:{{ ( index .Values.helm.parameters 1 ).value | replace "\"" "" }}"
tenant: "public"
namespace: "namespace4"
className: "com.company.pulsar.func5"
autoAck: true
funcConfig:
"DebugJson": "true"
"OutTopics": persistent://public/namespace1/ThisIsMe
input:
topicPattern: persistent://public/namespace4/func5
typeClassName: "[B"
replicas: 1
maxReplicas: 5
maxPendingAsyncRequests: 1000
forwardSourceMessageProperty: true
clusterName: pulsar-{{ .Release.Namespace }}
resources:
<<: *resources
pulsar:
<<: *pulsar
java:
<<: *java-edge-function
pod:
<<: *pod
volumeMounts:
*volumeMounts
- name: {{ "func6" | lower }}
image: "registry.company.com/project/func6:{{ ( index .Values.helm.parameters 1 ).value | replace "\"" "" }}"
tenant: "public"
namespace: "namespace4"
className: "com.company.pulsar.func6"
autoAck: true
funcConfig:
"OutTopics": "persistent://public/namespace1/MQTTLogs,persistent://public/namespace1/Logs"
input:
topicPattern: persistent://public/namespace4/func6
typeClassName: "[B"
replicas: 1
maxReplicas: 5
maxPendingAsyncRequests: 1000
forwardSourceMessageProperty: true
clusterName: pulsar-{{ .Release.Namespace }}
resources:
<<: *resources
pulsar:
<<: *pulsar
java:
<<: *java-edge-function
pod:
<<: *pod
volumeMounts:
*volumeMounts
- name: {{ "func7" | lower }}
image: "registry.company.com/project/func7:{{ ( index .Values.helm.parameters 1 ).value | replace "\"" "" }}"
tenant: "public"
namespace: "namespace4"
className: "com.company.pulsar.namespace3func7"
autoAck: true
funcConfig:
"OutTopics": "persistent://public/namespace1/sink1,persistent://public/namespace1/func1"
input:
topicPattern: persistent://public/namespace4/func7
typeClassName: "[B"
replicas: 1
maxReplicas: 5
maxPendingAsyncRequests: 1000
forwardSourceMessageProperty: true
clusterName: pulsar-{{ .Release.Namespace }}
resources:
<<: *resources
pulsar:
<<: *pulsar
java:
<<: *java-edge-function
pod:
<<: *pod
volumeMounts:
*volumeMounts
- name: {{ "func8" | lower }}
image: "registry.company.com/project/func8:{{ ( index .Values.helm.parameters 1 ).value | replace "\"" "" }}"
tenant: "public"
namespace: "namespace4"
className: "com.company.pulsar.namespace3func8"
autoAck: true
funcConfig:
"OutTopics": "persistent://public/namespace1/sink1,persistent://public/namespace1/func1"
input:
topicPattern: persistent://public/namespace4/func8
typeClassName: "[B"
replicas: 1
maxReplicas: 5
maxPendingAsyncRequests: 1000
forwardSourceMessageProperty: true
clusterName: pulsar-{{ .Release.Namespace }}
resources:
<<: *resources
pulsar:
<<: *pulsar
java:
<<: *java-edge-function
pod:
<<: *pod
volumeMounts:
*volumeMounts
- name: {{ "func9" | lower }}
image: "registry.company.com/project/func9:{{ ( index .Values.helm.parameters 1 ).value | replace "\"" "" }}"
tenant: "public"
namespace: "namespace4"
className: "com.company.pulsar.namespace3func9"
autoAck: false
funcConfig:
"OutTopics": "persistent://public/namespace1/sink1,persistent://public/namespace1/func1"
input:
topicPattern: persistent://public/namespace4/func9
typeClassName: "[B"
replicas: 1
maxReplicas: 5
maxPendingAsyncRequests: 1000
forwardSourceMessageProperty: true
clusterName: pulsar-{{ .Release.Namespace }}
resources:
<<: *resources
pulsar:
<<: *pulsar
java:
<<: *java-edge-function
pod:
<<: *pod
volumeMounts:
*volumeMounts
- name: {{ "func10" | lower }}
image: "registry.company.com/project/func10:{{ ( index .Values.helm.parameters 1 ).value | replace "\"" "" }}"
tenant: "public"
namespace: "namespace4"
className: "com.company.pulsar.namespace3func10"
autoAck: true
funcConfig:
"DebugJson": "true"
"OutTopics": "persistent://public/namespace4/func7"
input:
topicPattern: persistent://public/namespace4/func10
typeClassName: "[B"
replicas: 1
maxReplicas: 5
maxPendingAsyncRequests: 1000
forwardSourceMessageProperty: true
clusterName: pulsar-{{ .Release.Namespace }}
resources:
<<: *resources
pulsar:
<<: *pulsar
java:
<<: *java-edge-function
pod:
<<: *pod
volumeMounts:
*volumeMounts
---
apiVersion: v1
kind: ConfigMap
metadata:
name: mesh-{{ .Release.Namespace }}
data:
brokerServiceURL: "pulsar+ssl://pulsar-{{ .Release.Namespace }}-broker.{{ .Release.Namespace }}.svc.cluster.local:6651/"
webServiceURL: "https://pulsar-{{ .Release.Namespace }}-broker.{{ .Release.Namespace }}.svc.cluster.local:8443/"
---
apiVersion: v1
kind: Secret
metadata:
name: mesh-tlssecret-{{ .Release.Namespace }}
stringData:
tlsAllowInsecureConnection: "true"
tlsHostnameVerificationEnable: "false"
tlsTrustCertsFilePath: "/pulsar/certs/ca/ca.crt"
---
apiVersion: v1
kind: Secret
metadata:
name: mesh-authsecret-{{ .Release.Namespace }}
stringData:
clientAuthenticationParameters: "tlsCertFile:/pulsar/certs/admin/tls.crt,tlsKeyFile:/pulsar/certs/admin/tls.key"
clientAuthenticationPlugin: "org.apache.pulsar.client.impl.auth.AuthenticationTls"
functionAuthProviderClassName: "com.company.KubernetesFunctionTlsAuthProvider"
|
This issue only occurs in cases when Function has HPA enabled, right? If so I think
what do you think? @armangurkan and teams @streamnative/serverless |
@tpiperatgod exactly, this is exactly what I think of. |
ok, I will fix this ASAP, please let me know if there is anything missing. |
Hello there, Thank you for taking action on the issue first of all. Is there a way that we can get a road map about the issue? We are looking forward to use the autoscaling functionality. Best |
Hi @armangurkan , according to the milestone, the fix for this issue will be merged to v0.6.0 with a release date around the middle of September. Here's a simple case to show the effect of the modification: #450 (comment) Please let me know if this meets your needs, thx. |
Hello there,
I would like to thank you for your support for my previous issues first of all. You guys are great.
We are now using FM Operator 0.4.0 also use argocd to deploy and and k8s state immutability is very important for us. When we enable autoscaling from the function mesh CRD,
replicas
field is a requirement and I think it is a requirement in the code as well.The problem is when that is set, the statefulset gets a replicas value and and everytime the HPA gets triggered, the argo operator heals it back up the replicas to the value that is set by the StatefulSet Config.
So there should be a minReplicas field that is interchangable with replicas, where in that case, the replicas field in the statefulSet replicas field should not be set, and the number of pods that belong the statefulSet is managed solemnly from the HPA.
This is very important for the consistency of our environment. I would really appreciate a quick fix or a work around about the subject.
The text was updated successfully, but these errors were encountered: