diff --git a/kafka/error_codes.go b/kafka/error_codes.go new file mode 100644 index 00000000..d2410c09 --- /dev/null +++ b/kafka/error_codes.go @@ -0,0 +1,13 @@ +package kafka + +const ( + // Error types that determine commit or retry for a given offset + PERMANENT = 0 + TEMPORARY = 1 + // Error types that determine which error response is includeded in the emission to + // Kafka for a given request failure. + OK = 0 + DUPLICATE_REDEMPTION = 1 + UNVERIFIED = 2 + ERROR = 3 +) diff --git a/kafka/main.go b/kafka/main.go index 96a0f38f..a177a976 100644 --- a/kafka/main.go +++ b/kafka/main.go @@ -8,6 +8,7 @@ import ( "strings" "time" + batgo_handlers "github.com/brave-intl/bat-go/utils/handlers" batgo_kafka "github.com/brave-intl/bat-go/utils/kafka" "github.com/brave-intl/challenge-bypass-server/server" uuid "github.com/google/uuid" @@ -18,7 +19,12 @@ import ( var brokers []string -type Processor func([]byte, *kafka.Writer, *server.Server, *zerolog.Logger) error +type Processor func( + []byte, + *kafka.Writer, + *server.Server, + *zerolog.Logger, +) []*batgo_handlers.AppError type TopicMapping struct { Topic string @@ -74,6 +80,7 @@ func StartConsumers(providedServer *server.Server, logger *zerolog.Logger) error for i := 1; i <= consumerCount; i++ { go func(topicMappings []TopicMapping) { consumer := newConsumer(topics, adsConsumerGroupV1, logger) + logger.Info().Msg(fmt.Sprintf("Reader Stats: %#v", consumer.Stats())) var ( failureCount = 0 failureLimit = 10 @@ -93,27 +100,32 @@ func StartConsumers(providedServer *server.Server, logger *zerolog.Logger) error continue } logger.Info().Msg(fmt.Sprintf("Processing message for topic %s at offset %d", msg.Topic, msg.Offset)) - logger.Info().Msg(fmt.Sprintf("Reader Stats: %#v", consumer.Stats())) for _, topicMapping := range topicMappings { if msg.Topic == topicMapping.Topic { - go func( - msg kafka.Message, - topicMapping TopicMapping, - providedServer *server.Server, - logger *zerolog.Logger, - ) { - err := topicMapping.Processor( - msg.Value, - topicMapping.ResultProducer, - providedServer, - logger, - ) - if err != nil { - logger.Error().Err(err).Msg("Processing failed.") + appErrors := topicMapping.Processor( + // Readbatch instead of using msg.Value + // Batch has getMessage (readMessage?) which is a msg that we can run value on. Iterate on this top level valueh + msg.Value, + topicMapping.ResultProducer, + providedServer, + logger, + ) + shallCommit := true + if appErrors != nil { + for _, appErr := range appErrors { + if appErr.Code == TEMPORARY { + logger.Error().Err(appErr.Cause).Msg("Processing failed with temporary error. Not committing. Will retry.") + shallCommit = false + } + } + } + if shallCommit { + logger.Info().Msg(fmt.Sprintf("Processing completed without error. Committing.")) + if err := consumer.CommitMessages(ctx, msg); err != nil { + logger.Error().Err(err).Msg("Failed to commit") + } else { + logger.Info().Msg(fmt.Sprintf("Committed offset %d for topic %s.", msg.Offset, msg.Topic)) } - }(msg, topicMapping, providedServer, logger) - if err := consumer.CommitMessages(ctx, msg); err != nil { - logger.Error().Msg(fmt.Sprintf("Failed to commit: %s", err)) } } } diff --git a/kafka/signed_blinded_token_issuer_handler.go b/kafka/signed_blinded_token_issuer_handler.go index 5da8d302..868b0b48 100644 --- a/kafka/signed_blinded_token_issuer_handler.go +++ b/kafka/signed_blinded_token_issuer_handler.go @@ -4,7 +4,9 @@ import ( "bytes" "errors" "fmt" + "sync" + batgo_handlers "github.com/brave-intl/bat-go/utils/handlers" crypto "github.com/brave-intl/challenge-bypass-ristretto-ffi" avroSchema "github.com/brave-intl/challenge-bypass-server/avro/generated" "github.com/brave-intl/challenge-bypass-server/btd" @@ -24,135 +26,209 @@ func SignedBlindedTokenIssuerHandler( producer *kafka.Writer, server *cbpServer.Server, logger *zerolog.Logger, -) error { - const ( - OK = 0 - INVALID_ISSUER = 1 - ERROR = 2 - ) +) []*batgo_handlers.AppError { + var errorSet []*batgo_handlers.AppError blindedTokenRequestSet, err := avroSchema.DeserializeSigningRequestSet(bytes.NewReader(data)) if err != nil { - return errors.New(fmt.Sprintf("Request %s: Failed Avro deserialization: %e", blindedTokenRequestSet.Request_id, err)) + errorSet = append(errorSet, batgo_handlers.WrapError( + err, + fmt.Sprintf( + "Request %s: Failed Avro deserialization", + blindedTokenRequestSet.Request_id, + ), + PERMANENT, + )) + return errorSet } - var blindedTokenResults []avroSchema.SigningResult if len(blindedTokenRequestSet.Data) > 1 { // NOTE: When we start supporting multiple requests we will need to review // errors and return values as well. - return errors.New(fmt.Sprintf("Request %s: Data array unexpectedly contained more than a single message. This array is intended to make future extension easier, but no more than a single value is currently expected.", blindedTokenRequestSet.Request_id)) + message := fmt.Sprintf("Request %s: Data array unexpectedly contained more than a single message. This array is intended to make future extension easier, but no more than a single value is currently expected.", blindedTokenRequestSet.Request_id) + errorSet = append(errorSet, batgo_handlers.WrapError( + errors.New(message), + message, + PERMANENT, + )) + return errorSet } + var wg sync.WaitGroup + blindedTokenResultsChannel := make(chan avroSchema.SigningResult) + errorChannel := make(chan *batgo_handlers.AppError) + for _, request := range blindedTokenRequestSet.Data { - if request.Blinded_tokens == nil { - logger.Error().Msg(fmt.Sprintf("Request %s: Empty request", blindedTokenRequestSet.Request_id)) - continue - } + wg.Add(1) + go handleBlindedTokenRequest( + wg, + request, + blindedTokenRequestSet.Request_id, + blindedTokenResultsChannel, + errorChannel, + server, + logger, + ) + } + wg.Wait() - if request.Issuer_cohort != 0 && request.Issuer_cohort != 1 { - logger.Error().Msg( - fmt.Sprintf( - "Request %s: Provided cohort is not supported: %d", - blindedTokenRequestSet.Request_id, - request.Issuer_cohort, - ), - ) - continue - } + var blindedTokenResults []avroSchema.SigningResult + for blindedTokenResult := range blindedTokenResultsChannel { + blindedTokenResults = append(blindedTokenResults, blindedTokenResult) + } + for errorChannelResult := range errorChannel { + errorSet = append(errorSet, errorChannelResult) + } - issuer, appErr := server.GetLatestIssuer(request.Issuer_type, int(request.Issuer_cohort)) - if appErr != nil { - blindedTokenResults = append(blindedTokenResults, avroSchema.SigningResult{ - Signed_tokens: nil, - Issuer_public_key: "", - Status: INVALID_ISSUER, - Associated_data: request.Associated_data, - }) - continue - } + resultSet := avroSchema.SigningResultSet{ + Request_id: blindedTokenRequestSet.Request_id, + Data: blindedTokenResults, + } + var resultSetBuffer bytes.Buffer + err = resultSet.Serialize(&resultSetBuffer) + if err != nil { + errorSet = append(errorSet, batgo_handlers.WrapError( + err, + fmt.Sprintf( + "Request %s: Failed to serialize ResultSet: %s", + blindedTokenRequestSet.Request_id, + resultSet, + ), + PERMANENT, + )) + return errorSet + } + err = Emit(producer, resultSetBuffer.Bytes(), logger) + if err != nil { + errorSet = append(errorSet, batgo_handlers.WrapError( + err, + fmt.Sprintf( + "Request %s: Failed to emit results to topic %s", + blindedTokenRequestSet.Request_id, + producer.Topic, + ), + TEMPORARY, + )) + return errorSet + } + return errorSet +} - var blindedTokens []*crypto.BlindedToken - // Iterate over the provided tokens and create data structure from them, - // grouping into a slice for approval - for _, stringBlindedToken := range request.Blinded_tokens { - blindedToken := crypto.BlindedToken{} - err := blindedToken.UnmarshalText([]byte(stringBlindedToken)) - if err != nil { - logger.Error().Msg(fmt.Sprintf("Request %s: failed to unmarshal blinded tokens: %e", blindedTokenRequestSet.Request_id, err)) - blindedTokenResults = append(blindedTokenResults, avroSchema.SigningResult{ - Signed_tokens: nil, - Issuer_public_key: "", - Status: ERROR, - Associated_data: request.Associated_data, - }) - continue - } - blindedTokens = append(blindedTokens, &blindedToken) +func handleBlindedTokenRequest( + wg sync.WaitGroup, + request avroSchema.SigningRequest, + requestId string, + blindedTokenResults chan avroSchema.SigningResult, + errorChannel chan *batgo_handlers.AppError, + server *cbpServer.Server, + logger *zerolog.Logger, +) { + defer wg.Done() + const ( + OK = 0 + INVALID_ISSUER = 1 + ERROR = 2 + ) + if request.Blinded_tokens == nil { + message := fmt.Sprintf("Request %s: Empty request", requestId) + errorChannel <- batgo_handlers.WrapError( + errors.New(message), + message, + PERMANENT, + ) + } + + if request.Issuer_cohort != 0 && request.Issuer_cohort != 1 { + message := fmt.Sprintf( + "Request %s: Provided cohort is not supported: %d", + requestId, + request.Issuer_cohort, + ) + errorChannel <- batgo_handlers.WrapError( + errors.New(message), + message, + PERMANENT, + ) + } + + issuer, appErr := server.GetLatestIssuer(request.Issuer_type, int(request.Issuer_cohort)) + if appErr != nil { + blindedTokenResults <- avroSchema.SigningResult{ + Signed_tokens: nil, + Issuer_public_key: "", + Status: INVALID_ISSUER, + Associated_data: request.Associated_data, } - // @TODO: If one token fails they will all fail. Assess this behavior - signedTokens, dleqProof, err := btd.ApproveTokens(blindedTokens, issuer.SigningKey) + } + + var blindedTokens []*crypto.BlindedToken + // Iterate over the provided tokens and create data structure from them, + // grouping into a slice for approval + for _, stringBlindedToken := range request.Blinded_tokens { + blindedToken := crypto.BlindedToken{} + err := blindedToken.UnmarshalText([]byte(stringBlindedToken)) if err != nil { - logger.Error().Msg(fmt.Sprintf("Request %s: Could not approve new tokens: %e", blindedTokenRequestSet.Request_id, err)) - blindedTokenResults = append(blindedTokenResults, avroSchema.SigningResult{ + logger.Error().Msg(fmt.Sprintf("Request %s: failed to unmarshal blinded tokens: %e", requestId, err)) + blindedTokenResults <- avroSchema.SigningResult{ Signed_tokens: nil, Issuer_public_key: "", Status: ERROR, Associated_data: request.Associated_data, - }) - continue - } - marshaledDLEQProof, err := dleqProof.MarshalText() - if err != nil { - return errors.New( - fmt.Sprintf( - "Request %s: Could not marshal DLEQ proof: %e", - blindedTokenRequestSet.Request_id, - err, - ), - ) - } - var marshaledTokens []string - for _, token := range signedTokens { - marshaledToken, err := token.MarshalText() - if err != nil { - return errors.New( - fmt.Sprintf( - "Request %s: Could not marshal new tokens to bytes: %e", - blindedTokenRequestSet.Request_id, - err, - ), - ) } - marshaledTokens = append(marshaledTokens, string(marshaledToken[:])) } - publicKey := issuer.SigningKey.PublicKey() - marshaledPublicKey, err := publicKey.MarshalText() + blindedTokens = append(blindedTokens, &blindedToken) + } + // @TODO: If one token fails they will all fail. Assess this behavior + signedTokens, dleqProof, err := btd.ApproveTokens(blindedTokens, issuer.SigningKey) + if err != nil { + logger.Error().Msg(fmt.Sprintf("Request %s: Could not approve new tokens: %e", requestId, err)) + blindedTokenResults <- avroSchema.SigningResult{ + Signed_tokens: nil, + Issuer_public_key: "", + Status: ERROR, + Associated_data: request.Associated_data, + } + } + marshaledDLEQProof, err := dleqProof.MarshalText() + if err != nil { + errorChannel <- batgo_handlers.WrapError( + err, + fmt.Sprintf( + "Request %s: Could not marshal DLEQ proof", + requestId, + ), + PERMANENT, + ) + } + var marshaledTokens []string + for _, token := range signedTokens { + marshaledToken, err := token.MarshalText() if err != nil { - return errors.New( + errorChannel <- batgo_handlers.WrapError( + err, fmt.Sprintf( - "Request %s: Could not marshal signing key: %e", - blindedTokenRequestSet.Request_id, - err, + "Request %s: Could not marshal new tokens to bytes", + requestId, ), + PERMANENT, ) } - blindedTokenResults = append(blindedTokenResults, avroSchema.SigningResult{ - Signed_tokens: marshaledTokens, - Proof: string(marshaledDLEQProof), - Issuer_public_key: string(marshaledPublicKey), - Status: OK, - Associated_data: request.Associated_data, - }) + marshaledTokens = append(marshaledTokens, string(marshaledToken[:])) } - resultSet := avroSchema.SigningResultSet{ - Request_id: blindedTokenRequestSet.Request_id, - Data: blindedTokenResults, - } - var resultSetBuffer bytes.Buffer - err = resultSet.Serialize(&resultSetBuffer) + publicKey := issuer.SigningKey.PublicKey() + marshaledPublicKey, err := publicKey.MarshalText() if err != nil { - return errors.New(fmt.Sprintf("Request %s: Failed to serialize ResultSet: %s", blindedTokenRequestSet.Request_id, resultSet)) + errorChannel <- batgo_handlers.WrapError( + err, + fmt.Sprintf( + "Request %s: Could not marshal signing key", + requestId, + ), + PERMANENT, + ) } - err = Emit(producer, resultSetBuffer.Bytes(), logger) - if err != nil { - return errors.New(fmt.Sprintf("Request %s: Failed to emit results to topic %s: %e", blindedTokenRequestSet.Request_id, producer.Topic, err)) + blindedTokenResults <- avroSchema.SigningResult{ + Signed_tokens: marshaledTokens, + Proof: string(marshaledDLEQProof), + Issuer_public_key: string(marshaledPublicKey), + Status: OK, + Associated_data: request.Associated_data, } - return nil } diff --git a/kafka/signed_token_redeem_handler.go b/kafka/signed_token_redeem_handler.go index a3e32049..4c7fed98 100644 --- a/kafka/signed_token_redeem_handler.go +++ b/kafka/signed_token_redeem_handler.go @@ -5,8 +5,10 @@ import ( "errors" "fmt" "strings" + "sync" "time" + batgo_handlers "github.com/brave-intl/bat-go/utils/handlers" crypto "github.com/brave-intl/challenge-bypass-ristretto-ffi" avroSchema "github.com/brave-intl/challenge-bypass-server/avro/generated" "github.com/brave-intl/challenge-bypass-server/btd" @@ -24,152 +26,250 @@ func SignedTokenRedeemHandler( producer *kafka.Writer, server *cbpServer.Server, logger *zerolog.Logger, -) error { - const ( - OK = 0 - DUPLICATE_REDEMPTION = 1 - UNVERIFIED = 2 - ERROR = 3 - ) +) []*batgo_handlers.AppError { + var errorSet []*batgo_handlers.AppError tokenRedeemRequestSet, err := avroSchema.DeserializeRedeemRequestSet(bytes.NewReader(data)) if err != nil { - return errors.New(fmt.Sprintf("Request %s: Failed Avro deserialization: %e", tokenRedeemRequestSet.Request_id, err)) + errorSet = append(errorSet, batgo_handlers.WrapError( + err, + fmt.Sprintf( + "Request %s: Failed Avro deserialization: %e", + tokenRedeemRequestSet.Request_id, err, + ), + PERMANENT, + )) } - defer func() { - if recover() != nil { - err = errors.New(fmt.Sprintf("Request %s: Redeem attempt panicked", tokenRedeemRequestSet.Request_id)) - } - }() - var redeemedTokenResults []avroSchema.RedeemResult if len(tokenRedeemRequestSet.Data) > 1 { // NOTE: When we start supporting multiple requests we will need to review // errors and return values as well. - return errors.New(fmt.Sprintf("Request %s: Data array unexpectedly contained more than a single message. This array is intended to make future extension easier, but no more than a single value is currently expected.", tokenRedeemRequestSet.Request_id)) + message := fmt.Sprintf( + "Request %s: Data array unexpectedly contained more than a single message. This array is intended to make future extension easier, but no more than a single value is currently expected.", + tokenRedeemRequestSet.Request_id) + errorSet = append(errorSet, batgo_handlers.WrapError( + errors.New(message), + message, + PERMANENT, + )) } issuers, err := server.FetchAllIssuers() if err != nil { - return errors.New(fmt.Sprintf("Request %s: Failed to fetch all issuers", tokenRedeemRequestSet.Request_id)) + errorSet = append(errorSet, batgo_handlers.WrapError( + err, + fmt.Sprintf( + "Request %s: Failed to fetch all issuers", + tokenRedeemRequestSet.Request_id, + ), + TEMPORARY, + )) } + var wg sync.WaitGroup + tokenRedeemResultsChannel := make(chan avroSchema.RedeemResult) + errorChannel := make(chan *batgo_handlers.AppError) for _, request := range tokenRedeemRequestSet.Data { - var ( - verified = false - verifiedIssuer = &cbpServer.Issuer{} - verifiedCohort int32 = 0 + wg.Add(1) + handleTokenRedeemRequest( + wg, + request, + tokenRedeemRequestSet.Request_id, + issuers, + tokenRedeemResultsChannel, + errorChannel, + server, + logger, ) - if request.Public_key == "" { - logger.Error().Msg(fmt.Sprintf("Request %s: Missing public key", tokenRedeemRequestSet.Request_id)) - redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{ - Issuer_name: "", - Issuer_cohort: 0, - Status: ERROR, - Associated_data: request.Associated_data, - }) - continue + } + wg.Wait() + + var tokenRedeemResults []avroSchema.RedeemResult + for tokenRedeemResult := range tokenRedeemResultsChannel { + tokenRedeemResults = append(tokenRedeemResults, tokenRedeemResult) + } + for errorChannelResult := range errorChannel { + errorSet = append(errorSet, errorChannelResult) + } + + resultSet := avroSchema.RedeemResultSet{ + Request_id: tokenRedeemRequestSet.Request_id, + Data: tokenRedeemResults, + } + var resultSetBuffer bytes.Buffer + err = resultSet.Serialize(&resultSetBuffer) + if err != nil { + errorSet = append(errorSet, batgo_handlers.WrapError( + err, + fmt.Sprintf( + "Request %s: Failed to serialize ResultSet.", + tokenRedeemRequestSet.Request_id, + ), + PERMANENT, + )) + } + + err = Emit(producer, resultSetBuffer.Bytes(), logger) + if err != nil { + errorSet = append(errorSet, batgo_handlers.WrapError( + err, + fmt.Sprintf( + "Request %s: Failed to emit results to topic %s.", + tokenRedeemRequestSet.Request_id, + producer.Topic, + ), + TEMPORARY, + )) + } + return errorSet +} + +func handleTokenRedeemRequest( + wg sync.WaitGroup, + request avroSchema.RedeemRequest, + requestId string, + issuers *[]cbpServer.Issuer, + tokenRedeemResults chan avroSchema.RedeemResult, + errorChannel chan *batgo_handlers.AppError, + server *cbpServer.Server, + logger *zerolog.Logger, +) { + defer wg.Done() + var ( + verified = false + verifiedIssuer = &cbpServer.Issuer{} + verifiedCohort int32 = 0 + ) + if request.Public_key == "" { + logger.Error().Msg(fmt.Sprintf("Request %s: Missing public key", requestId)) + tokenRedeemResults <- avroSchema.RedeemResult{ + Issuer_name: "", + Issuer_cohort: 0, + Status: ERROR, + Associated_data: request.Associated_data, } + } - if request.Token_preimage == "" || request.Signature == "" || request.Binding == "" { - logger.Error().Msg(fmt.Sprintf("Request %s: Empty request", tokenRedeemRequestSet.Request_id)) - redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{ - Issuer_name: "", - Issuer_cohort: 0, - Status: ERROR, - Associated_data: request.Associated_data, - }) - continue + if request.Token_preimage == "" || request.Signature == "" || request.Binding == "" { + logger.Error().Msg(fmt.Sprintf("Request %s: Empty request", requestId)) + tokenRedeemResults <- avroSchema.RedeemResult{ + Issuer_name: "", + Issuer_cohort: 0, + Status: ERROR, + Associated_data: request.Associated_data, } + } - tokenPreimage := crypto.TokenPreimage{} - err = tokenPreimage.UnmarshalText([]byte(request.Token_preimage)) - if err != nil { - return errors.New(fmt.Sprintf("Request %s: Could not unmarshal text into preimage: %e", tokenRedeemRequestSet.Request_id, err)) + tokenPreimage := crypto.TokenPreimage{} + err := tokenPreimage.UnmarshalText([]byte(request.Token_preimage)) + if err != nil { + errorChannel <- batgo_handlers.WrapError( + err, + fmt.Sprintf( + "Request %s: Could not unmarshal text into preimage", + requestId, + ), + PERMANENT, + ) + } + verificationSignature := crypto.VerificationSignature{} + err = verificationSignature.UnmarshalText([]byte(request.Signature)) + if err != nil { + errorChannel <- batgo_handlers.WrapError( + err, + fmt.Sprintf( + "Request %s: Could not unmarshal text into verification signature", + requestId, + ), + PERMANENT, + ) + } + for _, issuer := range *issuers { + if !issuer.ExpiresAt.IsZero() && issuer.ExpiresAt.Before(time.Now()) { + return } - verificationSignature := crypto.VerificationSignature{} - err = verificationSignature.UnmarshalText([]byte(request.Signature)) + // Only attempt token verification with the issuer that was provided. + issuerPublicKey := issuer.SigningKey.PublicKey() + marshaledPublicKey, err := issuerPublicKey.MarshalText() if err != nil { - return errors.New(fmt.Sprintf("Request %s: Could not unmarshal text into verification signature: %e", tokenRedeemRequestSet.Request_id, err)) + errorChannel <- batgo_handlers.WrapError( + err, + fmt.Sprintf( + "Request %s: Could not unmarshal issuer public key into text", + requestId, + ), + PERMANENT, + ) } - for _, issuer := range *issuers { - if !issuer.ExpiresAt.IsZero() && issuer.ExpiresAt.Before(time.Now()) { - continue - } - // Only attempt token verification with the issuer that was provided. - issuerPublicKey := issuer.SigningKey.PublicKey() - marshaledPublicKey, err := issuerPublicKey.MarshalText() - if err != nil { - return errors.New(fmt.Sprintf("Request %s: Could not unmarshal issuer public key into text: %e", tokenRedeemRequestSet.Request_id, err)) - } - logger.Trace().Msg(fmt.Sprintf("Request %s: Issuer: %s, Request: %s", tokenRedeemRequestSet.Request_id, string(marshaledPublicKey), request.Public_key)) - if string(marshaledPublicKey) == request.Public_key { - if err := btd.VerifyTokenRedemption( - &tokenPreimage, - &verificationSignature, - string(request.Binding), - []*crypto.SigningKey{issuer.SigningKey}, - ); err != nil { - verified = false - } else { - verified = true - verifiedIssuer = &issuer - verifiedCohort = int32(issuer.IssuerCohort) - break - } + logger.Trace().Msg(fmt.Sprintf( + "Request %s: Issuer: %s, Request: %s", + requestId, + string(marshaledPublicKey), + request.Public_key, + )) + if string(marshaledPublicKey) == request.Public_key { + if err := btd.VerifyTokenRedemption( + &tokenPreimage, + &verificationSignature, + string(request.Binding), + []*crypto.SigningKey{issuer.SigningKey}, + ); err != nil { + verified = false + } else { + verified = true + verifiedIssuer = &issuer + verifiedCohort = int32(issuer.IssuerCohort) + break } } + } - if !verified { - logger.Error().Msg(fmt.Sprintf("Request %s: Could not verify that the token redemption is valid", tokenRedeemRequestSet.Request_id)) - redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{ - Issuer_name: "", - Issuer_cohort: 0, - Status: UNVERIFIED, - Associated_data: request.Associated_data, - }) - continue - } else { - logger.Trace().Msg(fmt.Sprintf("Request %s: Validated", tokenRedeemRequestSet.Request_id)) + if !verified { + logger.Error().Msg(fmt.Sprintf( + "Request %s: Could not verify that the token redemption is valid", + requestId, + )) + tokenRedeemResults <- avroSchema.RedeemResult{ + Issuer_name: "", + Issuer_cohort: 0, + Status: UNVERIFIED, + Associated_data: request.Associated_data, } - if err := server.RedeemToken(verifiedIssuer, &tokenPreimage, string(request.Binding)); err != nil { - logger.Error().Err(err).Msg(fmt.Sprintf("Request %s: Token redemption failed: %e", tokenRedeemRequestSet.Request_id, err)) - if strings.Contains(err.Error(), "Duplicate") { - logger.Error().Msg(fmt.Sprintf("Request %s: Duplicate redemption: %e", tokenRedeemRequestSet.Request_id, err)) - redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{ - Issuer_name: "", - Issuer_cohort: 0, - Status: DUPLICATE_REDEMPTION, - Associated_data: request.Associated_data, - }) - } - logger.Error().Msg(fmt.Sprintf("Request %s: Could not mark token redemption", tokenRedeemRequestSet.Request_id)) - redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{ + } else { + logger.Trace().Msg(fmt.Sprintf("Request %s: Validated", requestId)) + } + if err := server.RedeemToken(verifiedIssuer, &tokenPreimage, string(request.Binding)); err != nil { + logger.Error().Err(err).Msg(fmt.Sprintf( + "Request %s: Token redemption failed: %e", + requestId, + err, + )) + if strings.Contains(err.Error(), "Duplicate") { + logger.Error().Msg(fmt.Sprintf( + "Request %s: Duplicate redemption: %e", + requestId, + err, + )) + tokenRedeemResults <- avroSchema.RedeemResult{ Issuer_name: "", Issuer_cohort: 0, - Status: ERROR, + Status: DUPLICATE_REDEMPTION, Associated_data: request.Associated_data, - }) - continue + } } - logger.Trace().Msg(fmt.Sprintf("Request %s: Redeemed", tokenRedeemRequestSet.Request_id)) - issuerName := verifiedIssuer.IssuerType - redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{ - Issuer_name: issuerName, - Issuer_cohort: verifiedCohort, - Status: OK, + logger.Error().Msg(fmt.Sprintf( + "Request %s: Could not mark token redemption", + requestId, + )) + tokenRedeemResults <- avroSchema.RedeemResult{ + Issuer_name: "", + Issuer_cohort: 0, + Status: ERROR, Associated_data: request.Associated_data, - }) - } - resultSet := avroSchema.RedeemResultSet{ - Request_id: tokenRedeemRequestSet.Request_id, - Data: redeemedTokenResults, - } - var resultSetBuffer bytes.Buffer - err = resultSet.Serialize(&resultSetBuffer) - if err != nil { - return errors.New(fmt.Sprintf("Request %s: Failed to serialize ResultSet: %e", tokenRedeemRequestSet.Request_id, err)) + } } - - err = Emit(producer, resultSetBuffer.Bytes(), logger) - if err != nil { - return errors.New(fmt.Sprintf("Request %s: Failed to emit results to topic %s: %e", tokenRedeemRequestSet.Request_id, producer.Topic, err)) + logger.Trace().Msg(fmt.Sprintf("Request %s: Redeemed", requestId)) + issuerName := verifiedIssuer.IssuerType + tokenRedeemResults <- avroSchema.RedeemResult{ + Issuer_name: issuerName, + Issuer_cohort: verifiedCohort, + Status: OK, + Associated_data: request.Associated_data, } - return nil } diff --git a/server/dynamo.go b/server/dynamo.go index 4ed261f8..8a2f37d9 100644 --- a/server/dynamo.go +++ b/server/dynamo.go @@ -107,6 +107,8 @@ func (c *Server) redeemTokenV2(issuer *Issuer, preimage *crypto.TokenPreimage, p } input := &dynamodb.PutItemInput{ + // @ME emit different error messages depending on whether the ID is the same and the data is different or the ID is the same AND the data is the same. + // Batch persistence if possible Item: av, ConditionExpression: aws.String("attribute_not_exists(id)"), TableName: aws.String("redemptions"),