Skip to content

Commit

Permalink
create a Hub interface
Browse files Browse the repository at this point in the history
  • Loading branch information
atoulme committed Aug 13, 2022
1 parent d88c0a0 commit 828a0c8
Show file tree
Hide file tree
Showing 14 changed files with 149 additions and 83 deletions.
2 changes: 1 addition & 1 deletion _examples/helloworld/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func main() {
}
}

func initHub() (*eventhub.Hub, []string) {
func initHub() (eventhub.Hub, []string) {
namespace := mustGetenv("EVENTHUB_NAMESPACE")
hubMgmt, err := ensureEventHub(context.Background(), HubName)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion _examples/helloworld/producer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func main() {
}
}

func initHub() (*eventhub.Hub, []string) {
func initHub() (eventhub.Hub, []string) {
namespace := mustGetenv("EVENTHUB_NAMESPACE")
hubMgmt, err := ensureEventHub(context.Background(), HubName)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Change Log

## Unreleased

- Move Hub to an interface (#272)

## `v3.3.18`

- Fixing issue where the LeaserCheckpointer could fail with a "ContainerAlreadyExists" error. (#253)
Expand Down
2 changes: 1 addition & 1 deletion eph/eph.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type (
name string
consumerGroup string
tokenProvider auth.TokenProvider
client *eventhub.Hub
client eventhub.Hub
leaser Leaser
checkpointer Checkpointer
scheduler *scheduler
Expand Down
4 changes: 2 additions & 2 deletions eph/eph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,15 +258,15 @@ func (s *testSuite) newInMemoryEPHWithOptions(hubName string, store *sharedStore
return processor, nil
}

func (s *testSuite) newClient(t *testing.T, hubName string, opts ...eventhub.HubOption) *eventhub.Hub {
func (s *testSuite) newClient(t *testing.T, hubName string, opts ...eventhub.HubOption) eventhub.Hub {
provider, err := aad.NewJWTProvider(aad.JWTProviderWithEnvironmentVars(), aad.JWTProviderWithAzureEnvironment(&s.Env))
if err != nil {
t.Fatal(err)
}
return s.newClientWithProvider(t, hubName, provider, opts...)
}

func (s *testSuite) newClientWithProvider(t *testing.T, hubName string, provider auth.TokenProvider, opts ...eventhub.HubOption) *eventhub.Hub {
func (s *testSuite) newClientWithProvider(t *testing.T, hubName string, provider auth.TokenProvider, opts ...eventhub.HubOption) eventhub.Hub {
opts = append(opts, eventhub.HubWithEnvironment(s.Env))
client, err := eventhub.NewHub(s.Namespace, hubName, provider, opts...)
if err != nil {
Expand Down
133 changes: 95 additions & 38 deletions hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,39 @@ const (

type (
// Hub provides the ability to send and receive Event Hub messages
Hub struct {
Hub interface {
// GetRuntimeInformation fetches runtime information from the Event Hub management node
GetRuntimeInformation(ctx context.Context) (*HubRuntimeInformation, error)
// GetPartitionInformation fetches runtime information about a specific partition from the Event Hub management node
GetPartitionInformation(ctx context.Context, partitionID string) (*HubPartitionRuntimeInformation, error)
// Close drains and closes all of the existing senders, receivers and connections
Close(ctx context.Context) error
// Receive subscribes for messages sent to the provided entityPath.
//
// The context passed into Receive is only used to limit the amount of time the caller will wait for the Receive
// method to connect to the Event Hub. The context passed in does not control the lifetime of Receive after connection.
//
// If Receive encounters an initial error setting up the connection, an error will be returned.
//
// If Receive starts successfully, a *ListenerHandle and a nil error will be returned. The ListenerHandle exposes
// methods which will help manage the life span of the receiver.
//
// # ListenerHandle.Close(ctx) closes the receiver
//
// # ListenerHandle.Done() signals the consumer when the receiver has stopped
//
// ListenerHandle.Err() provides the last error the listener encountered and was unable to recover from
Receive(ctx context.Context, partitionID string, handler Handler, opts ...ReceiveOption) (*ListenerHandle, error)
// Send sends an event to the Event Hub
//
// Send will retry sending the message for as long as the context allows
Send(ctx context.Context, event *Event, opts ...SendOption) error
// SendBatch sends a batch of events to the Hub
SendBatch(ctx context.Context, iterator BatchIterator, opts ...BatchOption) error
}

// Hub provides the ability to send and receive Event Hub messages
hubImpl struct {
name string
namespace *namespace
receivers map[string]*receiver
Expand Down Expand Up @@ -92,7 +124,7 @@ type (

// HubOption provides structure for configuring new Event Hub clients. For building new Event Hubs, see
// HubManagementOption.
HubOption func(h *Hub) error
HubOption func(h Hub) error

// HubManager provides CRUD functionality for Event Hubs
HubManager struct {
Expand Down Expand Up @@ -336,7 +368,7 @@ func hubEntryToEntity(entry *hubEntry) *HubEntity {
// NOTE: If the AZURE_ENVIRONMENT variable is set, it will be used to set the ServiceBusEndpointSuffix
// from the corresponding azure.Environment type at the end of the namespace host string. The default
// is azure.PublicCloud.
func NewHub(namespace, name string, tokenProvider auth.TokenProvider, opts ...HubOption) (*Hub, error) {
func NewHub(namespace, name string, tokenProvider auth.TokenProvider, opts ...HubOption) (Hub, error) {
env := azure.PublicCloud
if e := os.Getenv("AZURE_ENVIRONMENT"); e != "" {
var err error
Expand All @@ -350,7 +382,7 @@ func NewHub(namespace, name string, tokenProvider auth.TokenProvider, opts ...Hu
return nil, err
}

h := &Hub{
h := &hubImpl{
name: name,
namespace: ns,
offsetPersister: persist.NewMemoryPersister(),
Expand Down Expand Up @@ -385,10 +417,8 @@ func NewHub(namespace, name string, tokenProvider auth.TokenProvider, opts ...Hu
// 2) Expected Environment Variable:
// - "EVENTHUB_CONNECTION_STRING" connection string from the Azure portal
//
//
// AAD TokenProvider environment variables:
//
// 1. client Credentials: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID" and
// 1. client Credentials: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID" and
// "AZURE_CLIENT_SECRET"
//
// 2. client Certificate: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID",
Expand All @@ -397,9 +427,8 @@ func NewHub(namespace, name string, tokenProvider auth.TokenProvider, opts ...Hu
// 3. Managed Service Identity (MSI): attempt to authenticate via MSI on the default local MSI internally addressable IP
// and port. See: adal.GetMSIVMEndpoint()
//
//
// The Azure Environment used can be specified using the name of the Azure Environment set in the AZURE_ENVIRONMENT var.
func NewHubWithNamespaceNameAndEnvironment(namespace, name string, opts ...HubOption) (*Hub, error) {
func NewHubWithNamespaceNameAndEnvironment(namespace, name string, opts ...HubOption) (Hub, error) {
var provider auth.TokenProvider
provider, sasErr := sas.NewTokenProvider(sas.TokenProviderWithEnvironmentVars())
if sasErr == nil {
Expand All @@ -420,7 +449,6 @@ func NewHubWithNamespaceNameAndEnvironment(namespace, name string, opts ...HubOp
// - "EVENTHUB_NAMESPACE" the namespace of the Event Hub instance
// - "EVENTHUB_NAME" the name of the Event Hub instance
//
//
// This method depends on NewHubWithNamespaceNameAndEnvironment which will attempt to build a token provider from
// environment variables. If unable to build a AAD Token Provider it will fall back to a SAS token provider. If neither
// can be built, it will return error.
Expand All @@ -437,7 +465,6 @@ func NewHubWithNamespaceNameAndEnvironment(namespace, name string, opts ...HubOp
// 2) Expected Environment Variable:
// - "EVENTHUB_CONNECTION_STRING" connection string from the Azure portal
//
//
// AAD TokenProvider environment variables:
// 1. client Credentials: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID" and
// "AZURE_CLIENT_SECRET"
Expand All @@ -449,7 +476,7 @@ func NewHubWithNamespaceNameAndEnvironment(namespace, name string, opts ...HubOp
//
//
// The Azure Environment used can be specified using the name of the Azure Environment set in the AZURE_ENVIRONMENT var.
func NewHubFromEnvironment(opts ...HubOption) (*Hub, error) {
func NewHubFromEnvironment(opts ...HubOption) (Hub, error) {
const envErrMsg = "environment var %s must not be empty"
var namespace, name string

Expand All @@ -467,8 +494,38 @@ func NewHubFromEnvironment(opts ...HubOption) (*Hub, error) {
// NewHubFromConnectionString creates a new Event Hub client for sending and receiving messages from a connection string
// formatted like the following:
//
// Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName
func NewHubFromConnectionString(connStr string, opts ...HubOption) (*Hub, error) {
// Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName
func NewHubFromConnectionString(connStr string, opts ...HubOption) (Hub, error) {
parsed, err := conn.ParsedConnectionFromStr(connStr)
if err != nil {
return nil, err
}

ns, err := newNamespace(namespaceWithConnectionString(connStr))
if err != nil {
return nil, err
}

h := &hubImpl{
name: parsed.HubName,
namespace: ns,
offsetPersister: persist.NewMemoryPersister(),
userAgent: rootUserAgent,
receivers: make(map[string]*receiver),
senderRetryOptions: newSenderRetryOptions(),
}

for _, opt := range opts {
err := opt(h)
if err != nil {
return nil, err
}
}

return h, err
}

func NewMockHubFromConnectionString(connStr string, opts ...HubOption) (Hub, error) {
parsed, err := conn.ParsedConnectionFromStr(connStr)
if err != nil {
return nil, err
Expand All @@ -479,7 +536,7 @@ func NewHubFromConnectionString(connStr string, opts ...HubOption) (*Hub, error)
return nil, err
}

h := &Hub{
h := &hubImpl{
name: parsed.HubName,
namespace: ns,
offsetPersister: persist.NewMemoryPersister(),
Expand All @@ -499,7 +556,7 @@ func NewHubFromConnectionString(connStr string, opts ...HubOption) (*Hub, error)
}

// GetRuntimeInformation fetches runtime information from the Event Hub management node
func (h *Hub) GetRuntimeInformation(ctx context.Context) (*HubRuntimeInformation, error) {
func (h *hubImpl) GetRuntimeInformation(ctx context.Context) (*HubRuntimeInformation, error) {
span, ctx := h.startSpanFromContext(ctx, "eh.Hub.GetRuntimeInformation")
defer span.End()
client := newClient(h.namespace, h.name)
Expand All @@ -525,7 +582,7 @@ func (h *Hub) GetRuntimeInformation(ctx context.Context) (*HubRuntimeInformation
}

// GetPartitionInformation fetches runtime information about a specific partition from the Event Hub management node
func (h *Hub) GetPartitionInformation(ctx context.Context, partitionID string) (*HubPartitionRuntimeInformation, error) {
func (h *hubImpl) GetPartitionInformation(ctx context.Context, partitionID string) (*HubPartitionRuntimeInformation, error) {
span, ctx := h.startSpanFromContext(ctx, "eh.Hub.GetPartitionInformation")
defer span.End()
client := newClient(h.namespace, h.name)
Expand All @@ -550,7 +607,7 @@ func (h *Hub) GetPartitionInformation(ctx context.Context, partitionID string) (
}

// Close drains and closes all of the existing senders, receivers and connections
func (h *Hub) Close(ctx context.Context) error {
func (h *hubImpl) Close(ctx context.Context) error {
span, ctx := h.startSpanFromContext(ctx, "eh.Hub.Close")
defer span.End()

Expand Down Expand Up @@ -582,7 +639,7 @@ func (h *Hub) Close(ctx context.Context) error {
}

// closeReceivers will close the receivers on the hub and return the last error
func (h *Hub) closeReceivers(ctx context.Context) error {
func (h *hubImpl) closeReceivers(ctx context.Context) error {
span, ctx := h.startSpanFromContext(ctx, "eh.Hub.closeReceivers")
defer span.End()

Expand All @@ -606,12 +663,12 @@ func (h *Hub) closeReceivers(ctx context.Context) error {
// If Receive starts successfully, a *ListenerHandle and a nil error will be returned. The ListenerHandle exposes
// methods which will help manage the life span of the receiver.
//
// ListenerHandle.Close(ctx) closes the receiver
// # ListenerHandle.Close(ctx) closes the receiver
//
// ListenerHandle.Done() signals the consumer when the receiver has stopped
// # ListenerHandle.Done() signals the consumer when the receiver has stopped
//
// ListenerHandle.Err() provides the last error the listener encountered and was unable to recover from
func (h *Hub) Receive(ctx context.Context, partitionID string, handler Handler, opts ...ReceiveOption) (*ListenerHandle, error) {
func (h *hubImpl) Receive(ctx context.Context, partitionID string, handler Handler, opts ...ReceiveOption) (*ListenerHandle, error) {
span, ctx := h.startSpanFromContext(ctx, "eh.Hub.Receive")
defer span.End()

Expand Down Expand Up @@ -639,7 +696,7 @@ func (h *Hub) Receive(ctx context.Context, partitionID string, handler Handler,
// Send sends an event to the Event Hub
//
// Send will retry sending the message for as long as the context allows
func (h *Hub) Send(ctx context.Context, event *Event, opts ...SendOption) error {
func (h *hubImpl) Send(ctx context.Context, event *Event, opts ...SendOption) error {
span, ctx := h.startSpanFromContext(ctx, "eh.Hub.Send")
defer span.End()

Expand All @@ -652,7 +709,7 @@ func (h *Hub) Send(ctx context.Context, event *Event, opts ...SendOption) error
}

// SendBatch sends a batch of events to the Hub
func (h *Hub) SendBatch(ctx context.Context, iterator BatchIterator, opts ...BatchOption) error {
func (h *hubImpl) SendBatch(ctx context.Context, iterator BatchIterator, opts ...BatchOption) error {
span, ctx := h.startSpanFromContext(ctx, "eh.Hub.SendBatch")
defer span.End()

Expand Down Expand Up @@ -698,17 +755,17 @@ func (h *Hub) SendBatch(ctx context.Context, iterator BatchIterator, opts ...Bat

// HubWithPartitionedSender configures the Hub instance to send to a specific event Hub partition
func HubWithPartitionedSender(partitionID string) HubOption {
return func(h *Hub) error {
h.senderPartitionID = &partitionID
return func(h Hub) error {
h.(*hubImpl).senderPartitionID = &partitionID
return nil
}
}

// HubWithOffsetPersistence configures the Hub instance to read and write offsets so that if a Hub is interrupted, it
// can resume after the last consumed event.
func HubWithOffsetPersistence(offsetPersister persist.CheckpointPersister) HubOption {
return func(h *Hub) error {
h.offsetPersister = offsetPersister
return func(h Hub) error {
h.(*hubImpl).offsetPersister = offsetPersister
return nil
}
}
Expand All @@ -719,25 +776,25 @@ func HubWithOffsetPersistence(offsetPersister persist.CheckpointPersister) HubOp
//
// Max user agent length is specified by the const maxUserAgentLen.
func HubWithUserAgent(userAgent string) HubOption {
return func(h *Hub) error {
return h.appendAgent(userAgent)
return func(h Hub) error {
return h.(*hubImpl).appendAgent(userAgent)
}
}

// HubWithEnvironment configures the Hub to use the specified environment.
//
// By default, the Hub instance will use Azure US Public cloud environment
func HubWithEnvironment(env azure.Environment) HubOption {
return func(h *Hub) error {
h.namespace.host = "amqps://" + h.namespace.name + "." + env.ServiceBusEndpointSuffix
return func(h Hub) error {
h.(*hubImpl).namespace.host = "amqps://" + h.(*hubImpl).namespace.name + "." + env.ServiceBusEndpointSuffix
return nil
}
}

// HubWithWebSocketConnection configures the Hub to use a WebSocket connection wss:// rather than amqps://
func HubWithWebSocketConnection() HubOption {
return func(h *Hub) error {
h.namespace.useWebSocket = true
return func(h Hub) error {
h.(*hubImpl).namespace.useWebSocket = true
return nil
}
}
Expand All @@ -746,13 +803,13 @@ func HubWithWebSocketConnection() HubOption {
// in addition to the original attempt.
// 0 indicates no retries, and < 0 will cause infinite retries.
func HubWithSenderMaxRetryCount(maxRetryCount int) HubOption {
return func(h *Hub) error {
h.senderRetryOptions.maxRetries = maxRetryCount
return func(h Hub) error {
h.(*hubImpl).senderRetryOptions.maxRetries = maxRetryCount
return nil
}
}

func (h *Hub) appendAgent(userAgent string) error {
func (h *hubImpl) appendAgent(userAgent string) error {
ua := path.Join(h.userAgent, userAgent)
if len(ua) > maxUserAgentLen {
return fmt.Errorf("user agent string has surpassed the max length of %d", maxUserAgentLen)
Expand All @@ -761,7 +818,7 @@ func (h *Hub) appendAgent(userAgent string) error {
return nil
}

func (h *Hub) getSender(ctx context.Context) (*sender, error) {
func (h *hubImpl) getSender(ctx context.Context) (*sender, error) {
h.senderMu.Lock()
defer h.senderMu.Unlock()

Expand Down
Loading

0 comments on commit 828a0c8

Please sign in to comment.