Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event Streams: support sarama client with IAM token #5822

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
10 changes: 4 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ module github.com/IBM-Cloud/terraform-provider-ibm

go 1.22.4

toolchain go1.22.5

require (
github.com/IBM-Cloud/bluemix-go v0.0.0-20241117121028-a3be206688b3
github.com/IBM-Cloud/container-services-go-sdk v0.0.0-20240725064144-454a2ae23113
Expand Down Expand Up @@ -34,7 +32,7 @@ require (
github.com/IBM/platform-services-go-sdk v0.70.0
github.com/IBM/project-go-sdk v0.3.5
github.com/IBM/push-notifications-go-sdk v0.0.0-20210310100607-5790b96c47f5
github.com/IBM/sarama v1.41.2
github.com/IBM/sarama v1.43.3
github.com/IBM/scc-go-sdk/v5 v5.4.1
github.com/IBM/schematics-go-sdk v0.3.0
github.com/IBM/secrets-manager-go-sdk/v2 v2.0.7
Expand Down Expand Up @@ -95,7 +93,7 @@ require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dchest/bcrypt_pbkdf v0.0.0-20150205184540-83f37f9c154a // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/eapache/go-resiliency v1.4.0 // indirect
github.com/eapache/go-resiliency v1.7.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/emicklei/go-restful/v3 v3.10.0 // indirect
Expand Down Expand Up @@ -166,7 +164,7 @@ require (
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/kube-object-storage/lib-bucket-provisioner v0.0.0-20221122204822-d1a8c34382f1 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/libopenstorage/secrets v0.0.0-20220823020833-2ecadaf59d8a // indirect
Expand All @@ -190,7 +188,7 @@ require (
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
github.com/pelletier/go-toml v1.7.0 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
Expand Down
16 changes: 8 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ github.com/IBM/project-go-sdk v0.3.5 h1:L+YClFUa14foS0B/hOOY9n7sIdsT5/XQicnXOyJS
github.com/IBM/project-go-sdk v0.3.5/go.mod h1:FOJM9ihQV3EEAY6YigcWiTNfVCThtdY8bLC/nhQHFvo=
github.com/IBM/push-notifications-go-sdk v0.0.0-20210310100607-5790b96c47f5 h1:NPUhkoOCRuv3OFWt19PmwjXGGTKlvmbuPg9fUrBUNe4=
github.com/IBM/push-notifications-go-sdk v0.0.0-20210310100607-5790b96c47f5/go.mod h1:b07XHUVh0XYnQE9s2mqgjYST1h9buaQNqN4EcKhOsX0=
github.com/IBM/sarama v1.41.2 h1:ZDBZfGPHAD4uuAtSv4U22fRZBgst0eEwGFzLj0fb85c=
github.com/IBM/sarama v1.41.2/go.mod h1:xdpu7sd6OE1uxNdjYTSKUfY8FaKkJES9/+EyjSgiGQk=
github.com/IBM/sarama v1.43.3 h1:Yj6L2IaNvb2mRBop39N7mmJAHBVY3dTPncr3qGVkxPA=
github.com/IBM/sarama v1.43.3/go.mod h1:FVIRaLrhK3Cla/9FfRF5X9Zua2KpS3SYIXxhac1H+FQ=
github.com/IBM/scc-go-sdk/v5 v5.4.1 h1:RXIuxOo9/hxkWyHCI69ae+KIJgSbXcAkJwTEl+fO3LQ=
github.com/IBM/scc-go-sdk/v5 v5.4.1/go.mod h1:2xQTDgNXG5QMEfQxBDKB067z+5ha6OgcaKCTcdGDAo8=
github.com/IBM/schematics-go-sdk v0.3.0 h1:Vwxw85SONflakiBsNHAfViKLyp9zJiH5/hh6SewOP5Q=
Expand Down Expand Up @@ -426,8 +426,8 @@ github.com/duosecurity/duo_api_golang v0.0.0-20190308151101-6c680f768e74/go.mod
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-resiliency v1.4.0 h1:3OK9bWpPk5q6pbFAaYSEwD9CLUSHG8bnZuqX2yMt3B0=
github.com/eapache/go-resiliency v1.4.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA=
github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws=
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0=
Expand Down Expand Up @@ -1161,8 +1161,8 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o
github.com/klauspost/compress v1.4.1/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/cpuid v1.2.0/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down Expand Up @@ -1435,8 +1435,8 @@ github.com/pierrec/lz4 v2.2.6+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi
github.com/pierrec/lz4 v2.5.2+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM=
github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ=
github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pires/go-proxyproto v0.6.1 h1:EBupykFmo22SDjv4fQVQd2J9NOoLPmyZA/15ldOGkPw=
github.com/pires/go-proxyproto v0.6.1/go.mod h1:Odh9VFOZJCf9G8cLW5o435Xf1J95Jw9Gw5rnCjcwzAY=
github.com/pjbgf/sha1cd v0.3.0 h1:4D5XXmUUBUl/xQ6IjCkEAbqXskkq/4O7LmGn0AqMDs4=
Expand Down
77 changes: 53 additions & 24 deletions ibm/service/eventstreams/resource_ibm_event_streams_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ package eventstreams

import (
"context"
"errors"
"fmt"
"log"
"slices"
"strings"
"time"

"github.com/IBM-Cloud/bluemix-go/session"
"github.com/IBM-Cloud/terraform-provider-ibm/ibm/conns"
"github.com/IBM-Cloud/terraform-provider-ibm/ibm/flex"
"github.com/IBM-Cloud/terraform-provider-ibm/version"
"github.com/IBM/sarama"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
Expand All @@ -27,7 +30,6 @@ const (
)

var (
brokerVersion = sarama.V3_3_0_0
adminClientTimeout = 30 * time.Second
allowedTopicConfigs = []string{
"cleanup.policy",
Expand All @@ -38,12 +40,6 @@ var (
"segment.index.bytes",
"message.audit.enable", // enterprise only
}
defaultConfigs = map[string]interface{}{
"cleanup.policy": defaultCleanupPolicy,
"retention.ms": defaultRetentionMs,
"retention.bytes": defaultRetentionBytes,
"segment.bytes": defaultSegmentBytes,
}
)

func ResourceIBMEventStreamsTopic() *schema.Resource {
Expand Down Expand Up @@ -272,15 +268,6 @@ func createSaramaAdminClient(d *schema.ResourceData, meta interface{}) (sarama.C
log.Printf("[DEBUG] createSaramaAdminClient BluemixSession err %s", err)
return nil, "", err
}
apiKey := bxSession.Config.BluemixAPIKey
if len(apiKey) == 0 {
log.Printf("[DEBUG] createSaramaAdminClient BluemixAPIKey is empty")
return nil, "", fmt.Errorf("failed to get IBM cloud API key")
}
if err != nil {
log.Printf("[DEBUG] createSaramaAdminClient ResourceControllerAPI err %s", err)
return nil, "", err
}
instanceCRN := d.Get("resource_instance_id").(string)
if len(instanceCRN) == 0 {
topicID := d.Id()
Expand All @@ -290,6 +277,12 @@ func createSaramaAdminClient(d *schema.ResourceData, meta interface{}) (sarama.C
}
instanceCRN = getInstanceCRN(topicID)
}
var adminClient sarama.ClusterAdmin
var ok bool
if adminClient, ok = clientPool[instanceCRN]; ok {
log.Printf("[DEBUG] createSaramaAdminClient got client from pool for instance %s", instanceCRN)
return adminClient, instanceCRN, nil
}
instance, err := getInstanceDetails(instanceCRN, meta)
if err != nil {
return nil, "", err
Expand All @@ -301,20 +294,30 @@ func createSaramaAdminClient(d *schema.ResourceData, meta interface{}) (sarama.C
slices.Sort(brokerAddress)
d.Set("kafka_brokers_sasl", brokerAddress)
log.Printf("[INFO] createSaramaAdminClient kafka_brokers_sasl is set to %s", brokerAddress)
tenantID := strings.TrimPrefix(strings.Split(adminURL, ".")[0], "https://")

config := sarama.NewConfig()
config.ClientID = "terraform-provider-ibm"
config.ClientID = fmt.Sprintf("terraform-provider-ibm/%s", version.Version)
config.Net.SASL.Enable = true
config.Net.TLS.Enable = true
config.Version = sarama.MaxVersion
tenantID := strings.TrimPrefix(strings.Split(adminURL, ".")[0], "https://")
if tenantID != "" && tenantID != "admin" {
config.Net.SASL.AuthIdentity = tenantID
} else {
config.Net.SASL.AuthIdentity = instanceCRN
}
config.Net.SASL.User = "token"
config.Net.SASL.Password = apiKey
config.Net.TLS.Enable = true
config.Version = brokerVersion
config.Admin.Timeout = adminClientTimeout
adminClient, err := sarama.NewClusterAdmin(brokerAddress, config)
_, err = validateToken(bxSession.Config.IAMAccessToken)
if err == nil {
config.Net.SASL.Mechanism = sarama.SASLTypeOAuth
config.Net.SASL.TokenProvider = accessTokenProvider{clientSession: bxSession}
log.Printf("[DEBUG] createSaramaAdminClient configured SASL mechanism=OAUTHBEARER")
} else {
config.Net.SASL.User = "token"
config.Net.SASL.Password = bxSession.Config.BluemixAPIKey
config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
log.Printf("[DEBUG] createSaramaAdminClient configured SASL mechanism=PLAIN")
}
adminClient, err = sarama.NewClusterAdmin(brokerAddress, config)
if err != nil {
log.Printf("[DEBUG] createSaramaAdminClient NewClusterAdmin err %s", err)
return nil, "", err
Expand Down Expand Up @@ -362,3 +365,29 @@ func getInstanceCRN(topicID string) string {
crnSegments[9] = ""
return strings.Join(crnSegments, ":")
}

type accessTokenProvider struct {
clientSession *session.Session
}

// Token() implements sarama.AccessTokenProvider interface for sasl.mechanism=OAUTHBEARER
func (tp accessTokenProvider) Token() (*sarama.AccessToken, error) {
token, err := validateToken(tp.clientSession.Config.IAMAccessToken)
if err != nil {
log.Printf("[DEBUG] accessTokenProvider.Token() error:%s", err)
return nil, err
}
return &sarama.AccessToken{Token: token}, nil
}

func validateToken(token string) (string, error) {
if len(token) == 0 {
return "", errors.New("IAMAccessToken is required")
}
token = strings.TrimPrefix(token, "Bearer")
token = strings.Trim(token, " ")
if len(strings.Split(token, ".")) != 3 {
return "", errors.New("IAMAccessToken is malformed")
}
return token, nil
}
Loading