diff --git a/pubsub/azuresb/azuresb.go b/pubsub/azuresb/azuresb.go index 140bae09c8..b1d7095472 100644 --- a/pubsub/azuresb/azuresb.go +++ b/pubsub/azuresb/azuresb.go @@ -21,7 +21,7 @@ // For pubsub.OpenTopic and pubsub.OpenSubscription, azuresb registers // for the scheme "azuresb". // The default URL opener will use a Service Bus Connection String based on -// the environment variable "SERVICEBUS_CONNECTION_STRING". +// AZURE_SERVICEBUS_HOSTNAME or SERVICEBUS_CONNECTION_STRING environment variables. SERVICEBUS_CONNECTION_STRING takes precedence. // To customize the URL opener, or for more details on the URL format, // see URLOpener. // See https://gocloud.dev/concepts/urls/ for background information. @@ -65,6 +65,7 @@ import ( "time" common "github.com/Azure/azure-amqp-common-go/v3" + "github.com/Azure/azure-sdk-for-go/sdk/azidentity" servicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" "github.com/Azure/go-amqp" "gocloud.dev/gcerrors" @@ -99,7 +100,8 @@ func init() { } // defaultURLOpener creates an URLOpener with ConnectionString initialized from -// the environment variable SERVICEBUS_CONNECTION_STRING. +// AZURE_SERVICEBUS_HOSTNAME or SERVICEBUS_CONNECTION_STRING environment variables. +// SERVICEBUS_CONNECTION_STRING takes precedence. type defaultOpener struct { init sync.Once opener *URLOpener @@ -109,11 +111,12 @@ type defaultOpener struct { func (o *defaultOpener) defaultOpener() (*URLOpener, error) { o.init.Do(func() { cs := os.Getenv("SERVICEBUS_CONNECTION_STRING") - if cs == "" { - o.err = errors.New("SERVICEBUS_CONNECTION_STRING environment variable not set") + sbHostname := os.Getenv("AZURE_SERVICEBUS_HOSTNAME") + if cs == "" && sbHostname == "" { + o.err = errors.New("SERVICEBUS_CONNECTION_STRING or AZURE_SERVICEBUS_HOSTNAME environment variables not set") return } - o.opener = &URLOpener{ConnectionString: cs} + o.opener = &URLOpener{ConnectionString: cs, ServiceBusHostname: sbHostname} }) return o.opener, o.err } @@ -147,10 +150,14 @@ const Scheme = "azuresb" // // No other query parameters are supported. type URLOpener struct { - // ConnectionString is the Service Bus connection string (required). + // ConnectionString is the Service Bus connection string (required if ServiceBusHostname is not defined). // https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues ConnectionString string + // Azure ServiceBus hostname + // https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-go-how-to-use-queues?tabs=bash + ServiceBusHostname string + // ClientOptions are options when creating the Client. ServiceBusClientOptions *servicebus.ClientOptions @@ -165,14 +172,29 @@ type URLOpener struct { } func (o *URLOpener) sbClient(kind string, u *url.URL) (*servicebus.Client, error) { - if o.ConnectionString == "" { - return nil, fmt.Errorf("open %s %v: ConnectionString is required", kind, u) + if o.ConnectionString == "" && o.ServiceBusHostname == "" { + return nil, fmt.Errorf("open %s %v: ConnectionString or ServiceBusHostname is required", kind, u) } - client, err := NewClientFromConnectionString(o.ConnectionString, o.ServiceBusClientOptions) - if err != nil { - return nil, fmt.Errorf("open %s %v: invalid connection string %q: %v", kind, u, o.ConnectionString, err) + + // auth using shared key (old method) + // ConnectionString approach takes presendence + if o.ConnectionString != "" { + client, err := NewClientFromConnectionString(o.ConnectionString, o.ServiceBusClientOptions) + if err != nil { + return nil, fmt.Errorf("open %s %v: invalid connection string %q: %v", kind, u, o.ConnectionString, err) + } + return client, nil } - return client, nil + + // auth using Azure AAD Workload Identity/AAD Pod Identities/AKS Kubelet Identity/Service Principal + if o.ServiceBusHostname != "" { + client, err := NewClientFromServiceBusHostname(o.ServiceBusHostname, o.ServiceBusClientOptions) + if err != nil { + return nil, fmt.Errorf("open %s %v: invalid service bus hostname %q: %v", kind, u, o.ServiceBusHostname, err) + } + return client, nil + } + return nil, fmt.Errorf("open %s: please set ServiceBusHostname or ConnectionString", kind) } // OpenTopicURL opens a pubsub.Topic based on u. @@ -234,12 +256,27 @@ type TopicOptions struct { BatcherOptions batcher.Options } -// NewClientFromConnectionString returns a *servicebus.Client from a Service Bus connection string. +// NewClientFromConnectionString returns a *servicebus.Client from a Service Bus connection string.(using shared key for auth) // https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues func NewClientFromConnectionString(connectionString string, opts *servicebus.ClientOptions) (*servicebus.Client, error) { return servicebus.NewClientFromConnectionString(connectionString, opts) } +// NewClientFromConnectionString returns a *servicebus.Client from a Service Bus connection string.(using shared key for auth) +// for example you can use workload identity autorization. +// https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-go-how-to-use-queues?tabs=bash +func NewClientFromServiceBusHostname(serviceBusHostname string, opts *servicebus.ClientOptions) (*servicebus.Client, error) { + cred, err := azidentity.NewDefaultAzureCredential(nil) + if err != nil { + return nil, err + } + client, err := servicebus.NewClient(serviceBusHostname, cred, opts) + if err != nil { + return nil, err + } + return client, nil +} + // NewSender returns a *servicebus.Sender associated with a Service Bus Client. func NewSender(sbClient *servicebus.Client, topicName string, opts *servicebus.NewSenderOptions) (*servicebus.Sender, error) { return sbClient.NewSender(topicName, opts) diff --git a/pubsub/azuresb/azuresb_test.go b/pubsub/azuresb/azuresb_test.go index 491fa5e8ec..65d9d3112d 100644 --- a/pubsub/azuresb/azuresb_test.go +++ b/pubsub/azuresb/azuresb_test.go @@ -26,6 +26,7 @@ import ( "gocloud.dev/pubsub/driver" "gocloud.dev/pubsub/drivertest" + "github.com/Azure/azure-sdk-for-go/sdk/azidentity" servicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin" ) @@ -34,6 +35,7 @@ var ( // See docs below on how to provision an Azure Service Bus Namespace and obtaining the connection string. // https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues connString = os.Getenv("SERVICEBUS_CONNECTION_STRING") + sbHostname = os.Getenv("AZURE_SERVICEBUS_HOSTNAME") ) const ( @@ -56,8 +58,8 @@ type harness struct { } func newHarness(ctx context.Context, t *testing.T) (drivertest.Harness, error) { - if connString == "" { - return nil, fmt.Errorf("azuresb: test harness requires environment variable SERVICEBUS_CONNECTION_STRING to run") + if connString == "" && sbHostname == "" { + return nil, fmt.Errorf("azuresb: test harness requires environment variable SERVICEBUS_CONNECTION_STRING or AZURE_SERVICEBUS_HOSTNAME to run") } adminClient, err := admin.NewClientFromConnectionString(connString, nil) if err != nil { @@ -310,6 +312,13 @@ func deleteSubscription(ctx context.Context, topicName string, subscriptionName return nil } +// to run test using Azure Entra credentials: +// 1. grant access to ${AZURE_CLIENT_ID} to Service Bus namespace +// 2. run test: +// AZURE_CLIENT_SECRET='secret' \ +// AZURE_CLIENT_ID=client_id_uud \ +// AZURE_TENANT_ID=tenant_id_uuid \ +// AZURE_SERVICEBUS_HOSTNAME=hostname go test -benchmem -run=^$ -bench ^BenchmarkAzureServiceBusPubSub$ gocloud.dev/pubsub/azuresb func BenchmarkAzureServiceBusPubSub(b *testing.B) { const ( benchmarkTopicName = "benchmark-topic" @@ -317,16 +326,36 @@ func BenchmarkAzureServiceBusPubSub(b *testing.B) { ) ctx := context.Background() - if connString == "" { - b.Fatal("azuresb: benchmark requires environment variable SERVICEBUS_CONNECTION_STRING to run") - } - adminClient, err := admin.NewClientFromConnectionString(connString, nil) - if err != nil { - b.Fatal(err) + var adminClient *admin.Client + var sbClient *servicebus.Client + var err error + + if connString == "" && sbHostname == "" { + b.Fatal("azuresb: benchmark requires environment variable SERVICEBUS_CONNECTION_STRING or AZURE_SERVICEBUS_HOSTNAME to run") } - sbClient, err := NewClientFromConnectionString(connString, nil) - if err != nil { - b.Fatal(err) + + if connString != "" { + adminClient, err = admin.NewClientFromConnectionString(connString, nil) + if err != nil { + b.Fatal(err) + } + sbClient, err = NewClientFromConnectionString(connString, nil) + if err != nil { + b.Fatal(err) + } + } else if sbHostname != "" { + cred, err := azidentity.NewDefaultAzureCredential(nil) + if err != nil { + b.Fatal(err) + } + adminClient, err = admin.NewClient(sbHostname, cred, nil) + if err != nil { + b.Fatal(err) + } + sbClient, err = NewClientFromServiceBusHostname(sbHostname, nil) + if err != nil { + b.Fatal(err) + } } // Make topic.