diff --git a/plugins/outputs/event_hubs/README.md b/plugins/outputs/event_hubs/README.md index c63d74220612f..b689b592a7b2e 100644 --- a/plugins/outputs/event_hubs/README.md +++ b/plugins/outputs/event_hubs/README.md @@ -27,14 +27,11 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ```toml @sample.conf # Configuration for Event Hubs output plugin [[outputs.event_hubs]] - ## The full connection string to the Event Hub (required) - ## The shared access key must have "Send" permissions on the target Event Hub. + ## Full connection string to the Event Hub instance. The shared access key + ## must have "Send" permissions on the target Event Hub. connection_string = "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName" - ## Client timeout (defaults to 30s) - # timeout = "30s" - - ## Partition key + ## Partition key to use for the event ## Metric tag or field name to use for the event partition key. The value of ## this tag or field is set as the key for events if it exists. If both, tag ## and field, exist the tag is preferred. @@ -47,6 +44,9 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## used (currently 1,000,000 bytes) # max_message_size = "1MB" + ## Timeout for sending the data + # timeout = "30s" + ## Data format to output. ## Each data format has its own unique set of configuration options, read ## more about them here: diff --git a/plugins/outputs/event_hubs/event_hubs.go b/plugins/outputs/event_hubs/event_hubs.go index efb72393167d2..73aae33de86b7 100644 --- a/plugins/outputs/event_hubs/event_hubs.go +++ b/plugins/outputs/event_hubs/event_hubs.go @@ -44,7 +44,10 @@ func (e *EventHubs) Init() error { } func (e *EventHubs) Connect() error { - cfg := &azeventhubs.ProducerClientOptions{ApplicationID: internal.FormatFullVersion()} + cfg := &azeventhubs.ProducerClientOptions{ + ApplicationID: internal.FormatFullVersion(), + RetryOptions: azeventhubs.RetryOptions{MaxRetries: -1}, + } client, err := azeventhubs.NewProducerClientFromConnectionString(e.ConnectionString, "", cfg) if err != nil { @@ -67,8 +70,7 @@ func (e *EventHubs) SetSerializer(serializer telegraf.Serializer) { } func (e *EventHubs) Write(metrics []telegraf.Metric) error { - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout)) - defer cancel() + ctx := context.Background() batchOptions := e.options batches := make(map[string]*azeventhubs.EventDataBatch) @@ -122,7 +124,7 @@ func (e *EventHubs) Write(metrics []telegraf.Metric) error { e.Log.Tracef("metric: %+v", m) continue } - if err := e.client.SendEventDataBatch(ctx, batches[partition], nil); err != nil { + if err := e.send(batches[partition]); err != nil { return fmt.Errorf("sending batch for partition %q failed: %w", partition, err) } @@ -140,13 +142,20 @@ func (e *EventHubs) Write(metrics []telegraf.Metric) error { if batch.NumBytes() == 0 { continue } - if err := e.client.SendEventDataBatch(ctx, batch, nil); err != nil { + if err := e.send(batch); err != nil { return fmt.Errorf("sending batch for partition %q failed: %w", partition, err) } } return nil } +func (e *EventHubs) send(batch *azeventhubs.EventDataBatch) error { + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout)) + defer cancel() + + return e.client.SendEventDataBatch(ctx, batch, nil) +} + func init() { outputs.Add("event_hubs", func() telegraf.Output { return &EventHubs{ diff --git a/plugins/outputs/event_hubs/event_hubs_test.go b/plugins/outputs/event_hubs/event_hubs_test.go index 1bafd44916199..9a2d3f5563c9c 100644 --- a/plugins/outputs/event_hubs/event_hubs_test.go +++ b/plugins/outputs/event_hubs/event_hubs_test.go @@ -71,9 +71,137 @@ func TestEmulatorIntegration(t *testing.T) { hc.ExtraHosts = append(hc.ExtraHosts, "host.docker.internal:host-gateway") }, ExposedPorts: []string{"5672"}, - WaitingFor: wait.ForAll( - wait.ForListeningPort(nat.Port("5672")), + WaitingFor: wait.ForListeningPort(nat.Port("5672")), + } + require.NoError(t, emulator.Start(), "failed to start Azure Event Hub emulator container") + defer emulator.Terminate() + + conn := "Endpoint=sb://" + emulator.Address + ":" + emulator.Ports["5672"] + ";" + conn += "SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;EntityPath=test" + + // Setup plugin and connect + serializer := &json.Serializer{} + require.NoError(t, serializer.Init()) + + plugin := &EventHubs{ + ConnectionString: conn, + Timeout: config.Duration(3 * time.Second), + Log: testutil.Logger{}, + } + plugin.SetSerializer(serializer) + require.NoError(t, plugin.Init()) + require.NoError(t, plugin.Connect()) + defer plugin.Close() + + // Make sure we are connected + require.Eventually(t, func() bool { + return plugin.Write(testutil.MockMetrics()) == nil + }, 3*time.Second, 500*time.Millisecond) + + input := []telegraf.Metric{ + metric.New( + "test", + map[string]string{ + "source": "foo", + "division": "A", + "type": "temperature", + }, + map[string]interface{}{ + "value": 23, + }, + time.Unix(0, 0), ), + metric.New( + "test", + map[string]string{ + "source": "foo", + "division": "A", + "type": "humidity", + }, + map[string]interface{}{ + "value": 59, + }, + time.Unix(0, 0), + ), + metric.New( + "test", + map[string]string{ + "source": "bar", + "division": "B", + "type": "temperature", + }, + map[string]interface{}{ + "value": 42, + }, + time.Unix(0, 0), + ), + metric.New( + "test", + map[string]string{ + "source": "bar", + "division": "B", + "type": "humidity", + }, + map[string]interface{}{ + "value": 87, + }, + time.Unix(0, 0), + ), + } + + require.NoError(t, plugin.Write(input)) +} + +func TestReconnectIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + // Require the developers to explicitly accept the EULA of the emulator + if os.Getenv("AZURE_EVENT_HUBS_EMULATOR_ACCEPT_EULA") != "yes" { + t.Skip(` + Skipping due to unexcepted EULA. To run this test, please check the EULA of the emulator + at https://github.com/Azure/azure-event-hubs-emulator-installer/blob/main/EMULATOR_EULA.md + and accept it by setting the environment variable AZURE_EVENT_HUBS_EMULATOR_ACCEPT_EULA + to 'yes'. + `) + } + + // Setup the Azure Event Hub emulator environment + // See https://learn.microsoft.com/en-us/azure/event-hubs/test-locally-with-event-hub-emulator + ctx := context.Background() + azuriteContainer, err := azurite.Run(ctx, "mcr.microsoft.com/azure-storage/azurite:3.28.0") + require.NoError(t, err, "failed to start Azurite container") + defer func() { + if err := testcontainers.TerminateContainer(azuriteContainer); err != nil { + log.Printf("failed to terminate container: %s", err) + } + }() + + blobPort, err := azuriteContainer.MappedPort(ctx, azurite.BlobPort) + require.NoError(t, err) + + metadataPort, err := azuriteContainer.MappedPort(ctx, azurite.TablePort) + require.NoError(t, err) + + cfgfile, err := filepath.Abs(filepath.Join("testdata", "Config.json")) + require.NoError(t, err, "getting absolute path for config") + emulator := testutil.Container{ + Image: "mcr.microsoft.com/azure-messaging/eventhubs-emulator:latest", + Env: map[string]string{ + "BLOB_SERVER": "host.docker.internal:" + blobPort.Port(), + "METADATA_SERVER": "host.docker.internal:" + metadataPort.Port(), + "ACCEPT_EULA": "Y", + }, + Files: map[string]string{ + "/Eventhubs_Emulator/ConfigFiles/Config.json": cfgfile, + }, + HostAccessPorts: []int{blobPort.Int(), metadataPort.Int()}, + HostConfigModifier: func(hc *container.HostConfig) { + hc.ExtraHosts = append(hc.ExtraHosts, "host.docker.internal:host-gateway") + }, + ExposedPorts: []string{"5672"}, + WaitingFor: wait.ForListeningPort(nat.Port("5672")), } require.NoError(t, emulator.Start(), "failed to start Azure Event Hub emulator container") defer emulator.Terminate() @@ -87,13 +215,19 @@ func TestEmulatorIntegration(t *testing.T) { plugin := &EventHubs{ ConnectionString: conn, - Timeout: config.Duration(60 * time.Second), + Timeout: config.Duration(3 * time.Second), + Log: testutil.Logger{}, } plugin.SetSerializer(serializer) require.NoError(t, plugin.Init()) require.NoError(t, plugin.Connect()) defer plugin.Close() + // Make sure we are connected + require.Eventually(t, func() bool { + return plugin.Write(testutil.MockMetrics()) == nil + }, 3*time.Second, 500*time.Millisecond) + input := []telegraf.Metric{ metric.New( "test", @@ -145,5 +279,16 @@ func TestEmulatorIntegration(t *testing.T) { ), } + // This write should succeed as we should be able to connect to the + // container + require.NoError(t, plugin.Write(input)) + + // Pause the container to simulate connection loss. Subsequent writes + // should fail until the container is resumed + require.NoError(t, emulator.Pause()) + require.ErrorIs(t, plugin.Write(input), context.DeadlineExceeded) + + // Resume the container to check if the plugin reconnects + require.NoError(t, emulator.Resume()) require.NoError(t, plugin.Write(input)) } diff --git a/plugins/outputs/event_hubs/sample.conf b/plugins/outputs/event_hubs/sample.conf index 34ef79443db32..3717652a4e8c3 100644 --- a/plugins/outputs/event_hubs/sample.conf +++ b/plugins/outputs/event_hubs/sample.conf @@ -1,13 +1,10 @@ # Configuration for Event Hubs output plugin [[outputs.event_hubs]] - ## The full connection string to the Event Hub (required) - ## The shared access key must have "Send" permissions on the target Event Hub. + ## Full connection string to the Event Hub instance. The shared access key + ## must have "Send" permissions on the target Event Hub. connection_string = "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName" - ## Client timeout (defaults to 30s) - # timeout = "30s" - - ## Partition key + ## Partition key to use for the event ## Metric tag or field name to use for the event partition key. The value of ## this tag or field is set as the key for events if it exists. If both, tag ## and field, exist the tag is preferred. @@ -20,6 +17,9 @@ ## used (currently 1,000,000 bytes) # max_message_size = "1MB" + ## Timeout for sending the data + # timeout = "30s" + ## Data format to output. ## Each data format has its own unique set of configuration options, read ## more about them here: