Skip to content

Commit

Permalink
Release v1.27.1
Browse files Browse the repository at this point in the history
  • Loading branch information
aryanmehrotra authored Nov 17, 2024
2 parents 9990977 + 9a6e8b8 commit 9a97ec5
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 41 deletions.
91 changes: 54 additions & 37 deletions pkg/gofr/datasource/pubsub/google/google.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"errors"
"strings"
"sync"
"time"

gcPubSub "cloud.google.com/go/pubsub"
Expand All @@ -30,7 +31,9 @@ type googleClient struct {
client Client
logger pubsub.Logger
metrics Metrics
recieveChan chan *pubsub.Message
receiveChan map[string]chan *pubsub.Message
subStarted map[string]struct{}
mu sync.RWMutex
}

//nolint:revive // We do not want anyone using the client without initialization steps.
Expand Down Expand Up @@ -58,7 +61,9 @@ func New(conf Config, logger pubsub.Logger, metrics Metrics) *googleClient {
client: client,
logger: logger,
metrics: metrics,
recieveChan: make(chan *pubsub.Message),
receiveChan: make(map[string]chan *pubsub.Message),
subStarted: make(map[string]struct{}),
mu: sync.RWMutex{},
}
}

Expand Down Expand Up @@ -117,63 +122,71 @@ func (g *googleClient) Publish(ctx context.Context, topic string, message []byte
}

func (g *googleClient) Subscribe(ctx context.Context, topic string) (*pubsub.Message, error) {
receiveChan := make(chan *pubsub.Message)
defer close(receiveChan)
var end time.Duration

ctx, span := otel.GetTracerProvider().Tracer("gofr").Start(ctx, "gcp-subscribe")
spanCtx, span := otel.GetTracerProvider().Tracer("gofr").Start(ctx, "gcp-subscribe")
defer span.End()

g.metrics.IncrementCounter(ctx, "app_pubsub_subscribe_total_count", "topic", topic, "subscription_name", g.Config.SubscriptionName)
g.metrics.IncrementCounter(spanCtx, "app_pubsub_subscribe_total_count", "topic", topic, "subscription_name", g.Config.SubscriptionName)

t, err := g.getTopic(ctx, topic)
if err != nil {
return nil, err
}
if _, ok := g.subStarted[topic]; !ok {
t, err := g.getTopic(spanCtx, topic)
if err != nil {
return nil, err
}

subscription, err := g.getSubscription(ctx, t)
if err != nil {
return nil, err
}
subscription, err := g.getSubscription(spanCtx, t)
if err != nil {
return nil, err
}

start := time.Now()
cctx, cancel := context.WithCancel(ctx)
start := time.Now()

go func() {
processMessage := func(ctx context.Context, msg *gcPubSub.Message) {
m := pubsub.NewMessage(ctx)
end := time.Since(start)
end = time.Since(start)

m.Topic = topic
m.Value = msg.Data
m.MetaData = msg.Attributes
m.Committer = newGoogleMessage(msg)

g.logger.Debug(&pubsub.Log{
Mode: "SUB",
CorrelationID: span.SpanContext().TraceID().String(),
MessageValue: string(m.Value),
Topic: topic,
Host: g.Config.ProjectID,
PubSubBackend: "GCP",
Time: end.Microseconds(),
})

receiveChan <- m
g.mu.Lock()
defer g.mu.Unlock()

cancel()
g.receiveChan[topic] <- m
}

err = subscription.Receive(cctx, processMessage)
if err != nil {
g.logger.Errorf("error getting a message from google: %s", err.Error())
}
}()
// initialize the channel before we can start receiving on it
g.mu.Lock()
g.receiveChan[topic] = make(chan *pubsub.Message)
g.mu.Unlock()

go func() {
err = subscription.Receive(ctx, processMessage)
if err != nil {
g.logger.Errorf("error getting a message from google: %s", err.Error())
}
}()

g.subStarted[topic] = struct{}{}
}

select {
case m := <-receiveChan:
g.metrics.IncrementCounter(ctx, "app_pubsub_subscribe_success_count", "topic", topic, "subscription_name",
case m := <-g.receiveChan[topic]:
g.metrics.IncrementCounter(spanCtx, "app_pubsub_subscribe_success_count", "topic", topic, "subscription_name",
g.Config.SubscriptionName)

g.logger.Debug(&pubsub.Log{
Mode: "SUB",
CorrelationID: span.SpanContext().TraceID().String(),
MessageValue: string(m.Value),
Topic: topic,
Host: g.Config.ProjectID,
PubSubBackend: "GCP",
Time: end.Microseconds(),
})

return m, nil
case <-ctx.Done():
return nil, nil
Expand Down Expand Up @@ -244,5 +257,9 @@ func (g *googleClient) Close() error {
return g.client.Close()
}

for _, c := range g.receiveChan {
close(c)
}

return nil
}
7 changes: 4 additions & 3 deletions pkg/gofr/datasource/pubsub/google/google_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"gofr.dev/pkg/gofr/datasource/pubsub"
"gofr.dev/pkg/gofr/logging"
"gofr.dev/pkg/gofr/testutil"
)
Expand Down Expand Up @@ -174,17 +175,17 @@ func Test_validateConfigs(t *testing.T) {
}

func TestGoogleClient_CloseReturnsError(t *testing.T) {
// client already present
g := &googleClient{
client: getGoogleClient(t),
client: getGoogleClient(t),
receiveChan: make(map[string]chan *pubsub.Message),
}

err := g.Close()

require.NoError(t, err)

// client empty
g = &googleClient{}
g = &googleClient{receiveChan: make(map[string]chan *pubsub.Message)}

err = g.Close()

Expand Down
2 changes: 1 addition & 1 deletion pkg/gofr/version/version.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package version

const Framework = "v1.27.0"
const Framework = "v1.27.1"

0 comments on commit 9a97ec5

Please sign in to comment.