Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

question about channel pendingAck implementation. #11

Open
1046102779 opened this issue Oct 27, 2022 · 3 comments
Open

question about channel pendingAck implementation. #11

1046102779 opened this issue Oct 27, 2022 · 3 comments

Comments

@1046102779
Copy link

1046102779 commented Oct 27, 2022

I see that pluggable component go-sdk transforms streaming into unary mode. In a question-and-answer implementation, there is a paragraph in github.com/dapr-sandbox/components-go-sdk/bindings/v1/handle.go code snippet about pendingAck:

 84                 select {
 85                 case resp := <-pendingAck:
 86                         return resp.data, resp.err
 87                 case <-ctx.Done():
 88                         return nil, ErrAckTimeout
 89                 }

If the channel pendingAck receives data timeout (default: 1s), the ACK API will directly return nil, but after executing the Send API, waiting for pendingAck will hang in/out, and the select <-ctx.Done here is to wait for the upstream to actively complete the request. and returns ErrAckTimeout. In fact, the ErrAckTimeout returned here is not suitable, it should be the upstream end of the request.

I think the correct approach should be: AcknowledgementManager.Ack API should also take ackTimeoutFlagFunc API as a return parameter, and then uniformly handle timeout, is it more appropriate? similar:

                msgID, pendingAck, cleanup, ackTimeoutFlagFunc := ackManager.Get()
                ......
                select {
                case resp := <-pendingAck:
                        return resp.data, resp.err
               case <-ackTimeoutFlagFunc(): // return Ack timeout flag.
                       return nil, ErrAckTimeout
                case <-ctx.Done():
                        return nil, nil
                }

include pubsub/bindings/state...

My main point is that when the Ack API has a processing timeout (default: 1s), the pendingAck returned by the Get API also needs to know this timeout, and then return directly without blocking waiting and relying on upstream active processing. Or should there be absolutely no packet loss?

@mcandeia
Copy link
Collaborator

mcandeia commented Nov 8, 2022

hey @1046102779 sorry for taking too long for responding back,

If the channel pendingAck receives data timeout (default: 1s), the ACK API will directly return nil, but after executing the Send API, waiting for pendingAck will hang in/out, and the select <-ctx.Done here is to wait for the upstream to actively complete the request. and returns ErrAckTimeout. In fact, the ErrAckTimeout returned here is not suitable, it should be the upstream end of the request.

Please, notice that ctx.Done() is not for the entire upstream request (the input binding stream/pubsub stream), the context in this case is provided by the underlying component, so as a good practice it should be tied to the message consumption process (i.e the timeout to consume the message, redis for instance uses the metadata field processingTimeout)

My main point is that when the Ack API has a processing timeout (default: 1s), the pendingAck returned by the Get API also needs to know this timeout, and then return directly without blocking waiting and relying on upstream active processing. Or should there be absolutely no packet loss?

The default ack timeout of 1s is used when the message was actually ack'ed but there's no consumer waiting for the ack, so it would probably lead in two scenarios:

  1. Message was already ack'ed (because the ack channel has size 1), hence it is duplicated so could be discarded without any problem
  2. Ack has timed out (now the default 1s) because the component couldn't wait enough for the message to be ack'ed

by using this

                msgID, pendingAck, cleanup, ackTimeoutFlagFunc := ackManager.Get()
                ......
                select {
                case resp := <-pendingAck:
                        return resp.data, resp.err
               case <-ackTimeoutFlagFunc(): // return Ack timeout flag.
                       return nil, ErrAckTimeout
                case <-ctx.Done():
                        return nil, nil
                }

we are also saying that the underlying component will consider the message as successfully delivered as the ctx.Done() was reached but there's no evidence that the message was processed, so it leads in having messages being not at-least-once delivered, or not?

Please, see if it makes sense to you @1046102779

@mcandeia
Copy link
Collaborator

ping @1046102779

@1046102779
Copy link
Author

hey @1046102779 sorry for taking too long for responding back,

If the channel pendingAck receives data timeout (default: 1s), the ACK API will directly return nil, but after executing the Send API, waiting for pendingAck will hang in/out, and the select <-ctx.Done here is to wait for the upstream to actively complete the request. and returns ErrAckTimeout. In fact, the ErrAckTimeout returned here is not suitable, it should be the upstream end of the request.

Please, notice that ctx.Done() is not for the entire upstream request (the input binding stream/pubsub stream), the context in this case is provided by the underlying component, so as a good practice it should be tied to the message consumption process (i.e the timeout to consume the message, redis for instance uses the metadata field processingTimeout)

My main point is that when the Ack API has a processing timeout (default: 1s), the pendingAck returned by the Get API also needs to know this timeout, and then return directly without blocking waiting and relying on upstream active processing. Or should there be absolutely no packet loss?

The default ack timeout of 1s is used when the message was actually ack'ed but there's no consumer waiting for the ack, so it would probably lead in two scenarios:

  1. Message was already ack'ed (because the ack channel has size 1), hence it is duplicated so could be discarded without any problem
  2. Ack has timed out (now the default 1s) because the component couldn't wait enough for the message to be ack'ed

by using this

                msgID, pendingAck, cleanup, ackTimeoutFlagFunc := ackManager.Get()
                ......
                select {
                case resp := <-pendingAck:
                        return resp.data, resp.err
               case <-ackTimeoutFlagFunc(): // return Ack timeout flag.
                       return nil, ErrAckTimeout
                case <-ctx.Done():
                        return nil, nil
                }

we are also saying that the underlying component will consider the message as successfully delivered as the ctx.Done() was reached but there's no evidence that the message was processed, so it leads in having messages being not at-least-once delivered, or not?

Please, see if it makes sense to you @1046102779

I'll take a closer look at the entire process. Thank you for your detailed reply.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants