From 14216ad97f7c5c539bfc769b27c0fa77fc110484 Mon Sep 17 00:00:00 2001 From: Jackson Egan Date: Wed, 15 Dec 2021 15:55:19 -0500 Subject: [PATCH 01/10] Restructure Token Issuer Change token issuer function to use goroutines and a waitgroup to manage asynchronous processing instead of immediate commits. --- kafka/main.go | 29 +-- kafka/signed_blinded_token_issuer_handler.go | 226 +++++++++++-------- 2 files changed, 139 insertions(+), 116 deletions(-) diff --git a/kafka/main.go b/kafka/main.go index 96a0f38f..a49d48ad 100644 --- a/kafka/main.go +++ b/kafka/main.go @@ -96,24 +96,19 @@ func StartConsumers(providedServer *server.Server, logger *zerolog.Logger) error logger.Info().Msg(fmt.Sprintf("Reader Stats: %#v", consumer.Stats())) for _, topicMapping := range topicMappings { if msg.Topic == topicMapping.Topic { - go func( - msg kafka.Message, - topicMapping TopicMapping, - providedServer *server.Server, - logger *zerolog.Logger, - ) { - err := topicMapping.Processor( - msg.Value, - topicMapping.ResultProducer, - providedServer, - logger, - ) - if err != nil { - logger.Error().Err(err).Msg("Processing failed.") + err := topicMapping.Processor( + msg.Value, + topicMapping.ResultProducer, + providedServer, + logger, + ) + if err != nil { + logger.Error().Err(err).Msg("Processing failed.") + } else { + logger.Info().Msg(fmt.Sprintf("Processing completed. Committing")) + if err := consumer.CommitMessages(ctx, msg); err != nil { + logger.Error().Msg(fmt.Sprintf("Failed to commit: %s", err)) } - }(msg, topicMapping, providedServer, logger) - if err := consumer.CommitMessages(ctx, msg); err != nil { - logger.Error().Msg(fmt.Sprintf("Failed to commit: %s", err)) } } } diff --git a/kafka/signed_blinded_token_issuer_handler.go b/kafka/signed_blinded_token_issuer_handler.go index 5da8d302..3b043475 100644 --- a/kafka/signed_blinded_token_issuer_handler.go +++ b/kafka/signed_blinded_token_issuer_handler.go @@ -4,6 +4,7 @@ import ( "bytes" "errors" "fmt" + "sync" crypto "github.com/brave-intl/challenge-bypass-ristretto-ffi" avroSchema "github.com/brave-intl/challenge-bypass-server/avro/generated" @@ -25,134 +26,161 @@ func SignedBlindedTokenIssuerHandler( server *cbpServer.Server, logger *zerolog.Logger, ) error { - const ( - OK = 0 - INVALID_ISSUER = 1 - ERROR = 2 - ) blindedTokenRequestSet, err := avroSchema.DeserializeSigningRequestSet(bytes.NewReader(data)) if err != nil { return errors.New(fmt.Sprintf("Request %s: Failed Avro deserialization: %e", blindedTokenRequestSet.Request_id, err)) } - var blindedTokenResults []avroSchema.SigningResult if len(blindedTokenRequestSet.Data) > 1 { // NOTE: When we start supporting multiple requests we will need to review // errors and return values as well. return errors.New(fmt.Sprintf("Request %s: Data array unexpectedly contained more than a single message. This array is intended to make future extension easier, but no more than a single value is currently expected.", blindedTokenRequestSet.Request_id)) } + var wg sync.WaitGroup + blindedTokenResultsChannel := make(chan avroSchema.SigningResult) + errorSet := make(chan error) + for _, request := range blindedTokenRequestSet.Data { - if request.Blinded_tokens == nil { - logger.Error().Msg(fmt.Sprintf("Request %s: Empty request", blindedTokenRequestSet.Request_id)) - continue - } + wg.Add(1) + handleBlindedTokenRequest( + wg, + request, + blindedTokenRequestSet.Request_id, + blindedTokenResultsChannel, + errorSet, + server, + logger, + ) + } + wg.Wait() - if request.Issuer_cohort != 0 && request.Issuer_cohort != 1 { - logger.Error().Msg( - fmt.Sprintf( - "Request %s: Provided cohort is not supported: %d", - blindedTokenRequestSet.Request_id, - request.Issuer_cohort, - ), - ) - continue - } + var blindedTokenResults []avroSchema.SigningResult + for blindedTokenResult := range blindedTokenResultsChannel { + blindedTokenResults = append(blindedTokenResults, blindedTokenResult) + } - issuer, appErr := server.GetLatestIssuer(request.Issuer_type, int(request.Issuer_cohort)) - if appErr != nil { - blindedTokenResults = append(blindedTokenResults, avroSchema.SigningResult{ - Signed_tokens: nil, - Issuer_public_key: "", - Status: INVALID_ISSUER, - Associated_data: request.Associated_data, - }) - continue - } + resultSet := avroSchema.SigningResultSet{ + Request_id: blindedTokenRequestSet.Request_id, + Data: blindedTokenResults, + } + var resultSetBuffer bytes.Buffer + err = resultSet.Serialize(&resultSetBuffer) + if err != nil { + return errors.New(fmt.Sprintf("Request %s: Failed to serialize ResultSet: %s", blindedTokenRequestSet.Request_id, resultSet)) + } + err = Emit(producer, resultSetBuffer.Bytes(), logger) + if err != nil { + return errors.New(fmt.Sprintf("Request %s: Failed to emit results to topic %s: %e", blindedTokenRequestSet.Request_id, producer.Topic, err)) + } + return nil +} - var blindedTokens []*crypto.BlindedToken - // Iterate over the provided tokens and create data structure from them, - // grouping into a slice for approval - for _, stringBlindedToken := range request.Blinded_tokens { - blindedToken := crypto.BlindedToken{} - err := blindedToken.UnmarshalText([]byte(stringBlindedToken)) - if err != nil { - logger.Error().Msg(fmt.Sprintf("Request %s: failed to unmarshal blinded tokens: %e", blindedTokenRequestSet.Request_id, err)) - blindedTokenResults = append(blindedTokenResults, avroSchema.SigningResult{ - Signed_tokens: nil, - Issuer_public_key: "", - Status: ERROR, - Associated_data: request.Associated_data, - }) - continue - } - blindedTokens = append(blindedTokens, &blindedToken) +func handleBlindedTokenRequest( + wg sync.WaitGroup, + request avroSchema.SigningRequest, + requestId string, + blindedTokenResults chan avroSchema.SigningResult, + errorSet chan error, + server *cbpServer.Server, + logger *zerolog.Logger, +) { + defer wg.Done() + const ( + OK = 0 + INVALID_ISSUER = 1 + ERROR = 2 + ) + if request.Blinded_tokens == nil { + errorSet <- errors.New(fmt.Sprintf("Request %s: Empty request", requestId)) + } + + if request.Issuer_cohort != 0 && request.Issuer_cohort != 1 { + errorSet <- errors.New( + fmt.Sprintf( + "Request %s: Provided cohort is not supported: %d", + requestId, + request.Issuer_cohort, + ), + ) + } + + issuer, appErr := server.GetLatestIssuer(request.Issuer_type, int(request.Issuer_cohort)) + if appErr != nil { + blindedTokenResults <- avroSchema.SigningResult{ + Signed_tokens: nil, + Issuer_public_key: "", + Status: INVALID_ISSUER, + Associated_data: request.Associated_data, } - // @TODO: If one token fails they will all fail. Assess this behavior - signedTokens, dleqProof, err := btd.ApproveTokens(blindedTokens, issuer.SigningKey) + } + + var blindedTokens []*crypto.BlindedToken + // Iterate over the provided tokens and create data structure from them, + // grouping into a slice for approval + for _, stringBlindedToken := range request.Blinded_tokens { + blindedToken := crypto.BlindedToken{} + err := blindedToken.UnmarshalText([]byte(stringBlindedToken)) if err != nil { - logger.Error().Msg(fmt.Sprintf("Request %s: Could not approve new tokens: %e", blindedTokenRequestSet.Request_id, err)) - blindedTokenResults = append(blindedTokenResults, avroSchema.SigningResult{ + logger.Error().Msg(fmt.Sprintf("Request %s: failed to unmarshal blinded tokens: %e", requestId, err)) + blindedTokenResults <- avroSchema.SigningResult{ Signed_tokens: nil, Issuer_public_key: "", Status: ERROR, Associated_data: request.Associated_data, - }) - continue - } - marshaledDLEQProof, err := dleqProof.MarshalText() - if err != nil { - return errors.New( - fmt.Sprintf( - "Request %s: Could not marshal DLEQ proof: %e", - blindedTokenRequestSet.Request_id, - err, - ), - ) - } - var marshaledTokens []string - for _, token := range signedTokens { - marshaledToken, err := token.MarshalText() - if err != nil { - return errors.New( - fmt.Sprintf( - "Request %s: Could not marshal new tokens to bytes: %e", - blindedTokenRequestSet.Request_id, - err, - ), - ) } - marshaledTokens = append(marshaledTokens, string(marshaledToken[:])) } - publicKey := issuer.SigningKey.PublicKey() - marshaledPublicKey, err := publicKey.MarshalText() + blindedTokens = append(blindedTokens, &blindedToken) + } + // @TODO: If one token fails they will all fail. Assess this behavior + signedTokens, dleqProof, err := btd.ApproveTokens(blindedTokens, issuer.SigningKey) + if err != nil { + logger.Error().Msg(fmt.Sprintf("Request %s: Could not approve new tokens: %e", requestId, err)) + blindedTokenResults <- avroSchema.SigningResult{ + Signed_tokens: nil, + Issuer_public_key: "", + Status: ERROR, + Associated_data: request.Associated_data, + } + } + marshaledDLEQProof, err := dleqProof.MarshalText() + if err != nil { + errorSet <- errors.New( + fmt.Sprintf( + "Request %s: Could not marshal DLEQ proof: %e", + requestId, + err, + ), + ) + } + var marshaledTokens []string + for _, token := range signedTokens { + marshaledToken, err := token.MarshalText() if err != nil { - return errors.New( + errorSet <- errors.New( fmt.Sprintf( - "Request %s: Could not marshal signing key: %e", - blindedTokenRequestSet.Request_id, + "Request %s: Could not marshal new tokens to bytes: %e", + requestId, err, ), ) } - blindedTokenResults = append(blindedTokenResults, avroSchema.SigningResult{ - Signed_tokens: marshaledTokens, - Proof: string(marshaledDLEQProof), - Issuer_public_key: string(marshaledPublicKey), - Status: OK, - Associated_data: request.Associated_data, - }) - } - resultSet := avroSchema.SigningResultSet{ - Request_id: blindedTokenRequestSet.Request_id, - Data: blindedTokenResults, + marshaledTokens = append(marshaledTokens, string(marshaledToken[:])) } - var resultSetBuffer bytes.Buffer - err = resultSet.Serialize(&resultSetBuffer) + publicKey := issuer.SigningKey.PublicKey() + marshaledPublicKey, err := publicKey.MarshalText() if err != nil { - return errors.New(fmt.Sprintf("Request %s: Failed to serialize ResultSet: %s", blindedTokenRequestSet.Request_id, resultSet)) + errorSet <- errors.New( + fmt.Sprintf( + "Request %s: Could not marshal signing key: %e", + requestId, + err, + ), + ) } - err = Emit(producer, resultSetBuffer.Bytes(), logger) - if err != nil { - return errors.New(fmt.Sprintf("Request %s: Failed to emit results to topic %s: %e", blindedTokenRequestSet.Request_id, producer.Topic, err)) + blindedTokenResults <- avroSchema.SigningResult{ + Signed_tokens: marshaledTokens, + Proof: string(marshaledDLEQProof), + Issuer_public_key: string(marshaledPublicKey), + Status: OK, + Associated_data: request.Associated_data, } - return nil } From d837e22311cd53fc193420bf714e88d3aee6bdb2 Mon Sep 17 00:00:00 2001 From: Jackson Egan Date: Mon, 20 Dec 2021 16:04:19 -0500 Subject: [PATCH 02/10] Change redemption to async with waitgroup --- kafka/signed_token_redeem_handler.go | 263 ++++++++++++++++----------- 1 file changed, 154 insertions(+), 109 deletions(-) diff --git a/kafka/signed_token_redeem_handler.go b/kafka/signed_token_redeem_handler.go index a3e32049..1e01e7e6 100644 --- a/kafka/signed_token_redeem_handler.go +++ b/kafka/signed_token_redeem_handler.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "strings" + "sync" "time" crypto "github.com/brave-intl/challenge-bypass-ristretto-ffi" @@ -25,12 +26,6 @@ func SignedTokenRedeemHandler( server *cbpServer.Server, logger *zerolog.Logger, ) error { - const ( - OK = 0 - DUPLICATE_REDEMPTION = 1 - UNVERIFIED = 2 - ERROR = 3 - ) tokenRedeemRequestSet, err := avroSchema.DeserializeRedeemRequestSet(bytes.NewReader(data)) if err != nil { return errors.New(fmt.Sprintf("Request %s: Failed Avro deserialization: %e", tokenRedeemRequestSet.Request_id, err)) @@ -40,7 +35,6 @@ func SignedTokenRedeemHandler( err = errors.New(fmt.Sprintf("Request %s: Redeem attempt panicked", tokenRedeemRequestSet.Request_id)) } }() - var redeemedTokenResults []avroSchema.RedeemResult if len(tokenRedeemRequestSet.Data) > 1 { // NOTE: When we start supporting multiple requests we will need to review // errors and return values as well. @@ -50,126 +44,177 @@ func SignedTokenRedeemHandler( if err != nil { return errors.New(fmt.Sprintf("Request %s: Failed to fetch all issuers", tokenRedeemRequestSet.Request_id)) } + var wg sync.WaitGroup + tokenRedeemResultsChannel := make(chan avroSchema.RedeemResult) + errorSet := make(chan error) for _, request := range tokenRedeemRequestSet.Data { - var ( - verified = false - verifiedIssuer = &cbpServer.Issuer{} - verifiedCohort int32 = 0 + wg.Add(1) + handleTokenRedeemRequest( + wg, + request, + tokenRedeemRequestSet.Request_id, + issuers, + tokenRedeemResultsChannel, + errorSet, + server, + logger, ) - if request.Public_key == "" { - logger.Error().Msg(fmt.Sprintf("Request %s: Missing public key", tokenRedeemRequestSet.Request_id)) - redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{ - Issuer_name: "", - Issuer_cohort: 0, - Status: ERROR, - Associated_data: request.Associated_data, - }) - continue + } + wg.Wait() + + var tokenRedeemResults []avroSchema.RedeemResult + for tokenRedeemResult := range tokenRedeemResultsChannel { + tokenRedeemResults = append(tokenRedeemResults, tokenRedeemResult) + } + + resultSet := avroSchema.RedeemResultSet{ + Request_id: tokenRedeemRequestSet.Request_id, + Data: tokenRedeemResults, + } + var resultSetBuffer bytes.Buffer + err = resultSet.Serialize(&resultSetBuffer) + if err != nil { + return errors.New(fmt.Sprintf("Request %s: Failed to serialize ResultSet: %e", tokenRedeemRequestSet.Request_id, err)) + } + + err = Emit(producer, resultSetBuffer.Bytes(), logger) + if err != nil { + return errors.New(fmt.Sprintf("Request %s: Failed to emit results to topic %s: %e", tokenRedeemRequestSet.Request_id, producer.Topic, err)) + } + return nil +} + +func handleTokenRedeemRequest( + wg sync.WaitGroup, + request avroSchema.RedeemRequest, + requestId string, + issuers *[]cbpServer.Issuer, + tokenRedeemResults chan avroSchema.RedeemResult, + errorSet chan error, + server *cbpServer.Server, + logger *zerolog.Logger, +) { + defer wg.Done() + const ( + OK = 0 + DUPLICATE_REDEMPTION = 1 + UNVERIFIED = 2 + ERROR = 3 + ) + var ( + verified = false + verifiedIssuer = &cbpServer.Issuer{} + verifiedCohort int32 = 0 + ) + if request.Public_key == "" { + logger.Error().Msg(fmt.Sprintf("Request %s: Missing public key", requestId)) + tokenRedeemResults <- avroSchema.RedeemResult{ + Issuer_name: "", + Issuer_cohort: 0, + Status: ERROR, + Associated_data: request.Associated_data, } + } - if request.Token_preimage == "" || request.Signature == "" || request.Binding == "" { - logger.Error().Msg(fmt.Sprintf("Request %s: Empty request", tokenRedeemRequestSet.Request_id)) - redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{ - Issuer_name: "", - Issuer_cohort: 0, - Status: ERROR, - Associated_data: request.Associated_data, - }) - continue + if request.Token_preimage == "" || request.Signature == "" || request.Binding == "" { + logger.Error().Msg(fmt.Sprintf("Request %s: Empty request", requestId)) + tokenRedeemResults <- avroSchema.RedeemResult{ + Issuer_name: "", + Issuer_cohort: 0, + Status: ERROR, + Associated_data: request.Associated_data, } + } - tokenPreimage := crypto.TokenPreimage{} - err = tokenPreimage.UnmarshalText([]byte(request.Token_preimage)) - if err != nil { - return errors.New(fmt.Sprintf("Request %s: Could not unmarshal text into preimage: %e", tokenRedeemRequestSet.Request_id, err)) + tokenPreimage := crypto.TokenPreimage{} + err := tokenPreimage.UnmarshalText([]byte(request.Token_preimage)) + if err != nil { + errorSet <- errors.New( + fmt.Sprintf( + "Request %s: Could not unmarshal text into preimage: %e", + requestId, err, + ), + ) + } + verificationSignature := crypto.VerificationSignature{} + err = verificationSignature.UnmarshalText([]byte(request.Signature)) + if err != nil { + errorSet <- errors.New( + fmt.Sprintf( + "Request %s: Could not unmarshal text into verification signature: %e", + requestId, err, + ), + ) + } + for _, issuer := range *issuers { + if !issuer.ExpiresAt.IsZero() && issuer.ExpiresAt.Before(time.Now()) { + return } - verificationSignature := crypto.VerificationSignature{} - err = verificationSignature.UnmarshalText([]byte(request.Signature)) + // Only attempt token verification with the issuer that was provided. + issuerPublicKey := issuer.SigningKey.PublicKey() + marshaledPublicKey, err := issuerPublicKey.MarshalText() if err != nil { - return errors.New(fmt.Sprintf("Request %s: Could not unmarshal text into verification signature: %e", tokenRedeemRequestSet.Request_id, err)) + errorSet <- errors.New( + fmt.Sprintf( + "Request %s: Could not unmarshal issuer public key into text: %e", + requestId, err, + ), + ) } - for _, issuer := range *issuers { - if !issuer.ExpiresAt.IsZero() && issuer.ExpiresAt.Before(time.Now()) { - continue - } - // Only attempt token verification with the issuer that was provided. - issuerPublicKey := issuer.SigningKey.PublicKey() - marshaledPublicKey, err := issuerPublicKey.MarshalText() - if err != nil { - return errors.New(fmt.Sprintf("Request %s: Could not unmarshal issuer public key into text: %e", tokenRedeemRequestSet.Request_id, err)) - } - logger.Trace().Msg(fmt.Sprintf("Request %s: Issuer: %s, Request: %s", tokenRedeemRequestSet.Request_id, string(marshaledPublicKey), request.Public_key)) - if string(marshaledPublicKey) == request.Public_key { - if err := btd.VerifyTokenRedemption( - &tokenPreimage, - &verificationSignature, - string(request.Binding), - []*crypto.SigningKey{issuer.SigningKey}, - ); err != nil { - verified = false - } else { - verified = true - verifiedIssuer = &issuer - verifiedCohort = int32(issuer.IssuerCohort) - break - } + logger.Trace().Msg(fmt.Sprintf("Request %s: Issuer: %s, Request: %s", requestId, string(marshaledPublicKey), request.Public_key)) + if string(marshaledPublicKey) == request.Public_key { + if err := btd.VerifyTokenRedemption( + &tokenPreimage, + &verificationSignature, + string(request.Binding), + []*crypto.SigningKey{issuer.SigningKey}, + ); err != nil { + verified = false + } else { + verified = true + verifiedIssuer = &issuer + verifiedCohort = int32(issuer.IssuerCohort) + break } } + } - if !verified { - logger.Error().Msg(fmt.Sprintf("Request %s: Could not verify that the token redemption is valid", tokenRedeemRequestSet.Request_id)) - redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{ - Issuer_name: "", - Issuer_cohort: 0, - Status: UNVERIFIED, - Associated_data: request.Associated_data, - }) - continue - } else { - logger.Trace().Msg(fmt.Sprintf("Request %s: Validated", tokenRedeemRequestSet.Request_id)) + if !verified { + logger.Error().Msg(fmt.Sprintf("Request %s: Could not verify that the token redemption is valid", requestId)) + tokenRedeemResults <- avroSchema.RedeemResult{ + Issuer_name: "", + Issuer_cohort: 0, + Status: UNVERIFIED, + Associated_data: request.Associated_data, } - if err := server.RedeemToken(verifiedIssuer, &tokenPreimage, string(request.Binding)); err != nil { - logger.Error().Err(err).Msg(fmt.Sprintf("Request %s: Token redemption failed: %e", tokenRedeemRequestSet.Request_id, err)) - if strings.Contains(err.Error(), "Duplicate") { - logger.Error().Msg(fmt.Sprintf("Request %s: Duplicate redemption: %e", tokenRedeemRequestSet.Request_id, err)) - redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{ - Issuer_name: "", - Issuer_cohort: 0, - Status: DUPLICATE_REDEMPTION, - Associated_data: request.Associated_data, - }) - } - logger.Error().Msg(fmt.Sprintf("Request %s: Could not mark token redemption", tokenRedeemRequestSet.Request_id)) - redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{ + } else { + logger.Trace().Msg(fmt.Sprintf("Request %s: Validated", requestId)) + } + if err := server.RedeemToken(verifiedIssuer, &tokenPreimage, string(request.Binding)); err != nil { + logger.Error().Err(err).Msg(fmt.Sprintf("Request %s: Token redemption failed: %e", requestId, err)) + if strings.Contains(err.Error(), "Duplicate") { + logger.Error().Msg(fmt.Sprintf("Request %s: Duplicate redemption: %e", requestId, err)) + tokenRedeemResults <- avroSchema.RedeemResult{ Issuer_name: "", Issuer_cohort: 0, - Status: ERROR, + Status: DUPLICATE_REDEMPTION, Associated_data: request.Associated_data, - }) - continue + } } - logger.Trace().Msg(fmt.Sprintf("Request %s: Redeemed", tokenRedeemRequestSet.Request_id)) - issuerName := verifiedIssuer.IssuerType - redeemedTokenResults = append(redeemedTokenResults, avroSchema.RedeemResult{ - Issuer_name: issuerName, - Issuer_cohort: verifiedCohort, - Status: OK, + logger.Error().Msg(fmt.Sprintf("Request %s: Could not mark token redemption", requestId)) + tokenRedeemResults <- avroSchema.RedeemResult{ + Issuer_name: "", + Issuer_cohort: 0, + Status: ERROR, Associated_data: request.Associated_data, - }) - } - resultSet := avroSchema.RedeemResultSet{ - Request_id: tokenRedeemRequestSet.Request_id, - Data: redeemedTokenResults, - } - var resultSetBuffer bytes.Buffer - err = resultSet.Serialize(&resultSetBuffer) - if err != nil { - return errors.New(fmt.Sprintf("Request %s: Failed to serialize ResultSet: %e", tokenRedeemRequestSet.Request_id, err)) + } } - - err = Emit(producer, resultSetBuffer.Bytes(), logger) - if err != nil { - return errors.New(fmt.Sprintf("Request %s: Failed to emit results to topic %s: %e", tokenRedeemRequestSet.Request_id, producer.Topic, err)) + logger.Trace().Msg(fmt.Sprintf("Request %s: Redeemed", requestId)) + issuerName := verifiedIssuer.IssuerType + tokenRedeemResults <- avroSchema.RedeemResult{ + Issuer_name: issuerName, + Issuer_cohort: verifiedCohort, + Status: OK, + Associated_data: request.Associated_data, } - return nil } From be38029f57da6045f3152b6000e77b229ac98bb7 Mon Sep 17 00:00:00 2001 From: Jackson Egan Date: Mon, 20 Dec 2021 16:09:39 -0500 Subject: [PATCH 03/10] Reformat errors and logs --- kafka/signed_token_redeem_handler.go | 87 +++++++++++++++++++--------- 1 file changed, 59 insertions(+), 28 deletions(-) diff --git a/kafka/signed_token_redeem_handler.go b/kafka/signed_token_redeem_handler.go index 1e01e7e6..e7edb34b 100644 --- a/kafka/signed_token_redeem_handler.go +++ b/kafka/signed_token_redeem_handler.go @@ -28,17 +28,26 @@ func SignedTokenRedeemHandler( ) error { tokenRedeemRequestSet, err := avroSchema.DeserializeRedeemRequestSet(bytes.NewReader(data)) if err != nil { - return errors.New(fmt.Sprintf("Request %s: Failed Avro deserialization: %e", tokenRedeemRequestSet.Request_id, err)) + return errors.New(fmt.Sprintf( + "Request %s: Failed Avro deserialization: %e", + tokenRedeemRequestSet.Request_id, err, + )) } defer func() { if recover() != nil { - err = errors.New(fmt.Sprintf("Request %s: Redeem attempt panicked", tokenRedeemRequestSet.Request_id)) + err = errors.New(fmt.Sprintf( + "Request %s: Redeem attempt panicked", + tokenRedeemRequestSet.Request_id, + )) } }() if len(tokenRedeemRequestSet.Data) > 1 { // NOTE: When we start supporting multiple requests we will need to review // errors and return values as well. - return errors.New(fmt.Sprintf("Request %s: Data array unexpectedly contained more than a single message. This array is intended to make future extension easier, but no more than a single value is currently expected.", tokenRedeemRequestSet.Request_id)) + return errors.New(fmt.Sprintf( + "Request %s: Data array unexpectedly contained more than a single message. This array is intended to make future extension easier, but no more than a single value is currently expected.", + tokenRedeemRequestSet.Request_id, + )) } issuers, err := server.FetchAllIssuers() if err != nil { @@ -74,12 +83,21 @@ func SignedTokenRedeemHandler( var resultSetBuffer bytes.Buffer err = resultSet.Serialize(&resultSetBuffer) if err != nil { - return errors.New(fmt.Sprintf("Request %s: Failed to serialize ResultSet: %e", tokenRedeemRequestSet.Request_id, err)) + return errors.New(fmt.Sprintf( + "Request %s: Failed to serialize ResultSet: %e", + tokenRedeemRequestSet.Request_id, + err, + )) } err = Emit(producer, resultSetBuffer.Bytes(), logger) if err != nil { - return errors.New(fmt.Sprintf("Request %s: Failed to emit results to topic %s: %e", tokenRedeemRequestSet.Request_id, producer.Topic, err)) + return errors.New(fmt.Sprintf( + "Request %s: Failed to emit results to topic %s: %e", + tokenRedeemRequestSet.Request_id, + producer.Topic, + err, + )) } return nil } @@ -129,22 +147,18 @@ func handleTokenRedeemRequest( tokenPreimage := crypto.TokenPreimage{} err := tokenPreimage.UnmarshalText([]byte(request.Token_preimage)) if err != nil { - errorSet <- errors.New( - fmt.Sprintf( - "Request %s: Could not unmarshal text into preimage: %e", - requestId, err, - ), - ) + errorSet <- errors.New(fmt.Sprintf( + "Request %s: Could not unmarshal text into preimage: %e", + requestId, err, + )) } verificationSignature := crypto.VerificationSignature{} err = verificationSignature.UnmarshalText([]byte(request.Signature)) if err != nil { - errorSet <- errors.New( - fmt.Sprintf( - "Request %s: Could not unmarshal text into verification signature: %e", - requestId, err, - ), - ) + errorSet <- errors.New(fmt.Sprintf( + "Request %s: Could not unmarshal text into verification signature: %e", + requestId, err, + )) } for _, issuer := range *issuers { if !issuer.ExpiresAt.IsZero() && issuer.ExpiresAt.Before(time.Now()) { @@ -154,14 +168,17 @@ func handleTokenRedeemRequest( issuerPublicKey := issuer.SigningKey.PublicKey() marshaledPublicKey, err := issuerPublicKey.MarshalText() if err != nil { - errorSet <- errors.New( - fmt.Sprintf( - "Request %s: Could not unmarshal issuer public key into text: %e", - requestId, err, - ), - ) + errorSet <- errors.New(fmt.Sprintf( + "Request %s: Could not unmarshal issuer public key into text: %e", + requestId, err, + )) } - logger.Trace().Msg(fmt.Sprintf("Request %s: Issuer: %s, Request: %s", requestId, string(marshaledPublicKey), request.Public_key)) + logger.Trace().Msg(fmt.Sprintf( + "Request %s: Issuer: %s, Request: %s", + requestId, + string(marshaledPublicKey), + request.Public_key, + )) if string(marshaledPublicKey) == request.Public_key { if err := btd.VerifyTokenRedemption( &tokenPreimage, @@ -180,7 +197,10 @@ func handleTokenRedeemRequest( } if !verified { - logger.Error().Msg(fmt.Sprintf("Request %s: Could not verify that the token redemption is valid", requestId)) + logger.Error().Msg(fmt.Sprintf( + "Request %s: Could not verify that the token redemption is valid", + requestId, + )) tokenRedeemResults <- avroSchema.RedeemResult{ Issuer_name: "", Issuer_cohort: 0, @@ -191,9 +211,17 @@ func handleTokenRedeemRequest( logger.Trace().Msg(fmt.Sprintf("Request %s: Validated", requestId)) } if err := server.RedeemToken(verifiedIssuer, &tokenPreimage, string(request.Binding)); err != nil { - logger.Error().Err(err).Msg(fmt.Sprintf("Request %s: Token redemption failed: %e", requestId, err)) + logger.Error().Err(err).Msg(fmt.Sprintf( + "Request %s: Token redemption failed: %e", + requestId, + err, + )) if strings.Contains(err.Error(), "Duplicate") { - logger.Error().Msg(fmt.Sprintf("Request %s: Duplicate redemption: %e", requestId, err)) + logger.Error().Msg(fmt.Sprintf( + "Request %s: Duplicate redemption: %e", + requestId, + err, + )) tokenRedeemResults <- avroSchema.RedeemResult{ Issuer_name: "", Issuer_cohort: 0, @@ -201,7 +229,10 @@ func handleTokenRedeemRequest( Associated_data: request.Associated_data, } } - logger.Error().Msg(fmt.Sprintf("Request %s: Could not mark token redemption", requestId)) + logger.Error().Msg(fmt.Sprintf( + "Request %s: Could not mark token redemption", + requestId, + )) tokenRedeemResults <- avroSchema.RedeemResult{ Issuer_name: "", Issuer_cohort: 0, From c82e622cc86fe9cdd3e100c675ac03cd86bc333c Mon Sep 17 00:00:00 2001 From: Jackson Egan Date: Mon, 20 Dec 2021 16:15:34 -0500 Subject: [PATCH 04/10] Reformat errors and logs --- kafka/main.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/kafka/main.go b/kafka/main.go index a49d48ad..34bcd57f 100644 --- a/kafka/main.go +++ b/kafka/main.go @@ -103,11 +103,13 @@ func StartConsumers(providedServer *server.Server, logger *zerolog.Logger) error logger, ) if err != nil { - logger.Error().Err(err).Msg("Processing failed.") + logger.Error().Err(err).Msg("Processing failed. Not committing.") } else { - logger.Info().Msg(fmt.Sprintf("Processing completed. Committing")) + logger.Info().Msg(fmt.Sprintf("Processing completed. Committing.")) if err := consumer.CommitMessages(ctx, msg); err != nil { logger.Error().Msg(fmt.Sprintf("Failed to commit: %s", err)) + } else { + logger.Info().Msg(fmt.Sprintf("Committed offset %d for topic %s.", msg.Offset, msg.Topic)) } } } From c2242bfd8d68ce8f4d6fae9b641836b7d4edd0e1 Mon Sep 17 00:00:00 2001 From: Jackson Egan Date: Mon, 20 Dec 2021 16:16:17 -0500 Subject: [PATCH 05/10] Reformat errors and logs --- kafka/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/main.go b/kafka/main.go index 34bcd57f..51940a80 100644 --- a/kafka/main.go +++ b/kafka/main.go @@ -74,6 +74,7 @@ func StartConsumers(providedServer *server.Server, logger *zerolog.Logger) error for i := 1; i <= consumerCount; i++ { go func(topicMappings []TopicMapping) { consumer := newConsumer(topics, adsConsumerGroupV1, logger) + logger.Info().Msg(fmt.Sprintf("Reader Stats: %#v", consumer.Stats())) var ( failureCount = 0 failureLimit = 10 @@ -93,7 +94,6 @@ func StartConsumers(providedServer *server.Server, logger *zerolog.Logger) error continue } logger.Info().Msg(fmt.Sprintf("Processing message for topic %s at offset %d", msg.Topic, msg.Offset)) - logger.Info().Msg(fmt.Sprintf("Reader Stats: %#v", consumer.Stats())) for _, topicMapping := range topicMappings { if msg.Topic == topicMapping.Topic { err := topicMapping.Processor( From cc811883015aa43269af3c9b8c449f4c47314e66 Mon Sep 17 00:00:00 2001 From: Jackson Egan Date: Mon, 20 Dec 2021 17:02:03 -0500 Subject: [PATCH 06/10] Switch to use bat-go AppError --- kafka/main.go | 8 +- kafka/signed_blinded_token_issuer_handler.go | 86 ++++++++++---- kafka/signed_token_redeem_handler.go | 112 +++++++++++-------- 3 files changed, 136 insertions(+), 70 deletions(-) diff --git a/kafka/main.go b/kafka/main.go index 51940a80..935829a1 100644 --- a/kafka/main.go +++ b/kafka/main.go @@ -8,6 +8,7 @@ import ( "strings" "time" + batgo_handlers "github.com/brave-intl/bat-go/utils/handlers" batgo_kafka "github.com/brave-intl/bat-go/utils/kafka" "github.com/brave-intl/challenge-bypass-server/server" uuid "github.com/google/uuid" @@ -18,7 +19,12 @@ import ( var brokers []string -type Processor func([]byte, *kafka.Writer, *server.Server, *zerolog.Logger) error +type Processor func( + []byte, + *kafka.Writer, + *server.Server, + *zerolog.Logger, +) *batgo_handlers.AppError type TopicMapping struct { Topic string diff --git a/kafka/signed_blinded_token_issuer_handler.go b/kafka/signed_blinded_token_issuer_handler.go index 3b043475..fa3c4b05 100644 --- a/kafka/signed_blinded_token_issuer_handler.go +++ b/kafka/signed_blinded_token_issuer_handler.go @@ -6,6 +6,7 @@ import ( "fmt" "sync" + batgo_handlers "github.com/brave-intl/bat-go/utils/handlers" crypto "github.com/brave-intl/challenge-bypass-ristretto-ffi" avroSchema "github.com/brave-intl/challenge-bypass-server/avro/generated" "github.com/brave-intl/challenge-bypass-server/btd" @@ -25,19 +26,31 @@ func SignedBlindedTokenIssuerHandler( producer *kafka.Writer, server *cbpServer.Server, logger *zerolog.Logger, -) error { +) *batgo_handlers.AppError { blindedTokenRequestSet, err := avroSchema.DeserializeSigningRequestSet(bytes.NewReader(data)) if err != nil { - return errors.New(fmt.Sprintf("Request %s: Failed Avro deserialization: %e", blindedTokenRequestSet.Request_id, err)) + return batgo_handlers.WrapError( + err, + fmt.Sprintf( + "Request %s: Failed Avro deserialization", + blindedTokenRequestSet.Request_id, + ), + PERMANENT, + ) } if len(blindedTokenRequestSet.Data) > 1 { // NOTE: When we start supporting multiple requests we will need to review // errors and return values as well. - return errors.New(fmt.Sprintf("Request %s: Data array unexpectedly contained more than a single message. This array is intended to make future extension easier, but no more than a single value is currently expected.", blindedTokenRequestSet.Request_id)) + message := fmt.Sprintf("Request %s: Data array unexpectedly contained more than a single message. This array is intended to make future extension easier, but no more than a single value is currently expected.", blindedTokenRequestSet.Request_id) + return batgo_handlers.WrapError( + errors.New(message), + message, + PERMANENT, + ) } var wg sync.WaitGroup blindedTokenResultsChannel := make(chan avroSchema.SigningResult) - errorSet := make(chan error) + errorSet := make(chan *batgo_handlers.AppError) for _, request := range blindedTokenRequestSet.Data { wg.Add(1) @@ -65,11 +78,27 @@ func SignedBlindedTokenIssuerHandler( var resultSetBuffer bytes.Buffer err = resultSet.Serialize(&resultSetBuffer) if err != nil { - return errors.New(fmt.Sprintf("Request %s: Failed to serialize ResultSet: %s", blindedTokenRequestSet.Request_id, resultSet)) + return batgo_handlers.WrapError( + err, + fmt.Sprintf( + "Request %s: Failed to serialize ResultSet: %s", + blindedTokenRequestSet.Request_id, + resultSet, + ), + PERMANENT, + ) } err = Emit(producer, resultSetBuffer.Bytes(), logger) if err != nil { - return errors.New(fmt.Sprintf("Request %s: Failed to emit results to topic %s: %e", blindedTokenRequestSet.Request_id, producer.Topic, err)) + return batgo_handlers.WrapError( + err, + fmt.Sprintf( + "Request %s: Failed to emit results to topic %s", + blindedTokenRequestSet.Request_id, + producer.Topic, + ), + TEMPORARY, + ) } return nil } @@ -79,7 +108,7 @@ func handleBlindedTokenRequest( request avroSchema.SigningRequest, requestId string, blindedTokenResults chan avroSchema.SigningResult, - errorSet chan error, + errorSet chan *batgo_handlers.AppError, server *cbpServer.Server, logger *zerolog.Logger, ) { @@ -90,16 +119,24 @@ func handleBlindedTokenRequest( ERROR = 2 ) if request.Blinded_tokens == nil { - errorSet <- errors.New(fmt.Sprintf("Request %s: Empty request", requestId)) + message := fmt.Sprintf("Request %s: Empty request", requestId) + errorSet <- batgo_handlers.WrapError( + errors.New(message), + message, + PERMANENT, + ) } if request.Issuer_cohort != 0 && request.Issuer_cohort != 1 { - errorSet <- errors.New( - fmt.Sprintf( - "Request %s: Provided cohort is not supported: %d", - requestId, - request.Issuer_cohort, - ), + message := fmt.Sprintf( + "Request %s: Provided cohort is not supported: %d", + requestId, + request.Issuer_cohort, + ) + errorSet <- batgo_handlers.WrapError( + errors.New(message), + message, + PERMANENT, ) } @@ -143,24 +180,26 @@ func handleBlindedTokenRequest( } marshaledDLEQProof, err := dleqProof.MarshalText() if err != nil { - errorSet <- errors.New( + errorSet <- batgo_handlers.WrapError( + err, fmt.Sprintf( - "Request %s: Could not marshal DLEQ proof: %e", + "Request %s: Could not marshal DLEQ proof", requestId, - err, ), + PERMANENT, ) } var marshaledTokens []string for _, token := range signedTokens { marshaledToken, err := token.MarshalText() if err != nil { - errorSet <- errors.New( + errorSet <- batgo_handlers.WrapError( + err, fmt.Sprintf( - "Request %s: Could not marshal new tokens to bytes: %e", + "Request %s: Could not marshal new tokens to bytes", requestId, - err, ), + PERMANENT, ) } marshaledTokens = append(marshaledTokens, string(marshaledToken[:])) @@ -168,12 +207,13 @@ func handleBlindedTokenRequest( publicKey := issuer.SigningKey.PublicKey() marshaledPublicKey, err := publicKey.MarshalText() if err != nil { - errorSet <- errors.New( + errorSet <- batgo_handlers.WrapError( + err, fmt.Sprintf( - "Request %s: Could not marshal signing key: %e", + "Request %s: Could not marshal signing key", requestId, - err, ), + PERMANENT, ) } blindedTokenResults <- avroSchema.SigningResult{ diff --git a/kafka/signed_token_redeem_handler.go b/kafka/signed_token_redeem_handler.go index e7edb34b..17295a4e 100644 --- a/kafka/signed_token_redeem_handler.go +++ b/kafka/signed_token_redeem_handler.go @@ -8,6 +8,7 @@ import ( "sync" "time" + batgo_handlers "github.com/brave-intl/bat-go/utils/handlers" crypto "github.com/brave-intl/challenge-bypass-ristretto-ffi" avroSchema "github.com/brave-intl/challenge-bypass-server/avro/generated" "github.com/brave-intl/challenge-bypass-server/btd" @@ -25,37 +26,44 @@ func SignedTokenRedeemHandler( producer *kafka.Writer, server *cbpServer.Server, logger *zerolog.Logger, -) error { +) *batgo_handlers.AppError { tokenRedeemRequestSet, err := avroSchema.DeserializeRedeemRequestSet(bytes.NewReader(data)) if err != nil { - return errors.New(fmt.Sprintf( - "Request %s: Failed Avro deserialization: %e", - tokenRedeemRequestSet.Request_id, err, - )) + return batgo_handlers.WrapError( + err, + fmt.Sprintf( + "Request %s: Failed Avro deserialization: %e", + tokenRedeemRequestSet.Request_id, err, + ), + PERMANENT, + ) } - defer func() { - if recover() != nil { - err = errors.New(fmt.Sprintf( - "Request %s: Redeem attempt panicked", - tokenRedeemRequestSet.Request_id, - )) - } - }() if len(tokenRedeemRequestSet.Data) > 1 { // NOTE: When we start supporting multiple requests we will need to review // errors and return values as well. - return errors.New(fmt.Sprintf( + message := fmt.Sprintf( "Request %s: Data array unexpectedly contained more than a single message. This array is intended to make future extension easier, but no more than a single value is currently expected.", - tokenRedeemRequestSet.Request_id, - )) + tokenRedeemRequestSet.Request_id) + return batgo_handlers.WrapError( + errors.New(message), + message, + PERMANENT, + ) } issuers, err := server.FetchAllIssuers() if err != nil { - return errors.New(fmt.Sprintf("Request %s: Failed to fetch all issuers", tokenRedeemRequestSet.Request_id)) + return batgo_handlers.WrapError( + err, + fmt.Sprintf( + "Request %s: Failed to fetch all issuers", + tokenRedeemRequestSet.Request_id, + ), + TEMPORARY, + ) } var wg sync.WaitGroup tokenRedeemResultsChannel := make(chan avroSchema.RedeemResult) - errorSet := make(chan error) + errorSet := make(chan *batgo_handlers.AppError) for _, request := range tokenRedeemRequestSet.Data { wg.Add(1) handleTokenRedeemRequest( @@ -83,21 +91,27 @@ func SignedTokenRedeemHandler( var resultSetBuffer bytes.Buffer err = resultSet.Serialize(&resultSetBuffer) if err != nil { - return errors.New(fmt.Sprintf( - "Request %s: Failed to serialize ResultSet: %e", - tokenRedeemRequestSet.Request_id, + return batgo_handlers.WrapError( err, - )) + fmt.Sprintf( + "Request %s: Failed to serialize ResultSet.", + tokenRedeemRequestSet.Request_id, + ), + PERMANENT, + ) } err = Emit(producer, resultSetBuffer.Bytes(), logger) if err != nil { - return errors.New(fmt.Sprintf( - "Request %s: Failed to emit results to topic %s: %e", - tokenRedeemRequestSet.Request_id, - producer.Topic, + return batgo_handlers.WrapError( err, - )) + fmt.Sprintf( + "Request %s: Failed to emit results to topic %s.", + tokenRedeemRequestSet.Request_id, + producer.Topic, + ), + TEMPORARY, + ) } return nil } @@ -108,17 +122,11 @@ func handleTokenRedeemRequest( requestId string, issuers *[]cbpServer.Issuer, tokenRedeemResults chan avroSchema.RedeemResult, - errorSet chan error, + errorSet chan *batgo_handlers.AppError, server *cbpServer.Server, logger *zerolog.Logger, ) { defer wg.Done() - const ( - OK = 0 - DUPLICATE_REDEMPTION = 1 - UNVERIFIED = 2 - ERROR = 3 - ) var ( verified = false verifiedIssuer = &cbpServer.Issuer{} @@ -147,18 +155,26 @@ func handleTokenRedeemRequest( tokenPreimage := crypto.TokenPreimage{} err := tokenPreimage.UnmarshalText([]byte(request.Token_preimage)) if err != nil { - errorSet <- errors.New(fmt.Sprintf( - "Request %s: Could not unmarshal text into preimage: %e", - requestId, err, - )) + errorSet <- batgo_handlers.WrapError( + err, + fmt.Sprintf( + "Request %s: Could not unmarshal text into preimage", + requestId, + ), + PERMANENT, + ) } verificationSignature := crypto.VerificationSignature{} err = verificationSignature.UnmarshalText([]byte(request.Signature)) if err != nil { - errorSet <- errors.New(fmt.Sprintf( - "Request %s: Could not unmarshal text into verification signature: %e", - requestId, err, - )) + errorSet <- batgo_handlers.WrapError( + err, + fmt.Sprintf( + "Request %s: Could not unmarshal text into verification signature", + requestId, + ), + PERMANENT, + ) } for _, issuer := range *issuers { if !issuer.ExpiresAt.IsZero() && issuer.ExpiresAt.Before(time.Now()) { @@ -168,10 +184,14 @@ func handleTokenRedeemRequest( issuerPublicKey := issuer.SigningKey.PublicKey() marshaledPublicKey, err := issuerPublicKey.MarshalText() if err != nil { - errorSet <- errors.New(fmt.Sprintf( - "Request %s: Could not unmarshal issuer public key into text: %e", - requestId, err, - )) + errorSet <- batgo_handlers.WrapError( + err, + fmt.Sprintf( + "Request %s: Could not unmarshal issuer public key into text", + requestId, + ), + PERMANENT, + ) } logger.Trace().Msg(fmt.Sprintf( "Request %s: Issuer: %s, Request: %s", From 6f1a8ef576239e479c34d9af28517d191bc8746e Mon Sep 17 00:00:00 2001 From: Jackson Egan Date: Mon, 20 Dec 2021 22:44:38 -0500 Subject: [PATCH 07/10] Add error codes file --- kafka/error_codes.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 kafka/error_codes.go diff --git a/kafka/error_codes.go b/kafka/error_codes.go new file mode 100644 index 00000000..d2410c09 --- /dev/null +++ b/kafka/error_codes.go @@ -0,0 +1,13 @@ +package kafka + +const ( + // Error types that determine commit or retry for a given offset + PERMANENT = 0 + TEMPORARY = 1 + // Error types that determine which error response is includeded in the emission to + // Kafka for a given request failure. + OK = 0 + DUPLICATE_REDEMPTION = 1 + UNVERIFIED = 2 + ERROR = 3 +) From 326a3d718de27292b4ab570428e92ff0a0d22523 Mon Sep 17 00:00:00 2001 From: Jackson Egan Date: Tue, 21 Dec 2021 10:51:53 -0500 Subject: [PATCH 08/10] Only commit offset if all errors are permanent --- kafka/main.go | 21 ++++++---- kafka/signed_blinded_token_issuer_handler.go | 44 ++++++++++++-------- kafka/signed_token_redeem_handler.go | 40 ++++++++++-------- 3 files changed, 62 insertions(+), 43 deletions(-) diff --git a/kafka/main.go b/kafka/main.go index 935829a1..f5ffdafc 100644 --- a/kafka/main.go +++ b/kafka/main.go @@ -24,7 +24,7 @@ type Processor func( *kafka.Writer, *server.Server, *zerolog.Logger, -) *batgo_handlers.AppError +) []*batgo_handlers.AppError type TopicMapping struct { Topic string @@ -102,18 +102,25 @@ func StartConsumers(providedServer *server.Server, logger *zerolog.Logger) error logger.Info().Msg(fmt.Sprintf("Processing message for topic %s at offset %d", msg.Topic, msg.Offset)) for _, topicMapping := range topicMappings { if msg.Topic == topicMapping.Topic { - err := topicMapping.Processor( + appErrors := topicMapping.Processor( msg.Value, topicMapping.ResultProducer, providedServer, logger, ) - if err != nil { - logger.Error().Err(err).Msg("Processing failed. Not committing.") - } else { - logger.Info().Msg(fmt.Sprintf("Processing completed. Committing.")) + shallCommit := true + if appErrors != nil { + for _, appErr := range appErrors { + if appErr.Code == TEMPORARY { + logger.Error().Err(appErr.Cause).Msg("Processing failed with temporary error. Not committing. Will retry.") + shallCommit = false + } + } + } + if shallCommit { + logger.Info().Msg(fmt.Sprintf("Processing completed without error. Committing.")) if err := consumer.CommitMessages(ctx, msg); err != nil { - logger.Error().Msg(fmt.Sprintf("Failed to commit: %s", err)) + logger.Error().Err(err).Msg("Failed to commit") } else { logger.Info().Msg(fmt.Sprintf("Committed offset %d for topic %s.", msg.Offset, msg.Topic)) } diff --git a/kafka/signed_blinded_token_issuer_handler.go b/kafka/signed_blinded_token_issuer_handler.go index fa3c4b05..b0f89157 100644 --- a/kafka/signed_blinded_token_issuer_handler.go +++ b/kafka/signed_blinded_token_issuer_handler.go @@ -26,31 +26,34 @@ func SignedBlindedTokenIssuerHandler( producer *kafka.Writer, server *cbpServer.Server, logger *zerolog.Logger, -) *batgo_handlers.AppError { +) []*batgo_handlers.AppError { + var errorSet []*batgo_handlers.AppError blindedTokenRequestSet, err := avroSchema.DeserializeSigningRequestSet(bytes.NewReader(data)) if err != nil { - return batgo_handlers.WrapError( + errorSet = append(errorSet, batgo_handlers.WrapError( err, fmt.Sprintf( "Request %s: Failed Avro deserialization", blindedTokenRequestSet.Request_id, ), PERMANENT, - ) + )) + return errorSet } if len(blindedTokenRequestSet.Data) > 1 { // NOTE: When we start supporting multiple requests we will need to review // errors and return values as well. message := fmt.Sprintf("Request %s: Data array unexpectedly contained more than a single message. This array is intended to make future extension easier, but no more than a single value is currently expected.", blindedTokenRequestSet.Request_id) - return batgo_handlers.WrapError( + errorSet = append(errorSet, batgo_handlers.WrapError( errors.New(message), message, PERMANENT, - ) + )) + return errorSet } var wg sync.WaitGroup blindedTokenResultsChannel := make(chan avroSchema.SigningResult) - errorSet := make(chan *batgo_handlers.AppError) + errorChannel := make(chan *batgo_handlers.AppError) for _, request := range blindedTokenRequestSet.Data { wg.Add(1) @@ -59,7 +62,7 @@ func SignedBlindedTokenIssuerHandler( request, blindedTokenRequestSet.Request_id, blindedTokenResultsChannel, - errorSet, + errorChannel, server, logger, ) @@ -70,6 +73,9 @@ func SignedBlindedTokenIssuerHandler( for blindedTokenResult := range blindedTokenResultsChannel { blindedTokenResults = append(blindedTokenResults, blindedTokenResult) } + for errorChannelResult := range errorChannel { + errorSet = append(errorSet, errorChannelResult) + } resultSet := avroSchema.SigningResultSet{ Request_id: blindedTokenRequestSet.Request_id, @@ -78,7 +84,7 @@ func SignedBlindedTokenIssuerHandler( var resultSetBuffer bytes.Buffer err = resultSet.Serialize(&resultSetBuffer) if err != nil { - return batgo_handlers.WrapError( + errorSet = append(errorSet, batgo_handlers.WrapError( err, fmt.Sprintf( "Request %s: Failed to serialize ResultSet: %s", @@ -86,11 +92,12 @@ func SignedBlindedTokenIssuerHandler( resultSet, ), PERMANENT, - ) + )) + return errorSet } err = Emit(producer, resultSetBuffer.Bytes(), logger) if err != nil { - return batgo_handlers.WrapError( + errorSet = append(errorSet, batgo_handlers.WrapError( err, fmt.Sprintf( "Request %s: Failed to emit results to topic %s", @@ -98,9 +105,10 @@ func SignedBlindedTokenIssuerHandler( producer.Topic, ), TEMPORARY, - ) + )) + return errorSet } - return nil + return errorSet } func handleBlindedTokenRequest( @@ -108,7 +116,7 @@ func handleBlindedTokenRequest( request avroSchema.SigningRequest, requestId string, blindedTokenResults chan avroSchema.SigningResult, - errorSet chan *batgo_handlers.AppError, + errorChannel chan *batgo_handlers.AppError, server *cbpServer.Server, logger *zerolog.Logger, ) { @@ -120,7 +128,7 @@ func handleBlindedTokenRequest( ) if request.Blinded_tokens == nil { message := fmt.Sprintf("Request %s: Empty request", requestId) - errorSet <- batgo_handlers.WrapError( + errorChannel <- batgo_handlers.WrapError( errors.New(message), message, PERMANENT, @@ -133,7 +141,7 @@ func handleBlindedTokenRequest( requestId, request.Issuer_cohort, ) - errorSet <- batgo_handlers.WrapError( + errorChannel <- batgo_handlers.WrapError( errors.New(message), message, PERMANENT, @@ -180,7 +188,7 @@ func handleBlindedTokenRequest( } marshaledDLEQProof, err := dleqProof.MarshalText() if err != nil { - errorSet <- batgo_handlers.WrapError( + errorChannel <- batgo_handlers.WrapError( err, fmt.Sprintf( "Request %s: Could not marshal DLEQ proof", @@ -193,7 +201,7 @@ func handleBlindedTokenRequest( for _, token := range signedTokens { marshaledToken, err := token.MarshalText() if err != nil { - errorSet <- batgo_handlers.WrapError( + errorChannel <- batgo_handlers.WrapError( err, fmt.Sprintf( "Request %s: Could not marshal new tokens to bytes", @@ -207,7 +215,7 @@ func handleBlindedTokenRequest( publicKey := issuer.SigningKey.PublicKey() marshaledPublicKey, err := publicKey.MarshalText() if err != nil { - errorSet <- batgo_handlers.WrapError( + errorChannel <- batgo_handlers.WrapError( err, fmt.Sprintf( "Request %s: Could not marshal signing key", diff --git a/kafka/signed_token_redeem_handler.go b/kafka/signed_token_redeem_handler.go index 17295a4e..4c7fed98 100644 --- a/kafka/signed_token_redeem_handler.go +++ b/kafka/signed_token_redeem_handler.go @@ -26,17 +26,18 @@ func SignedTokenRedeemHandler( producer *kafka.Writer, server *cbpServer.Server, logger *zerolog.Logger, -) *batgo_handlers.AppError { +) []*batgo_handlers.AppError { + var errorSet []*batgo_handlers.AppError tokenRedeemRequestSet, err := avroSchema.DeserializeRedeemRequestSet(bytes.NewReader(data)) if err != nil { - return batgo_handlers.WrapError( + errorSet = append(errorSet, batgo_handlers.WrapError( err, fmt.Sprintf( "Request %s: Failed Avro deserialization: %e", tokenRedeemRequestSet.Request_id, err, ), PERMANENT, - ) + )) } if len(tokenRedeemRequestSet.Data) > 1 { // NOTE: When we start supporting multiple requests we will need to review @@ -44,26 +45,26 @@ func SignedTokenRedeemHandler( message := fmt.Sprintf( "Request %s: Data array unexpectedly contained more than a single message. This array is intended to make future extension easier, but no more than a single value is currently expected.", tokenRedeemRequestSet.Request_id) - return batgo_handlers.WrapError( + errorSet = append(errorSet, batgo_handlers.WrapError( errors.New(message), message, PERMANENT, - ) + )) } issuers, err := server.FetchAllIssuers() if err != nil { - return batgo_handlers.WrapError( + errorSet = append(errorSet, batgo_handlers.WrapError( err, fmt.Sprintf( "Request %s: Failed to fetch all issuers", tokenRedeemRequestSet.Request_id, ), TEMPORARY, - ) + )) } var wg sync.WaitGroup tokenRedeemResultsChannel := make(chan avroSchema.RedeemResult) - errorSet := make(chan *batgo_handlers.AppError) + errorChannel := make(chan *batgo_handlers.AppError) for _, request := range tokenRedeemRequestSet.Data { wg.Add(1) handleTokenRedeemRequest( @@ -72,7 +73,7 @@ func SignedTokenRedeemHandler( tokenRedeemRequestSet.Request_id, issuers, tokenRedeemResultsChannel, - errorSet, + errorChannel, server, logger, ) @@ -83,6 +84,9 @@ func SignedTokenRedeemHandler( for tokenRedeemResult := range tokenRedeemResultsChannel { tokenRedeemResults = append(tokenRedeemResults, tokenRedeemResult) } + for errorChannelResult := range errorChannel { + errorSet = append(errorSet, errorChannelResult) + } resultSet := avroSchema.RedeemResultSet{ Request_id: tokenRedeemRequestSet.Request_id, @@ -91,19 +95,19 @@ func SignedTokenRedeemHandler( var resultSetBuffer bytes.Buffer err = resultSet.Serialize(&resultSetBuffer) if err != nil { - return batgo_handlers.WrapError( + errorSet = append(errorSet, batgo_handlers.WrapError( err, fmt.Sprintf( "Request %s: Failed to serialize ResultSet.", tokenRedeemRequestSet.Request_id, ), PERMANENT, - ) + )) } err = Emit(producer, resultSetBuffer.Bytes(), logger) if err != nil { - return batgo_handlers.WrapError( + errorSet = append(errorSet, batgo_handlers.WrapError( err, fmt.Sprintf( "Request %s: Failed to emit results to topic %s.", @@ -111,9 +115,9 @@ func SignedTokenRedeemHandler( producer.Topic, ), TEMPORARY, - ) + )) } - return nil + return errorSet } func handleTokenRedeemRequest( @@ -122,7 +126,7 @@ func handleTokenRedeemRequest( requestId string, issuers *[]cbpServer.Issuer, tokenRedeemResults chan avroSchema.RedeemResult, - errorSet chan *batgo_handlers.AppError, + errorChannel chan *batgo_handlers.AppError, server *cbpServer.Server, logger *zerolog.Logger, ) { @@ -155,7 +159,7 @@ func handleTokenRedeemRequest( tokenPreimage := crypto.TokenPreimage{} err := tokenPreimage.UnmarshalText([]byte(request.Token_preimage)) if err != nil { - errorSet <- batgo_handlers.WrapError( + errorChannel <- batgo_handlers.WrapError( err, fmt.Sprintf( "Request %s: Could not unmarshal text into preimage", @@ -167,7 +171,7 @@ func handleTokenRedeemRequest( verificationSignature := crypto.VerificationSignature{} err = verificationSignature.UnmarshalText([]byte(request.Signature)) if err != nil { - errorSet <- batgo_handlers.WrapError( + errorChannel <- batgo_handlers.WrapError( err, fmt.Sprintf( "Request %s: Could not unmarshal text into verification signature", @@ -184,7 +188,7 @@ func handleTokenRedeemRequest( issuerPublicKey := issuer.SigningKey.PublicKey() marshaledPublicKey, err := issuerPublicKey.MarshalText() if err != nil { - errorSet <- batgo_handlers.WrapError( + errorChannel <- batgo_handlers.WrapError( err, fmt.Sprintf( "Request %s: Could not unmarshal issuer public key into text", From 03eedf2df252ac2e0b7b2a5831f0c94913e079fe Mon Sep 17 00:00:00 2001 From: Jackson Egan Date: Mon, 3 Jan 2022 10:01:12 -0500 Subject: [PATCH 09/10] WIP --- kafka/main.go | 2 ++ kafka/signed_blinded_token_issuer_handler.go | 4 ++-- server/dynamo.go | 2 ++ 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/kafka/main.go b/kafka/main.go index f5ffdafc..a177a976 100644 --- a/kafka/main.go +++ b/kafka/main.go @@ -103,6 +103,8 @@ func StartConsumers(providedServer *server.Server, logger *zerolog.Logger) error for _, topicMapping := range topicMappings { if msg.Topic == topicMapping.Topic { appErrors := topicMapping.Processor( + // Readbatch instead of using msg.Value + // Batch has getMessage (readMessage?) which is a msg that we can run value on. Iterate on this top level valueh msg.Value, topicMapping.ResultProducer, providedServer, diff --git a/kafka/signed_blinded_token_issuer_handler.go b/kafka/signed_blinded_token_issuer_handler.go index b0f89157..991088a9 100644 --- a/kafka/signed_blinded_token_issuer_handler.go +++ b/kafka/signed_blinded_token_issuer_handler.go @@ -57,7 +57,7 @@ func SignedBlindedTokenIssuerHandler( for _, request := range blindedTokenRequestSet.Data { wg.Add(1) - handleBlindedTokenRequest( + go handleBlindedTokenRequest( wg, request, blindedTokenRequestSet.Request_id, @@ -65,7 +65,7 @@ func SignedBlindedTokenIssuerHandler( errorChannel, server, logger, - ) + )(wg) } wg.Wait() diff --git a/server/dynamo.go b/server/dynamo.go index 4ed261f8..8a2f37d9 100644 --- a/server/dynamo.go +++ b/server/dynamo.go @@ -107,6 +107,8 @@ func (c *Server) redeemTokenV2(issuer *Issuer, preimage *crypto.TokenPreimage, p } input := &dynamodb.PutItemInput{ + // @ME emit different error messages depending on whether the ID is the same and the data is different or the ID is the same AND the data is the same. + // Batch persistence if possible Item: av, ConditionExpression: aws.String("attribute_not_exists(id)"), TableName: aws.String("redemptions"), From e6a06c793d1da5ed9f47ba91cb7d38e32313308c Mon Sep 17 00:00:00 2001 From: Jackson Egan Date: Mon, 3 Jan 2022 13:27:59 -0500 Subject: [PATCH 10/10] Building --- kafka/signed_blinded_token_issuer_handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/signed_blinded_token_issuer_handler.go b/kafka/signed_blinded_token_issuer_handler.go index 991088a9..868b0b48 100644 --- a/kafka/signed_blinded_token_issuer_handler.go +++ b/kafka/signed_blinded_token_issuer_handler.go @@ -65,7 +65,7 @@ func SignedBlindedTokenIssuerHandler( errorChannel, server, logger, - )(wg) + ) } wg.Wait()