Skip to content

Commit

Permalink
Add unit-test for reconnecting
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan committed Feb 5, 2025
1 parent d7c5301 commit 7d6a908
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 20 deletions.
12 changes: 6 additions & 6 deletions plugins/outputs/event_hubs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,11 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
```toml @sample.conf
# Configuration for Event Hubs output plugin
[[outputs.event_hubs]]
## The full connection string to the Event Hub (required)
## The shared access key must have "Send" permissions on the target Event Hub.
## Full connection string to the Event Hub instance. The shared access key
## must have "Send" permissions on the target Event Hub.
connection_string = "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName"

## Client timeout (defaults to 30s)
# timeout = "30s"

## Partition key
## Partition key to use for the event
## Metric tag or field name to use for the event partition key. The value of
## this tag or field is set as the key for events if it exists. If both, tag
## and field, exist the tag is preferred.
Expand All @@ -47,6 +44,9 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## used (currently 1,000,000 bytes)
# max_message_size = "1MB"

## Timeout for sending the data
# timeout = "30s"

## Data format to output.
## Each data format has its own unique set of configuration options, read
## more about them here:
Expand Down
19 changes: 14 additions & 5 deletions plugins/outputs/event_hubs/event_hubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ func (e *EventHubs) Init() error {
}

func (e *EventHubs) Connect() error {
cfg := &azeventhubs.ProducerClientOptions{ApplicationID: internal.FormatFullVersion()}
cfg := &azeventhubs.ProducerClientOptions{
ApplicationID: internal.FormatFullVersion(),
RetryOptions: azeventhubs.RetryOptions{MaxRetries: -1},
}

client, err := azeventhubs.NewProducerClientFromConnectionString(e.ConnectionString, "", cfg)
if err != nil {
Expand All @@ -67,8 +70,7 @@ func (e *EventHubs) SetSerializer(serializer telegraf.Serializer) {
}

func (e *EventHubs) Write(metrics []telegraf.Metric) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout))
defer cancel()
ctx := context.Background()

batchOptions := e.options
batches := make(map[string]*azeventhubs.EventDataBatch)
Expand Down Expand Up @@ -122,7 +124,7 @@ func (e *EventHubs) Write(metrics []telegraf.Metric) error {
e.Log.Tracef("metric: %+v", m)
continue
}
if err := e.client.SendEventDataBatch(ctx, batches[partition], nil); err != nil {
if err := e.send(batches[partition]); err != nil {
return fmt.Errorf("sending batch for partition %q failed: %w", partition, err)
}

Expand All @@ -140,13 +142,20 @@ func (e *EventHubs) Write(metrics []telegraf.Metric) error {
if batch.NumBytes() == 0 {
continue
}
if err := e.client.SendEventDataBatch(ctx, batch, nil); err != nil {
if err := e.send(batch); err != nil {
return fmt.Errorf("sending batch for partition %q failed: %w", partition, err)
}
}
return nil
}

func (e *EventHubs) send(batch *azeventhubs.EventDataBatch) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout))
defer cancel()

return e.client.SendEventDataBatch(ctx, batch, nil)
}

func init() {
outputs.Add("event_hubs", func() telegraf.Output {
return &EventHubs{
Expand Down
151 changes: 148 additions & 3 deletions plugins/outputs/event_hubs/event_hubs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,137 @@ func TestEmulatorIntegration(t *testing.T) {
hc.ExtraHosts = append(hc.ExtraHosts, "host.docker.internal:host-gateway")
},
ExposedPorts: []string{"5672"},
WaitingFor: wait.ForAll(
wait.ForListeningPort(nat.Port("5672")),
WaitingFor: wait.ForListeningPort(nat.Port("5672")),
}
require.NoError(t, emulator.Start(), "failed to start Azure Event Hub emulator container")
defer emulator.Terminate()

conn := "Endpoint=sb://" + emulator.Address + ":" + emulator.Ports["5672"] + ";"
conn += "SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;EntityPath=test"

// Setup plugin and connect
serializer := &json.Serializer{}
require.NoError(t, serializer.Init())

plugin := &EventHubs{
ConnectionString: conn,
Timeout: config.Duration(3 * time.Second),
Log: testutil.Logger{},
}
plugin.SetSerializer(serializer)
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Connect())
defer plugin.Close()

// Make sure we are connected
require.Eventually(t, func() bool {
return plugin.Write(testutil.MockMetrics()) == nil
}, 3*time.Second, 500*time.Millisecond)

input := []telegraf.Metric{
metric.New(
"test",
map[string]string{
"source": "foo",
"division": "A",
"type": "temperature",
},
map[string]interface{}{
"value": 23,
},
time.Unix(0, 0),
),
metric.New(
"test",
map[string]string{
"source": "foo",
"division": "A",
"type": "humidity",
},
map[string]interface{}{
"value": 59,
},
time.Unix(0, 0),
),
metric.New(
"test",
map[string]string{
"source": "bar",
"division": "B",
"type": "temperature",
},
map[string]interface{}{
"value": 42,
},
time.Unix(0, 0),
),
metric.New(
"test",
map[string]string{
"source": "bar",
"division": "B",
"type": "humidity",
},
map[string]interface{}{
"value": 87,
},
time.Unix(0, 0),
),
}

require.NoError(t, plugin.Write(input))
}

func TestReconnectIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

// Require the developers to explicitly accept the EULA of the emulator
if os.Getenv("AZURE_EVENT_HUBS_EMULATOR_ACCEPT_EULA") != "yes" {
t.Skip(`
Skipping due to unexcepted EULA. To run this test, please check the EULA of the emulator
at https://github.com/Azure/azure-event-hubs-emulator-installer/blob/main/EMULATOR_EULA.md
and accept it by setting the environment variable AZURE_EVENT_HUBS_EMULATOR_ACCEPT_EULA
to 'yes'.
`)
}

// Setup the Azure Event Hub emulator environment
// See https://learn.microsoft.com/en-us/azure/event-hubs/test-locally-with-event-hub-emulator
ctx := context.Background()
azuriteContainer, err := azurite.Run(ctx, "mcr.microsoft.com/azure-storage/azurite:3.28.0")
require.NoError(t, err, "failed to start Azurite container")
defer func() {
if err := testcontainers.TerminateContainer(azuriteContainer); err != nil {
log.Printf("failed to terminate container: %s", err)
}
}()

blobPort, err := azuriteContainer.MappedPort(ctx, azurite.BlobPort)
require.NoError(t, err)

metadataPort, err := azuriteContainer.MappedPort(ctx, azurite.TablePort)
require.NoError(t, err)

cfgfile, err := filepath.Abs(filepath.Join("testdata", "Config.json"))
require.NoError(t, err, "getting absolute path for config")
emulator := testutil.Container{
Image: "mcr.microsoft.com/azure-messaging/eventhubs-emulator:latest",
Env: map[string]string{
"BLOB_SERVER": "host.docker.internal:" + blobPort.Port(),
"METADATA_SERVER": "host.docker.internal:" + metadataPort.Port(),
"ACCEPT_EULA": "Y",
},
Files: map[string]string{
"/Eventhubs_Emulator/ConfigFiles/Config.json": cfgfile,
},
HostAccessPorts: []int{blobPort.Int(), metadataPort.Int()},
HostConfigModifier: func(hc *container.HostConfig) {
hc.ExtraHosts = append(hc.ExtraHosts, "host.docker.internal:host-gateway")
},
ExposedPorts: []string{"5672"},
WaitingFor: wait.ForListeningPort(nat.Port("5672")),
}
require.NoError(t, emulator.Start(), "failed to start Azure Event Hub emulator container")
defer emulator.Terminate()
Expand All @@ -87,13 +215,19 @@ func TestEmulatorIntegration(t *testing.T) {

plugin := &EventHubs{
ConnectionString: conn,
Timeout: config.Duration(60 * time.Second),
Timeout: config.Duration(3 * time.Second),
Log: testutil.Logger{},
}
plugin.SetSerializer(serializer)
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Connect())
defer plugin.Close()

// Make sure we are connected
require.Eventually(t, func() bool {
return plugin.Write(testutil.MockMetrics()) == nil
}, 3*time.Second, 500*time.Millisecond)

input := []telegraf.Metric{
metric.New(
"test",
Expand Down Expand Up @@ -145,5 +279,16 @@ func TestEmulatorIntegration(t *testing.T) {
),
}

// This write should succeed as we should be able to connect to the
// container
require.NoError(t, plugin.Write(input))

// Pause the container to simulate connection loss. Subsequent writes
// should fail until the container is resumed
require.NoError(t, emulator.Pause())
require.ErrorIs(t, plugin.Write(input), context.DeadlineExceeded)

// Resume the container to check if the plugin reconnects
require.NoError(t, emulator.Resume())
require.NoError(t, plugin.Write(input))
}
12 changes: 6 additions & 6 deletions plugins/outputs/event_hubs/sample.conf
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
# Configuration for Event Hubs output plugin
[[outputs.event_hubs]]
## The full connection string to the Event Hub (required)
## The shared access key must have "Send" permissions on the target Event Hub.
## Full connection string to the Event Hub instance. The shared access key
## must have "Send" permissions on the target Event Hub.
connection_string = "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName"

## Client timeout (defaults to 30s)
# timeout = "30s"

## Partition key
## Partition key to use for the event
## Metric tag or field name to use for the event partition key. The value of
## this tag or field is set as the key for events if it exists. If both, tag
## and field, exist the tag is preferred.
Expand All @@ -20,6 +17,9 @@
## used (currently 1,000,000 bytes)
# max_message_size = "1MB"

## Timeout for sending the data
# timeout = "30s"

## Data format to output.
## Each data format has its own unique set of configuration options, read
## more about them here:
Expand Down

0 comments on commit 7d6a908

Please sign in to comment.