From cc2917281296e2664d96289b8682c245bfe8a117 Mon Sep 17 00:00:00 2001 From: Pierre Tessier Date: Thu, 6 Jun 2024 09:08:28 -0400 Subject: [PATCH] [checkout] fix kafka restart (#1590) * fix kafka acks * Update src/checkoutservice/main.go Co-authored-by: Juliano Costa * align attribute names with semantic conventions * checkout: recover from kafka restart --------- Co-authored-by: Juliano Costa --- CHANGELOG.md | 2 + src/checkoutservice/kafka/producer.go | 10 +++ src/checkoutservice/main.go | 107 +++++++++++++++++--------- 3 files changed, 83 insertions(+), 36 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b4a58674dc..b7c65a739f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,8 @@ the release. ([#1592](https://github.com/open-telemetry/opentelemetry-demo/pull/1592)) * chore: Add service version to OTEL_RESOURCE_ATTRIBUTES ([#1594](https://github.com/open-telemetry/opentelemetry-demo/pull/1594)) +* [checkout] increase Kafka resiliency and observability + ([#1590](https://github.com/open-telemetry/opentelemetry-demo/pull/1590)) ## 1.9.0 diff --git a/src/checkoutservice/kafka/producer.go b/src/checkoutservice/kafka/producer.go index 6ee773d8c2..1304f1b976 100644 --- a/src/checkoutservice/kafka/producer.go +++ b/src/checkoutservice/kafka/producer.go @@ -13,8 +13,18 @@ var ( ) func CreateKafkaProducer(brokers []string, log *logrus.Logger) (sarama.AsyncProducer, error) { + sarama.Logger = log + saramaConfig := sarama.NewConfig() + saramaConfig.Producer.Return.Successes = true + saramaConfig.Producer.Return.Errors = true + + // Sarama has an issue in a single broker kafka if the kafka broker is restarted. + // This setting is to prevent that issue from manifesting itself, but may swallow failed messages. + saramaConfig.Producer.RequiredAcks = sarama.NoResponse + saramaConfig.Version = ProtocolVersion + // So we can know the partition and offset of messages. saramaConfig.Producer.Return.Successes = true diff --git a/src/checkoutservice/main.go b/src/checkoutservice/main.go index b2acb998c6..7d258b3c48 100644 --- a/src/checkoutservice/main.go +++ b/src/checkoutservice/main.go @@ -20,11 +20,15 @@ import ( "github.com/IBM/sarama" "github.com/google/uuid" + otelhooks "github.com/open-feature/go-sdk-contrib/hooks/open-telemetry/pkg" + flagd "github.com/open-feature/go-sdk-contrib/providers/flagd/pkg" + "github.com/open-feature/go-sdk/openfeature" "github.com/sirupsen/logrus" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.opentelemetry.io/contrib/instrumentation/runtime" "go.opentelemetry.io/otel" + otelcodes "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" "go.opentelemetry.io/otel/propagation" @@ -34,9 +38,6 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" - flagd "github.com/open-feature/go-sdk-contrib/providers/flagd/pkg" - "github.com/open-feature/go-sdk/openfeature" - otelhooks "github.com/open-feature/go-sdk-contrib/hooks/open-telemetry/pkg" healthpb "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" @@ -440,13 +441,13 @@ func (cs *checkoutService) convertCurrency(ctx context.Context, from *pb.Money, } func (cs *checkoutService) chargeCard(ctx context.Context, amount *pb.Money, paymentInfo *pb.CreditCardInfo) (string, error) { - paymentService := cs.paymentSvcClient + paymentService := cs.paymentSvcClient if cs.isFeatureFlagEnabled(ctx, "paymentServiceUnreachable") { - badAddress := "badAddress:50051" - c := mustCreateClient(context.Background(), badAddress) + badAddress := "badAddress:50051" + c := mustCreateClient(context.Background(), badAddress) paymentService = pb.NewPaymentServiceClient(c) - } - + } + paymentResp, err := paymentService.Charge(ctx, &pb.ChargeRequest{ Amount: amount, CreditCard: paymentInfo}) @@ -504,18 +505,52 @@ func (cs *checkoutService) sendToPostProcessor(ctx context.Context, result *pb.O span := createProducerSpan(ctx, &msg) defer span.End() - cs.KafkaProducerClient.Input() <- &msg - successMsg := <-cs.KafkaProducerClient.Successes() - log.Infof("Successful to write message. offset: %v", successMsg.Offset) + // Send message and handle response + startTime := time.Now() + select { + case cs.KafkaProducerClient.Input() <- &msg: + log.Infof("Message sent to Kafka: %v", msg) + select { + case successMsg := <-cs.KafkaProducerClient.Successes(): + span.SetAttributes( + attribute.Bool("messaging.kafka.producer.success", true), + attribute.Int("messaging.kafka.producer.duration_ms", int(time.Since(startTime).Milliseconds())), + attribute.KeyValue(semconv.MessagingKafkaMessageOffset(int(successMsg.Offset))), + ) + log.Infof("Successful to write message. offset: %v, duration: %v", successMsg.Offset, time.Since(startTime)) + case errMsg := <-cs.KafkaProducerClient.Errors(): + span.SetAttributes( + attribute.Bool("messaging.kafka.producer.success", false), + attribute.Int("messaging.kafka.producer.duration_ms", int(time.Since(startTime).Milliseconds())), + ) + span.SetStatus(otelcodes.Error, errMsg.Err.Error()) + log.Errorf("Failed to write message: %v", errMsg.Err) + case <-ctx.Done(): + span.SetAttributes( + attribute.Bool("messaging.kafka.producer.success", false), + attribute.Int("messaging.kafka.producer.duration_ms", int(time.Since(startTime).Milliseconds())), + ) + span.SetStatus(otelcodes.Error, "Context cancelled: "+ctx.Err().Error()) + log.Warnf("Context canceled before success message received: %v", ctx.Err()) + } + case <-ctx.Done(): + span.SetAttributes( + attribute.Bool("messaging.kafka.producer.success", false), + attribute.Int("messaging.kafka.producer.duration_ms", int(time.Since(startTime).Milliseconds())), + ) + span.SetStatus(otelcodes.Error, "Failed to send: "+ctx.Err().Error()) + log.Errorf("Failed to send message to Kafka within context deadline: %v", ctx.Err()) + return + } ffValue := cs.getIntFeatureFlag(ctx, "kafkaQueueProblems") if ffValue > 0 { log.Infof("Warning: FeatureFlag 'kafkaQueueProblems' is activated, overloading queue now.") for i := 0; i < ffValue; i++ { - go func(i int) { - cs.KafkaProducerClient.Input() <- &msg + go func(i int) { + cs.KafkaProducerClient.Input() <- &msg _ = <-cs.KafkaProducerClient.Successes() - }(i) + }(i) } log.Infof("Done with #%d messages for overload simulation.", ffValue) } @@ -548,29 +583,29 @@ func createProducerSpan(ctx context.Context, msg *sarama.ProducerMessage) trace. } func (cs *checkoutService) isFeatureFlagEnabled(ctx context.Context, featureFlagName string) bool { - client := openfeature.NewClient("checkout") - - // Default value is set to false, but you could also make this a parameter. - featureEnabled, _ := client.BooleanValue( - ctx, - featureFlagName, - false, - openfeature.EvaluationContext{}, - ) - - return featureEnabled + client := openfeature.NewClient("checkout") + + // Default value is set to false, but you could also make this a parameter. + featureEnabled, _ := client.BooleanValue( + ctx, + featureFlagName, + false, + openfeature.EvaluationContext{}, + ) + + return featureEnabled } func (cs *checkoutService) getIntFeatureFlag(ctx context.Context, featureFlagName string) int { - client := openfeature.NewClient("checkout") - - // Default value is set to 0, but you could also make this a parameter. - featureFlagValue, _ := client.IntValue( - ctx, - featureFlagName, - 0, - openfeature.EvaluationContext{}, - ) - - return int(featureFlagValue) + client := openfeature.NewClient("checkout") + + // Default value is set to 0, but you could also make this a parameter. + featureFlagValue, _ := client.IntValue( + ctx, + featureFlagName, + 0, + openfeature.EvaluationContext{}, + ) + + return int(featureFlagValue) }