From 18ef8cf6b9eeaedff36f8915c21c57e9761a9773 Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Tue, 4 Feb 2025 09:07:03 +0100 Subject: [PATCH 1/5] chore(outputs.event_hubs): Switch to maintained library --- go.mod | 8 +- go.sum | 28 +- plugins/outputs/event_hubs/README.md | 9 +- plugins/outputs/event_hubs/event_hubs.go | 165 ++++++------ plugins/outputs/event_hubs/event_hubs_test.go | 241 +++++++++--------- plugins/outputs/event_hubs/sample.conf | 9 +- .../outputs/event_hubs/testdata/Config.json | 24 ++ testutil/container.go | 2 + 8 files changed, 261 insertions(+), 225 deletions(-) create mode 100644 plugins/outputs/event_hubs/testdata/Config.json diff --git a/go.mod b/go.mod index 56bc21d782a95..8dc697e33c410 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/Azure/azure-kusto-go v0.16.1 github.com/Azure/azure-sdk-for-go/sdk/azcore v1.17.0 github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.8.1 + github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.2.3 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor v0.11.0 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0 github.com/Azure/azure-storage-queue-go v0.0.0-20230531184854-c06a8eff66fe @@ -189,7 +190,8 @@ require ( github.com/srebhan/protobufquery v1.0.1 github.com/stretchr/testify v1.10.0 github.com/tbrandon/mbserver v0.0.0-20170611213546-993e1772cc62 - github.com/testcontainers/testcontainers-go v0.34.0 + github.com/testcontainers/testcontainers-go v0.35.0 + github.com/testcontainers/testcontainers-go/modules/azurite v0.35.0 github.com/testcontainers/testcontainers-go/modules/kafka v0.34.0 github.com/thomasklein94/packer-plugin-libvirt v0.5.0 github.com/tidwall/gjson v1.18.0 @@ -244,7 +246,7 @@ require ( cloud.google.com/go/auth/oauth2adapt v0.2.7 // indirect cloud.google.com/go/compute/metadata v0.6.0 // indirect cloud.google.com/go/iam v1.2.2 // indirect - code.cloudfoundry.org/clock v1.0.0 // indirect + code.cloudfoundry.org/clock v1.2.0 // indirect dario.cat/mergo v1.0.1 // indirect filippo.io/edwards25519 v1.1.0 // indirect github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect @@ -253,7 +255,7 @@ require ( github.com/Azure/azure-sdk-for-go v68.0.0+incompatible // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2 // indirect - github.com/Azure/go-amqp v1.0.0 // indirect + github.com/Azure/go-amqp v1.0.5 // indirect github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect github.com/Azure/go-autorest v14.2.0+incompatible // indirect github.com/Azure/go-autorest/autorest/azure/cli v0.4.6 // indirect diff --git a/go.sum b/go.sum index b954030ad1eaa..fa3aa71e579e7 100644 --- a/go.sum +++ b/go.sum @@ -624,8 +624,8 @@ cloud.google.com/go/workflows v1.8.0/go.mod h1:ysGhmEajwZxGn1OhGOGKsTXc5PyxOc0vf cloud.google.com/go/workflows v1.9.0/go.mod h1:ZGkj1aFIOd9c8Gerkjjq7OW7I5+l6cSvT3ujaO/WwSA= cloud.google.com/go/workflows v1.10.0/go.mod h1:fZ8LmRmZQWacon9UCX1r/g/DfAXx5VcPALq2CxzdePw= code.cloudfoundry.org/clock v0.0.0-20180518195852-02e53af36e6c/go.mod h1:QD9Lzhd/ux6eNQVUDVRJX/RKTigpewimNYBi7ivZKY8= -code.cloudfoundry.org/clock v1.0.0 h1:kFXWQM4bxYvdBw2X8BbBeXwQNgfoWv1vqAk2ZZyBN2o= -code.cloudfoundry.org/clock v1.0.0/go.mod h1:QD9Lzhd/ux6eNQVUDVRJX/RKTigpewimNYBi7ivZKY8= +code.cloudfoundry.org/clock v1.2.0 h1:1swXS7yPmQmhAdkTb1nJ2c0geOdf4LvibUleNCo2HjA= +code.cloudfoundry.org/clock v1.2.0/go.mod h1:foDbmVp5RIuIGlota90ot4FkJtx5m4+oKoWiVuu2FDg= collectd.org v0.6.0 h1:wDTcB13Zork7m9bEHmU2sVL4z+hxBmm8EyeMjjxtW7s= collectd.org v0.6.0/go.mod h1:fXcRZb1qBKshIHJa2T8qBS7Xew/I43iMutefnTdGeYo= dario.cat/mergo v1.0.1 h1:Ra4+bf83h2ztPIQYNP99R6m+Y7KfnARDfID+a+vLl4s= @@ -658,8 +658,14 @@ github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.8.1 h1:1mvYtZfWQAnwNah/C+Z+J github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.8.1/go.mod h1:75I/mXtme1JyWFtz8GocPHVFyH421IBoZErnO16dd0k= github.com/Azure/azure-sdk-for-go/sdk/azidentity/cache v0.3.1 h1:Bk5uOhSAenHyR5P61D/NzeQCv+4fEVV8mOkJ82NqpWw= github.com/Azure/azure-sdk-for-go/sdk/azidentity/cache v0.3.1/go.mod h1:QZ4pw3or1WPmRBxf0cHd1tknzrT54WPBOQoGutCPvSU= +github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.2.0 h1:aJG+Jxd9/rrLwf8R1Ko0RlOBTJASs/lGQJ8b9AdlKTc= +github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.2.0/go.mod h1:41ONblJrPxDcnVr+voS+3xXWy/KnZLh+7zY5s6woAlQ= github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 h1:ywEEhmNahHBihViHepv3xPBn1663uRv2t2q/ESv9seY= github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0/go.mod h1:iZDifYGJTIgIIkYRNWPENUnqx6bJ2xnSDFI2tjwZNuY= +github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.2.3 h1:6bVZts/82H+hax9b3vdmSpi7+Hw9uWvEaJHeKlafnW4= +github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.2.3/go.mod h1:qf3s/6aV9ePKYGeEYPsbndK6GGfeS7SrbA6OE/T7NIA= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.2.0 h1:+dggnR89/BIIlRlQ6d19dkhhdd/mQUiQbXhyHUFiB4w= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.2.0/go.mod h1:tI9M2Q/ueFi287QRkdrhb9LHm6ZnXgkVYLRC3FhYkPw= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/internal/v2 v2.0.0 h1:PTFGRSlMKCQelWwxUyYVEUqseBJVemLyqWJjvMyt0do= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/internal/v2 v2.0.0/go.mod h1:LRr2FzBTQlONPPa5HREE5+RjSCTXl7BwOvYOaWTqCaI= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/managementgroups/armmanagementgroups v1.0.0 h1:pPvTJ1dY0sA35JOeFq6TsY2xj6Z85Yo23Pj4wCCvu4o= @@ -678,10 +684,12 @@ github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2 h1:YUUxeiOWgdAQE3pXt github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2/go.mod h1:dmXQgZuiSubAecswZE+Sm8jkvEa7kQgTPVRvwL/nd0E= github.com/Azure/azure-sdk-for-go/sdk/storage/azfile v1.2.2 h1:PmDhkIT8S5U4nkY/s78Xmf7CXT8qCliNEBhbrkBp3Q0= github.com/Azure/azure-sdk-for-go/sdk/storage/azfile v1.2.2/go.mod h1:Kj2pCkQ47klX1aAlDnlN/BUvwBiARqIJkc9iw1Up7q8= +github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue v1.0.0 h1:lJwNFV+xYjHREUTHJKx/ZF6CJSt9znxmLw9DqSTvyRU= +github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue v1.0.0/go.mod h1:GfT0aGew8Qj5yiQVqOO5v7N8fanbJGyUoHqXg56qcVY= github.com/Azure/azure-storage-queue-go v0.0.0-20230531184854-c06a8eff66fe h1:HGuouUM1533rBXmMtR7qh5pYNSSjUZG90b/MgJAnb/A= github.com/Azure/azure-storage-queue-go v0.0.0-20230531184854-c06a8eff66fe/go.mod h1:K6am8mT+5iFXgingS9LUc7TmbsW6XBw3nxaRyaMyWc8= -github.com/Azure/go-amqp v1.0.0 h1:QfCugi1M+4F2JDTRgVnRw7PYXLXZ9hmqk3+9+oJh3OA= -github.com/Azure/go-amqp v1.0.0/go.mod h1:+bg0x3ce5+Q3ahCEXnCsGG3ETpDQe3MEVnOuT2ywPwc= +github.com/Azure/go-amqp v1.0.5 h1:po5+ljlcNSU8xtapHTe8gIc8yHxCzC03E8afH2g1ftU= +github.com/Azure/go-amqp v1.0.5/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs= @@ -1353,6 +1361,7 @@ github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4= github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8= +github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -1706,6 +1715,9 @@ github.com/jmhodges/clock v1.2.0 h1:eq4kys+NI0PLngzaHEe7AmPT90XMGIEySD1JfV1PDIs= github.com/jmhodges/clock v1.2.0/go.mod h1:qKjhA7x7u/lQpPB1XAqX1b1lCI/w3/fNuYpI/ZjLynI= github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc= github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg= +github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= @@ -2338,8 +2350,10 @@ github.com/t3rm1n4l/go-mega v0.0.0-20240219080617-d494b6a8ace7/go.mod h1:suDIky6 github.com/tbrandon/mbserver v0.0.0-20170611213546-993e1772cc62 h1:Oj2e7Sae4XrOsk3ij21QjjEgAcVSeo9nkp0dI//cD2o= github.com/tbrandon/mbserver v0.0.0-20170611213546-993e1772cc62/go.mod h1:qUzPVlSj2UgxJkVbH0ZwuuiR46U8RBMDT5KLY78Ifpw= github.com/tedsuo/ifrit v0.0.0-20180802180643-bea94bb476cc/go.mod h1:eyZnKCc955uh98WQvzOm0dgAeLnf2O0Rz0LPoC5ze+0= -github.com/testcontainers/testcontainers-go v0.34.0 h1:5fbgF0vIN5u+nD3IWabQwRybuB4GY8G2HHgCkbMzMHo= -github.com/testcontainers/testcontainers-go v0.34.0/go.mod h1:6P/kMkQe8yqPHfPWNulFGdFHTD8HB2vLq/231xY2iPQ= +github.com/testcontainers/testcontainers-go v0.35.0 h1:uADsZpTKFAtp8SLK+hMwSaa+X+JiERHtd4sQAFmXeMo= +github.com/testcontainers/testcontainers-go v0.35.0/go.mod h1:oEVBj5zrfJTrgjwONs1SsRbnBtH9OKl+IGl3UMcr2B4= +github.com/testcontainers/testcontainers-go/modules/azurite v0.35.0 h1:gUZ25e1DVE/0+ZZ0nupsIo+C1j7UNloN7Pkg3w6tceI= +github.com/testcontainers/testcontainers-go/modules/azurite v0.35.0/go.mod h1:2Fc67EpyOEexLAF99zhSuzu9H22zd83pkjxEHHTtHf4= github.com/testcontainers/testcontainers-go/modules/kafka v0.34.0 h1:LrMlsBH+nKJ2c6M7rOjbi7UivgofgAQo+LAwsWttR+Q= github.com/testcontainers/testcontainers-go/modules/kafka v0.34.0/go.mod h1:4BIbeoKY/ZAf86MvWT5xJW5TvxbCPg67I5rBvwFsx4A= github.com/thomasklein94/packer-plugin-libvirt v0.5.0 h1:aj2HLHZZM/ClGLIwVp9rrgh+2TOU/w4EiaZHAwCpOgs= @@ -3508,6 +3522,8 @@ modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= modernc.org/z v1.5.1/go.mod h1:eWFB510QWW5Th9YGZT81s+LwvaAs3Q2yr4sP0rmLkv8= moul.io/http2curl/v2 v2.3.0 h1:9r3JfDzWPcbIklMOs2TnIFzDYvfAZvjeavG6EzP7jYs= moul.io/http2curl/v2 v2.3.0/go.mod h1:RW4hyBjTWSYDOxapodpNEtX0g5Eb16sxklBqmd2RHcE= +nhooyr.io/websocket v1.8.11 h1:f/qXNc2/3DpoSZkHt1DQu6rj4zGC8JmkkLkWss0MgN0= +nhooyr.io/websocket v1.8.11/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c= pgregory.net/rapid v1.1.0 h1:CMa0sjHSru3puNx+J0MIAuiiEV4N0qj8/cMWGBBCsjw= pgregory.net/rapid v1.1.0/go.mod h1:PY5XlDGj0+V1FCq0o192FdRhpKHGTRIWBgqjDBTrq04= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= diff --git a/plugins/outputs/event_hubs/README.md b/plugins/outputs/event_hubs/README.md index f1922278aa9fa..c63d74220612f 100644 --- a/plugins/outputs/event_hubs/README.md +++ b/plugins/outputs/event_hubs/README.md @@ -41,10 +41,11 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. # partition_key = "" ## Set the maximum batch message size in bytes - ## The allowable size depends on the Event Hub tier - ## See: https://learn.microsoft.com/azure/event-hubs/event-hubs-quotas#basic-vs-standard-vs-premium-vs-dedicated-tiers - ## Setting this to 0 means using the default size from the Azure Event Hubs Client library (1000000 bytes) - # max_message_size = 1000000 + ## The allowable size depends on the Event Hub tier, see + ## https://learn.microsoft.com/azure/event-hubs/event-hubs-quotas#basic-vs-standard-vs-premium-vs-dedicated-tiers + ## for details. If unset the default size defined by Azure Event Hubs is + ## used (currently 1,000,000 bytes) + # max_message_size = "1MB" ## Data format to output. ## Each data format has its own unique set of configuration options, read diff --git a/plugins/outputs/event_hubs/event_hubs.go b/plugins/outputs/event_hubs/event_hubs.go index 7a0c01e6f717f..efb72393167d2 100644 --- a/plugins/outputs/event_hubs/event_hubs.go +++ b/plugins/outputs/event_hubs/event_hubs.go @@ -4,89 +4,54 @@ package event_hubs import ( "context" _ "embed" + "errors" + "fmt" "time" - eventhub "github.com/Azure/azure-event-hubs-go/v3" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/outputs" ) //go:embed sample.conf var sampleConfig string -/* -** Wrapper interface for eventhub.Hub - */ - -type EventHubInterface interface { - GetHub(s string) error - Close(ctx context.Context) error - SendBatch(ctx context.Context, iterator eventhub.BatchIterator, opts ...eventhub.BatchOption) error -} - -type eventHub struct { - hub *eventhub.Hub -} - -func (eh *eventHub) GetHub(s string) error { - hub, err := eventhub.NewHubFromConnectionString(s) - - if err != nil { - return err - } - - eh.hub = hub - - return nil -} - -func (eh *eventHub) Close(ctx context.Context) error { - return eh.hub.Close(ctx) -} - -func (eh *eventHub) SendBatch(ctx context.Context, iterator eventhub.BatchIterator, opts ...eventhub.BatchOption) error { - return eh.hub.SendBatch(ctx, iterator, opts...) -} - -/* End wrapper interface */ - type EventHubs struct { - Log telegraf.Logger `toml:"-"` ConnectionString string `toml:"connection_string"` - Timeout config.Duration `toml:"timeout"` PartitionKey string `toml:"partition_key"` - MaxMessageSize int `toml:"max_message_size"` + MaxMessageSize config.Size `toml:"max_message_size"` + Timeout config.Duration `toml:"timeout"` + Log telegraf.Logger `toml:"-"` - Hub EventHubInterface - batchOptions []eventhub.BatchOption - serializer telegraf.Serializer + client *azeventhubs.ProducerClient + options azeventhubs.EventDataBatchOptions + serializer telegraf.Serializer } -const ( - defaultRequestTimeout = time.Second * 30 -) - func (*EventHubs) SampleConfig() string { return sampleConfig } func (e *EventHubs) Init() error { - err := e.Hub.GetHub(e.ConnectionString) - - if err != nil { - return err - } - if e.MaxMessageSize > 0 { - e.batchOptions = append(e.batchOptions, eventhub.BatchWithMaxSizeInBytes(e.MaxMessageSize)) + e.options.MaxBytes = uint64(e.MaxMessageSize) } return nil } -func (*EventHubs) Connect() error { +func (e *EventHubs) Connect() error { + cfg := &azeventhubs.ProducerClientOptions{ApplicationID: internal.FormatFullVersion()} + + client, err := azeventhubs.NewProducerClientFromConnectionString(e.ConnectionString, "", cfg) + if err != nil { + return fmt.Errorf("failed to create client: %w", err) + } + e.client = client + return nil } @@ -94,13 +59,7 @@ func (e *EventHubs) Close() error { ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout)) defer cancel() - err := e.Hub.Close(ctx) - - if err != nil { - return err - } - - return nil + return e.client.Close(ctx) } func (e *EventHubs) SetSerializer(serializer telegraf.Serializer) { @@ -108,46 +67,90 @@ func (e *EventHubs) SetSerializer(serializer telegraf.Serializer) { } func (e *EventHubs) Write(metrics []telegraf.Metric) error { - events := make([]*eventhub.Event, 0, len(metrics)) - for _, metric := range metrics { - payload, err := e.serializer.Serialize(metric) + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout)) + defer cancel() + batchOptions := e.options + batches := make(map[string]*azeventhubs.EventDataBatch) + for i := 0; i < len(metrics); i++ { + m := metrics[i] + + // Prepare the payload + payload, err := e.serializer.Serialize(m) if err != nil { - e.Log.Debugf("Could not serialize metric: %v", err) + e.Log.Errorf("Could not serialize metric: %v", err) + e.Log.Tracef("metric: %+v", m) continue } - event := eventhub.NewEvent(payload) + // Get the batcher for the chosen partition + partition := "" + batchOptions.PartitionKey = nil if e.PartitionKey != "" { - if key, ok := metric.GetTag(e.PartitionKey); ok { - event.PartitionKey = &key - } else if key, ok := metric.GetField(e.PartitionKey); ok { - if strKey, ok := key.(string); ok { - event.PartitionKey = &strKey + if key, ok := m.GetTag(e.PartitionKey); ok { + partition = key + batchOptions.PartitionKey = &partition + } else if key, ok := m.GetField(e.PartitionKey); ok { + if k, ok := key.(string); ok { + partition = k + batchOptions.PartitionKey = &partition } } } + if _, found := batches[partition]; !found { + batches[partition], err = e.client.NewEventDataBatch(ctx, &batchOptions) + if err != nil { + return fmt.Errorf("creating batch for partition %q failed: %w", partition, err) + } + } - events = append(events, event) - } + // Add the event to the partition and send it if the batch is full + err = batches[partition].AddEventData(&azeventhubs.EventData{Body: payload}, nil) + if err == nil { + continue + } - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout)) - defer cancel() + // If the event doesn't fit into the batch anymore, send the batch + if !errors.Is(err, azeventhubs.ErrEventDataTooLarge) { + return fmt.Errorf("adding metric to batch for partition %q failed: %w", partition, err) + } - err := e.Hub.SendBatch(ctx, eventhub.NewEventBatchIterator(events...), e.batchOptions...) + // The event is larger than the maximum allowed size so there + // is nothing we can do here but have to drop the metric. + if batches[partition].NumEvents() == 0 { + e.Log.Errorf("Metric with %d bytes exceeds the maximum allowed size and must be dropped!", len(payload)) + e.Log.Tracef("metric: %+v", m) + continue + } + if err := e.client.SendEventDataBatch(ctx, batches[partition], nil); err != nil { + return fmt.Errorf("sending batch for partition %q failed: %w", partition, err) + } - if err != nil { - return err + // Create a new metric and reiterate over the current metric to be + // added in the next iteration of the for loop. + batches[partition], err = e.client.NewEventDataBatch(ctx, &e.options) + if err != nil { + return fmt.Errorf("creating batch for partition %q failed: %w", partition, err) + } + i-- } + // Send the remaining batches that never exceeded the batch size + for partition, batch := range batches { + if batch.NumBytes() == 0 { + continue + } + if err := e.client.SendEventDataBatch(ctx, batch, nil); err != nil { + return fmt.Errorf("sending batch for partition %q failed: %w", partition, err) + } + } return nil } func init() { outputs.Add("event_hubs", func() telegraf.Output { return &EventHubs{ - Hub: &eventHub{}, - Timeout: config.Duration(defaultRequestTimeout), + Timeout: config.Duration(30 * time.Second), } }) } diff --git a/plugins/outputs/event_hubs/event_hubs_test.go b/plugins/outputs/event_hubs/event_hubs_test.go index eb65c7ed9642f..1bafd44916199 100644 --- a/plugins/outputs/event_hubs/event_hubs_test.go +++ b/plugins/outputs/event_hubs/event_hubs_test.go @@ -2,161 +2,148 @@ package event_hubs import ( "context" - "fmt" - "math/rand" + "log" "os" + "path/filepath" "testing" "time" - eventhub "github.com/Azure/azure-event-hubs-go/v3" - "github.com/stretchr/testify/mock" + "github.com/docker/docker/api/types/container" + "github.com/docker/go-connections/nat" "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/modules/azurite" + "github.com/testcontainers/testcontainers-go/wait" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/serializers/json" "github.com/influxdata/telegraf/testutil" ) -/* -** Wrapper interface mock for eventhub.Hub - */ - -type mockEventHub struct { - mock.Mock -} - -func (eh *mockEventHub) GetHub(s string) error { - args := eh.Called(s) - return args.Error(0) -} - -func (eh *mockEventHub) Close(ctx context.Context) error { - args := eh.Called(ctx) - return args.Error(0) -} - -func (eh *mockEventHub) SendBatch(ctx context.Context, iterator eventhub.BatchIterator, opts ...eventhub.BatchOption) error { - args := eh.Called(ctx, iterator, opts) - return args.Error(0) -} - -/* End wrapper interface */ - -func TestInitAndWrite(t *testing.T) { - serializer := &json.Serializer{} - require.NoError(t, serializer.Init()) - - mockHub := &mockEventHub{} - e := &EventHubs{ - Hub: mockHub, - ConnectionString: "mock", - Timeout: config.Duration(time.Second * 5), - MaxMessageSize: 1000000, - serializer: serializer, - } - - mockHub.On("GetHub", mock.Anything).Return(nil).Once() - require.NoError(t, e.Init()) - mockHub.AssertExpectations(t) - - metrics := testutil.MockMetrics() - - mockHub.On("SendBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() - require.NoError(t, e.Write(metrics)) - mockHub.AssertExpectations(t) -} - -/* -** Integration test (requires an Event Hubs instance) - */ - -func TestInitAndWriteIntegration(t *testing.T) { +func TestEmulatorIntegration(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } - if os.Getenv("EVENTHUB_CONNECTION_STRING") == "" { - t.Skip("Missing environment variable EVENTHUB_CONNECTION_STRING") + // Require the developers to explicitly accept the EULA of the emulator + if os.Getenv("AZURE_EVENT_HUBS_EMULATOR_ACCEPT_EULA") != "yes" { + t.Skip(` + Skipping due to unexcepted EULA. To run this test, please check the EULA of the emulator + at https://github.com/Azure/azure-event-hubs-emulator-installer/blob/main/EMULATOR_EULA.md + and accept it by setting the environment variable AZURE_EVENT_HUBS_EMULATOR_ACCEPT_EULA + to 'yes'. + `) } - ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) - defer cancel() + // Setup the Azure Event Hub emulator environment + // See https://learn.microsoft.com/en-us/azure/event-hubs/test-locally-with-event-hub-emulator + ctx := context.Background() + azuriteContainer, err := azurite.Run(ctx, "mcr.microsoft.com/azure-storage/azurite:3.28.0") + require.NoError(t, err, "failed to start Azurite container") + defer func() { + if err := testcontainers.TerminateContainer(azuriteContainer); err != nil { + log.Printf("failed to terminate container: %s", err) + } + }() - // Create a new, empty Event Hub - // NB: for this to work, the connection string needs to grant "Manage" permissions on the root namespace - mHub, err := eventhub.NewHubManagerFromConnectionString(os.Getenv("EVENTHUB_CONNECTION_STRING")) + blobPort, err := azuriteContainer.MappedPort(ctx, azurite.BlobPort) require.NoError(t, err) - r := rand.New(rand.NewSource(time.Now().UnixNano())) - name := fmt.Sprintf("testmetrics%05d", r.Intn(10000)) - - entity, err := mHub.Put(ctx, name, eventhub.HubWithPartitionCount(1)) + metadataPort, err := azuriteContainer.MappedPort(ctx, azurite.TablePort) require.NoError(t, err) - // Delete the test hub - defer func() { - err := mHub.Delete(ctx, entity.Name) - require.NoError(t, err) - }() + cfgfile, err := filepath.Abs(filepath.Join("testdata", "Config.json")) + require.NoError(t, err, "getting absolute path for config") + emulator := testutil.Container{ + Image: "mcr.microsoft.com/azure-messaging/eventhubs-emulator:latest", + Env: map[string]string{ + "BLOB_SERVER": "host.docker.internal:" + blobPort.Port(), + "METADATA_SERVER": "host.docker.internal:" + metadataPort.Port(), + "ACCEPT_EULA": "Y", + }, + Files: map[string]string{ + "/Eventhubs_Emulator/ConfigFiles/Config.json": cfgfile, + }, + HostAccessPorts: []int{blobPort.Int(), metadataPort.Int()}, + HostConfigModifier: func(hc *container.HostConfig) { + hc.ExtraHosts = append(hc.ExtraHosts, "host.docker.internal:host-gateway") + }, + ExposedPorts: []string{"5672"}, + WaitingFor: wait.ForAll( + wait.ForListeningPort(nat.Port("5672")), + ), + } + require.NoError(t, emulator.Start(), "failed to start Azure Event Hub emulator container") + defer emulator.Terminate() - testHubCS := os.Getenv("EVENTHUB_CONNECTION_STRING") + ";EntityPath=" + entity.Name + conn := "Endpoint=sb://" + emulator.Address + ":" + emulator.Ports["5672"] + ";" + conn += "SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;EntityPath=test" - // Configure the plugin to target the newly created hub + // Setup plugin and connect serializer := &json.Serializer{} require.NoError(t, serializer.Init()) - e := &EventHubs{ - Hub: &eventHub{}, - ConnectionString: testHubCS, - Timeout: config.Duration(time.Second * 5), - serializer: serializer, - } - - // Verify that we can connect to Event Hubs - require.NoError(t, e.Init()) - - // Verify that we can successfully write data to Event Hubs - metrics := testutil.MockMetrics() - require.NoError(t, e.Write(metrics)) - /* - ** Verify we can read data back from the test hub - */ - - exit := make(chan string) - - // Create a hub client for receiving - hub, err := eventhub.NewHubFromConnectionString(testHubCS) - require.NoError(t, err) - - // The handler function will pass received messages via the channel - handler := func(_ context.Context, event *eventhub.Event) error { - exit <- string(event.Data) - return nil - } - - // Set up the receivers - runtimeInfo, err := hub.GetRuntimeInformation(ctx) - require.NoError(t, err) - - for _, partitionID := range runtimeInfo.PartitionIDs { - _, err := hub.Receive(ctx, partitionID, handler, eventhub.ReceiveWithStartingOffset("-1")) - require.NoError(t, err) + plugin := &EventHubs{ + ConnectionString: conn, + Timeout: config.Duration(60 * time.Second), } - - // Wait to receive the same number of messages sent, with timeout - received := 0 -wait: - for _, metric := range metrics { - select { - case m := <-exit: - t.Logf("Received for %s: %s", metric.Name(), m) - received = received + 1 - case <-time.After(10 * time.Second): - t.Logf("Timeout") - break wait - } + plugin.SetSerializer(serializer) + require.NoError(t, plugin.Init()) + require.NoError(t, plugin.Connect()) + defer plugin.Close() + + input := []telegraf.Metric{ + metric.New( + "test", + map[string]string{ + "source": "foo", + "division": "A", + "type": "temperature", + }, + map[string]interface{}{ + "value": 23, + }, + time.Unix(0, 0), + ), + metric.New( + "test", + map[string]string{ + "source": "foo", + "division": "A", + "type": "humidity", + }, + map[string]interface{}{ + "value": 59, + }, + time.Unix(0, 0), + ), + metric.New( + "test", + map[string]string{ + "source": "bar", + "division": "B", + "type": "temperature", + }, + map[string]interface{}{ + "value": 42, + }, + time.Unix(0, 0), + ), + metric.New( + "test", + map[string]string{ + "source": "bar", + "division": "B", + "type": "humidity", + }, + map[string]interface{}{ + "value": 87, + }, + time.Unix(0, 0), + ), } - // Make sure received == sent - require.Len(t, metrics, received) + require.NoError(t, plugin.Write(input)) } diff --git a/plugins/outputs/event_hubs/sample.conf b/plugins/outputs/event_hubs/sample.conf index 426500ea7634e..34ef79443db32 100644 --- a/plugins/outputs/event_hubs/sample.conf +++ b/plugins/outputs/event_hubs/sample.conf @@ -14,10 +14,11 @@ # partition_key = "" ## Set the maximum batch message size in bytes - ## The allowable size depends on the Event Hub tier - ## See: https://learn.microsoft.com/azure/event-hubs/event-hubs-quotas#basic-vs-standard-vs-premium-vs-dedicated-tiers - ## Setting this to 0 means using the default size from the Azure Event Hubs Client library (1000000 bytes) - # max_message_size = 1000000 + ## The allowable size depends on the Event Hub tier, see + ## https://learn.microsoft.com/azure/event-hubs/event-hubs-quotas#basic-vs-standard-vs-premium-vs-dedicated-tiers + ## for details. If unset the default size defined by Azure Event Hubs is + ## used (currently 1,000,000 bytes) + # max_message_size = "1MB" ## Data format to output. ## Each data format has its own unique set of configuration options, read diff --git a/plugins/outputs/event_hubs/testdata/Config.json b/plugins/outputs/event_hubs/testdata/Config.json new file mode 100644 index 0000000000000..c698550cc7a6d --- /dev/null +++ b/plugins/outputs/event_hubs/testdata/Config.json @@ -0,0 +1,24 @@ +{ + "UserConfig": { + "NamespaceConfig": [ + { + "Type": "EventHub", + "Name": "emulatorNs1", + "Entities": [ + { + "Name": "test", + "PartitionCount": 2, + "ConsumerGroups": [ + { + "Name": "cg1" + } + ] + } + ] + } + ], + "LoggingConfig": { + "Type": "Console" + } + } +} diff --git a/testutil/container.go b/testutil/container.go index 0a33ed1cb4c3a..ccba5a1bd018f 100644 --- a/testutil/container.go +++ b/testutil/container.go @@ -27,6 +27,7 @@ type Container struct { Entrypoint []string Env map[string]string Files map[string]string + HostAccessPorts []int HostConfigModifier func(*container.HostConfig) ExposedPorts []string Cmd []string @@ -62,6 +63,7 @@ func (c *Container) Start() error { Env: c.Env, ExposedPorts: c.ExposedPorts, Files: files, + HostAccessPorts: c.HostAccessPorts, HostConfigModifier: c.HostConfigModifier, Cmd: c.Cmd, Image: c.Image, From 252ccb714025a48e3c351d9e073e722126331d57 Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Wed, 5 Feb 2025 11:31:37 +0100 Subject: [PATCH 2/5] Add license information for new library --- docs/LICENSE_OF_DEPENDENCIES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index 505d23e926967..382a30531f298 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -18,6 +18,7 @@ following works: - github.com/Azure/azure-sdk-for-go/sdk/azcore [MIT License](https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/azcore/LICENSE.txt) - github.com/Azure/azure-sdk-for-go/sdk/azidentity [MIT License](https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/azidentity/LICENSE.txt) - github.com/Azure/azure-sdk-for-go/sdk/internal [MIT License](https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/internal/LICENSE.txt) +- github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs [MIT License](https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs/LICENSE.txt) - github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor [MIT License](https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/resourcemanager/monitor/armmonitor/LICENSE.txt) - github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources [MIT License](https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/resourcemanager/resources/armresources/LICENSE.txt) - github.com/Azure/azure-sdk-for-go/sdk/storage/azblob [MIT License](https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/storage/azblob/LICENSE.txt) From b82ef08e7f4975e238ef30f072deb62f5d3c80c6 Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Wed, 5 Feb 2025 11:31:59 +0100 Subject: [PATCH 3/5] Accept emulator EULA for CircleCI runs --- .circleci/config.yml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 7ce248d728758..a90c8df9ce4cf 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -166,7 +166,11 @@ jobs: - check-changed-files-or-halt - run: 'sh ./scripts/installgo_linux.sh' - run: 'make deps' - - run: 'make test-integration' + - run: + name: "Run integration tests" + command: make test-integration + environment: + AZURE_EVENT_HUBS_EMULATOR_ACCEPT_EULA: yes test-go-mac: executor: mac steps: From 2a0357242b91dfa00e576c0c60bbcb3db0a2dba2 Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Wed, 5 Feb 2025 16:35:27 +0100 Subject: [PATCH 4/5] Add unit-test for reconnecting --- plugins/outputs/event_hubs/README.md | 12 +- plugins/outputs/event_hubs/event_hubs.go | 19 ++- plugins/outputs/event_hubs/event_hubs_test.go | 151 +++++++++++++++++- plugins/outputs/event_hubs/sample.conf | 12 +- 4 files changed, 174 insertions(+), 20 deletions(-) diff --git a/plugins/outputs/event_hubs/README.md b/plugins/outputs/event_hubs/README.md index c63d74220612f..b689b592a7b2e 100644 --- a/plugins/outputs/event_hubs/README.md +++ b/plugins/outputs/event_hubs/README.md @@ -27,14 +27,11 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ```toml @sample.conf # Configuration for Event Hubs output plugin [[outputs.event_hubs]] - ## The full connection string to the Event Hub (required) - ## The shared access key must have "Send" permissions on the target Event Hub. + ## Full connection string to the Event Hub instance. The shared access key + ## must have "Send" permissions on the target Event Hub. connection_string = "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName" - ## Client timeout (defaults to 30s) - # timeout = "30s" - - ## Partition key + ## Partition key to use for the event ## Metric tag or field name to use for the event partition key. The value of ## this tag or field is set as the key for events if it exists. If both, tag ## and field, exist the tag is preferred. @@ -47,6 +44,9 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## used (currently 1,000,000 bytes) # max_message_size = "1MB" + ## Timeout for sending the data + # timeout = "30s" + ## Data format to output. ## Each data format has its own unique set of configuration options, read ## more about them here: diff --git a/plugins/outputs/event_hubs/event_hubs.go b/plugins/outputs/event_hubs/event_hubs.go index efb72393167d2..73aae33de86b7 100644 --- a/plugins/outputs/event_hubs/event_hubs.go +++ b/plugins/outputs/event_hubs/event_hubs.go @@ -44,7 +44,10 @@ func (e *EventHubs) Init() error { } func (e *EventHubs) Connect() error { - cfg := &azeventhubs.ProducerClientOptions{ApplicationID: internal.FormatFullVersion()} + cfg := &azeventhubs.ProducerClientOptions{ + ApplicationID: internal.FormatFullVersion(), + RetryOptions: azeventhubs.RetryOptions{MaxRetries: -1}, + } client, err := azeventhubs.NewProducerClientFromConnectionString(e.ConnectionString, "", cfg) if err != nil { @@ -67,8 +70,7 @@ func (e *EventHubs) SetSerializer(serializer telegraf.Serializer) { } func (e *EventHubs) Write(metrics []telegraf.Metric) error { - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout)) - defer cancel() + ctx := context.Background() batchOptions := e.options batches := make(map[string]*azeventhubs.EventDataBatch) @@ -122,7 +124,7 @@ func (e *EventHubs) Write(metrics []telegraf.Metric) error { e.Log.Tracef("metric: %+v", m) continue } - if err := e.client.SendEventDataBatch(ctx, batches[partition], nil); err != nil { + if err := e.send(batches[partition]); err != nil { return fmt.Errorf("sending batch for partition %q failed: %w", partition, err) } @@ -140,13 +142,20 @@ func (e *EventHubs) Write(metrics []telegraf.Metric) error { if batch.NumBytes() == 0 { continue } - if err := e.client.SendEventDataBatch(ctx, batch, nil); err != nil { + if err := e.send(batch); err != nil { return fmt.Errorf("sending batch for partition %q failed: %w", partition, err) } } return nil } +func (e *EventHubs) send(batch *azeventhubs.EventDataBatch) error { + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout)) + defer cancel() + + return e.client.SendEventDataBatch(ctx, batch, nil) +} + func init() { outputs.Add("event_hubs", func() telegraf.Output { return &EventHubs{ diff --git a/plugins/outputs/event_hubs/event_hubs_test.go b/plugins/outputs/event_hubs/event_hubs_test.go index 1bafd44916199..9a2d3f5563c9c 100644 --- a/plugins/outputs/event_hubs/event_hubs_test.go +++ b/plugins/outputs/event_hubs/event_hubs_test.go @@ -71,9 +71,137 @@ func TestEmulatorIntegration(t *testing.T) { hc.ExtraHosts = append(hc.ExtraHosts, "host.docker.internal:host-gateway") }, ExposedPorts: []string{"5672"}, - WaitingFor: wait.ForAll( - wait.ForListeningPort(nat.Port("5672")), + WaitingFor: wait.ForListeningPort(nat.Port("5672")), + } + require.NoError(t, emulator.Start(), "failed to start Azure Event Hub emulator container") + defer emulator.Terminate() + + conn := "Endpoint=sb://" + emulator.Address + ":" + emulator.Ports["5672"] + ";" + conn += "SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;EntityPath=test" + + // Setup plugin and connect + serializer := &json.Serializer{} + require.NoError(t, serializer.Init()) + + plugin := &EventHubs{ + ConnectionString: conn, + Timeout: config.Duration(3 * time.Second), + Log: testutil.Logger{}, + } + plugin.SetSerializer(serializer) + require.NoError(t, plugin.Init()) + require.NoError(t, plugin.Connect()) + defer plugin.Close() + + // Make sure we are connected + require.Eventually(t, func() bool { + return plugin.Write(testutil.MockMetrics()) == nil + }, 3*time.Second, 500*time.Millisecond) + + input := []telegraf.Metric{ + metric.New( + "test", + map[string]string{ + "source": "foo", + "division": "A", + "type": "temperature", + }, + map[string]interface{}{ + "value": 23, + }, + time.Unix(0, 0), ), + metric.New( + "test", + map[string]string{ + "source": "foo", + "division": "A", + "type": "humidity", + }, + map[string]interface{}{ + "value": 59, + }, + time.Unix(0, 0), + ), + metric.New( + "test", + map[string]string{ + "source": "bar", + "division": "B", + "type": "temperature", + }, + map[string]interface{}{ + "value": 42, + }, + time.Unix(0, 0), + ), + metric.New( + "test", + map[string]string{ + "source": "bar", + "division": "B", + "type": "humidity", + }, + map[string]interface{}{ + "value": 87, + }, + time.Unix(0, 0), + ), + } + + require.NoError(t, plugin.Write(input)) +} + +func TestReconnectIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + // Require the developers to explicitly accept the EULA of the emulator + if os.Getenv("AZURE_EVENT_HUBS_EMULATOR_ACCEPT_EULA") != "yes" { + t.Skip(` + Skipping due to unexcepted EULA. To run this test, please check the EULA of the emulator + at https://github.com/Azure/azure-event-hubs-emulator-installer/blob/main/EMULATOR_EULA.md + and accept it by setting the environment variable AZURE_EVENT_HUBS_EMULATOR_ACCEPT_EULA + to 'yes'. + `) + } + + // Setup the Azure Event Hub emulator environment + // See https://learn.microsoft.com/en-us/azure/event-hubs/test-locally-with-event-hub-emulator + ctx := context.Background() + azuriteContainer, err := azurite.Run(ctx, "mcr.microsoft.com/azure-storage/azurite:3.28.0") + require.NoError(t, err, "failed to start Azurite container") + defer func() { + if err := testcontainers.TerminateContainer(azuriteContainer); err != nil { + log.Printf("failed to terminate container: %s", err) + } + }() + + blobPort, err := azuriteContainer.MappedPort(ctx, azurite.BlobPort) + require.NoError(t, err) + + metadataPort, err := azuriteContainer.MappedPort(ctx, azurite.TablePort) + require.NoError(t, err) + + cfgfile, err := filepath.Abs(filepath.Join("testdata", "Config.json")) + require.NoError(t, err, "getting absolute path for config") + emulator := testutil.Container{ + Image: "mcr.microsoft.com/azure-messaging/eventhubs-emulator:latest", + Env: map[string]string{ + "BLOB_SERVER": "host.docker.internal:" + blobPort.Port(), + "METADATA_SERVER": "host.docker.internal:" + metadataPort.Port(), + "ACCEPT_EULA": "Y", + }, + Files: map[string]string{ + "/Eventhubs_Emulator/ConfigFiles/Config.json": cfgfile, + }, + HostAccessPorts: []int{blobPort.Int(), metadataPort.Int()}, + HostConfigModifier: func(hc *container.HostConfig) { + hc.ExtraHosts = append(hc.ExtraHosts, "host.docker.internal:host-gateway") + }, + ExposedPorts: []string{"5672"}, + WaitingFor: wait.ForListeningPort(nat.Port("5672")), } require.NoError(t, emulator.Start(), "failed to start Azure Event Hub emulator container") defer emulator.Terminate() @@ -87,13 +215,19 @@ func TestEmulatorIntegration(t *testing.T) { plugin := &EventHubs{ ConnectionString: conn, - Timeout: config.Duration(60 * time.Second), + Timeout: config.Duration(3 * time.Second), + Log: testutil.Logger{}, } plugin.SetSerializer(serializer) require.NoError(t, plugin.Init()) require.NoError(t, plugin.Connect()) defer plugin.Close() + // Make sure we are connected + require.Eventually(t, func() bool { + return plugin.Write(testutil.MockMetrics()) == nil + }, 3*time.Second, 500*time.Millisecond) + input := []telegraf.Metric{ metric.New( "test", @@ -145,5 +279,16 @@ func TestEmulatorIntegration(t *testing.T) { ), } + // This write should succeed as we should be able to connect to the + // container + require.NoError(t, plugin.Write(input)) + + // Pause the container to simulate connection loss. Subsequent writes + // should fail until the container is resumed + require.NoError(t, emulator.Pause()) + require.ErrorIs(t, plugin.Write(input), context.DeadlineExceeded) + + // Resume the container to check if the plugin reconnects + require.NoError(t, emulator.Resume()) require.NoError(t, plugin.Write(input)) } diff --git a/plugins/outputs/event_hubs/sample.conf b/plugins/outputs/event_hubs/sample.conf index 34ef79443db32..3717652a4e8c3 100644 --- a/plugins/outputs/event_hubs/sample.conf +++ b/plugins/outputs/event_hubs/sample.conf @@ -1,13 +1,10 @@ # Configuration for Event Hubs output plugin [[outputs.event_hubs]] - ## The full connection string to the Event Hub (required) - ## The shared access key must have "Send" permissions on the target Event Hub. + ## Full connection string to the Event Hub instance. The shared access key + ## must have "Send" permissions on the target Event Hub. connection_string = "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName" - ## Client timeout (defaults to 30s) - # timeout = "30s" - - ## Partition key + ## Partition key to use for the event ## Metric tag or field name to use for the event partition key. The value of ## this tag or field is set as the key for events if it exists. If both, tag ## and field, exist the tag is preferred. @@ -20,6 +17,9 @@ ## used (currently 1,000,000 bytes) # max_message_size = "1MB" + ## Timeout for sending the data + # timeout = "30s" + ## Data format to output. ## Each data format has its own unique set of configuration options, read ## more about them here: From bd96edbe5ec4e5c9e890c285e24da2fe8366ca33 Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Wed, 5 Feb 2025 16:38:15 +0100 Subject: [PATCH 5/5] Make tidy --- go.sum | 3 --- 1 file changed, 3 deletions(-) diff --git a/go.sum b/go.sum index fa3aa71e579e7..5385d18066e03 100644 --- a/go.sum +++ b/go.sum @@ -1713,9 +1713,6 @@ github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGw github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/jmhodges/clock v1.2.0 h1:eq4kys+NI0PLngzaHEe7AmPT90XMGIEySD1JfV1PDIs= github.com/jmhodges/clock v1.2.0/go.mod h1:qKjhA7x7u/lQpPB1XAqX1b1lCI/w3/fNuYpI/ZjLynI= -github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc= -github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg= -github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=