diff --git a/.build-tools/builtin-authentication-profiles.yaml b/.build-tools/builtin-authentication-profiles.yaml index cccb195a44..56f0621017 100644 --- a/.build-tools/builtin-authentication-profiles.yaml +++ b/.build-tools/builtin-authentication-profiles.yaml @@ -3,7 +3,7 @@ aws: description: | Authenticate using an Access Key ID and Secret Access Key included in the metadata metadata: - - name: awsRegion + - name: region type: string required: true description: | @@ -25,8 +25,29 @@ aws: description: | AWS session token to use. A session token is only required if you are using temporary security credentials. - example: '"TOKEN"' + - title: "AWS: Assume IAM Role" + description: | + Assume a specific IAM role. Note: This is only supported for Kafka and PostgreSQL. + metadata: + - name: region + type: string + required: true + description: | + The AWS Region where the AWS resource is deployed to. + example: '"us-east-1"' + - name: assumeRoleArn + type: string + required: false + description: | + IAM role that has access to AWS resource. + This is another option to authenticate with MSK and RDS Aurora aside from the AWS Credentials. + example: '"arn:aws:iam::123456789:role/mskRole"' + - name: sessionName type: string + description: | + The session name for assuming a role. + example: '"MyAppSession"' + default: '"DaprDefaultSession"' - title: "AWS: Credentials from Environment Variables" description: Use AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY from the environment - title: "AWS: IAM Roles Anywhere" diff --git a/.build-tools/pkg/metadataschema/builtin-authentication-profiles.go b/.build-tools/pkg/metadataschema/builtin-authentication-profiles.go index 28d6659188..eb925cd917 100644 --- a/.build-tools/pkg/metadataschema/builtin-authentication-profiles.go +++ b/.build-tools/pkg/metadataschema/builtin-authentication-profiles.go @@ -32,14 +32,46 @@ func ParseBuiltinAuthenticationProfile(bi BuiltinAuthenticationProfile, componen for i, profile := range profiles { res[i] = profile - res[i].Metadata = mergedMetadata(bi.Metadata, res[i].Metadata...) + // convert slice to a slice of pointers to update in place for required -> non-required fields + metadataPtr := make([]*Metadata, len(profile.Metadata)) + for j := range profile.Metadata { + metadataPtr[j] = &profile.Metadata[j] + } + + if componentTitle == "Apache Kafka" { + removeRequiredOnSomeAWSFields(&metadataPtr) + } + + // convert back to value slices for merging + updatedMetadata := make([]Metadata, 0, len(metadataPtr)) + for _, ptr := range metadataPtr { + if ptr != nil { + updatedMetadata = append(updatedMetadata, *ptr) + } + } + + merged := mergedMetadata(bi.Metadata, updatedMetadata...) + + // Note: We must apply the removal of deprecated fields after the merge!! - // If component is PostgreSQL, filter out duplicated aws profile fields - if strings.ToLower(componentTitle) == "postgresql" && bi.Name == "aws" { - res[i].Metadata = filterOutDuplicateFields(res[i].Metadata) + // Here, we remove some deprecated fields as we support the transition to a new auth profile + if profile.Title == "AWS: Assume specific IAM Role" && componentTitle == "Apache Kafka" { + merged = removeSomeDeprecatedFieldsOnUnrelatedAuthProfiles(merged) } + // Here, there are no metadata fields that need deprecating + if profile.Title == "AWS: Credentials from Environment Variables" && componentTitle == "Apache Kafka" { + merged = removeAllDeprecatedFieldsOnUnrelatedAuthProfiles(merged) + } + + // Here, this is a new auth profile, so rm all deprecating fields as unrelated. + if profile.Title == "AWS: IAM Roles Anywhere" && componentTitle == "Apache Kafka" { + merged = removeAllDeprecatedFieldsOnUnrelatedAuthProfiles(merged) + } + + res[i].Metadata = merged } + return res, nil } @@ -54,26 +86,49 @@ func mergedMetadata(base []Metadata, add ...Metadata) []Metadata { return res } -// filterOutDuplicateFields removes specific duplicated fields from the metadata -func filterOutDuplicateFields(metadata []Metadata) []Metadata { - duplicateFields := map[string]int{ - "awsRegion": 0, - "accessKey": 0, - "secretKey": 0, +// removeRequiredOnSomeAWSFields needs to be removed in Dapr 1.17 as duplicated AWS IAM fields get removed, +// and we standardize on these fields. +// Currently, there are: awsAccessKey, accessKey and awsSecretKey, secretKey, and awsRegion and region fields. +// We normally have accessKey, secretKey, and region fields marked required as it is part of the builtin AWS auth profile fields. +// However, as we rm the aws prefixed ones, we need to then mark the normally required ones as not required only for postgres and kafka. +// This way we do not break existing users, and transition them to the standardized fields. +func removeRequiredOnSomeAWSFields(metadata *[]*Metadata) { + if metadata == nil { + return } - filteredMetadata := []Metadata{} + for _, field := range *metadata { + if field == nil { + continue + } + + if field.Name == "accessKey" || field.Name == "secretKey" || field.Name == "region" { + field.Required = false + } + } +} +func removeAllDeprecatedFieldsOnUnrelatedAuthProfiles(metadata []Metadata) []Metadata { + filteredMetadata := []Metadata{} for _, field := range metadata { - if _, exists := duplicateFields[field.Name]; !exists { + if strings.HasPrefix(field.Name, "aws") { + continue + } else { filteredMetadata = append(filteredMetadata, field) + } + } + + return filteredMetadata +} + +func removeSomeDeprecatedFieldsOnUnrelatedAuthProfiles(metadata []Metadata) []Metadata { + filteredMetadata := []Metadata{} + + for _, field := range metadata { + if field.Name == "awsAccessKey" || field.Name == "awsSecretKey" || field.Name == "awsSessionToken" { + continue } else { - if field.Name == "awsRegion" && duplicateFields["awsRegion"] == 0 { - filteredMetadata = append(filteredMetadata, field) - duplicateFields["awsRegion"]++ - } else if field.Name != "awsRegion" { - continue - } + filteredMetadata = append(filteredMetadata, field) } } diff --git a/bindings/aws/sns/sns.go b/bindings/aws/sns/sns.go index 5464f1f044..370cabdf25 100644 --- a/bindings/aws/sns/sns.go +++ b/bindings/aws/sns/sns.go @@ -43,6 +43,7 @@ type snsMetadata struct { SessionToken string `json:"sessionToken" mapstructure:"sessionToken" mdignore:"true"` TopicArn string `json:"topicArn"` + // TODO: in Dapr 1.17 rm the alias on region as we remove the aws prefix on these fields Region string `json:"region" mapstructure:"region" mapstructurealiases:"awsRegion" mdignore:"true"` Endpoint string `json:"endpoint"` } diff --git a/bindings/kafka/metadata.yaml b/bindings/kafka/metadata.yaml index ab24f3e8fe..435debf90e 100644 --- a/bindings/kafka/metadata.yaml +++ b/bindings/kafka/metadata.yaml @@ -14,6 +14,75 @@ binding: operations: - name: create description: "Publish a new message in the topic." +# This auth profile has duplicate fields intentionally as we maintain backwards compatibility, +# but also move Kafka to utilize the noramlized AWS fields in the builtin auth profiles. +# TODO: rm the duplicate aws prefixed fields in Dapr 1.17. +builtinAuthenticationProfiles: + - name: "aws" + metadata: + - name: authType + type: string + required: true + description: | + Authentication type. + This must be set to "awsiam" for this authentication profile. + example: '"awsiam"' + allowedValues: + - "awsiam" + - name: awsRegion + type: string + required: false + description: | + This maintains backwards compatibility with existing fields. + It will be deprecated as of Dapr 1.17. Use 'region' instead. + The AWS Region where the AWS Relational Database Service is deployed to. + example: '"us-east-1"' + - name: awsAccessKey + type: string + required: false + description: | + This maintains backwards compatibility with existing fields. + It will be deprecated as of Dapr 1.17. Use 'accessKey' instead. + If both fields are set, then 'accessKey' value will be used. + AWS access key associated with an IAM account. + example: '"AKIAIOSFODNN7EXAMPLE"' + - name: awsSecretKey + type: string + required: false + sensitive: true + description: | + This maintains backwards compatibility with existing fields. + It will be deprecated as of Dapr 1.17. Use 'secretKey' instead. + If both fields are set, then 'secretKey' value will be used. + The secret key associated with the access key. + example: '"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"' + - name: awsSessionToken + type: string + sensitive: true + description: | + This maintains backwards compatibility with existing fields. + It will be deprecated as of Dapr 1.17. Use 'sessionToken' instead. + If both fields are set, then 'sessionToken' value will be used. + AWS session token to use. A session token is only required if you are using temporary security credentials. + example: '"TOKEN"' + - name: awsIamRoleArn + type: string + required: false + description: | + This maintains backwards compatibility with existing fields. + It will be deprecated as of Dapr 1.17. Use 'assumeRoleArn' instead. + If both fields are set, then 'assumeRoleArn' value will be used. + IAM role that has access to MSK. This is another option to authenticate with MSK aside from the AWS Credentials. + example: '"arn:aws:iam::123456789:role/mskRole"' + - name: awsStsSessionName + type: string + description: | + This maintains backwards compatibility with existing fields. + It will be deprecated as of Dapr 1.17. Use 'sessionName' instead. + If both fields are set, then 'sessionName' value will be used. + Represents the session name for assuming a role. + example: '"MyAppSession"' + default: '"MSKSASLDefaultSession"' authenticationProfiles: - title: "OIDC Authentication" description: | @@ -139,55 +208,6 @@ authenticationProfiles: example: '"none"' allowedValues: - "none" - - title: "AWS IAM" - description: "Authenticate using AWS IAM credentials or role for AWS MSK" - metadata: - - name: authType - type: string - required: true - description: | - Authentication type. - This must be set to "awsiam" for this authentication profile. - example: '"awsiam"' - allowedValues: - - "awsiam" - - name: awsRegion - type: string - required: true - description: | - The AWS Region where the MSK Kafka broker is deployed to. - example: '"us-east-1"' - - name: awsAccessKey - type: string - required: true - description: | - AWS access key associated with an IAM account. - example: '"AKIAIOSFODNN7EXAMPLE"' - - name: awsSecretKey - type: string - required: true - sensitive: true - description: | - The secret key associated with the access key. - example: '"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"' - - name: awsSessionToken - type: string - sensitive: true - description: | - AWS session token to use. A session token is only required if you are using temporary security credentials. - example: '"TOKEN"' - - name: awsIamRoleArn - type: string - required: true - description: | - IAM role that has access to MSK. This is another option to authenticate with MSK aside from the AWS Credentials. - example: '"arn:aws:iam::123456789:role/mskRole"' - - name: awsStsSessionName - type: string - description: | - Represents the session name for assuming a role. - example: '"MyAppSession"' - default: '"MSKSASLDefaultSession"' metadata: - name: topics type: string diff --git a/common/authentication/aws/aws.go b/common/authentication/aws/aws.go index a45eb48277..0c03731509 100644 --- a/common/authentication/aws/aws.go +++ b/common/authentication/aws/aws.go @@ -44,6 +44,17 @@ type AWSIAM struct { AWSRegion string `json:"awsRegion" mapstructure:"awsRegion"` } +// TODO: Delete in Dapr 1.17 so we can move all IAM fields to use the defaults of: +// accessKey and secretKey and region as noted in the docs, and Options struct above. +type DeprecatedKafkaIAM struct { + Region string `json:"awsRegion" mapstructure:"awsRegion"` + AccessKey string `json:"awsAccessKey" mapstructure:"awsAccessKey"` + SecretKey string `json:"awsSecretKey" mapstructure:"awsSecretKey"` + SessionToken string `json:"awsSessionToken" mapstructure:"awsSessionToken"` + IamRoleArn string `json:"awsIamRoleArn" mapstructure:"awsIamRoleArn"` + StsSessionName string `json:"awsStsSessionName" mapstructure:"awsStsSessionName"` +} + type AWSIAMAuthOptions struct { PoolConfig *pgxpool.Config `json:"poolConfig" mapstructure:"poolConfig"` ConnectionString string `json:"connectionString" mapstructure:"connectionString"` @@ -59,9 +70,13 @@ type Options struct { PoolConfig *pgxpool.Config `json:"poolConfig" mapstructure:"poolConfig"` ConnectionString string `json:"connectionString" mapstructure:"connectionString"` - Region string `json:"region" mapstructure:"region"` - AccessKey string `json:"accessKey" mapstructure:"accessKey"` - SecretKey string `json:"secretKey" mapstructure:"secretKey"` + // TODO: in Dapr 1.17 rm the alias on regions as we rm the aws prefixed one. + // Docs have it just as region, but most metadata fields show the aws prefix... + Region string `json:"region" mapstructure:"region" mapstructurealiases:"awsRegion"` + AccessKey string `json:"accessKey" mapstructure:"accessKey"` + SecretKey string `json:"secretKey" mapstructure:"secretKey"` + SessionName string `mapstructure:"sessionName"` + AssumeRoleARN string `mapstructure:"assumeRoleArn"` Endpoint string SessionToken string @@ -80,6 +95,7 @@ func GetConfig(opts Options) *aws.Config { return cfg } +//nolint:interfacebloat type Provider interface { S3() *S3Clients DynamoDB() *DynamoDBClients @@ -91,6 +107,8 @@ type Provider interface { Kinesis() *KinesisClients Ses() *SesClients + Kafka(KafkaOptions) (*KafkaClients, error) + Close() error } @@ -179,3 +197,14 @@ func (opts *Options) InitiateAWSIAMAuth() error { return nil } + +// Coalesce is a helper function to return the first non-empty string from the inputs +// This helps us to migrate away from the deprecated duplicate aws auth profile metadata fields in Dapr 1.17. +func Coalesce(values ...string) string { + for _, v := range values { + if v != "" { + return v + } + } + return "" +} diff --git a/common/authentication/aws/client.go b/common/authentication/aws/client.go index 8d0e9de20b..11b26e4988 100644 --- a/common/authentication/aws/client.go +++ b/common/authentication/aws/client.go @@ -16,8 +16,13 @@ package aws import ( "context" "errors" + "fmt" "sync" + "time" + "github.com/IBM/sarama" + "github.com/aws/aws-msk-iam-sasl-signer-go/signer" + aws2 "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" @@ -51,13 +56,14 @@ type Clients struct { ParameterStore *ParameterStoreClients kinesis *KinesisClients ses *SesClients + kafka *KafkaClients } func newClients() *Clients { return new(Clients) } -func (c *Clients) refresh(session *session.Session) { +func (c *Clients) refresh(session *session.Session) error { c.mu.Lock() defer c.mu.Unlock() switch { @@ -79,7 +85,16 @@ func (c *Clients) refresh(session *session.Session) { c.kinesis.New(session) case c.ses != nil: c.ses.New(session) + case c.kafka != nil: + // Note: we pass in nil for token provider + // as there are no special fields for x509 auth for it. + // Only static auth passes it in. + err := c.kafka.New(session, nil) + if err != nil { + return fmt.Errorf("failed to refresh Kafka AWS IAM Config: %w", err) + } } + return nil } type S3Clients struct { @@ -124,6 +139,16 @@ type SesClients struct { Ses *ses.SES } +type KafkaClients struct { + config *sarama.Config + consumerGroup *string + brokers *[]string + maxMessageBytes *int + + ConsumerGroup sarama.ConsumerGroup + Producer sarama.SyncProducer +} + func (c *S3Clients) New(session *session.Session) { refreshedS3 := s3.New(session, session.Config) c.S3 = refreshedS3 @@ -207,3 +232,120 @@ func (c *KinesisClients) WorkerCfg(ctx context.Context, stream, consumer, mode s func (c *SesClients) New(session *session.Session) { c.Ses = ses.New(session, session.Config) } + +type KafkaOptions struct { + Config *sarama.Config + ConsumerGroup string + Brokers []string + MaxMessageBytes int +} + +func initKafkaClients(opts KafkaOptions) *KafkaClients { + return &KafkaClients{ + config: opts.Config, + consumerGroup: &opts.ConsumerGroup, + brokers: &opts.Brokers, + maxMessageBytes: &opts.MaxMessageBytes, + } +} + +func (c *KafkaClients) New(session *session.Session, tokenProvider *mskTokenProvider) error { + const timeout = 10 * time.Second + creds, err := session.Config.Credentials.Get() + if err != nil { + return fmt.Errorf("failed to get credentials from session: %w", err) + } + + // fill in token provider common fields across x509 and static auth + if tokenProvider == nil { + tokenProvider = &mskTokenProvider{} + } + tokenProvider.generateTokenTimeout = timeout + tokenProvider.region = *session.Config.Region + tokenProvider.accessKey = creds.AccessKeyID + tokenProvider.secretKey = creds.SecretAccessKey + tokenProvider.sessionToken = creds.SessionToken + + c.config.Net.SASL.Enable = true + c.config.Net.SASL.Mechanism = sarama.SASLTypeOAuth + c.config.Net.SASL.TokenProvider = tokenProvider + + _, err = c.config.Net.SASL.TokenProvider.Token() + if err != nil { + return fmt.Errorf("error validating iam credentials %v", err) + } + + consumerGroup, err := sarama.NewConsumerGroup(*c.brokers, *c.consumerGroup, c.config) + if err != nil { + return err + } + c.ConsumerGroup = consumerGroup + + producer, err := c.getSyncProducer() + if err != nil { + return err + } + c.Producer = producer + + return nil +} + +// Kafka specific +type mskTokenProvider struct { + generateTokenTimeout time.Duration + accessKey string + secretKey string + sessionToken string + awsIamRoleArn string + awsStsSessionName string + region string +} + +func (m *mskTokenProvider) Token() (*sarama.AccessToken, error) { + // this function can't use the context passed on Init because that context would be cancelled right after Init + ctx, cancel := context.WithTimeout(context.Background(), m.generateTokenTimeout) + defer cancel() + + switch { + // we must first check if we are using the assume role auth profile + case m.awsIamRoleArn != "" && m.awsStsSessionName != "": + token, _, err := signer.GenerateAuthTokenFromRole(ctx, m.region, m.awsIamRoleArn, m.awsStsSessionName) + return &sarama.AccessToken{Token: token}, err + case m.accessKey != "" && m.secretKey != "": + token, _, err := signer.GenerateAuthTokenFromCredentialsProvider(ctx, m.region, aws2.CredentialsProviderFunc(func(ctx context.Context) (aws2.Credentials, error) { + return aws2.Credentials{ + AccessKeyID: m.accessKey, + SecretAccessKey: m.secretKey, + SessionToken: m.sessionToken, + }, nil + })) + return &sarama.AccessToken{Token: token}, err + + default: // load default aws creds + token, _, err := signer.GenerateAuthToken(ctx, m.region) + return &sarama.AccessToken{Token: token}, err + } +} + +func (c *KafkaClients) getSyncProducer() (sarama.SyncProducer, error) { + // Add SyncProducer specific properties to copy of base config + c.config.Producer.RequiredAcks = sarama.WaitForAll + c.config.Producer.Retry.Max = 5 + c.config.Producer.Return.Successes = true + + if *c.maxMessageBytes > 0 { + c.config.Producer.MaxMessageBytes = *c.maxMessageBytes + } + + saramaClient, err := sarama.NewClient(*c.brokers, c.config) + if err != nil { + return nil, err + } + + producer, err := sarama.NewSyncProducerFromClient(saramaClient) + if err != nil { + return nil, err + } + + return producer, nil +} diff --git a/common/authentication/aws/static.go b/common/authentication/aws/static.go index a66ef86e1e..1972089fdf 100644 --- a/common/authentication/aws/static.go +++ b/common/authentication/aws/static.go @@ -15,6 +15,7 @@ package aws import ( "context" + "errors" "fmt" "sync" @@ -39,6 +40,9 @@ type StaticAuth struct { secretKey *string sessionToken *string + assumeRoleARN *string + sessionName *string + session *session.Session cfg *aws.Config clients *Clients @@ -46,12 +50,15 @@ type StaticAuth struct { func newStaticIAM(_ context.Context, opts Options, cfg *aws.Config) (*StaticAuth, error) { auth := &StaticAuth{ - logger: opts.Logger, - region: &opts.Region, - endpoint: &opts.Endpoint, - accessKey: &opts.AccessKey, - secretKey: &opts.SecretKey, - sessionToken: &opts.SessionToken, + logger: opts.Logger, + region: &opts.Region, + endpoint: &opts.Endpoint, + accessKey: &opts.AccessKey, + secretKey: &opts.SecretKey, + sessionToken: &opts.SessionToken, + assumeRoleARN: &opts.AssumeRoleARN, + sessionName: &opts.SessionName, + cfg: func() *aws.Config { // if nil is passed or it's just a default cfg, // then we use the options to build the aws cfg. @@ -206,6 +213,36 @@ func (a *StaticAuth) Ses() *SesClients { return a.clients.ses } +func (a *StaticAuth) Kafka(opts KafkaOptions) (*KafkaClients, error) { + a.mu.Lock() + defer a.mu.Unlock() + + // This means we've already set the config in our New function + // to use the SASL token provider. + if a.clients.kafka != nil { + return a.clients.kafka, nil + } + + a.clients.kafka = initKafkaClients(opts) + // static auth has additional fields we need added, + // so we add those static auth specific fields here, + // and the rest of the token provider fields are added in New() + tokenProvider := mskTokenProvider{} + if a.assumeRoleARN != nil { + tokenProvider.awsIamRoleArn = *a.assumeRoleARN + } + if a.sessionName != nil { + tokenProvider.awsStsSessionName = *a.sessionName + } + + err := a.clients.kafka.New(a.session, &tokenProvider) + if err != nil { + return nil, fmt.Errorf("failed to create AWS IAM Kafka config: %w", err) + } + + return a.clients.kafka, nil +} + func (a *StaticAuth) getTokenClient() (*session.Session, error) { var awsConfig *aws.Config if a.cfg == nil { @@ -245,7 +282,21 @@ func (a *StaticAuth) getTokenClient() (*session.Session, error) { } func (a *StaticAuth) Close() error { - return nil + a.mu.Lock() + defer a.mu.Unlock() + + errs := make([]error, 2) + if a.clients.kafka != nil { + if a.clients.kafka.Producer != nil { + errs[0] = a.clients.kafka.Producer.Close() + a.clients.kafka.Producer = nil + } + if a.clients.kafka.ConsumerGroup != nil { + errs[1] = a.clients.kafka.ConsumerGroup.Close() + a.clients.kafka.ConsumerGroup = nil + } + } + return errors.Join(errs...) } func GetConfigV2(accessKey string, secretKey string, sessionToken string, region string, endpoint string) (awsv2.Config, error) { diff --git a/common/authentication/aws/x509.go b/common/authentication/aws/x509.go index 52af56d271..1c6d6dcf0d 100644 --- a/common/authentication/aws/x509.go +++ b/common/authentication/aws/x509.go @@ -119,9 +119,23 @@ func newX509(ctx context.Context, opts Options, cfg *aws.Config) (*x509, error) } func (a *x509) Close() error { + a.mu.Lock() + defer a.mu.Unlock() close(a.closeCh) a.wg.Wait() - return nil + + errs := make([]error, 2) + if a.clients.kafka != nil { + if a.clients.kafka.Producer != nil { + errs[0] = a.clients.kafka.Producer.Close() + a.clients.kafka.Producer = nil + } + if a.clients.kafka.ConsumerGroup != nil { + errs[1] = a.clients.kafka.ConsumerGroup.Close() + a.clients.kafka.ConsumerGroup = nil + } + } + return errors.Join(errs...) } func (a *x509) getCertPEM(ctx context.Context) error { @@ -275,6 +289,26 @@ func (a *x509) Ses() *SesClients { return a.clients.ses } +func (a *x509) Kafka(opts KafkaOptions) (*KafkaClients, error) { + a.mu.Lock() + defer a.mu.Unlock() + + // This means we've already set the config in our New function + // to use the SASL token provider. + if a.clients.kafka != nil { + return a.clients.kafka, nil + } + + a.clients.kafka = initKafkaClients(opts) + // Note: we pass in nil for token provider, + // as there are no special fields for x509 auth for it. + err := a.clients.kafka.New(a.session, nil) + if err != nil { + return nil, fmt.Errorf("failed to create AWS IAM Kafka config: %w", err) + } + return a.clients.kafka, nil +} + func (a *x509) initializeTrustAnchors() error { var ( trustAnchor arn.ARN @@ -436,7 +470,10 @@ func (a *x509) refreshClient() { for { newSession, err := a.createOrRefreshSession(context.Background()) if err == nil { - a.clients.refresh(newSession) + err = a.clients.refresh(newSession) + if err != nil { + a.logger.Errorf("Failed to refresh client, retrying in 5 seconds: %w", err) + } a.logger.Debugf("AWS IAM Roles Anywhere session credentials refreshed successfully") return } diff --git a/common/component/kafka/auth.go b/common/component/kafka/auth.go index bd61690c05..ea8cc43fac 100644 --- a/common/component/kafka/auth.go +++ b/common/component/kafka/auth.go @@ -14,16 +14,12 @@ limitations under the License. package kafka import ( - "context" "crypto/tls" "crypto/x509" "errors" "fmt" - "time" "github.com/IBM/sarama" - "github.com/aws/aws-msk-iam-sasl-signer-go/signer" - aws2 "github.com/aws/aws-sdk-go-v2/aws" ) func updatePasswordAuthInfo(config *sarama.Config, metadata *KafkaMetadata, saslUsername, saslPassword string) { @@ -92,58 +88,3 @@ func updateOidcAuthInfo(config *sarama.Config, metadata *KafkaMetadata) error { return nil } - -func updateAWSIAMAuthInfo(ctx context.Context, config *sarama.Config, metadata *KafkaMetadata) error { - config.Net.SASL.Enable = true - config.Net.SASL.Mechanism = sarama.SASLTypeOAuth - config.Net.SASL.TokenProvider = &mskAccessTokenProvider{ - ctx: ctx, - generateTokenTimeout: 10 * time.Second, - region: metadata.AWSRegion, - accessKey: metadata.AWSAccessKey, - secretKey: metadata.AWSSecretKey, - sessionToken: metadata.AWSSessionToken, - awsIamRoleArn: metadata.AWSIamRoleArn, - awsStsSessionName: metadata.AWSStsSessionName, - } - - _, err := config.Net.SASL.TokenProvider.Token() - if err != nil { - return fmt.Errorf("error validating iam credentials %v", err) - } - return nil -} - -type mskAccessTokenProvider struct { - ctx context.Context - generateTokenTimeout time.Duration - accessKey string - secretKey string - sessionToken string - awsIamRoleArn string - awsStsSessionName string - region string -} - -func (m *mskAccessTokenProvider) Token() (*sarama.AccessToken, error) { - // this function can't use the context passed on Init because that context would be cancelled right after Init - ctx, cancel := context.WithTimeout(m.ctx, m.generateTokenTimeout) - defer cancel() - - if m.accessKey != "" && m.secretKey != "" { - token, _, err := signer.GenerateAuthTokenFromCredentialsProvider(ctx, m.region, aws2.CredentialsProviderFunc(func(ctx context.Context) (aws2.Credentials, error) { - return aws2.Credentials{ - AccessKeyID: m.accessKey, - SecretAccessKey: m.secretKey, - SessionToken: m.sessionToken, - }, nil - })) - return &sarama.AccessToken{Token: token}, err - } else if m.awsIamRoleArn != "" { - token, _, err := signer.GenerateAuthTokenFromRole(ctx, m.region, m.awsIamRoleArn, m.awsStsSessionName) - return &sarama.AccessToken{Token: token}, err - } - - token, _, err := signer.GenerateAuthToken(ctx, m.region) - return &sarama.AccessToken{Token: token}, err -} diff --git a/common/component/kafka/clients.go b/common/component/kafka/clients.go new file mode 100644 index 0000000000..8e8111b7b2 --- /dev/null +++ b/common/component/kafka/clients.go @@ -0,0 +1,64 @@ +package kafka + +import ( + "fmt" + + "github.com/IBM/sarama" + + awsAuth "github.com/dapr/components-contrib/common/authentication/aws" +) + +type clients struct { + consumerGroup sarama.ConsumerGroup + producer sarama.SyncProducer +} + +func (k *Kafka) latestClients() (*clients, error) { + switch { + // case 0: use mock clients for testing + case k.mockProducer != nil || k.mockConsumerGroup != nil: + return &clients{ + consumerGroup: k.mockConsumerGroup, + producer: k.mockProducer, + }, nil + + // case 1: use aws clients with refreshable tokens in the cfg + case k.awsAuthProvider != nil: + awsKafkaOpts := awsAuth.KafkaOptions{ + Config: k.config, + ConsumerGroup: k.consumerGroup, + Brokers: k.brokers, + MaxMessageBytes: k.maxMessageBytes, + } + awsKafkaClients, err := k.awsAuthProvider.Kafka(awsKafkaOpts) + if err != nil { + return nil, fmt.Errorf("failed to get AWS IAM Kafka clients: %w", err) + } + return &clients{ + consumerGroup: awsKafkaClients.ConsumerGroup, + producer: awsKafkaClients.Producer, + }, nil + + // case 2: normal static auth profile clients + default: + if k.clients != nil { + return k.clients, nil + } + cg, err := sarama.NewConsumerGroup(k.brokers, k.consumerGroup, k.config) + if err != nil { + return nil, err + } + + p, err := GetSyncProducer(*k.config, k.brokers, k.maxMessageBytes) + if err != nil { + return nil, err + } + + newStaticClients := clients{ + consumerGroup: cg, + producer: p, + } + k.clients = &newStaticClients + return k.clients, nil + } +} diff --git a/common/component/kafka/kafka.go b/common/component/kafka/kafka.go index c4d9989fb2..3e930ec4b2 100644 --- a/common/component/kafka/kafka.go +++ b/common/component/kafka/kafka.go @@ -28,6 +28,7 @@ import ( "github.com/linkedin/goavro/v2" "github.com/riferrei/srclient" + awsAuth "github.com/dapr/components-contrib/common/authentication/aws" "github.com/dapr/components-contrib/pubsub" "github.com/dapr/kit/logger" kitmd "github.com/dapr/kit/metadata" @@ -36,18 +37,23 @@ import ( // Kafka allows reading/writing to a Kafka consumer group. type Kafka struct { - producer sarama.SyncProducer - consumerGroup string - brokers []string - logger logger.Logger - authType string - saslUsername string - saslPassword string - initialOffset int64 - config *sarama.Config - escapeHeaders bool - - cg sarama.ConsumerGroup + // These are used to inject mocked clients for tests + mockConsumerGroup sarama.ConsumerGroup + mockProducer sarama.SyncProducer + clients *clients + + maxMessageBytes int + consumerGroup string + brokers []string + logger logger.Logger + authType string + saslUsername string + saslPassword string + initialOffset int64 + config *sarama.Config + escapeHeaders bool + awsAuthProvider awsAuth.Provider + subscribeTopics TopicHandlerConfig subscribeLock sync.Mutex consumerCancel context.CancelFunc @@ -182,19 +188,32 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error { // already handled in updateTLSConfig case awsIAMAuthType: k.logger.Info("Configuring AWS IAM authentication") - err = updateAWSIAMAuthInfo(k.internalContext, config, meta) + kafkaIAM, validateErr := k.ValidateAWS(metadata) + if validateErr != nil { + return fmt.Errorf("failed to validate AWS IAM authentication fields: %w", validateErr) + } + opts := awsAuth.Options{ + Logger: k.logger, + Properties: metadata, + Region: kafkaIAM.Region, + Endpoint: "", + AccessKey: kafkaIAM.AccessKey, + SecretKey: kafkaIAM.SecretKey, + SessionToken: kafkaIAM.SessionToken, + AssumeRoleARN: kafkaIAM.IamRoleArn, + SessionName: kafkaIAM.StsSessionName, + } + var provider awsAuth.Provider + provider, err = awsAuth.NewProvider(ctx, opts, awsAuth.GetConfig(opts)) if err != nil { return err } + k.awsAuthProvider = provider } k.config = config sarama.Logger = SaramaLogBridge{daprLogger: k.logger} - - k.producer, err = getSyncProducer(*k.config, k.brokers, meta.MaxMessageBytes) - if err != nil { - return err - } + k.maxMessageBytes = meta.MaxMessageBytes // Default retry configuration is used if no // backOff properties are set. @@ -222,29 +241,56 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error { k.latestSchemaCacheTTL = meta.SchemaLatestVersionCacheTTL } } - k.logger.Debug("Kafka message bus initialization complete") - k.cg, err = sarama.NewConsumerGroup(k.brokers, k.consumerGroup, k.config) - if err != nil { - return err + clients, err := k.latestClients() + if err != nil || clients == nil { + return fmt.Errorf("failed to get latest Kafka clients for initialization: %w", err) + } + if clients.producer == nil { + return errors.New("component is closed") } + if clients.consumerGroup == nil { + return errors.New("component is closed") + } + + k.logger.Debug("Kafka message bus initialization complete") return nil } +func (k *Kafka) ValidateAWS(metadata map[string]string) (*awsAuth.DeprecatedKafkaIAM, error) { + const defaultSessionName = "DaprDefaultSession" + // This is needed as we remove the aws prefixed fields to use the builtin AWS profile fields instead. + region := awsAuth.Coalesce(metadata["region"], metadata["awsRegion"]) + accessKey := awsAuth.Coalesce(metadata["accessKey"], metadata["awsAccessKey"]) + secretKey := awsAuth.Coalesce(metadata["secretKey"], metadata["awsSecretKey"]) + role := awsAuth.Coalesce(metadata["assumeRoleArn"], metadata["awsIamRoleArn"]) + session := awsAuth.Coalesce(metadata["sessionName"], metadata["awsStsSessionName"], defaultSessionName) // set default if no value is provided + token := awsAuth.Coalesce(metadata["sessionToken"], metadata["awsSessionToken"]) + + if region == "" { + return nil, errors.New("metadata property AWSRegion is missing") + } + + return &awsAuth.DeprecatedKafkaIAM{ + Region: region, + AccessKey: accessKey, + SecretKey: secretKey, + IamRoleArn: role, + StsSessionName: session, + SessionToken: token, + }, nil +} + func (k *Kafka) Close() error { defer k.wg.Wait() defer k.consumerWG.Wait() - errs := make([]error, 2) + errs := make([]error, 3) if k.closed.CompareAndSwap(false, true) { - close(k.closeCh) - - if k.producer != nil { - errs[0] = k.producer.Close() - k.producer = nil + if k.closeCh != nil { + close(k.closeCh) } - if k.internalContext != nil { k.internalContextCancel() } @@ -255,8 +301,19 @@ func (k *Kafka) Close() error { } k.subscribeLock.Unlock() - if k.cg != nil { - errs[1] = k.cg.Close() + if k.clients != nil { + if k.clients.producer != nil { + errs[0] = k.clients.producer.Close() + k.clients.producer = nil + } + if k.clients.consumerGroup != nil { + errs[1] = k.clients.consumerGroup.Close() + k.clients.consumerGroup = nil + } + } + if k.awsAuthProvider != nil { + errs[2] = k.awsAuthProvider.Close() + k.awsAuthProvider = nil } } diff --git a/common/component/kafka/kafka_test.go b/common/component/kafka/kafka_test.go index a65fb29124..cc381ed5a4 100644 --- a/common/component/kafka/kafka_test.go +++ b/common/component/kafka/kafka_test.go @@ -3,6 +3,7 @@ package kafka import ( "encoding/binary" "encoding/json" + "errors" "testing" "time" @@ -10,8 +11,10 @@ import ( gomock "github.com/golang/mock/gomock" "github.com/linkedin/goavro/v2" "github.com/riferrei/srclient" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + awsAuth "github.com/dapr/components-contrib/common/authentication/aws" mock_srclient "github.com/dapr/components-contrib/common/component/kafka/mocks" "github.com/dapr/kit/logger" ) @@ -351,3 +354,81 @@ func TestLatestSchemaCaching(t *testing.T) { require.NoError(t, err) }) } + +func TestValidateAWS(t *testing.T) { + tests := []struct { + name string + metadata map[string]string + expected *awsAuth.DeprecatedKafkaIAM + err error + }{ + { + name: "Valid metadata with all fields without aws prefix", + metadata: map[string]string{ + "region": "us-east-1", + "accessKey": "testAccessKey", + "secretKey": "testSecretKey", + "assumeRoleArn": "testRoleArn", + "sessionName": "testSessionName", + "sessionToken": "testSessionToken", + }, + expected: &awsAuth.DeprecatedKafkaIAM{ + Region: "us-east-1", + AccessKey: "testAccessKey", + SecretKey: "testSecretKey", + IamRoleArn: "testRoleArn", + StsSessionName: "testSessionName", + SessionToken: "testSessionToken", + }, + err: nil, + }, + { + name: "Fallback to aws-prefixed fields with aws prefix", + metadata: map[string]string{ + "awsRegion": "us-west-2", + "awsAccessKey": "awsAccessKey", + "awsSecretKey": "awsSecretKey", + "awsIamRoleArn": "awsRoleArn", + "awsStsSessionName": "awsSessionName", + "awsSessionToken": "awsSessionToken", + }, + expected: &awsAuth.DeprecatedKafkaIAM{ + Region: "us-west-2", + AccessKey: "awsAccessKey", + SecretKey: "awsSecretKey", + IamRoleArn: "awsRoleArn", + StsSessionName: "awsSessionName", + SessionToken: "awsSessionToken", + }, + err: nil, + }, + { + name: "Missing region field", + metadata: map[string]string{ + "accessKey": "key", + "secretKey": "secret", + }, + expected: nil, + err: errors.New("metadata property AWSRegion is missing"), + }, + { + name: "Empty metadata", + metadata: map[string]string{}, + expected: nil, + err: errors.New("metadata property AWSRegion is missing"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + k := &Kafka{} + result, err := k.ValidateAWS(tt.metadata) + if tt.err != nil { + require.EqualError(t, err, tt.err.Error()) + } else { + require.NoError(t, err) + } + assert.Equal(t, tt.expected, result) + }) + } +} diff --git a/common/component/kafka/metadata.go b/common/component/kafka/metadata.go index 5f0011be9f..7122feb813 100644 --- a/common/component/kafka/metadata.go +++ b/common/component/kafka/metadata.go @@ -97,14 +97,7 @@ type KafkaMetadata struct { ClientConnectionTopicMetadataRefreshInterval time.Duration `mapstructure:"clientConnectionTopicMetadataRefreshInterval"` ClientConnectionKeepAliveInterval time.Duration `mapstructure:"clientConnectionKeepAliveInterval"` - // aws iam auth profile - AWSAccessKey string `mapstructure:"awsAccessKey"` - AWSSecretKey string `mapstructure:"awsSecretKey"` - AWSSessionToken string `mapstructure:"awsSessionToken"` - AWSIamRoleArn string `mapstructure:"awsIamRoleArn"` - AWSStsSessionName string `mapstructure:"awsStsSessionName"` - AWSRegion string `mapstructure:"awsRegion"` - channelBufferSize int `mapstructure:"-"` + channelBufferSize int `mapstructure:"-"` consumerFetchMin int32 `mapstructure:"-"` consumerFetchDefault int32 `mapstructure:"-"` @@ -267,9 +260,6 @@ func (k *Kafka) getKafkaMetadata(meta map[string]string) (*KafkaMetadata, error) } k.logger.Debug("Configuring root certificate authentication.") case awsIAMAuthType: - if m.AWSRegion == "" { - return nil, errors.New("missing AWS region property 'awsRegion' for authType 'awsiam'") - } k.logger.Debug("Configuring AWS IAM authentication.") default: return nil, errors.New("kafka error: invalid value for 'authType' attribute") diff --git a/common/component/kafka/metadata_test.go b/common/component/kafka/metadata_test.go index e53b1721d8..acb6eb1208 100644 --- a/common/component/kafka/metadata_test.go +++ b/common/component/kafka/metadata_test.go @@ -376,20 +376,6 @@ func TestTls(t *testing.T) { }) } -func TestAwsIam(t *testing.T) { - k := getKafka() - - t.Run("missing aws region", func(t *testing.T) { - m := getBaseMetadata() - m[authType] = awsIAMAuthType - meta, err := k.getKafkaMetadata(m) - require.Error(t, err) - require.Nil(t, meta) - - require.Equal(t, "missing AWS region property 'awsRegion' for authType 'awsiam'", err.Error()) - }) -} - func TestMetadataConsumerFetchValues(t *testing.T) { k := getKafka() m := getCompleteMetadata() diff --git a/common/component/kafka/producer.go b/common/component/kafka/producer.go index 0083cb86a8..97e5a6bbed 100644 --- a/common/component/kafka/producer.go +++ b/common/component/kafka/producer.go @@ -16,6 +16,7 @@ package kafka import ( "context" "errors" + "fmt" "maps" "github.com/IBM/sarama" @@ -23,7 +24,7 @@ import ( "github.com/dapr/components-contrib/pubsub" ) -func getSyncProducer(config sarama.Config, brokers []string, maxMessageBytes int) (sarama.SyncProducer, error) { +func GetSyncProducer(config sarama.Config, brokers []string, maxMessageBytes int) (sarama.SyncProducer, error) { // Add SyncProducer specific properties to copy of base config config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Retry.Max = 5 @@ -33,7 +34,12 @@ func getSyncProducer(config sarama.Config, brokers []string, maxMessageBytes int config.Producer.MaxMessageBytes = maxMessageBytes } - producer, err := sarama.NewSyncProducer(brokers, &config) + saramaClient, err := sarama.NewClient(brokers, &config) + if err != nil { + return nil, err + } + + producer, err := sarama.NewSyncProducerFromClient(saramaClient) if err != nil { return nil, err } @@ -43,9 +49,14 @@ func getSyncProducer(config sarama.Config, brokers []string, maxMessageBytes int // Publish message to Kafka cluster. func (k *Kafka) Publish(_ context.Context, topic string, data []byte, metadata map[string]string) error { - if k.producer == nil { + clients, err := k.latestClients() + if err != nil || clients == nil { + return fmt.Errorf("failed to get latest Kafka clients: %w", err) + } + if clients.producer == nil { return errors.New("component is closed") } + // k.logger.Debugf("Publishing topic %v with data: %v", topic, string(data)) k.logger.Debugf("Publishing on topic %v", topic) @@ -73,7 +84,7 @@ func (k *Kafka) Publish(_ context.Context, topic string, data []byte, metadata m }) } - partition, offset, err := k.producer.SendMessage(msg) + partition, offset, err := clients.producer.SendMessage(msg) k.logger.Debugf("Partition: %v, offset: %v", partition, offset) @@ -85,7 +96,12 @@ func (k *Kafka) Publish(_ context.Context, topic string, data []byte, metadata m } func (k *Kafka) BulkPublish(_ context.Context, topic string, entries []pubsub.BulkMessageEntry, metadata map[string]string) (pubsub.BulkPublishResponse, error) { - if k.producer == nil { + clients, err := k.latestClients() + if err != nil || clients == nil { + err = fmt.Errorf("failed to get latest Kafka clients: %w", err) + return pubsub.NewBulkPublishResponse(entries, err), err + } + if clients.producer == nil { err := errors.New("component is closed") return pubsub.NewBulkPublishResponse(entries, err), err } @@ -134,7 +150,7 @@ func (k *Kafka) BulkPublish(_ context.Context, topic string, entries []pubsub.Bu msgs = append(msgs, msg) } - if err := k.producer.SendMessages(msgs); err != nil { + if err := clients.producer.SendMessages(msgs); err != nil { // map the returned error to different entries return k.mapKafkaProducerErrors(err, entries), err } diff --git a/common/component/kafka/producer_test.go b/common/component/kafka/producer_test.go index 3dd1b75a9e..a0769767a0 100644 --- a/common/component/kafka/producer_test.go +++ b/common/component/kafka/producer_test.go @@ -13,17 +13,15 @@ import ( ) func arrangeKafkaWithAssertions(t *testing.T, msgCheckers ...saramamocks.MessageChecker) *Kafka { - cfg := saramamocks.NewTestConfig() - mockProducer := saramamocks.NewSyncProducer(t, cfg) + mockP := saramamocks.NewSyncProducer(t, saramamocks.NewTestConfig()) for _, msgChecker := range msgCheckers { - mockProducer.ExpectSendMessageWithMessageCheckerFunctionAndSucceed(msgChecker) + mockP.ExpectSendMessageWithMessageCheckerFunctionAndSucceed(msgChecker) } return &Kafka{ - producer: mockProducer, - config: cfg, - logger: logger.NewLogger("kafka_test"), + mockProducer: mockP, + logger: logger.NewLogger("kafka_test"), } } diff --git a/common/component/kafka/subscriber.go b/common/component/kafka/subscriber.go index 95bdd5a232..4e1dc7ae8f 100644 --- a/common/component/kafka/subscriber.go +++ b/common/component/kafka/subscriber.go @@ -84,7 +84,14 @@ func (k *Kafka) reloadConsumerGroup() { func (k *Kafka) consume(ctx context.Context, topics []string, consumer *consumer) { for { - err := k.cg.Consume(ctx, topics, consumer) + clients, err := k.latestClients() + if err != nil || clients == nil { + k.logger.Errorf("failed to get latest Kafka clients: %w", err) + } + if clients.consumerGroup == nil { + k.logger.Errorf("component is closed") + } + err = clients.consumerGroup.Consume(ctx, topics, consumer) if errors.Is(err, context.Canceled) { return } diff --git a/common/component/kafka/subscriber_test.go b/common/component/kafka/subscriber_test.go index 57b87cf4f2..dbfc696341 100644 --- a/common/component/kafka/subscriber_test.go +++ b/common/component/kafka/subscriber_test.go @@ -41,11 +41,11 @@ func Test_reloadConsumerGroup(t *testing.T) { }) k := &Kafka{ - logger: logger.NewLogger("test"), - cg: cg, - subscribeTopics: nil, - closeCh: make(chan struct{}), - consumerCancel: cancel, + logger: logger.NewLogger("test"), + mockConsumerGroup: cg, + subscribeTopics: nil, + closeCh: make(chan struct{}), + consumerCancel: cancel, } k.reloadConsumerGroup() @@ -64,11 +64,11 @@ func Test_reloadConsumerGroup(t *testing.T) { return nil }) k := &Kafka{ - logger: logger.NewLogger("test"), - cg: cg, - consumerCancel: cancel, - closeCh: make(chan struct{}), - subscribeTopics: TopicHandlerConfig{"foo": SubscriptionHandlerConfig{}}, + logger: logger.NewLogger("test"), + mockConsumerGroup: cg, + consumerCancel: cancel, + closeCh: make(chan struct{}), + subscribeTopics: TopicHandlerConfig{"foo": SubscriptionHandlerConfig{}}, } k.closed.Store(true) @@ -89,11 +89,11 @@ func Test_reloadConsumerGroup(t *testing.T) { return nil }) k := &Kafka{ - logger: logger.NewLogger("test"), - cg: cg, - consumerCancel: nil, - closeCh: make(chan struct{}), - subscribeTopics: TopicHandlerConfig{"foo": SubscriptionHandlerConfig{}}, + logger: logger.NewLogger("test"), + mockConsumerGroup: cg, + consumerCancel: nil, + closeCh: make(chan struct{}), + subscribeTopics: TopicHandlerConfig{"foo": SubscriptionHandlerConfig{}}, } k.reloadConsumerGroup() @@ -114,7 +114,7 @@ func Test_reloadConsumerGroup(t *testing.T) { }) k := &Kafka{ logger: logger.NewLogger("test"), - cg: cg, + mockConsumerGroup: cg, consumerCancel: nil, closeCh: make(chan struct{}), subscribeTopics: TopicHandlerConfig{"foo": SubscriptionHandlerConfig{}}, @@ -146,7 +146,7 @@ func Test_reloadConsumerGroup(t *testing.T) { }) k := &Kafka{ logger: logger.NewLogger("test"), - cg: cg, + mockConsumerGroup: cg, consumerCancel: nil, closeCh: make(chan struct{}), subscribeTopics: map[string]SubscriptionHandlerConfig{"foo": {}}, @@ -174,7 +174,7 @@ func Test_reloadConsumerGroup(t *testing.T) { }) k := &Kafka{ logger: logger.NewLogger("test"), - cg: cg, + mockConsumerGroup: cg, consumerCancel: nil, closeCh: make(chan struct{}), subscribeTopics: map[string]SubscriptionHandlerConfig{"foo": {}}, @@ -210,7 +210,7 @@ func Test_reloadConsumerGroup(t *testing.T) { }) k := &Kafka{ logger: logger.NewLogger("test"), - cg: cg, + mockConsumerGroup: cg, consumerCancel: nil, closeCh: make(chan struct{}), subscribeTopics: map[string]SubscriptionHandlerConfig{"foo": {}}, @@ -248,7 +248,7 @@ func Test_Subscribe(t *testing.T) { }) k := &Kafka{ logger: logger.NewLogger("test"), - cg: cg, + mockConsumerGroup: cg, consumerCancel: nil, closeCh: make(chan struct{}), consumeRetryInterval: time.Millisecond, @@ -273,7 +273,7 @@ func Test_Subscribe(t *testing.T) { }) k := &Kafka{ logger: logger.NewLogger("test"), - cg: cg, + mockConsumerGroup: cg, consumerCancel: nil, closeCh: make(chan struct{}), consumeRetryInterval: time.Millisecond, @@ -302,7 +302,7 @@ func Test_Subscribe(t *testing.T) { }) k := &Kafka{ logger: logger.NewLogger("test"), - cg: cg, + mockConsumerGroup: cg, consumerCancel: nil, closeCh: make(chan struct{}), consumeRetryInterval: time.Millisecond, @@ -340,7 +340,7 @@ func Test_Subscribe(t *testing.T) { }) k := &Kafka{ logger: logger.NewLogger("test"), - cg: cg, + mockConsumerGroup: cg, consumerCancel: nil, closeCh: make(chan struct{}), consumeRetryInterval: time.Millisecond, @@ -391,7 +391,7 @@ func Test_Subscribe(t *testing.T) { }) k := &Kafka{ logger: logger.NewLogger("test"), - cg: cg, + mockConsumerGroup: cg, consumerCancel: nil, closeCh: make(chan struct{}), subscribeTopics: make(TopicHandlerConfig), @@ -421,7 +421,7 @@ func Test_Subscribe(t *testing.T) { }) k := &Kafka{ logger: logger.NewLogger("test"), - cg: cg, + mockConsumerGroup: cg, consumerCancel: nil, closeCh: make(chan struct{}), subscribeTopics: make(TopicHandlerConfig), @@ -495,7 +495,7 @@ func Test_Subscribe(t *testing.T) { }) k := &Kafka{ logger: logger.NewLogger("test"), - cg: cg, + mockConsumerGroup: cg, consumerCancel: nil, closeCh: make(chan struct{}), subscribeTopics: make(TopicHandlerConfig), diff --git a/pubsub/aws/snssqs/metadata.go b/pubsub/aws/snssqs/metadata.go index 5ffd575b4c..f79ed207af 100644 --- a/pubsub/aws/snssqs/metadata.go +++ b/pubsub/aws/snssqs/metadata.go @@ -22,6 +22,7 @@ type snsSqsMetadata struct { // aws endpoint for the component to use. Endpoint string `mapstructure:"endpoint"` // aws region in which SNS/SQS should create resources. + // TODO: rm the alias on region in Dapr 1.17. Region string `json:"region" mapstructure:"region" mapstructurealiases:"awsRegion" mdignore:"true"` // aws partition in which SNS/SQS should create resources. internalPartition string `mapstructure:"-"` diff --git a/pubsub/kafka/metadata.yaml b/pubsub/kafka/metadata.yaml index b6536c2b43..9fd66e9444 100644 --- a/pubsub/kafka/metadata.yaml +++ b/pubsub/kafka/metadata.yaml @@ -8,6 +8,75 @@ title: "Apache Kafka" urls: - title: Reference url: https://docs.dapr.io/reference/components-reference/supported-pubsub/setup-apache-kafka/ +# This auth profile has duplicate fields intentionally as we maintain backwards compatibility, +# but also move Kafka to utilize the noramlized AWS fields in the builtin auth profiles. +# TODO: rm the duplicate aws prefixed fields in Dapr 1.17. +builtinAuthenticationProfiles: + - name: "aws" + metadata: + - name: authType + type: string + required: true + description: | + Authentication type. + This must be set to "awsiam" for this authentication profile. + example: '"awsiam"' + allowedValues: + - "awsiam" + - name: awsRegion + type: string + required: false + description: | + This maintains backwards compatibility with existing fields. + It will be deprecated as of Dapr 1.17. Use 'region' instead. + The AWS Region where the AWS Relational Database Service is deployed to. + example: '"us-east-1"' + - name: awsAccessKey + type: string + required: false + description: | + This maintains backwards compatibility with existing fields. + It will be deprecated as of Dapr 1.17. Use 'accessKey' instead. + If both fields are set, then 'accessKey' value will be used. + AWS access key associated with an IAM account. + example: '"AKIAIOSFODNN7EXAMPLE"' + - name: awsSecretKey + type: string + required: false + sensitive: true + description: | + This maintains backwards compatibility with existing fields. + It will be deprecated as of Dapr 1.17. Use 'secretKey' instead. + If both fields are set, then 'secretKey' value will be used. + The secret key associated with the access key. + example: '"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"' + - name: awsSessionToken + type: string + sensitive: true + description: | + This maintains backwards compatibility with existing fields. + It will be deprecated as of Dapr 1.17. Use 'sessionToken' instead. + If both fields are set, then 'sessionToken' value will be used. + AWS session token to use. A session token is only required if you are using temporary security credentials. + example: '"TOKEN"' + - name: awsIamRoleArn + type: string + required: false + description: | + This maintains backwards compatibility with existing fields. + It will be deprecated as of Dapr 1.17. Use 'assumeRoleArn' instead. + If both fields are set, then 'assumeRoleArn' value will be used. + IAM role that has access to MSK. This is another option to authenticate with MSK aside from the AWS Credentials. + example: '"arn:aws:iam::123456789:role/mskRole"' + - name: awsStsSessionName + type: string + description: | + This maintains backwards compatibility with existing fields. + It will be deprecated as of Dapr 1.17. Use 'sessionName' instead. + If both fields are set, then 'sessionName' value will be used. + Represents the session name for assuming a role. + example: '"MyAppSession"' + default: '"MSKSASLDefaultSession"' authenticationProfiles: - title: "OIDC Authentication" description: | @@ -133,55 +202,6 @@ authenticationProfiles: example: '"none"' allowedValues: - "none" - - title: "AWS IAM" - description: "Authenticate using AWS IAM credentials or role for AWS MSK" - metadata: - - name: authType - type: string - required: true - description: | - Authentication type. - This must be set to "awsiam" for this authentication profile. - example: '"awsiam"' - allowedValues: - - "awsiam" - - name: awsRegion - type: string - required: true - description: | - The AWS Region where the MSK Kafka broker is deployed to. - example: '"us-east-1"' - - name: awsAccessKey - type: string - required: true - description: | - AWS access key associated with an IAM account. - example: '"AKIAIOSFODNN7EXAMPLE"' - - name: awsSecretKey - type: string - required: true - sensitive: true - description: | - The secret key associated with the access key. - example: '"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"' - - name: awsSessionToken - type: string - sensitive: true - description: | - AWS session token to use. A session token is only required if you are using temporary security credentials. - example: '"TOKEN"' - - name: awsIamRoleArn - type: string - required: true - description: | - IAM role that has access to MSK. This is another option to authenticate with MSK aside from the AWS Credentials. - example: '"arn:aws:iam::123456789:role/mskRole"' - - name: awsStsSessionName - type: string - description: | - Represents the session name for assuming a role. - example: '"MyAppSession"' - default: '"MSKSASLDefaultSession"' metadata: - name: brokers type: string diff --git a/state/aws/dynamodb/dynamodb.go b/state/aws/dynamodb/dynamodb.go index d3bbd39a85..1ce4fd6de9 100644 --- a/state/aws/dynamodb/dynamodb.go +++ b/state/aws/dynamodb/dynamodb.go @@ -53,6 +53,7 @@ type dynamoDBMetadata struct { SecretKey string `json:"secretKey" mapstructure:"secretKey" mdignore:"true"` SessionToken string `json:"sessionToken" mapstructure:"sessionToken" mdignore:"true"` + // TODO: rm the alias in Dapr 1.17 Region string `json:"region" mapstructure:"region" mapstructurealiases:"awsRegion" mdignore:"true"` Endpoint string `json:"endpoint"` Table string `json:"table"`