diff --git a/exporter/exporterhelper/exporterhelperprofiles/Makefile b/exporter/exporterhelper/exporterhelperprofiles/Makefile new file mode 100644 index 00000000000..bdd863a203b --- /dev/null +++ b/exporter/exporterhelper/exporterhelperprofiles/Makefile @@ -0,0 +1 @@ +include ../../../Makefile.Common diff --git a/exporter/exporterhelper/exporterhelperprofiles/constants.go b/exporter/exporterhelper/exporterhelperprofiles/constants.go new file mode 100644 index 00000000000..528f40eacaf --- /dev/null +++ b/exporter/exporterhelper/exporterhelperprofiles/constants.go @@ -0,0 +1,19 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelperprofiles // import "go.opentelemetry.io/collector/exporter/exporterhelper/exporterhelperprofiles" + +import ( + "errors" +) + +var ( + // errNilConfig is returned when an empty name is given. + errNilConfig = errors.New("nil config") + // errNilLogger is returned when a logger is nil + errNilLogger = errors.New("nil logger") + // errNilPushProfileData is returned when a nil PushProfiles is given. + errNilPushProfileData = errors.New("nil PushProfiles") + // errNilProfilesConverter is returned when a nil RequestFromProfilesFunc is given. + errNilProfilesConverter = errors.New("nil RequestFromProfilesFunc") +) diff --git a/exporter/exporterhelper/exporterhelperprofiles/go.mod b/exporter/exporterhelper/exporterhelperprofiles/go.mod new file mode 100644 index 00000000000..b0616068289 --- /dev/null +++ b/exporter/exporterhelper/exporterhelperprofiles/go.mod @@ -0,0 +1,88 @@ +module go.opentelemetry.io/collector/exporter/exporterhelper/exporterhelperprofiles + +go 1.22.0 + +require ( + github.com/stretchr/testify v1.9.0 + go.opentelemetry.io/collector/component v0.111.0 + go.opentelemetry.io/collector/config/configretry v1.17.0 + go.opentelemetry.io/collector/consumer v0.111.0 + go.opentelemetry.io/collector/consumer/consumererror/consumererrorprofiles v0.0.0-00010101000000-000000000000 + go.opentelemetry.io/collector/consumer/consumerprofiles v0.111.0 + go.opentelemetry.io/collector/consumer/consumertest v0.111.0 + go.opentelemetry.io/collector/exporter v0.111.0 + go.opentelemetry.io/collector/exporter/exporterprofiles v0.111.0 + go.opentelemetry.io/collector/pdata v1.17.0 + go.opentelemetry.io/collector/pdata/pprofile v0.111.0 + go.opentelemetry.io/collector/pdata/testdata v0.111.0 + go.opentelemetry.io/collector/pipeline/pipelineprofiles v0.0.0-00010101000000-000000000000 + go.opentelemetry.io/otel v1.31.0 + go.opentelemetry.io/otel/sdk v1.31.0 + go.opentelemetry.io/otel/trace v1.31.0 + go.uber.org/zap v1.27.0 +) + +require ( + github.com/cenkalti/backoff/v4 v4.3.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + go.opentelemetry.io/collector/config/configtelemetry v0.111.0 // indirect + go.opentelemetry.io/collector/extension v0.111.0 // indirect + go.opentelemetry.io/collector/extension/experimental/storage v0.111.0 // indirect + go.opentelemetry.io/collector/pipeline v0.111.0 // indirect + go.opentelemetry.io/collector/receiver v0.111.0 // indirect + go.opentelemetry.io/collector/receiver/receiverprofiles v0.111.0 // indirect + go.opentelemetry.io/otel/metric v1.31.0 // indirect + go.opentelemetry.io/otel/sdk/metric v1.31.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + golang.org/x/net v0.28.0 // indirect + golang.org/x/sys v0.26.0 // indirect + golang.org/x/text v0.17.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd // indirect + google.golang.org/grpc v1.67.1 // indirect + google.golang.org/protobuf v1.35.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) + +replace go.opentelemetry.io/collector/consumer/consumertest => ../../../consumer/consumertest + +replace go.opentelemetry.io/collector/pdata/pprofile => ../../../pdata/pprofile + +replace go.opentelemetry.io/collector/pdata/testdata => ../../../pdata/testdata + +replace go.opentelemetry.io/collector/exporter => ../../ + +replace go.opentelemetry.io/collector/consumer => ../../../consumer + +replace go.opentelemetry.io/collector/consumer/consumererror/consumererrorprofiles => ../../../consumer/consumererror/consumererrorprofiles + +replace go.opentelemetry.io/collector/receiver => ../../../receiver + +replace go.opentelemetry.io/collector/consumer/consumerprofiles => ../../../consumer/consumerprofiles + +replace go.opentelemetry.io/collector/component => ../../../component + +replace go.opentelemetry.io/collector/receiver/receiverprofiles => ../../../receiver/receiverprofiles + +replace go.opentelemetry.io/collector/extension => ../../../extension + +replace go.opentelemetry.io/collector/pdata => ../../../pdata + +replace go.opentelemetry.io/collector/exporter/exporterprofiles => ../../exporterprofiles + +replace go.opentelemetry.io/collector/config/configtelemetry => ../../../config/configtelemetry + +replace go.opentelemetry.io/collector/config/configretry => ../../../config/configretry + +replace go.opentelemetry.io/collector/pipeline/pipelineprofiles => ../../../pipeline/pipelineprofiles + +replace go.opentelemetry.io/collector/extension/experimental/storage => ../../../extension/experimental/storage + +replace go.opentelemetry.io/collector/pipeline => ../../../pipeline diff --git a/exporter/exporterhelper/exporterhelperprofiles/go.sum b/exporter/exporterhelper/exporterhelperprofiles/go.sum new file mode 100644 index 00000000000..fe17027b364 --- /dev/null +++ b/exporter/exporterhelper/exporterhelperprofiles/go.sum @@ -0,0 +1,98 @@ +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY= +go.opentelemetry.io/otel v1.31.0/go.mod h1:O0C14Yl9FgkjqcCZAsE053C13OaddMYr/hz6clDkEJE= +go.opentelemetry.io/otel/metric v1.31.0 h1:FSErL0ATQAmYHUIzSezZibnyVlft1ybhy4ozRPcF2fE= +go.opentelemetry.io/otel/metric v1.31.0/go.mod h1:C3dEloVbLuYoX41KpmAhOqNriGbA+qqH6PQ5E5mUfnY= +go.opentelemetry.io/otel/sdk v1.31.0 h1:xLY3abVHYZ5HSfOg3l2E5LUj2Cwva5Y7yGxnSW9H5Gk= +go.opentelemetry.io/otel/sdk v1.31.0/go.mod h1:TfRbMdhvxIIr/B2N2LQW2S5v9m3gOQ/08KsbbO5BPT0= +go.opentelemetry.io/otel/sdk/metric v1.31.0 h1:i9hxxLJF/9kkvfHppyLL55aW7iIJz4JjxTeYusH7zMc= +go.opentelemetry.io/otel/sdk/metric v1.31.0/go.mod h1:CRInTMVvNhUKgSAMbKyTMxqOBC0zgyxzW55lZzX43Y8= +go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HYdmJys= +go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= +golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd h1:6TEm2ZxXoQmFWFlt1vNxvVOa1Q0dXFQD1m/rYjXmS0E= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= +google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E= +google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= +google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= +google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/exporter/exporterhelper/exporterhelperprofiles/profiles.go b/exporter/exporterhelper/exporterhelperprofiles/profiles.go new file mode 100644 index 00000000000..89410f22892 --- /dev/null +++ b/exporter/exporterhelper/exporterhelperprofiles/profiles.go @@ -0,0 +1,163 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelperprofiles // import "go.opentelemetry.io/collector/exporter/exporterhelper/exporterhelperprofiles" + +import ( + "context" + "errors" + + "go.uber.org/zap" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/consumer/consumererror/consumererrorprofiles" + "go.opentelemetry.io/collector/consumer/consumerprofiles" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/exporterprofiles" + "go.opentelemetry.io/collector/exporter/exporterqueue" + "go.opentelemetry.io/collector/pdata/pprofile" + "go.opentelemetry.io/collector/pipeline/pipelineprofiles" +) + +var profilesMarshaler = &pprofile.ProtoMarshaler{} +var profilesUnmarshaler = &pprofile.ProtoUnmarshaler{} + +type profilesRequest struct { + pd pprofile.Profiles + pusher consumerprofiles.ConsumeProfilesFunc +} + +func newProfilesRequest(pd pprofile.Profiles, pusher consumerprofiles.ConsumeProfilesFunc) exporterhelper.Request { + return &profilesRequest{ + pd: pd, + pusher: pusher, + } +} + +func newProfileRequestUnmarshalerFunc(pusher consumerprofiles.ConsumeProfilesFunc) exporterqueue.Unmarshaler[exporterhelper.Request] { + return func(bytes []byte) (exporterhelper.Request, error) { + profiles, err := profilesUnmarshaler.UnmarshalProfiles(bytes) + if err != nil { + return nil, err + } + return newProfilesRequest(profiles, pusher), nil + } +} + +func profilesRequestMarshaler(req exporterhelper.Request) ([]byte, error) { + return profilesMarshaler.MarshalProfiles(req.(*profilesRequest).pd) +} + +func (req *profilesRequest) OnError(err error) exporterhelper.Request { + var profileError consumererrorprofiles.Profiles + if errors.As(err, &profileError) { + return newProfilesRequest(profileError.Data(), req.pusher) + } + return req +} + +func (req *profilesRequest) Export(ctx context.Context) error { + return req.pusher(ctx, req.pd) +} + +func (req *profilesRequest) ItemsCount() int { + return req.pd.SampleCount() +} + +type profileExporter struct { + *internal.BaseExporter + consumerprofiles.Profiles +} + +// NewProfilesExporter creates an exporterprofiles.Profiles that records observability metrics and wraps every request with a Span. +func NewProfilesExporter( + ctx context.Context, + set exporter.Settings, + cfg component.Config, + pusher consumerprofiles.ConsumeProfilesFunc, + options ...exporterhelper.Option, +) (exporterprofiles.Profiles, error) { + if cfg == nil { + return nil, errNilConfig + } + if pusher == nil { + return nil, errNilPushProfileData + } + profilesOpts := []exporterhelper.Option{ + internal.WithMarshaler(profilesRequestMarshaler), internal.WithUnmarshaler(newProfileRequestUnmarshalerFunc(pusher)), + internal.WithBatchFuncs(mergeProfiles, mergeSplitProfiles), + } + return NewProfilesRequestExporter(ctx, set, requestFromProfiles(pusher), append(profilesOpts, options...)...) +} + +// RequestFromProfilesFunc converts pprofile.Profiles into a user-defined Request. +// Experimental: This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type RequestFromProfilesFunc func(context.Context, pprofile.Profiles) (exporterhelper.Request, error) + +// requestFromProfiles returns a RequestFromProfilesFunc that converts pprofile.Profiles into a Request. +func requestFromProfiles(pusher consumerprofiles.ConsumeProfilesFunc) RequestFromProfilesFunc { + return func(_ context.Context, profiles pprofile.Profiles) (exporterhelper.Request, error) { + return newProfilesRequest(profiles, pusher), nil + } +} + +// NewProfilesRequestExporter creates a new profiles exporter based on a custom ProfilesConverter and RequestSender. +// Experimental: This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func NewProfilesRequestExporter( + _ context.Context, + set exporter.Settings, + converter RequestFromProfilesFunc, + options ...exporterhelper.Option, +) (exporterprofiles.Profiles, error) { + if set.Logger == nil { + return nil, errNilLogger + } + + if converter == nil { + return nil, errNilProfilesConverter + } + + be, err := internal.NewBaseExporter(set, pipelineprofiles.SignalProfiles, newProfilesExporterWithObservability, options...) + if err != nil { + return nil, err + } + + tc, err := consumerprofiles.NewProfiles(func(ctx context.Context, pd pprofile.Profiles) error { + req, cErr := converter(ctx, pd) + if cErr != nil { + set.Logger.Error("Failed to convert profiles. Dropping data.", + zap.Int("dropped_samples", pd.SampleCount()), + zap.Error(err)) + return consumererror.NewPermanent(cErr) + } + return be.Send(ctx, req) + }, be.ConsumerOptions...) + + return &profileExporter{ + BaseExporter: be, + Profiles: tc, + }, err +} + +type profilesExporterWithObservability struct { + internal.BaseRequestSender + obsrep *internal.ObsReport +} + +func newProfilesExporterWithObservability(obsrep *internal.ObsReport) internal.RequestSender { + return &profilesExporterWithObservability{obsrep: obsrep} +} + +func (tewo *profilesExporterWithObservability) Send(ctx context.Context, req exporterhelper.Request) error { + c := tewo.obsrep.StartProfilesOp(ctx) + numSamples := req.ItemsCount() + // Forward the data to the next consumer (this pusher is the next). + err := tewo.NextSender.Send(c, req) + tewo.obsrep.EndProfilesOp(c, numSamples, err) + return err +} diff --git a/exporter/exporterhelper/exporterhelperprofiles/profiles_batch.go b/exporter/exporterhelper/exporterhelperprofiles/profiles_batch.go new file mode 100644 index 00000000000..0db7d879e20 --- /dev/null +++ b/exporter/exporterhelper/exporterhelperprofiles/profiles_batch.go @@ -0,0 +1,143 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelperprofiles // import "go.opentelemetry.io/collector/exporter/exporterhelper/exporterhelperprofiles" + +import ( + "context" + "errors" + + "go.opentelemetry.io/collector/exporter/exporterbatcher" + "go.opentelemetry.io/collector/exporter/exporterhelper" + "go.opentelemetry.io/collector/pdata/pprofile" +) + +// mergeProfiles merges two profiles requests into one. +func mergeProfiles(_ context.Context, r1 exporterhelper.Request, r2 exporterhelper.Request) (exporterhelper.Request, error) { + tr1, ok1 := r1.(*profilesRequest) + tr2, ok2 := r2.(*profilesRequest) + if !ok1 || !ok2 { + return nil, errors.New("invalid input type") + } + tr2.pd.ResourceProfiles().MoveAndAppendTo(tr1.pd.ResourceProfiles()) + return tr1, nil +} + +// mergeSplitProfiles splits and/or merges the profiles into multiple requests based on the MaxSizeConfig. +func mergeSplitProfiles(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r1 exporterhelper.Request, r2 exporterhelper.Request) ([]exporterhelper.Request, error) { + var ( + res []exporterhelper.Request + destReq *profilesRequest + capacityLeft = cfg.MaxSizeItems + ) + for _, req := range []exporterhelper.Request{r1, r2} { + if req == nil { + continue + } + srcReq, ok := req.(*profilesRequest) + if !ok { + return nil, errors.New("invalid input type") + } + if srcReq.pd.SampleCount() <= capacityLeft { + if destReq == nil { + destReq = srcReq + } else { + srcReq.pd.ResourceProfiles().MoveAndAppendTo(destReq.pd.ResourceProfiles()) + } + capacityLeft -= destReq.pd.SampleCount() + continue + } + + for { + extractedProfiles := extractProfiles(srcReq.pd, capacityLeft) + if extractedProfiles.SampleCount() == 0 { + break + } + capacityLeft -= extractedProfiles.SampleCount() + if destReq == nil { + destReq = &profilesRequest{pd: extractedProfiles, pusher: srcReq.pusher} + } else { + extractedProfiles.ResourceProfiles().MoveAndAppendTo(destReq.pd.ResourceProfiles()) + } + // Create new batch once capacity is reached. + if capacityLeft == 0 { + res = append(res, destReq) + destReq = nil + capacityLeft = cfg.MaxSizeItems + } + } + } + + if destReq != nil { + res = append(res, destReq) + } + return res, nil +} + +// extractProfiles extracts a new profiles with a maximum number of samples. +func extractProfiles(srcProfiles pprofile.Profiles, count int) pprofile.Profiles { + destProfiles := pprofile.NewProfiles() + srcProfiles.ResourceProfiles().RemoveIf(func(srcRS pprofile.ResourceProfiles) bool { + if count == 0 { + return false + } + needToExtract := samplesCount(srcRS) > count + if needToExtract { + srcRS = extractResourceProfiles(srcRS, count) + } + count -= samplesCount(srcRS) + srcRS.MoveTo(destProfiles.ResourceProfiles().AppendEmpty()) + return !needToExtract + }) + return destProfiles +} + +// extractResourceProfiles extracts profiles and returns a new resource profiles with the specified number of profiles. +func extractResourceProfiles(srcRS pprofile.ResourceProfiles, count int) pprofile.ResourceProfiles { + destRS := pprofile.NewResourceProfiles() + destRS.SetSchemaUrl(srcRS.SchemaUrl()) + srcRS.Resource().CopyTo(destRS.Resource()) + srcRS.ScopeProfiles().RemoveIf(func(srcSS pprofile.ScopeProfiles) bool { + if count == 0 { + return false + } + needToExtract := srcSS.Profiles().Len() > count + if needToExtract { + srcSS = extractScopeProfiles(srcSS, count) + } + count -= srcSS.Profiles().Len() + srcSS.MoveTo(destRS.ScopeProfiles().AppendEmpty()) + return !needToExtract + }) + srcRS.Resource().CopyTo(destRS.Resource()) + return destRS +} + +// extractScopeProfiles extracts profiles and returns a new scope profiles with the specified number of profiles. +func extractScopeProfiles(srcSS pprofile.ScopeProfiles, count int) pprofile.ScopeProfiles { + destSS := pprofile.NewScopeProfiles() + destSS.SetSchemaUrl(srcSS.SchemaUrl()) + srcSS.Scope().CopyTo(destSS.Scope()) + srcSS.Profiles().RemoveIf(func(srcProfile pprofile.ProfileContainer) bool { + if count == 0 { + return false + } + srcProfile.MoveTo(destSS.Profiles().AppendEmpty()) + count-- + return true + }) + return destSS +} + +// resourceProfilessCount calculates the total number of profiles in the pdata.ResourceProfiles. +func samplesCount(rs pprofile.ResourceProfiles) int { + count := 0 + rs.ScopeProfiles().RemoveIf(func(ss pprofile.ScopeProfiles) bool { + ss.Profiles().RemoveIf(func(sp pprofile.ProfileContainer) bool { + count += sp.Profile().Sample().Len() + return false + }) + return false + }) + return count +} diff --git a/exporter/exporterhelper/exporterhelperprofiles/profiles_batch_test.go b/exporter/exporterhelper/exporterhelperprofiles/profiles_batch_test.go new file mode 100644 index 00000000000..0272d8126b1 --- /dev/null +++ b/exporter/exporterhelper/exporterhelperprofiles/profiles_batch_test.go @@ -0,0 +1,174 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelperprofiles + +import ( + "context" + "fmt" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/exporter/exporterbatcher" + "go.opentelemetry.io/collector/exporter/exporterhelper" + "go.opentelemetry.io/collector/pdata/pprofile" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/pdata/testdata" +) + +func TestMergeProfiles(t *testing.T) { + pr1 := &profilesRequest{pd: testdata.GenerateProfiles(2)} + pr2 := &profilesRequest{pd: testdata.GenerateProfiles(3)} + res, err := mergeProfiles(context.Background(), pr1, pr2) + require.NoError(t, err) + fmt.Fprintf(os.Stdout, "%#v\n", res.(*profilesRequest).pd) + assert.Equal(t, 5, res.(*profilesRequest).pd.SampleCount()) +} + +func TestMergeProfilesInvalidInput(t *testing.T) { + pr1 := &tracesRequest{td: testdata.GenerateTraces(2)} + pr2 := &profilesRequest{pd: testdata.GenerateProfiles(3)} + _, err := mergeProfiles(context.Background(), pr1, pr2) + assert.Error(t, err) +} + +func TestMergeSplitProfiles(t *testing.T) { + tests := []struct { + name string + cfg exporterbatcher.MaxSizeConfig + pr1 exporterhelper.Request + pr2 exporterhelper.Request + expected []*profilesRequest + }{ + { + name: "both_requests_empty", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + pr1: &profilesRequest{pd: pprofile.NewProfiles()}, + pr2: &profilesRequest{pd: pprofile.NewProfiles()}, + expected: []*profilesRequest{{pd: pprofile.NewProfiles()}}, + }, + { + name: "both_requests_nil", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + pr1: nil, + pr2: nil, + expected: []*profilesRequest{}, + }, + { + name: "first_request_empty", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + pr1: &profilesRequest{pd: pprofile.NewProfiles()}, + pr2: &profilesRequest{pd: testdata.GenerateProfiles(5)}, + expected: []*profilesRequest{{pd: testdata.GenerateProfiles(5)}}, + }, + { + name: "first_requests_nil", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + pr1: nil, + pr2: &profilesRequest{pd: testdata.GenerateProfiles(5)}, + expected: []*profilesRequest{{pd: testdata.GenerateProfiles(5)}}, + }, + { + name: "first_nil_second_empty", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + pr1: nil, + pr2: &profilesRequest{pd: pprofile.NewProfiles()}, + expected: []*profilesRequest{{pd: pprofile.NewProfiles()}}, + }, + { + name: "merge_only", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + pr1: &profilesRequest{pd: testdata.GenerateProfiles(4)}, + pr2: &profilesRequest{pd: testdata.GenerateProfiles(6)}, + expected: []*profilesRequest{{pd: func() pprofile.Profiles { + profiles := testdata.GenerateProfiles(4) + testdata.GenerateProfiles(6).ResourceProfiles().MoveAndAppendTo(profiles.ResourceProfiles()) + return profiles + }()}}, + }, + { + name: "split_only", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 4}, + pr1: nil, + pr2: &profilesRequest{pd: testdata.GenerateProfiles(10)}, + expected: []*profilesRequest{ + {pd: testdata.GenerateProfiles(4)}, + {pd: testdata.GenerateProfiles(4)}, + {pd: testdata.GenerateProfiles(2)}, + }, + }, + { + name: "merge_and_split", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + pr1: &profilesRequest{pd: testdata.GenerateProfiles(8)}, + pr2: &profilesRequest{pd: testdata.GenerateProfiles(20)}, + expected: []*profilesRequest{ + {pd: func() pprofile.Profiles { + profiles := testdata.GenerateProfiles(8) + testdata.GenerateProfiles(2).ResourceProfiles().MoveAndAppendTo(profiles.ResourceProfiles()) + return profiles + }()}, + {pd: testdata.GenerateProfiles(10)}, + {pd: testdata.GenerateProfiles(8)}, + }, + }, + { + name: "scope_profiles_split", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 4}, + pr1: &profilesRequest{pd: func() pprofile.Profiles { + return testdata.GenerateProfiles(6) + }()}, + pr2: nil, + expected: []*profilesRequest{ + {pd: testdata.GenerateProfiles(4)}, + {pd: func() pprofile.Profiles { + return testdata.GenerateProfiles(2) + }()}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + res, err := mergeSplitProfiles(context.Background(), tt.cfg, tt.pr1, tt.pr2) + require.NoError(t, err) + assert.Equal(t, len(tt.expected), len(res)) + for i, r := range res { + assert.Equal(t, tt.expected[i], r.(*profilesRequest)) + } + }) + + } +} + +func TestMergeSplitProfilesInvalidInput(t *testing.T) { + r1 := &tracesRequest{td: testdata.GenerateTraces(2)} + r2 := &profilesRequest{pd: testdata.GenerateProfiles(3)} + _, err := mergeSplitProfiles(context.Background(), exporterbatcher.MaxSizeConfig{}, r1, r2) + assert.Error(t, err) +} + +func TestExtractProfiles(t *testing.T) { + for i := 0; i < 10; i++ { + ld := testdata.GenerateProfiles(10) + extractedProfiles := extractProfiles(ld, i) + assert.Equal(t, i, extractedProfiles.SampleCount()) + assert.Equal(t, 10-i, ld.SampleCount()) + } +} + +type tracesRequest struct { + td ptrace.Traces + pusher consumer.ConsumeTracesFunc +} + +func (req *tracesRequest) Export(ctx context.Context) error { + return req.pusher(ctx, req.td) +} + +func (req *tracesRequest) ItemsCount() int { + return req.td.SpanCount() +} diff --git a/exporter/exporterhelper/exporterhelperprofiles/profiles_test.go b/exporter/exporterhelper/exporterhelperprofiles/profiles_test.go new file mode 100644 index 00000000000..6422cc81f9a --- /dev/null +++ b/exporter/exporterhelper/exporterhelperprofiles/profiles_test.go @@ -0,0 +1,326 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelperprofiles + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + "go.opentelemetry.io/otel/trace" + nooptrace "go.opentelemetry.io/otel/trace/noop" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configretry" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/consumer/consumererror/consumererrorprofiles" + "go.opentelemetry.io/collector/consumer/consumerprofiles" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/exporterprofiles" + "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/exporter/internal/queue" + "go.opentelemetry.io/collector/pdata/pprofile" + "go.opentelemetry.io/collector/pdata/testdata" +) + +const ( + fakeProfilesParentSpanName = "fake_profiles_parent_span_name" +) + +var ( + fakeProfilesExporterConfig = struct{}{} +) + +func TestProfilesRequest(t *testing.T) { + lr := newProfilesRequest(testdata.GenerateProfiles(1), nil) + + profileErr := consumererrorprofiles.NewProfiles(errors.New("some error"), pprofile.NewProfiles()) + assert.EqualValues( + t, + newProfilesRequest(pprofile.NewProfiles(), nil), + lr.(exporterhelper.RequestErrorHandler).OnError(profileErr), + ) +} + +func TestProfilesExporter_InvalidName(t *testing.T) { + le, err := NewProfilesExporter(context.Background(), exportertest.NewNopSettings(), nil, newPushProfilesData(nil)) + require.Nil(t, le) + require.Equal(t, errNilConfig, err) +} + +func TestProfilesExporter_NilLogger(t *testing.T) { + le, err := NewProfilesExporter(context.Background(), exporter.Settings{}, &fakeProfilesExporterConfig, newPushProfilesData(nil)) + require.Nil(t, le) + require.Equal(t, errNilLogger, err) +} + +func TestProfilesRequestExporter_NilLogger(t *testing.T) { + le, err := NewProfilesRequestExporter(context.Background(), exporter.Settings{}, (&internal.FakeRequestConverter{}).RequestFromProfilesFunc) + require.Nil(t, le) + require.Equal(t, errNilLogger, err) +} + +func TestProfilesExporter_NilPushProfilesData(t *testing.T) { + le, err := NewProfilesExporter(context.Background(), exportertest.NewNopSettings(), &fakeProfilesExporterConfig, nil) + require.Nil(t, le) + require.Equal(t, errNilPushProfileData, err) +} + +func TestProfilesRequestExporter_NilProfilesConverter(t *testing.T) { + le, err := NewProfilesRequestExporter(context.Background(), exportertest.NewNopSettings(), nil) + require.Nil(t, le) + require.Equal(t, errNilProfilesConverter, err) +} + +func TestProfilesExporter_Default(t *testing.T) { + ld := pprofile.NewProfiles() + le, err := NewProfilesExporter(context.Background(), exportertest.NewNopSettings(), &fakeProfilesExporterConfig, newPushProfilesData(nil)) + assert.NotNil(t, le) + require.NoError(t, err) + + assert.Equal(t, consumer.Capabilities{MutatesData: false}, le.Capabilities()) + require.NoError(t, le.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, le.ConsumeProfiles(context.Background(), ld)) + require.NoError(t, le.Shutdown(context.Background())) +} + +func TestProfilesRequestExporter_Default(t *testing.T) { + ld := pprofile.NewProfiles() + le, err := NewProfilesRequestExporter(context.Background(), exportertest.NewNopSettings(), + (&internal.FakeRequestConverter{}).RequestFromProfilesFunc) + assert.NotNil(t, le) + require.NoError(t, err) + + assert.Equal(t, consumer.Capabilities{MutatesData: false}, le.Capabilities()) + require.NoError(t, le.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, le.ConsumeProfiles(context.Background(), ld)) + require.NoError(t, le.Shutdown(context.Background())) +} + +func TestProfilesExporter_WithCapabilities(t *testing.T) { + capabilities := consumer.Capabilities{MutatesData: true} + le, err := NewProfilesExporter(context.Background(), exportertest.NewNopSettings(), &fakeProfilesExporterConfig, newPushProfilesData(nil), exporterhelper.WithCapabilities(capabilities)) + require.NoError(t, err) + require.NotNil(t, le) + + assert.Equal(t, capabilities, le.Capabilities()) +} + +func TestProfilesRequestExporter_WithCapabilities(t *testing.T) { + capabilities := consumer.Capabilities{MutatesData: true} + le, err := NewProfilesRequestExporter(context.Background(), exportertest.NewNopSettings(), + (&internal.FakeRequestConverter{}).RequestFromProfilesFunc, exporterhelper.WithCapabilities(capabilities)) + require.NoError(t, err) + require.NotNil(t, le) + + assert.Equal(t, capabilities, le.Capabilities()) +} + +func TestProfilesExporter_Default_ReturnError(t *testing.T) { + ld := pprofile.NewProfiles() + want := errors.New("my_error") + le, err := NewProfilesExporter(context.Background(), exportertest.NewNopSettings(), &fakeProfilesExporterConfig, newPushProfilesData(want)) + require.NoError(t, err) + require.NotNil(t, le) + require.Equal(t, want, le.ConsumeProfiles(context.Background(), ld)) +} + +func TestProfilesRequestExporter_Default_ConvertError(t *testing.T) { + ld := pprofile.NewProfiles() + want := errors.New("convert_error") + le, err := NewProfilesRequestExporter(context.Background(), exportertest.NewNopSettings(), + (&internal.FakeRequestConverter{ProfilesError: want}).RequestFromProfilesFunc) + require.NoError(t, err) + require.NotNil(t, le) + require.Equal(t, consumererror.NewPermanent(want), le.ConsumeProfiles(context.Background(), ld)) +} + +func TestProfilesRequestExporter_Default_ExportError(t *testing.T) { + ld := pprofile.NewProfiles() + want := errors.New("export_error") + le, err := NewProfilesRequestExporter(context.Background(), exportertest.NewNopSettings(), + (&internal.FakeRequestConverter{RequestError: want}).RequestFromProfilesFunc) + require.NoError(t, err) + require.NotNil(t, le) + require.Equal(t, want, le.ConsumeProfiles(context.Background(), ld)) +} + +func TestProfilesExporter_WithPersistentQueue(t *testing.T) { + qCfg := exporterhelper.NewDefaultQueueConfig() + storageID := component.MustNewIDWithName("file_storage", "storage") + qCfg.StorageID = &storageID + rCfg := configretry.NewDefaultBackOffConfig() + ts := consumertest.ProfilesSink{} + set := exportertest.NewNopSettings() + set.ID = component.MustNewIDWithName("test_profiles", "with_persistent_queue") + te, err := NewProfilesExporter(context.Background(), set, &fakeProfilesExporterConfig, ts.ConsumeProfiles, exporterhelper.WithRetry(rCfg), exporterhelper.WithQueue(qCfg)) + require.NoError(t, err) + + host := &internal.MockHost{Ext: map[component.ID]component.Component{ + storageID: queue.NewMockStorageExtension(nil), + }} + require.NoError(t, te.Start(context.Background(), host)) + t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) }) + + traces := testdata.GenerateProfiles(2) + require.NoError(t, te.ConsumeProfiles(context.Background(), traces)) + require.Eventually(t, func() bool { + return len(ts.AllProfiles()) == 1 && ts.SampleCount() == 2 + }, 500*time.Millisecond, 10*time.Millisecond) +} + +func TestProfilesExporter_WithSpan(t *testing.T) { + set := exportertest.NewNopSettings() + sr := new(tracetest.SpanRecorder) + set.TracerProvider = sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + otel.SetTracerProvider(set.TracerProvider) + defer otel.SetTracerProvider(nooptrace.NewTracerProvider()) + + le, err := NewProfilesExporter(context.Background(), set, &fakeProfilesExporterConfig, newPushProfilesData(nil)) + require.NoError(t, err) + require.NotNil(t, le) + checkWrapSpanForProfilesExporter(t, sr, set.TracerProvider.Tracer("test"), le, nil, 1) +} + +func TestProfilesRequestExporter_WithSpan(t *testing.T) { + set := exportertest.NewNopSettings() + sr := new(tracetest.SpanRecorder) + set.TracerProvider = sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + otel.SetTracerProvider(set.TracerProvider) + defer otel.SetTracerProvider(nooptrace.NewTracerProvider()) + + le, err := NewProfilesRequestExporter(context.Background(), set, (&internal.FakeRequestConverter{}).RequestFromProfilesFunc) + require.NoError(t, err) + require.NotNil(t, le) + checkWrapSpanForProfilesExporter(t, sr, set.TracerProvider.Tracer("test"), le, nil, 1) +} + +func TestProfilesExporter_WithSpan_ReturnError(t *testing.T) { + set := exportertest.NewNopSettings() + sr := new(tracetest.SpanRecorder) + set.TracerProvider = sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + otel.SetTracerProvider(set.TracerProvider) + defer otel.SetTracerProvider(nooptrace.NewTracerProvider()) + + want := errors.New("my_error") + le, err := NewProfilesExporter(context.Background(), set, &fakeProfilesExporterConfig, newPushProfilesData(want)) + require.NoError(t, err) + require.NotNil(t, le) + checkWrapSpanForProfilesExporter(t, sr, set.TracerProvider.Tracer("test"), le, want, 1) +} + +func TestProfilesRequestExporter_WithSpan_ReturnError(t *testing.T) { + set := exportertest.NewNopSettings() + sr := new(tracetest.SpanRecorder) + set.TracerProvider = sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + otel.SetTracerProvider(set.TracerProvider) + defer otel.SetTracerProvider(nooptrace.NewTracerProvider()) + + want := errors.New("my_error") + le, err := NewProfilesRequestExporter(context.Background(), set, (&internal.FakeRequestConverter{RequestError: want}).RequestFromProfilesFunc) + require.NoError(t, err) + require.NotNil(t, le) + checkWrapSpanForProfilesExporter(t, sr, set.TracerProvider.Tracer("test"), le, want, 1) +} + +func TestProfilesExporter_WithShutdown(t *testing.T) { + shutdownCalled := false + shutdown := func(context.Context) error { shutdownCalled = true; return nil } + + le, err := NewProfilesExporter(context.Background(), exportertest.NewNopSettings(), &fakeProfilesExporterConfig, newPushProfilesData(nil), exporterhelper.WithShutdown(shutdown)) + assert.NotNil(t, le) + require.NoError(t, err) + + require.NoError(t, le.Shutdown(context.Background())) + assert.True(t, shutdownCalled) +} + +func TestProfilesRequestExporter_WithShutdown(t *testing.T) { + shutdownCalled := false + shutdown := func(context.Context) error { shutdownCalled = true; return nil } + + le, err := NewProfilesRequestExporter(context.Background(), exportertest.NewNopSettings(), + (&internal.FakeRequestConverter{}).RequestFromProfilesFunc, exporterhelper.WithShutdown(shutdown)) + assert.NotNil(t, le) + require.NoError(t, err) + + require.NoError(t, le.Shutdown(context.Background())) + assert.True(t, shutdownCalled) +} + +func TestProfilesExporter_WithShutdown_ReturnError(t *testing.T) { + want := errors.New("my_error") + shutdownErr := func(context.Context) error { return want } + + le, err := NewProfilesExporter(context.Background(), exportertest.NewNopSettings(), &fakeProfilesExporterConfig, newPushProfilesData(nil), exporterhelper.WithShutdown(shutdownErr)) + assert.NotNil(t, le) + require.NoError(t, err) + + assert.Equal(t, want, le.Shutdown(context.Background())) +} + +func TestProfilesRequestExporter_WithShutdown_ReturnError(t *testing.T) { + want := errors.New("my_error") + shutdownErr := func(context.Context) error { return want } + + le, err := NewProfilesRequestExporter(context.Background(), exportertest.NewNopSettings(), + (&internal.FakeRequestConverter{}).RequestFromProfilesFunc, exporterhelper.WithShutdown(shutdownErr)) + assert.NotNil(t, le) + require.NoError(t, err) + + assert.Equal(t, want, le.Shutdown(context.Background())) +} + +func newPushProfilesData(retError error) consumerprofiles.ConsumeProfilesFunc { + return func(_ context.Context, _ pprofile.Profiles) error { + return retError + } +} + +func generateProfilesTraffic(t *testing.T, tracer trace.Tracer, le exporterprofiles.Profiles, numRequests int, wantError error) { + ld := testdata.GenerateProfiles(1) + ctx, span := tracer.Start(context.Background(), fakeProfilesParentSpanName) + defer span.End() + for i := 0; i < numRequests; i++ { + require.Equal(t, wantError, le.ConsumeProfiles(ctx, ld)) + } +} + +func checkWrapSpanForProfilesExporter(t *testing.T, sr *tracetest.SpanRecorder, tracer trace.Tracer, le exporterprofiles.Profiles, + wantError error, numSampleRecords int64) { // nolint: unparam + const numRequests = 5 + generateProfilesTraffic(t, tracer, le, numRequests, wantError) + + // Inspection time! + gotSpanData := sr.Ended() + require.Len(t, gotSpanData, numRequests+1) + + parentSpan := gotSpanData[numRequests] + require.Equalf(t, fakeProfilesParentSpanName, parentSpan.Name(), "SpanData %v", parentSpan) + for _, sd := range gotSpanData[:numRequests] { + require.Equalf(t, parentSpan.SpanContext(), sd.Parent(), "Exporter span not a child\nSpanData %v", sd) + internal.CheckStatus(t, sd, wantError) + + sentSampleRecords := numSampleRecords + var failedToSendSampleRecords int64 + if wantError != nil { + sentSampleRecords = 0 + failedToSendSampleRecords = numSampleRecords + } + require.Containsf(t, sd.Attributes(), attribute.KeyValue{Key: internal.SentSamplesKey, Value: attribute.Int64Value(sentSampleRecords)}, "SpanData %v", sd) + require.Containsf(t, sd.Attributes(), attribute.KeyValue{Key: internal.FailedToSendSamplesKey, Value: attribute.Int64Value(failedToSendSampleRecords)}, "SpanData %v", sd) + } +} diff --git a/exporter/exporterhelper/internal/obsexporter.go b/exporter/exporterhelper/internal/obsexporter.go index c9fa30f8b61..004e5c48248 100644 --- a/exporter/exporterhelper/internal/obsexporter.go +++ b/exporter/exporterhelper/internal/obsexporter.go @@ -94,6 +94,19 @@ func (or *ObsReport) EndLogsOp(ctx context.Context, numLogRecords int, err error endSpan(ctx, err, numSent, numFailedToSend, SentLogRecordsKey, FailedToSendLogRecordsKey) } +// StartProfilesOp is called at the start of an Export operation. +// The returned context should be used in other calls to the Exporter functions +// dealing with the same export operation. +func (or *ObsReport) StartProfilesOp(ctx context.Context) context.Context { + return or.startOp(ctx, ExportTraceDataOperationSuffix) +} + +// EndProfilesOp completes the export operation that was started with startProfilesOp. +func (or *ObsReport) EndProfilesOp(ctx context.Context, numSpans int, err error) { + numSent, numFailedToSend := toNumItems(numSpans, err) + endSpan(ctx, err, numSent, numFailedToSend, SentSamplesKey, FailedToSendSamplesKey) +} + // startOp creates the span used to trace the operation. Returning // the updated context and the created span. func (or *ObsReport) startOp(ctx context.Context, operationSuffix string) context.Context { diff --git a/exporter/exporterhelper/internal/obsmetrics.go b/exporter/exporterhelper/internal/obsmetrics.go index cc02c0fc4e8..ae9e89942b3 100644 --- a/exporter/exporterhelper/internal/obsmetrics.go +++ b/exporter/exporterhelper/internal/obsmetrics.go @@ -28,6 +28,11 @@ const ( // FailedToSendLogRecordsKey used to track logs that failed to be sent by exporters. FailedToSendLogRecordsKey = "send_failed_log_records" + // SentSamplesKey used to track profiles samples sent by exporters. + SentSamplesKey = "sent_samples" + // FailedToSendSamplesKey used to track samples that failed to be sent by exporters. + FailedToSendSamplesKey = "send_failed_samples" + ExporterPrefix = ExporterKey + spanNameSep ExportTraceDataOperationSuffix = spanNameSep + "traces" ExportMetricsOperationSuffix = spanNameSep + "metrics" diff --git a/exporter/exporterhelper/internal/request.go b/exporter/exporterhelper/internal/request.go index 33a1915c65d..0ae94fcf45e 100644 --- a/exporter/exporterhelper/internal/request.go +++ b/exporter/exporterhelper/internal/request.go @@ -11,6 +11,7 @@ import ( "go.opentelemetry.io/collector/exporter/internal" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pprofile" "go.opentelemetry.io/collector/pdata/ptrace" ) @@ -55,10 +56,11 @@ func (r *fakeRequest) ItemsCount() int { } type FakeRequestConverter struct { - MetricsError error - TracesError error - LogsError error - RequestError error + MetricsError error + TracesError error + LogsError error + ProfilesError error + RequestError error } func (frc *FakeRequestConverter) RequestFromMetricsFunc(_ context.Context, md pmetric.Metrics) (internal.Request, error) { @@ -72,3 +74,7 @@ func (frc *FakeRequestConverter) RequestFromTracesFunc(_ context.Context, md ptr func (frc *FakeRequestConverter) RequestFromLogsFunc(_ context.Context, md plog.Logs) (internal.Request, error) { return &fakeRequest{items: md.LogRecordCount(), exportErr: frc.RequestError}, frc.LogsError } + +func (frc *FakeRequestConverter) RequestFromProfilesFunc(_ context.Context, md pprofile.Profiles) (internal.Request, error) { + return &fakeRequest{items: md.SampleCount(), exportErr: frc.RequestError}, frc.ProfilesError +} diff --git a/versions.yaml b/versions.yaml index 019526d50ae..3d77c501d68 100644 --- a/versions.yaml +++ b/versions.yaml @@ -44,6 +44,7 @@ module-sets: - go.opentelemetry.io/collector/exporter - go.opentelemetry.io/collector/exporter/debugexporter - go.opentelemetry.io/collector/exporter/exporterprofiles + - go.opentelemetry.io/collector/exporter/exporterhelper/exporterhelperprofiles - go.opentelemetry.io/collector/exporter/nopexporter - go.opentelemetry.io/collector/exporter/otlpexporter - go.opentelemetry.io/collector/exporter/otlphttpexporter