Skip to content

Commit

Permalink
removed redundant internal message.go
Browse files Browse the repository at this point in the history
  • Loading branch information
astelmashenko committed Oct 21, 2024
1 parent 56921ee commit 7e35dba
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 192 deletions.
87 changes: 0 additions & 87 deletions pkg/channel/jetstream/dispatcher/internal/message.go

This file was deleted.

80 changes: 0 additions & 80 deletions pkg/channel/jetstream/dispatcher/internal/nats_message.go

This file was deleted.

24 changes: 16 additions & 8 deletions pkg/channel/jetstream/dispatcher/message_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"net/http"
"time"

"knative.dev/eventing-natss/pkg/channel/jetstream/dispatcher/internal"
"github.com/nats-io/nats.go"

"knative.dev/pkg/apis"

Expand Down Expand Up @@ -110,7 +110,7 @@ type senderConfig struct {
oidcServiceAccount *types.NamespacedName
}

func SendMessage(dispatcher *kncloudevents.Dispatcher, ctx context.Context, message binding.Message, destination duckv1.Addressable, ackWait time.Duration, msg internal.NatsMessageWrapper, options ...SendOption) (*kncloudevents.DispatchInfo, error) {
func SendMessage(dispatcher *kncloudevents.Dispatcher, ctx context.Context, message binding.Message, destination duckv1.Addressable, ackWait time.Duration, msg *nats.Msg, options ...SendOption) (*kncloudevents.DispatchInfo, error) {
config := &senderConfig{
additionalHeaders: make(http.Header),
}
Expand All @@ -125,7 +125,7 @@ func SendMessage(dispatcher *kncloudevents.Dispatcher, ctx context.Context, mess
return send(dispatcher, ctx, message, destination, ackWait, msg, config)
}

func send(dispatcher *kncloudevents.Dispatcher, ctx context.Context, message binding.Message, destination duckv1.Addressable, ackWait time.Duration, msg internal.NatsMessageWrapper, config *senderConfig) (*kncloudevents.DispatchInfo, error) {
func send(dispatcher *kncloudevents.Dispatcher, ctx context.Context, message binding.Message, destination duckv1.Addressable, ackWait time.Duration, msg *nats.Msg, config *senderConfig) (*kncloudevents.DispatchInfo, error) {
logger := logging.FromContext(ctx)
dispatchExecutionInfo := &kncloudevents.DispatchInfo{}

Expand All @@ -150,7 +150,7 @@ func send(dispatcher *kncloudevents.Dispatcher, ctx context.Context, message bin
var noRetires = kncloudevents.NoRetries()
var lastTry bool

retryNumber, err := msg.NumDelivered()
retryNumber, err := numDelivered(msg)
if err != nil {
retryNumber = 1

Check warning on line 155 in pkg/channel/jetstream/dispatcher/message_dispatcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/channel/jetstream/dispatcher/message_dispatcher.go#L155

Added line #L155 was not covered by tests
logger.Errorw("failed to get nats message metadata, assuming it is 1", zap.Error(err))
Expand Down Expand Up @@ -243,7 +243,15 @@ func send(dispatcher *kncloudevents.Dispatcher, ctx context.Context, message bin
return dispatchExecutionInfo, nil
}

func processDispatchResult(ctx context.Context, msg internal.NatsMessageWrapper, retryConfig *kncloudevents.RetryConfig, retryNumber int, dispatchExecutionInfo *kncloudevents.DispatchInfo, err error) {
func numDelivered(msg *nats.Msg) (int, error) {
meta, err := msg.Metadata()
if err != nil {
return 0, err
}

Check warning on line 250 in pkg/channel/jetstream/dispatcher/message_dispatcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/channel/jetstream/dispatcher/message_dispatcher.go#L249-L250

Added lines #L249 - L250 were not covered by tests
return int(meta.NumDelivered), nil
}

func processDispatchResult(ctx context.Context, msg *nats.Msg, retryConfig *kncloudevents.RetryConfig, retryNumber int, dispatchExecutionInfo *kncloudevents.DispatchInfo, err error) {
logger := logging.FromContext(ctx)
result := protocol.ResultACK

Expand All @@ -263,15 +271,15 @@ func processDispatchResult(ctx context.Context, msg internal.NatsMessageWrapper,

switch {
case protocol.IsACK(result):
if err := msg.Ack(ctx); err != nil {
if err := msg.Ack(nats.Context(ctx)); err != nil {
logger.Error("failed to Ack message after successful delivery to subscriber", zap.Error(err))
}
case protocol.IsNACK(result):
if err := msg.NakWithDelay(jsutils.CalculateNakDelayForRetryNumber(retryNumber, retryConfig), ctx); err != nil {
if err := msg.NakWithDelay(jsutils.CalculateNakDelayForRetryNumber(retryNumber, retryConfig), nats.Context(ctx)); err != nil {
logger.Error("failed to Nack message after failed delivery to subscriber", zap.Error(err))
}
default:
if err := msg.Term(ctx); err != nil {
if err := msg.Term(nats.Context(ctx)); err != nil {
logger.Error("failed to Term message after failed delivery to subscriber", zap.Error(err))
}
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/channel/jetstream/dispatcher/message_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import (
"testing"
"time"

"knative.dev/eventing-natss/pkg/channel/jetstream/dispatcher/internal"

"github.com/nats-io/nats.go"

"github.com/nats-io/nats-server/v2/server"
Expand Down Expand Up @@ -568,7 +566,7 @@ func TestDispatchMessage(t *testing.T) {
message,
destination,
ackWait,
internal.NewNatsMessageWrapper(msg),
msg,
WithReply(&replyDestination),
WithDeadLetterSink(&deadLetterSinkDestination),
WithRetryConfig(&retryConfig),
Expand Down
26 changes: 15 additions & 11 deletions pkg/channel/jetstream/dispatcher/pull_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ import (
"sync"
"time"

cejs "github.com/cloudevents/sdk-go/protocol/nats_jetstream/v2"
"github.com/cloudevents/sdk-go/v2/binding"

"go.opencensus.io/trace"
"knative.dev/eventing-natss/pkg/tracing"

"github.com/cloudevents/sdk-go/v2/binding"
"github.com/nats-io/nats.go"
"go.uber.org/zap"
"knative.dev/eventing-natss/pkg/channel/jetstream/dispatcher/internal"
eventingchannels "knative.dev/eventing/pkg/channel"
"knative.dev/eventing/pkg/channel/fanout"
"knative.dev/eventing/pkg/kncloudevents"
Expand Down Expand Up @@ -166,9 +167,9 @@ func (c *PullConsumer) consumeMessages(ctx context.Context, batch nats.MessageBa
go func() {
defer wg.Done()
ctx := logging.WithLogger(ctx, c.logger.With(zap.String("msg_id", natsMsg.Header.Get(nats.MsgIdHdr))))
msg := internal.NewMessage(ctx, natsMsg, c.natsConsumerInfo.Config.AckWait)
//msg := internal.NewMessage(ctx, natsMsg, c.natsConsumerInfo.Config.AckWait)

if err := c.handleMessage(msg); err != nil {
if err := c.handleMessage(ctx, natsMsg); err != nil {
// handleMessage only errors if the message cannot be finished, any other error
// is consumed by msg.Finish(err)
logging.FromContext(ctx).Errorw("failed to finish message", zap.Error(err))
Expand Down Expand Up @@ -205,12 +206,13 @@ func updatePullSubscriptionConfig(config *ChannelConfig, sub *Subscription) {
}

Check warning on line 206 in pkg/channel/jetstream/dispatcher/pull_consumer.go

View check run for this annotation

Codecov / codecov/patch

pkg/channel/jetstream/dispatcher/pull_consumer.go#L203-L206

Added lines #L203 - L206 were not covered by tests
}

func (c *PullConsumer) handleMessage(msg internal.Message) (err error) {
func (c *PullConsumer) handleMessage(ctx context.Context, msg *nats.Msg) (err error) {
// ensure that c.sub is not modified while we are handling a message
c.subMu.RLock()
defer c.subMu.RUnlock()

ctx := msg.Context()
ctx, finish := context.WithTimeout(ctx, c.natsConsumerInfo.Config.AckWait)
defer finish()

logger := logging.FromContext(ctx)

Expand All @@ -223,11 +225,13 @@ func (c *PullConsumer) handleMessage(msg internal.Message) (err error) {
logger.Debugw("received message from JetStream consumer", debugKVs...)

Check warning on line 225 in pkg/channel/jetstream/dispatcher/pull_consumer.go

View check run for this annotation

Codecov / codecov/patch

pkg/channel/jetstream/dispatcher/pull_consumer.go#L225

Added line #L225 was not covered by tests
}

if msg.ReadEncoding() == binding.EncodingUnknown {
return errors.New("received a message with unknown encoding")
message := cejs.NewMessage(msg)
if message.ReadEncoding() == binding.EncodingUnknown {
logger.Errorw("received a message with unknown encoding")
return
}

Check warning on line 232 in pkg/channel/jetstream/dispatcher/pull_consumer.go

View check run for this annotation

Codecov / codecov/patch

pkg/channel/jetstream/dispatcher/pull_consumer.go#L228-L232

Added lines #L228 - L232 were not covered by tests

event := tracing.ConvertNatsMsgToEvent(c.logger.Desugar(), msg.NatsMessage())
event := tracing.ConvertNatsMsgToEvent(c.logger.Desugar(), msg)
additionalHeaders := tracing.ConvertEventToHttpHeader(event)

sc, ok := tracing.ParseSpanContext(event)
Expand All @@ -246,10 +250,10 @@ func (c *PullConsumer) handleMessage(msg internal.Message) (err error) {
dispatchExecutionInfo, err := SendMessage(
c.dispatcher,
ctx,
msg,
message,
c.sub.Subscriber,
c.natsConsumerInfo.Config.AckWait,
internal.NewNatsMessageWrapper(msg.NatsMessage()),
msg,
WithReply(c.sub.Reply),
WithDeadLetterSink(c.sub.DeadLetter),
WithRetryConfig(c.sub.RetryConfig),
Expand Down
4 changes: 1 addition & 3 deletions pkg/channel/jetstream/dispatcher/push_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"errors"
"sync"

"knative.dev/eventing-natss/pkg/channel/jetstream/dispatcher/internal"

"knative.dev/eventing/pkg/kncloudevents"

cejs "github.com/cloudevents/sdk-go/protocol/nats_jetstream/v2"
Expand Down Expand Up @@ -137,7 +135,7 @@ func (c *PushConsumer) doHandle(ctx context.Context, msg *nats.Msg) {
message,
c.sub.Subscriber,
c.natsConsumerInfo.Config.AckWait,
internal.NewNatsMessageWrapper(msg),
msg,
WithReply(c.sub.Reply),
WithDeadLetterSink(c.sub.DeadLetter),
WithRetryConfig(c.sub.RetryConfig),
Expand Down

0 comments on commit 7e35dba

Please sign in to comment.