-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[issue] In effectively_once mode, when a single pod fails, the entire function will fail due to failure to create the producer. #711
Comments
it looks like that the server should close the producer when the related client is crashed and doesn't close the producer which version of Pulsar are you using? @graysonzeng |
Thanks for your reply, the pulsar version is 3.1.1。I agree with you. It seems that some mechanism is needed to properly shut down the producer before creating it. |
I will check it |
I can't reproduce the error, when I manually delete one function pod, its producer for output topic will be closed by the server too, so the next active consumer can create the producer could u share the yaml you used? @graysonzeng |
Of course @jiangpengcheng apiVersion: compute.functionmesh.io/v1alpha1
kind: FunctionMesh
metadata:
name: functionmesh-001
spec:
functions:
- name: functions-dedup-v1
className: com.tencent.functionNoTag
image: mirrors.tencent.com/g_k_cdp/pulsar-functions-test:v1.0.1
replicas: 10
processingGuarantee: "effectively_once" #effectively_once manual
pod:
terminationGracePeriodSeconds: 30
input:
topics:
- persistent://pulsar/default2/input_test
typeClassName: "[B"
output:
topic: persistent://pulsar/default2/alltables3
typeClassName: "[B"
pulsar:
pulsarConfig: "pulsar-dedup-gtmz-167-sink-test02-config-v1"
authSecret: "sink-dedup-test02-config-auth"
java:
extraDependenciesDir: ""
jar: /pulsar/connectors//DynamicTopic-1.0.nar # the NAR location in image.
jarLocation: "" # leave empty since we will not download package from Pulsar Packages
clusterName: pulsar-gtmz-167
forwardSourceMessageProperty: true
resources:
requests:
cpu: "2"
memory: 2G
limits:
cpu: "2"
memory: 2G
clusterName: test-pulsar
---
apiVersion: v1
kind: ConfigMap
metadata:
name: pulsar-dedup-gtmz-167-sink-test02-config-v1
data:
webServiceURL: http://xx.xx.47.12:8080
brokerServiceURL: pulsar://xx.xx.47.12:6650
authPlugin: "org.apache.pulsar.client.impl.auth.AuthenticationToken"
authParams: "eyJhxxxx"
---
apiVersion: v1
kind: Secret
metadata:
name: sink-dedup-test02-config-auth
stringData:
clientAuthenticationPlugin: "org.apache.pulsar.client.impl.auth.AuthenticationToken"
clientAuthenticationParameters: "token:eyxxxx"
|
I see, the error is caused by below: when pod-1 crashes, another pod(like pod-2) will take over pod-1's subscription to pod-1 subscripted partition(like partition-5), if a message comes to partition-5, the pod-2 will create a new producer with the name this issue is caused by |
thanks! |
I created a pr here: apache/pulsar#21912, and built a jar based on it, could you add it to your runner image to check whether the issue is resolved? @graysonzeng |
This issue is hard to fix since we cannot use different producer name to ensure the de-duplication. Below is a workaround when the error happens:
cc @graysonzeng |
Thanks for the suggestion. I have an idea, how about using independent subscription names for each pod. For example, I have 10 partitions on the consumer side and two pods for consumption. Then, my two pods set subscription names A and B respectively. After this, even if the pod fails, the subscription will not be switched to other surviving pods due to failover, and it can continue to consume when the pod is pulled up by kubernetes. @jiangpengcheng |
with different sub names will make podA and podB both consume all ten partitions and lead to duplication |
A and B consume part of the partitions respectively. For example, A consumes partitions 0-4 and B consumes partitions 5-9. This will not lead to duplication. |
this needs to be specified manually, which is just like what you do now:
|
Thanks . I originally thought that the creation of subscription does not need to be specified manually. In this way, the user can create a sink with mutil-replicas. No need to configure multiple configurations in yaml repeatedly. But it seems like this is the only way it can be for now @jiangpengcheng |
After this, the function restarts due to an exception, and due to failover, the function once again transfers the subscription and fails due to failure to create the producer. Causes the all function pods to constantly restart
Therefore, when I need to enable effectively_once, I have to deploy multiple functions to consume partitioned topics separately. But this is not an easy way to maintain
If we have any optimization suggestions, I hope can provide them, thank very much
The text was updated successfully, but these errors were encountered: