Skip to content

Commit

Permalink
[checkout] fix kafka restart (#1590)
Browse files Browse the repository at this point in the history
* fix kafka acks

* Update src/checkoutservice/main.go

Co-authored-by: Juliano Costa <[email protected]>

* align attribute names with semantic conventions

* checkout: recover from kafka restart

---------

Co-authored-by: Juliano Costa <[email protected]>
  • Loading branch information
puckpuck and julianocosta89 committed Jun 6, 2024
1 parent d75c9d8 commit cc29172
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 36 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ the release.
([#1592](https://github.com/open-telemetry/opentelemetry-demo/pull/1592))
* chore: Add service version to OTEL_RESOURCE_ATTRIBUTES
([#1594](https://github.com/open-telemetry/opentelemetry-demo/pull/1594))
* [checkout] increase Kafka resiliency and observability
([#1590](https://github.com/open-telemetry/opentelemetry-demo/pull/1590))

## 1.9.0

Expand Down
10 changes: 10 additions & 0 deletions src/checkoutservice/kafka/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,18 @@ var (
)

func CreateKafkaProducer(brokers []string, log *logrus.Logger) (sarama.AsyncProducer, error) {
sarama.Logger = log

saramaConfig := sarama.NewConfig()
saramaConfig.Producer.Return.Successes = true
saramaConfig.Producer.Return.Errors = true

// Sarama has an issue in a single broker kafka if the kafka broker is restarted.
// This setting is to prevent that issue from manifesting itself, but may swallow failed messages.
saramaConfig.Producer.RequiredAcks = sarama.NoResponse

saramaConfig.Version = ProtocolVersion

// So we can know the partition and offset of messages.
saramaConfig.Producer.Return.Successes = true

Expand Down
107 changes: 71 additions & 36 deletions src/checkoutservice/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@ import (

"github.com/IBM/sarama"
"github.com/google/uuid"
otelhooks "github.com/open-feature/go-sdk-contrib/hooks/open-telemetry/pkg"
flagd "github.com/open-feature/go-sdk-contrib/providers/flagd/pkg"
"github.com/open-feature/go-sdk/openfeature"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/contrib/instrumentation/runtime"
"go.opentelemetry.io/otel"
otelcodes "go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/propagation"
Expand All @@ -34,9 +38,6 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
flagd "github.com/open-feature/go-sdk-contrib/providers/flagd/pkg"
"github.com/open-feature/go-sdk/openfeature"
otelhooks "github.com/open-feature/go-sdk-contrib/hooks/open-telemetry/pkg"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -440,13 +441,13 @@ func (cs *checkoutService) convertCurrency(ctx context.Context, from *pb.Money,
}

func (cs *checkoutService) chargeCard(ctx context.Context, amount *pb.Money, paymentInfo *pb.CreditCardInfo) (string, error) {
paymentService := cs.paymentSvcClient
paymentService := cs.paymentSvcClient
if cs.isFeatureFlagEnabled(ctx, "paymentServiceUnreachable") {
badAddress := "badAddress:50051"
c := mustCreateClient(context.Background(), badAddress)
badAddress := "badAddress:50051"
c := mustCreateClient(context.Background(), badAddress)
paymentService = pb.NewPaymentServiceClient(c)
}
}

paymentResp, err := paymentService.Charge(ctx, &pb.ChargeRequest{
Amount: amount,
CreditCard: paymentInfo})
Expand Down Expand Up @@ -504,18 +505,52 @@ func (cs *checkoutService) sendToPostProcessor(ctx context.Context, result *pb.O
span := createProducerSpan(ctx, &msg)
defer span.End()

cs.KafkaProducerClient.Input() <- &msg
successMsg := <-cs.KafkaProducerClient.Successes()
log.Infof("Successful to write message. offset: %v", successMsg.Offset)
// Send message and handle response
startTime := time.Now()
select {
case cs.KafkaProducerClient.Input() <- &msg:
log.Infof("Message sent to Kafka: %v", msg)
select {
case successMsg := <-cs.KafkaProducerClient.Successes():
span.SetAttributes(
attribute.Bool("messaging.kafka.producer.success", true),
attribute.Int("messaging.kafka.producer.duration_ms", int(time.Since(startTime).Milliseconds())),
attribute.KeyValue(semconv.MessagingKafkaMessageOffset(int(successMsg.Offset))),
)
log.Infof("Successful to write message. offset: %v, duration: %v", successMsg.Offset, time.Since(startTime))
case errMsg := <-cs.KafkaProducerClient.Errors():
span.SetAttributes(
attribute.Bool("messaging.kafka.producer.success", false),
attribute.Int("messaging.kafka.producer.duration_ms", int(time.Since(startTime).Milliseconds())),
)
span.SetStatus(otelcodes.Error, errMsg.Err.Error())
log.Errorf("Failed to write message: %v", errMsg.Err)
case <-ctx.Done():
span.SetAttributes(
attribute.Bool("messaging.kafka.producer.success", false),
attribute.Int("messaging.kafka.producer.duration_ms", int(time.Since(startTime).Milliseconds())),
)
span.SetStatus(otelcodes.Error, "Context cancelled: "+ctx.Err().Error())
log.Warnf("Context canceled before success message received: %v", ctx.Err())
}
case <-ctx.Done():
span.SetAttributes(
attribute.Bool("messaging.kafka.producer.success", false),
attribute.Int("messaging.kafka.producer.duration_ms", int(time.Since(startTime).Milliseconds())),
)
span.SetStatus(otelcodes.Error, "Failed to send: "+ctx.Err().Error())
log.Errorf("Failed to send message to Kafka within context deadline: %v", ctx.Err())
return
}

ffValue := cs.getIntFeatureFlag(ctx, "kafkaQueueProblems")
if ffValue > 0 {
log.Infof("Warning: FeatureFlag 'kafkaQueueProblems' is activated, overloading queue now.")
for i := 0; i < ffValue; i++ {
go func(i int) {
cs.KafkaProducerClient.Input() <- &msg
go func(i int) {
cs.KafkaProducerClient.Input() <- &msg
_ = <-cs.KafkaProducerClient.Successes()
}(i)
}(i)
}
log.Infof("Done with #%d messages for overload simulation.", ffValue)
}
Expand Down Expand Up @@ -548,29 +583,29 @@ func createProducerSpan(ctx context.Context, msg *sarama.ProducerMessage) trace.
}

func (cs *checkoutService) isFeatureFlagEnabled(ctx context.Context, featureFlagName string) bool {
client := openfeature.NewClient("checkout")
// Default value is set to false, but you could also make this a parameter.
featureEnabled, _ := client.BooleanValue(
ctx,
featureFlagName,
false,
openfeature.EvaluationContext{},
)
return featureEnabled
client := openfeature.NewClient("checkout")

// Default value is set to false, but you could also make this a parameter.
featureEnabled, _ := client.BooleanValue(
ctx,
featureFlagName,
false,
openfeature.EvaluationContext{},
)

return featureEnabled
}

func (cs *checkoutService) getIntFeatureFlag(ctx context.Context, featureFlagName string) int {
client := openfeature.NewClient("checkout")
// Default value is set to 0, but you could also make this a parameter.
featureFlagValue, _ := client.IntValue(
ctx,
featureFlagName,
0,
openfeature.EvaluationContext{},
)
return int(featureFlagValue)
client := openfeature.NewClient("checkout")

// Default value is set to 0, but you could also make this a parameter.
featureFlagValue, _ := client.IntValue(
ctx,
featureFlagName,
0,
openfeature.EvaluationContext{},
)

return int(featureFlagValue)
}

0 comments on commit cc29172

Please sign in to comment.