Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restructure Token Issuer #97

Open
wants to merge 10 commits into
base: ads-stage
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions kafka/error_codes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package kafka

const (
// Error types that determine commit or retry for a given offset
PERMANENT = 0
TEMPORARY = 1
// Error types that determine which error response is includeded in the emission to
// Kafka for a given request failure.
OK = 0
DUPLICATE_REDEMPTION = 1
UNVERIFIED = 2
ERROR = 3
)
50 changes: 31 additions & 19 deletions kafka/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"
"time"

batgo_handlers "github.com/brave-intl/bat-go/utils/handlers"
batgo_kafka "github.com/brave-intl/bat-go/utils/kafka"
"github.com/brave-intl/challenge-bypass-server/server"
uuid "github.com/google/uuid"
Expand All @@ -18,7 +19,12 @@ import (

var brokers []string

type Processor func([]byte, *kafka.Writer, *server.Server, *zerolog.Logger) error
type Processor func(
[]byte,
*kafka.Writer,
*server.Server,
*zerolog.Logger,
) []*batgo_handlers.AppError

type TopicMapping struct {
Topic string
Expand Down Expand Up @@ -74,6 +80,7 @@ func StartConsumers(providedServer *server.Server, logger *zerolog.Logger) error
for i := 1; i <= consumerCount; i++ {
go func(topicMappings []TopicMapping) {
consumer := newConsumer(topics, adsConsumerGroupV1, logger)
logger.Info().Msg(fmt.Sprintf("Reader Stats: %#v", consumer.Stats()))
var (
failureCount = 0
failureLimit = 10
Expand All @@ -93,27 +100,32 @@ func StartConsumers(providedServer *server.Server, logger *zerolog.Logger) error
continue
}
logger.Info().Msg(fmt.Sprintf("Processing message for topic %s at offset %d", msg.Topic, msg.Offset))
logger.Info().Msg(fmt.Sprintf("Reader Stats: %#v", consumer.Stats()))
for _, topicMapping := range topicMappings {
if msg.Topic == topicMapping.Topic {
go func(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m curious! Is there any disadvantage on keeping this goroutine here?

msg kafka.Message,
topicMapping TopicMapping,
providedServer *server.Server,
logger *zerolog.Logger,
) {
err := topicMapping.Processor(
msg.Value,
topicMapping.ResultProducer,
providedServer,
logger,
)
if err != nil {
logger.Error().Err(err).Msg("Processing failed.")
appErrors := topicMapping.Processor(
// Readbatch instead of using msg.Value
// Batch has getMessage (readMessage?) which is a msg that we can run value on. Iterate on this top level valueh
msg.Value,
topicMapping.ResultProducer,
providedServer,
logger,
)
shallCommit := true
if appErrors != nil {
for _, appErr := range appErrors {
if appErr.Code == TEMPORARY {
logger.Error().Err(appErr.Cause).Msg("Processing failed with temporary error. Not committing. Will retry.")
shallCommit = false
}
}
}
if shallCommit {
logger.Info().Msg(fmt.Sprintf("Processing completed without error. Committing."))
if err := consumer.CommitMessages(ctx, msg); err != nil {
logger.Error().Err(err).Msg("Failed to commit")
} else {
logger.Info().Msg(fmt.Sprintf("Committed offset %d for topic %s.", msg.Offset, msg.Topic))
}
}(msg, topicMapping, providedServer, logger)
if err := consumer.CommitMessages(ctx, msg); err != nil {
logger.Error().Msg(fmt.Sprintf("Failed to commit: %s", err))
}
}
}
Expand Down
Loading