Skip to content

Commit

Permalink
Merge pull request #237 from signadot/kafka-consumer-improvements
Browse files Browse the repository at this point in the history
Adjust kafka config settings
  • Loading branch information
daniel-de-vera authored Mar 12, 2024
2 parents bf854f5 + 39deef3 commit 6b65d94
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 50 deletions.
6 changes: 6 additions & 0 deletions pkg/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kafka

import (
"fmt"
"time"

"github.com/IBM/sarama"
"github.com/dnwe/otelsarama"
Expand Down Expand Up @@ -57,5 +58,10 @@ func getConfig(clientID string) *sarama.Config {
conf.Version = sarama.V1_1_0_0
conf.Net.TLS.Enable = false
conf.Net.SASL.Enable = false
conf.Net.DialTimeout = 5 * time.Second
conf.Net.ReadTimeout = 5 * time.Second
conf.Net.WriteTimeout = 5 * time.Second
conf.Producer.Timeout = 6 * time.Second
conf.Consumer.Group.Session.Timeout = 6 * time.Second
return conf
}
19 changes: 13 additions & 6 deletions services/driver/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,19 @@ func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
session.MarkMessage(message, "")
consumer.processDispatchRequest(message)
for {
select {
case message, ok := <-claim.Messages():
if !ok {
consumer.logger.Bg().Error("message channel was closed")
return nil
}
session.MarkMessage(message, "")
consumer.processDispatchRequest(message)
case <-session.Context().Done():
return nil
}
}

return nil
}

func (consumer *Consumer) shouldProcess(routingKey string) bool {
Expand Down Expand Up @@ -146,7 +153,7 @@ func (consumer *Consumer) processDispatchRequest(msg *sarama.ConsumerMessage) {
// get the driver with the best ETA
bestDriver, err := consumer.bestETA.Get(ctx, &dispatchReq, drivers)
if err != nil {
consumer.logger.For(ctx).Error("error calculating best route")
consumer.logger.For(ctx).Error("error calculating best route", zap.Error(err))
span.SetStatus(codes.Error, err.Error())
return
}
Expand Down
47 changes: 4 additions & 43 deletions services/driver/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,47 +128,8 @@ func (p *Processor) Run() error {
}
cancel()
wg.Wait()
return consumerGroup.Close()
if err := consumerGroup.Close(); err != nil {
p.logger.For(ctx).Error("could not close kafka consumer group", zap.Error(err))
}
return nil
}

// FindNearest implements gRPC driver interface
// func (s *Server) FindNearest(ctx context.Context, location *DriverLocationRequest) (*DriverLocationResponse, error) {
// return &DriverLocationResponse{}, nil
// // s.logger.For(ctx).Info("Searching for nearby drivers", zap.String("location", location.Location))
// // driverIDs := s.redis.FindDriverIDs(ctx, location.Location)

// // locations := make([]*DriverLocation, len(driverIDs))
// // for i, driverID := range driverIDs {
// // var drv Driver
// // var err error
// // for i := 0; i < 3; i++ {
// // drv, err = s.redis.GetDriver(ctx, driverID)
// // if err == nil {
// // break
// // }
// // s.logger.For(ctx).Error("Retrying GetDriver after error", zap.Int("retry_no", i+1), zap.Error(err))
// // }
// // if err != nil {
// // s.logger.For(ctx).Error("Failed to get driver after 3 attempts", zap.Error(err))
// // return nil, err
// // }
// // locations[i] = &DriverLocation{
// // DriverID: drv.DriverID,
// // Location: drv.Location,
// // }
// // }
// // s.logger.For(ctx).Info(
// // "Search successful",
// // zap.Int("driver_count", len(locations)),
// // zap.String("locations", toJSON(locations)),
// // )
// // return &DriverLocationResponse{Locations: locations}, nil
// }

// func toJSON(v any) string {
// str, err := json.Marshal(v)
// if err != nil {
// return err.Error()
// }
// return string(str)
// }
2 changes: 1 addition & 1 deletion services/route/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (s *Server) Run() error {
return err
}

// FindNearest implements gRPC driver interface
// FindRoute implements gRPC route interface
func (s *Server) FindRoute(ctx context.Context, req *FindRouteRequest) (*FindRouteResponse, error) {
s.logger.For(ctx).Info("Finding route", zap.String("from", req.From), zap.String("to", req.To))

Expand Down

0 comments on commit 6b65d94

Please sign in to comment.