diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index a384607e2..8ab6a76d3 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -15,3 +15,4 @@ /pkg/timeutil/ @smartcontractkit/foundations /pkg/values/ @smartcontractkit/keystone /pkg/workflows/ @smartcontractkit/keystone +/pkg/beholder/ @smartcontractkit/realtime diff --git a/.mockery.yaml b/.mockery.yaml index 01391f6bd..95cf7acdf 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -32,3 +32,6 @@ packages: interfaces: CapabilitiesRegistry: Relayer: + github.com/smartcontractkit/chainlink-common/pkg/beholder/internal: + interfaces: + OTLPExporter: diff --git a/go.mod b/go.mod index a9f453a4d..512e8fca0 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/dominikbraun/graph v0.23.0 github.com/fxamacker/cbor/v2 v2.5.0 github.com/go-json-experiment/json v0.0.0-20231102232822-2e55bd4e08b0 + github.com/go-playground/validator/v10 v10.4.1 github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.6.0 github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1 @@ -28,13 +29,23 @@ require ( github.com/pelletier/go-toml/v2 v2.2.0 github.com/prometheus/client_golang v1.17.0 github.com/riferrei/srclient v0.5.4 + github.com/santhosh-tekuri/jsonschema/v5 v5.2.0 github.com/shopspring/decimal v1.4.0 github.com/smartcontractkit/libocr v0.0.0-20240419185742-fd3cab206b2c github.com/stretchr/testify v1.9.0 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.52.0 go.opentelemetry.io/otel v1.28.0 + go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.0.0-20240823153156-2a54df7bffb9 + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.28.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0 + go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.4.0 + go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.28.0 + go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.28.0 + go.opentelemetry.io/otel/log v0.4.0 + go.opentelemetry.io/otel/metric v1.28.0 go.opentelemetry.io/otel/sdk v1.28.0 + go.opentelemetry.io/otel/sdk/log v0.4.0 + go.opentelemetry.io/otel/sdk/metric v1.28.0 go.opentelemetry.io/otel/trace v1.28.0 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 @@ -57,12 +68,15 @@ require ( github.com/fatih/color v1.16.0 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/go-playground/locales v0.13.0 // indirect + github.com/go-playground/universal-translator v0.17.0 // indirect github.com/goccy/go-yaml v1.12.0 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect; indirec github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 // indirect - github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect github.com/hashicorp/yamux v0.1.1 // indirect + github.com/leodido/go-urn v1.2.0 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect @@ -77,22 +91,20 @@ require ( github.com/prometheus/common v0.44.0 // indirect github.com/prometheus/procfs v0.11.1 // indirect github.com/sanity-io/litter v1.5.5 // indirect - github.com/santhosh-tekuri/jsonschema/v5 v5.2.0 github.com/stretchr/objx v0.5.2 // indirect github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect github.com/x448/float16 v0.8.4 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 // indirect - go.opentelemetry.io/otel/metric v1.28.0 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/mod v0.20.0 // indirect golang.org/x/net v0.28.0 // indirect golang.org/x/sync v0.8.0 // indirect - golang.org/x/sys v0.23.0 // indirect + golang.org/x/sys v0.24.0 // indirect golang.org/x/text v0.17.0 // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20240711142825-46eb208f015d // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240711142825-46eb208f015d // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20240822170219-fc7c04adadcd // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 15b1e27e5..201732898 100644 --- a/go.sum +++ b/go.sum @@ -60,6 +60,8 @@ github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A= +github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q= github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8= github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no= @@ -109,8 +111,8 @@ github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1 h1:qnpS github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1/go.mod h1:lXGCsh6c22WGtjr+qGHj1otzZpV/1kwTMAqkwZsnWRU= github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 h1:pRhl55Yx1eC7BZ1N+BBWwnKaMyD8uC+34TLdndZMAKk= github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0/go.mod h1:XKMd7iuf/RGPSMJ/U4HP0zS2Z9Fh8Ps9a+6X26m/tmI= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 h1:asbCHRVmodnJTuQ3qamDwqVOIjwqUPTYmYuemVOx+Ys= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0/go.mod h1:ggCgvZ2r7uOoQjOyu2Y1NhHmEPPzzuhWgcza5M1Ji1I= github.com/hashicorp/consul/sdk v0.16.0 h1:SE9m0W6DEfgIVCJX7xU+iv/hUl4m/nxqMTnCdMxDpJ8= github.com/hashicorp/consul/sdk v0.16.0/go.mod h1:7pxqqhqoaPqnBnzXD1StKed62LqJeClzVsUEy85Zr0A= github.com/hashicorp/go-hclog v1.5.0 h1:bI2ocEMgcVlz55Oj1xZNBsVi900c7II+fWDyV9o+13c= @@ -228,6 +230,7 @@ github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v0.0.0-20161117074351-18a02ba4a312/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -250,14 +253,30 @@ go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.5 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.52.0/go.mod h1:BMsdeOxN04K0L5FNUBfjFdvwWGNe/rkmSwH4Aelu/X0= go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= +go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.0.0-20240823153156-2a54df7bffb9 h1:UiRNKd1OgqsLbFwE+wkAWTdiAxXtCBqKIHeBIse4FUA= +go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.0.0-20240823153156-2a54df7bffb9/go.mod h1:eqZlW3pJWhjyexnDPrdQxix1pn0wwhI4AO4GKpP/bMI= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.28.0 h1:U2guen0GhqH8o/G2un8f/aG/y++OuW6MyCo6hT9prXk= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.28.0/go.mod h1:yeGZANgEcpdx/WK0IvvRFC+2oLiMS2u4L/0Rj2M2Qr0= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 h1:3Q/xZUyC1BBkualc9ROb4G8qkH90LXEIICcs5zv1OYY= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0/go.mod h1:s75jGIWA9OfCMzF0xr+ZgfrB5FEbbV7UuYo32ahUiFI= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0 h1:R3X6ZXmNPRR8ul6i3WgFURCHzaXjHdm0karRG/+dj3s= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0/go.mod h1:QWFXnDavXWwMx2EEcZsf3yxgEKAqsxQ+Syjp+seyInw= +go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.4.0 h1:0MH3f8lZrflbUWXVxyBg/zviDFdGE062uKh5+fu8Vv0= +go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.4.0/go.mod h1:Vh68vYiHY5mPdekTr0ox0sALsqjoVy0w3Os278yX5SQ= +go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.28.0 h1:BJee2iLkfRfl9lc7aFmBwkWxY/RI1RDdXepSF6y8TPE= +go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.28.0/go.mod h1:DIzlHs3DRscCIBU3Y9YSzPfScwnYnzfnCd4g8zA7bZc= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.28.0 h1:EVSnY9JbEEW92bEkIYOVMw4q1WJxIAGoFTrtYOzWuRQ= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.28.0/go.mod h1:Ea1N1QQryNXpCD0I1fdLibBAIpQuBkznMmkdKrapk1Y= +go.opentelemetry.io/otel/log v0.4.0 h1:/vZ+3Utqh18e8TPjuc3ecg284078KWrR8BRz+PQAj3o= +go.opentelemetry.io/otel/log v0.4.0/go.mod h1:DhGnQvky7pHy82MIRV43iXh3FlKN8UUKftn0KbLOq6I= go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q= go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s= go.opentelemetry.io/otel/sdk v1.28.0 h1:b9d7hIry8yZsgtbmM0DKyPWMMUMlK9NEKuIG4aBqWyE= go.opentelemetry.io/otel/sdk v1.28.0/go.mod h1:oYj7ClPUA7Iw3m+r7GeEjz0qckQRJK2B8zjcZEfu7Pg= +go.opentelemetry.io/otel/sdk/log v0.4.0 h1:1mMI22L82zLqf6KtkjrRy5BbagOTWdJsqMY/HSqILAA= +go.opentelemetry.io/otel/sdk/log v0.4.0/go.mod h1:AYJ9FVF0hNOgAVzUG/ybg/QttnXhUePWAupmCqtdESo= +go.opentelemetry.io/otel/sdk/metric v1.28.0 h1:OkuaKgKrgAbYrrY0t92c+cC+2F6hsFNnCQArXCKlg08= +go.opentelemetry.io/otel/sdk/metric v1.28.0/go.mod h1:cWPjykihLAPvXKi4iZc1dpER3Jdq2Z0YLse3moQUCpg= go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= @@ -321,10 +340,11 @@ golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM= -golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= +golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -354,10 +374,10 @@ google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoA google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/genproto v0.0.0-20210401141331-865547bb08e2/go.mod h1:9lPAdzaEmUacj36I+k7YKbEc5CXzPIeORRgDAUOu28A= -google.golang.org/genproto/googleapis/api v0.0.0-20240711142825-46eb208f015d h1:kHjw/5UfflP/L5EbledDrcG4C2597RtymmGRZvHiCuY= -google.golang.org/genproto/googleapis/api v0.0.0-20240711142825-46eb208f015d/go.mod h1:mw8MG/Qz5wfgYr6VqVCiZcHe/GJEfI+oGGDCohaVgB0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240711142825-46eb208f015d h1:JU0iKnSg02Gmb5ZdV8nYsKEKsP6o/FGVWTrw4i1DA9A= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240711142825-46eb208f015d/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= +google.golang.org/genproto/googleapis/api v0.0.0-20240822170219-fc7c04adadcd h1:BBOTEWLuuEGQy9n1y9MhVJ9Qt0BDu21X8qZs71/uPZo= +google.golang.org/genproto/googleapis/api v0.0.0-20240822170219-fc7c04adadcd/go.mod h1:fO8wJzT2zbQbAjbIoos1285VfEIYKDDY+Dt+WpTkh6g= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd h1:6TEm2ZxXoQmFWFlt1vNxvVOa1Q0dXFQD1m/rYjXmS0E= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= diff --git a/pkg/beholder/client.go b/pkg/beholder/client.go new file mode 100644 index 000000000..1b3c74985 --- /dev/null +++ b/pkg/beholder/client.go @@ -0,0 +1,293 @@ +package beholder + +import ( + "context" + "errors" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + otellog "go.opentelemetry.io/otel/log" + otelmetric "go.opentelemetry.io/otel/metric" + sdklog "go.opentelemetry.io/otel/sdk/log" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + sdkresource "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/sdk/trace" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + oteltrace "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" +) + +type Emitter interface { + // Sends message with bytes and attributes to OTel Collector + Emit(ctx context.Context, body []byte, attrKVs ...any) error +} + +type messageEmitter struct { + messageLogger otellog.Logger +} + +type Client struct { + Config Config + // Logger + Logger otellog.Logger + // Tracer + Tracer oteltrace.Tracer + // Meter + Meter otelmetric.Meter + // Message Emitter + Emitter Emitter + + // Providers + LoggerProvider otellog.LoggerProvider + TracerProvider oteltrace.TracerProvider + MeterProvider otelmetric.MeterProvider + MessageLoggerProvider otellog.LoggerProvider + + // OnClose + OnClose func() error +} + +// NewClient creates a new Client with initialized OpenTelemetry components +// To handle OpenTelemetry errors use [otel.SetErrorHandler](https://pkg.go.dev/go.opentelemetry.io/otel#SetErrorHandler) +func NewClient(ctx context.Context, cfg Config) (*Client, error) { + factory := func(ctx context.Context, options ...otlploggrpc.Option) (sdklog.Exporter, error) { + return otlploggrpc.New(ctx, options...) + } + return newClient(ctx, cfg, factory) +} + +// Used for testing to override the default exporter +type otlploggrpcFactory func(ctx context.Context, options ...otlploggrpc.Option) (sdklog.Exporter, error) + +func newClient(ctx context.Context, cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, error) { + baseResource, err := newOtelResource(cfg) + noop := NewNoopClient() + if err != nil { + return noop, err + } + creds := insecure.NewCredentials() + if !cfg.InsecureConnection && cfg.CACertFile != "" { + creds, err = credentials.NewClientTLSFromFile(cfg.CACertFile, "") + if err != nil { + return noop, err + } + } + sharedLogExporter, err := otlploggrpcNew( + ctx, + otlploggrpc.WithTLSCredentials(creds), + otlploggrpc.WithEndpoint(cfg.OtelExporterGRPCEndpoint), + ) + if err != nil { + return noop, err + } + + // Logger + var loggerProcessor sdklog.Processor + if cfg.LogBatchProcessor { + loggerProcessor = sdklog.NewBatchProcessor( + sharedLogExporter, + sdklog.WithExportTimeout(cfg.LogExportTimeout), // Default is 30s + ) + } else { + loggerProcessor = sdklog.NewSimpleProcessor(sharedLogExporter) + } + loggerAttributes := []attribute.KeyValue{ + attribute.String("beholder_data_type", "zap_log_message"), + } + loggerResource, err := sdkresource.Merge( + sdkresource.NewSchemaless(loggerAttributes...), + baseResource, + ) + if err != nil { + return noop, err + } + loggerProvider := sdklog.NewLoggerProvider( + sdklog.WithResource(loggerResource), + sdklog.WithProcessor(loggerProcessor), + ) + logger := loggerProvider.Logger(defaultPackageName) + + // Tracer + tracerProvider, err := newTracerProvider(cfg, baseResource, creds) + if err != nil { + return noop, err + } + tracer := tracerProvider.Tracer(defaultPackageName) + + // Meter + meterProvider, err := newMeterProvider(cfg, baseResource, creds) + if err != nil { + return noop, err + } + meter := meterProvider.Meter(defaultPackageName) + + // Message Emitter + var messageLogProcessor sdklog.Processor + if cfg.EmitterBatchProcessor { + messageLogProcessor = sdklog.NewBatchProcessor( + sharedLogExporter, + sdklog.WithExportTimeout(cfg.EmitterExportTimeout), // Default is 30s + ) + } else { + messageLogProcessor = sdklog.NewSimpleProcessor(sharedLogExporter) + } + + messageAttributes := []attribute.KeyValue{ + attribute.String("beholder_data_type", "custom_message"), + } + messageLoggerResource, err := sdkresource.Merge( + sdkresource.NewSchemaless(messageAttributes...), + baseResource, + ) + if err != nil { + return noop, err + } + + messageLoggerProvider := sdklog.NewLoggerProvider( + sdklog.WithResource(messageLoggerResource), + sdklog.WithProcessor(messageLogProcessor), + ) + messageLogger := messageLoggerProvider.Logger(defaultPackageName) + + emitter := messageEmitter{ + messageLogger: messageLogger, + } + + onClose := func() (err error) { + for _, provider := range []shutdowner{messageLoggerProvider, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider} { + err = errors.Join(err, provider.Shutdown(context.Background())) + } + return + } + client := Client{cfg, logger, tracer, meter, emitter, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, onClose} + + return &client, nil +} + +// Closes all providers, flushes all data and stops all background processes +func (c Client) Close() (err error) { + if c.OnClose != nil { + return c.OnClose() + } + return +} + +// Returns a new Client with the same configuration but with a different package name +func (c Client) ForPackage(name string) Client { + // Logger + logger := c.LoggerProvider.Logger(name) + // Tracer + tracer := c.TracerProvider.Tracer(name) + // Meter + meter := c.MeterProvider.Meter(name) + // Message Emitter + messageLogger := c.MessageLoggerProvider.Logger(name) + messageEmitter := &messageEmitter{messageLogger: messageLogger} + + newClient := c // copy + newClient.Logger = logger + newClient.Tracer = tracer + newClient.Meter = meter + newClient.Emitter = messageEmitter + return newClient +} + +func newOtelResource(cfg Config) (resource *sdkresource.Resource, err error) { + extraResources, err := sdkresource.New( + context.Background(), + sdkresource.WithOS(), + sdkresource.WithContainer(), + sdkresource.WithHost(), + ) + if err != nil { + return nil, err + } + resource, err = sdkresource.Merge( + sdkresource.Default(), + extraResources, + ) + if err != nil { + return nil, err + } + // Add custom resource attributes + resource, err = sdkresource.Merge( + sdkresource.NewSchemaless(cfg.ResourceAttributes...), + resource, + ) + if err != nil { + return nil, err + } + return +} + +// Emits logs the message, but does not wait for the message to be processed. +// Open question: what are pros/cons for using use map[]any vs use otellog.KeyValue +func (e messageEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error { + message := NewMessage(body, attrKVs...) + if err := message.Validate(); err != nil { + return err + } + e.messageLogger.Emit(ctx, message.OtelRecord()) + return nil +} + +func (e messageEmitter) EmitMessage(ctx context.Context, message Message) error { + if err := message.Validate(); err != nil { + return err + } + e.messageLogger.Emit(ctx, message.OtelRecord()) + return nil +} + +type shutdowner interface { + Shutdown(ctx context.Context) error +} + +func newTracerProvider(config Config, resource *sdkresource.Resource, creds credentials.TransportCredentials) (*sdktrace.TracerProvider, error) { + ctx := context.Background() + + exporter, err := otlptracegrpc.New(ctx, + otlptracegrpc.WithTLSCredentials(creds), + otlptracegrpc.WithEndpoint(config.OtelExporterGRPCEndpoint), + ) + if err != nil { + return nil, err + } + tp := sdktrace.NewTracerProvider( + sdktrace.WithBatcher(exporter, + trace.WithBatchTimeout(config.TraceBatchTimeout)), // Default is 5s + sdktrace.WithResource(resource), + sdktrace.WithSampler( + sdktrace.ParentBased( + sdktrace.TraceIDRatioBased(config.TraceSampleRatio), + ), + ), + ) + return tp, nil +} + +func newMeterProvider(config Config, resource *sdkresource.Resource, creds credentials.TransportCredentials) (*sdkmetric.MeterProvider, error) { + ctx := context.Background() + + exporter, err := otlpmetricgrpc.New( + ctx, + otlpmetricgrpc.WithTLSCredentials(creds), + otlpmetricgrpc.WithEndpoint(config.OtelExporterGRPCEndpoint), + ) + if err != nil { + return nil, err + } + + mp := sdkmetric.NewMeterProvider( + sdkmetric.WithReader( + sdkmetric.NewPeriodicReader( + exporter, + sdkmetric.WithInterval(config.MetricReaderInterval), // Default is 10s + )), + sdkmetric.WithResource(resource), + ) + return mp, nil +} diff --git a/pkg/beholder/client_test.go b/pkg/beholder/client_test.go new file mode 100644 index 000000000..6d3203034 --- /dev/null +++ b/pkg/beholder/client_test.go @@ -0,0 +1,254 @@ +package beholder + +import ( + "context" + "fmt" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc" + otellog "go.opentelemetry.io/otel/log" + sdklog "go.opentelemetry.io/otel/sdk/log" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder/internal/mocks" + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" +) + +type MockExporter struct { + mock.Mock + sdklog.Exporter +} + +func (m *MockExporter) Export(ctx context.Context, records []sdklog.Record) error { + args := m.Called(ctx, records) + return args.Error(0) +} + +func (m *MockExporter) Shutdown(ctx context.Context) error { + args := m.Called(ctx) + return args.Error(0) +} + +func (m *MockExporter) ForceFlush(ctx context.Context) error { + args := m.Called(ctx) + return args.Error(0) +} + +func TestClient(t *testing.T) { + defaultCustomAttributes := func() map[string]any { + return map[string]any{ + "int_key_1": 123, + "int64_key_1": int64(123), + "int32_key_1": int32(123), + "str_key_1": "str_val_1", + "bool_key_1": true, + "float_key_1": 123.456, + "byte_key_1": []byte("byte_val_1"), + "str_slice_key_1": []string{"str_val_1", "str_val_2"}, + "nil_key_1": nil, + "beholder_data_schema": "/schemas/ids/1001", // Required field, URI + } + } + defaultMessageBody := []byte("body bytes") + + testCases := []struct { + name string + makeCustomAttributes func() map[string]any + messageBody []byte + messageCount int + exporterMockErrorCount int + exporterOutputExpected bool + messageGenerator func(client *Client, messageBody []byte, customAttributes map[string]any) + }{ + { + name: "Test Emit", + makeCustomAttributes: defaultCustomAttributes, + messageBody: defaultMessageBody, + messageCount: 10, + exporterMockErrorCount: 0, + exporterOutputExpected: true, + messageGenerator: func(client *Client, messageBody []byte, customAttributes map[string]any) { + err := client.Emitter.Emit(tests.Context(t), messageBody, customAttributes) + assert.NoError(t, err) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + exporterMock := mocks.NewOTLPExporter(t) + defer exporterMock.AssertExpectations(t) + + otelErrorHandler := func(err error) { + t.Fatalf("otel error: %v", err) + } + // Override exporter factory which is used by Client + exporterFactory := func(context.Context, ...otlploggrpc.Option) (sdklog.Exporter, error) { + return exporterMock, nil + } + client, err := newClient(tests.Context(t), TestDefaultConfig(), exporterFactory) + if err != nil { + t.Fatalf("Error creating beholder client: %v", err) + } + otel.SetErrorHandler(otel.ErrorHandlerFunc(otelErrorHandler)) + // Number of exported messages + exportedMessageCount := 0 + + // Simulate exporter error if configured + if tc.exporterMockErrorCount > 0 { + exporterMock.On("Export", mock.Anything, mock.Anything).Return(fmt.Errorf("an error occurred")).Times(tc.exporterMockErrorCount) + } + customAttributes := tc.makeCustomAttributes() + if tc.exporterOutputExpected { + exporterMock.On("Export", mock.Anything, mock.Anything).Return(nil).Times(tc.messageCount). + Run(func(args mock.Arguments) { + assert.IsType(t, args.Get(1), []sdklog.Record{}, "Record type mismatch") + records := args.Get(1).([]sdklog.Record) + assert.Equal(t, 1, len(records), "batching is disabled, expecte 1 record") + record := records[0] + assert.Equal(t, tc.messageBody, record.Body().AsBytes(), "Record body mismatch") + actualAttributeKeys := map[string]struct{}{} + record.WalkAttributes(func(kv otellog.KeyValue) bool { + key := kv.Key + actualAttributeKeys[key] = struct{}{} + expectedValue, ok := customAttributes[key] + if !ok { + t.Fatalf("Record attribute key not found: %s", key) + } + expectedKv := OtelAttr(key, expectedValue) + equal := kv.Value.Equal(expectedKv.Value) + assert.True(t, equal, fmt.Sprintf("Record attributes mismatch for key %v", key)) + return true + }) + for key := range customAttributes { + if _, ok := actualAttributeKeys[key]; !ok { + t.Fatalf("Record attribute key not found: %s", key) + } + } + exportedMessageCount += len(records) + }) + } + for i := 0; i < tc.messageCount; i++ { + tc.messageGenerator(client, tc.messageBody, customAttributes) + } + assert.Equal(t, tc.messageCount, exportedMessageCount, "Expect all emitted messages to be exported") + }) + } +} + +func TestEmitterMessageValidation(t *testing.T) { + getEmitter := func(exporterMock *mocks.OTLPExporter) Emitter { + client, err := newClient( + tests.Context(t), + TestDefaultConfig(), + // Override exporter factory which is used by Client + func(context.Context, ...otlploggrpc.Option) (sdklog.Exporter, error) { + return exporterMock, nil + }, + ) + otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) { t.Fatalf("otel error: %v", err) })) + assert.NoError(t, err) + return client.Emitter + } + + for _, tc := range []struct { + name string + attrs Attributes + exporterCalledTimes int + expectedError string + }{ + { + name: "Missing required attribute", + attrs: Attributes{ + "key": "value", + }, + exporterCalledTimes: 0, + expectedError: "'Metadata.BeholderDataSchema' Error:Field validation for 'BeholderDataSchema' failed on the 'required' tag", + }, + { + name: "Invalid URI", + attrs: Attributes{ + "beholder_data_schema": "example-schema", + }, + exporterCalledTimes: 0, + expectedError: "'Metadata.BeholderDataSchema' Error:Field validation for 'BeholderDataSchema' failed on the 'uri' tag", + }, + { + name: "Valid URI", + exporterCalledTimes: 1, + attrs: Attributes{ + "beholder_data_schema": "/example-schema/versions/1", + }, + expectedError: "", + }, + } { + t.Run(tc.name, func(t *testing.T) { + t.Run("Emitter.Emit", func(t *testing.T) { + // Setup + exporterMock := mocks.NewOTLPExporter(t) + if tc.exporterCalledTimes > 0 { + exporterMock.On("Export", mock.Anything, mock.Anything).Return(nil).Times(tc.exporterCalledTimes) + } + emitter := getEmitter(exporterMock) + message := NewMessage([]byte("test"), tc.attrs) + // Emit + err := emitter.Emit(tests.Context(t), message.Body, tc.attrs) + // Assert expectations + if tc.expectedError != "" { + assert.ErrorContains(t, err, tc.expectedError) + } else { + assert.NoError(t, err) + } + if tc.exporterCalledTimes > 0 { + exporterMock.AssertExpectations(t) + } else { + exporterMock.AssertNotCalled(t, "Export") + } + }) + }) + } +} + +func TestClient_Close(t *testing.T) { + exporterMock := mocks.NewOTLPExporter(t) + defer exporterMock.AssertExpectations(t) + + client, err := NewStdoutClient() + assert.NoError(t, err) + + err = client.Close() + assert.NoError(t, err) + + exporterMock.AssertExpectations(t) +} + +func TestClient_ForPackage(t *testing.T) { + exporterMock := mocks.NewOTLPExporter(t) + defer exporterMock.AssertExpectations(t) + var b strings.Builder + client, err := NewWriterClient(&b) + assert.NoError(t, err) + clientForTest := client.ForPackage("TestClient_ForPackage") + + // Log + clientForTest.Logger.Emit(tests.Context(t), otellog.Record{}) + assert.Contains(t, b.String(), `"Name":"TestClient_ForPackage"`) + b.Reset() + + // Trace + _, span := clientForTest.Tracer.Start(tests.Context(t), "testSpan") + span.End() + assert.Contains(t, b.String(), `"Name":"TestClient_ForPackage"`) + assert.Contains(t, b.String(), "testSpan") + b.Reset() + + // Meter + counter, _ := clientForTest.Meter.Int64Counter("testMetric") + counter.Add(tests.Context(t), 1) + clientForTest.Close() + assert.Contains(t, b.String(), `"Name":"TestClient_ForPackage"`) + assert.Contains(t, b.String(), "testMetric") +} diff --git a/pkg/beholder/config.go b/pkg/beholder/config.go new file mode 100644 index 000000000..34641185d --- /dev/null +++ b/pkg/beholder/config.go @@ -0,0 +1,68 @@ +package beholder + +import ( + "time" + + otelattr "go.opentelemetry.io/otel/attribute" +) + +type Config struct { + InsecureConnection bool + CACertFile string + OtelExporterGRPCEndpoint string + + // OTel Resource + ResourceAttributes []otelattr.KeyValue + // Message Emitter + EmitterExportTimeout time.Duration + // Batch processing is enabled by default + // Disable it only for testing + EmitterBatchProcessor bool + // OTel Trace + TraceSampleRatio float64 + TraceBatchTimeout time.Duration + // OTel Metric + MetricReaderInterval time.Duration + // OTel Log + LogExportTimeout time.Duration + // Batch processing is enabled by default + // Disable it only for testing + LogBatchProcessor bool +} + +const ( + defaultPackageName = "beholder" +) + +var defaultOtelAttributes = []otelattr.KeyValue{ + otelattr.String("package_name", "beholder"), +} + +func DefaultConfig() Config { + return Config{ + InsecureConnection: true, + CACertFile: "", + OtelExporterGRPCEndpoint: "localhost:4317", + // Resource + ResourceAttributes: defaultOtelAttributes, + // Message Emitter + EmitterExportTimeout: 1 * time.Second, + EmitterBatchProcessor: true, + // Trace + TraceSampleRatio: 1, + TraceBatchTimeout: 1 * time.Second, + // Metric + MetricReaderInterval: 1 * time.Second, + // Log + LogExportTimeout: 1 * time.Second, + LogBatchProcessor: true, + } +} + +func TestDefaultConfig() Config { + config := DefaultConfig() + // Should be only disabled for testing + config.EmitterBatchProcessor = false + config.LogBatchProcessor = false + return config +} diff --git a/pkg/beholder/config_test.go b/pkg/beholder/config_test.go new file mode 100644 index 000000000..edf578310 --- /dev/null +++ b/pkg/beholder/config_test.go @@ -0,0 +1,41 @@ +package beholder_test + +import ( + "fmt" + "time" + + otelattr "go.opentelemetry.io/otel/attribute" + + beholder "github.com/smartcontractkit/chainlink-common/pkg/beholder" +) + +const ( + packageName = "beholder" +) + +func ExampleConfig() { + config := beholder.Config{ + InsecureConnection: true, + CACertFile: "", + OtelExporterGRPCEndpoint: "localhost:4317", + // Resource + ResourceAttributes: []otelattr.KeyValue{ + otelattr.String("package_name", packageName), + otelattr.String("sender", "beholderclient"), + }, + // Message Emitter + EmitterExportTimeout: 1 * time.Second, + EmitterBatchProcessor: true, + // Trace + TraceSampleRatio: 1, + TraceBatchTimeout: 1 * time.Second, + // Metric + MetricReaderInterval: 1 * time.Second, + // Log + LogExportTimeout: 1 * time.Second, + LogBatchProcessor: true, + } + fmt.Printf("%+v", config) + // Output: + // {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:}}] EmitterExportTimeout:1s EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s MetricReaderInterval:1s LogExportTimeout:1s LogBatchProcessor:true} +} diff --git a/pkg/beholder/example_test.go b/pkg/beholder/example_test.go new file mode 100644 index 000000000..92fb0acde --- /dev/null +++ b/pkg/beholder/example_test.go @@ -0,0 +1,124 @@ +package beholder_test + +import ( + "context" + "fmt" + "log" + "math/rand" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + "google.golang.org/protobuf/proto" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/beholder/pb" +) + +func ExampleNewClient() { + ctx := context.Background() + + config := beholder.DefaultConfig() + + // Initialize beholder otel client which sets up OTel components + client, err := beholder.NewClient(ctx, config) + if err != nil { + log.Fatalf("Error creating Beholder client: %v", err) + } + // Handle OTel errors + otel.SetErrorHandler(otel.ErrorHandlerFunc(errorHandler)) + // Set global client so it will be accessible from anywhere through beholder functions + beholder.SetClient(client) + + // Define a custom protobuf payload to emit + payload := &pb.TestCustomMessage{ + BoolVal: true, + IntVal: 42, + FloatVal: 3.14, + StringVal: "Hello, World!", + } + payloadBytes, err := proto.Marshal(payload) + if err != nil { + log.Fatalf("Failed to marshal protobuf") + } + + // Emit the custom message anywhere from application logic + fmt.Println("Emit custom messages") + for range 10 { + err := beholder.GetEmitter().Emit(context.Background(), payloadBytes, + "beholder_data_schema", "/custom-message/versions/1", // required + "beholder_data_type", "custom_message", + "foo", "bar", + ) + if err != nil { + log.Printf("Error emitting message: %v", err) + } + } + // Output: + // Emit custom messages +} + +func ExampleTracer() { + ctx := context.Background() + + config := beholder.DefaultConfig() + + // Initialize beholder otel client which sets up OTel components + client, err := beholder.NewClient(ctx, config) + if err != nil { + log.Fatalf("Error creating Beholder client: %v", err) + } + // Handle OTel errors + otel.SetErrorHandler(otel.ErrorHandlerFunc(errorHandler)) + + // Set global client so it will be accessible from anywhere through beholder functions + beholder.SetClient(client) + + // Define a new counter + counter, err := beholder.GetMeter().Int64Counter("custom_message.count") + if err != nil { + log.Fatalf("failed to create new counter") + } + + // Define a new gauge + gauge, err := beholder.GetMeter().Int64Gauge("custom_message.gauge") + if err != nil { + log.Fatalf("failed to create new gauge") + } + + // Use the counter and gauge for metrics within application logic + fmt.Println("Update metrics") + counter.Add(ctx, 1) + gauge.Record(ctx, rand.Int63n(101)) + + fmt.Println("Create new trace span") + _, rootSpan := beholder.GetTracer().Start(ctx, "foo", trace.WithAttributes( + attribute.String("app_name", "beholderdemo"), + )) + defer rootSpan.End() + // Output: + // Update metrics + // Create new trace span +} + +func ExampleNewNoopClient() { + fmt.Println("Beholder is not initialized. Fall back to Noop OTel Client") + + fmt.Println("Emitting custom message via noop otel client") + + err := beholder.GetEmitter().Emit(context.Background(), []byte("test message"), + "beholder_data_schema", "/custom-message/versions/1", // required + ) + if err != nil { + log.Printf("Error emitting message: %v", err) + } + // Output: + // Beholder is not initialized. Fall back to Noop OTel Client + // Emitting custom message via noop otel client +} + +func errorHandler(e error) { + if e != nil { + log.Printf("otel error: %v", e) + } +} diff --git a/pkg/beholder/global.go b/pkg/beholder/global.go new file mode 100644 index 000000000..b9f2cb366 --- /dev/null +++ b/pkg/beholder/global.go @@ -0,0 +1,64 @@ +package beholder + +import ( + "sync/atomic" + + "go.opentelemetry.io/otel" + otellog "go.opentelemetry.io/otel/log" + otellogglobal "go.opentelemetry.io/otel/log/global" + otelmetric "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/propagation" + oteltrace "go.opentelemetry.io/otel/trace" +) + +// Pointer to the global Beholder Client +var globalClient = defaultClient() + +// SetClient sets the global Beholder Client +func SetClient(client *Client) { + globalClient.Store(client) +} + +// Returns the global Beholder Client +// Its thread-safe and can be used concurrently +func GetClient() *Client { + return globalClient.Load() +} + +func GetLogger() otellog.Logger { + return GetClient().Logger +} + +func GetTracer() oteltrace.Tracer { + return GetClient().Tracer +} + +func GetMeter() otelmetric.Meter { + return GetClient().Meter +} + +func GetEmitter() Emitter { + return GetClient().Emitter +} + +func defaultClient() *atomic.Pointer[Client] { + ptr := &atomic.Pointer[Client]{} + client := NewNoopClient() + ptr.Store(client) + return ptr +} + +// Sets global OTel logger, tracer, meter providers from Client. +// Makes them accessible from anywhere in the code via global otel getters. +// Any package that relies on go.opentelemetry.io will be able to pick up configured global providers +// e.g [otelgrpc](https://pkg.go.dev/go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc#example-NewServerHandler) +func SetGlobalOtelProviders() { + c := GetClient() + // Logger + otellogglobal.SetLoggerProvider(c.LoggerProvider) + // Tracer + otel.SetTracerProvider(c.TracerProvider) + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})) + // Meter + otel.SetMeterProvider(c.MeterProvider) +} diff --git a/pkg/beholder/global_test.go b/pkg/beholder/global_test.go new file mode 100644 index 000000000..1dbc9e373 --- /dev/null +++ b/pkg/beholder/global_test.go @@ -0,0 +1,108 @@ +package beholder_test + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/otel" + otelattribute "go.opentelemetry.io/otel/attribute" + otellog "go.opentelemetry.io/otel/log" + otellogglobal "go.opentelemetry.io/otel/log/global" + otellognoop "go.opentelemetry.io/otel/log/noop" + otelmetric "go.opentelemetry.io/otel/metric" + otelmetricnoop "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/propagation" + oteltrace "go.opentelemetry.io/otel/trace" + oteltracenoop "go.opentelemetry.io/otel/trace/noop" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/beholder/internal/mocks" + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" +) + +func TestGlobal(t *testing.T) { + // Get global logger, tracer, meter, messageEmitter + // If not initialized with beholder.SetClient will return noop client + logger, tracer, meter, messageEmitter := beholder.GetLogger(), beholder.GetTracer(), beholder.GetMeter(), beholder.GetEmitter() + noopClient := beholder.NewNoopClient() + assert.IsType(t, otellognoop.Logger{}, logger) + assert.IsType(t, oteltracenoop.Tracer{}, tracer) + assert.IsType(t, otelmetricnoop.Meter{}, meter) + expectedMessageEmitter := beholder.NewNoopClient().Emitter + assert.IsType(t, expectedMessageEmitter, messageEmitter) + + var noopClientPtr *beholder.Client = noopClient + assert.IsType(t, noopClientPtr, beholder.GetClient()) + assert.NotSame(t, noopClientPtr, beholder.GetClient()) + + // Set beholder client so it will be accessible from anywhere through beholder functions + beholder.SetClient(noopClientPtr) + assert.Same(t, noopClientPtr, beholder.GetClient()) + + // After that use beholder functions to get logger, tracer, meter, messageEmitter + logger, tracer, meter, messageEmitter = beholder.GetLogger(), beholder.GetTracer(), beholder.GetMeter(), beholder.GetEmitter() + + // Emit otel log record + logger.Emit(tests.Context(t), otellog.Record{}) + + // Create trace span + ctx, span := tracer.Start(tests.Context(t), "ExampleGlobalClient", oteltrace.WithAttributes(otelattribute.String("key", "value"))) + defer span.End() + + // Create metric counter + counter, _ := meter.Int64Counter("global_counter") + counter.Add(tests.Context(t), 1) + + // Emit custom message + err := messageEmitter.Emit(ctx, []byte("test"), beholder.Attributes{"key": "value"}) + if err != nil { + t.Fatalf("Error emitting message: %v", err) + } +} + +func TestClient_SetGlobalOtelProviders(t *testing.T) { + exporterMock := mocks.NewOTLPExporter(t) + defer exporterMock.AssertExpectations(t) + + // Restore global providers after test + defer restoreProviders(t, providers{ + otellogglobal.GetLoggerProvider(), + otel.GetTracerProvider(), + otel.GetTextMapPropagator(), + otel.GetMeterProvider(), + }) + + var b strings.Builder + client, err := beholder.NewWriterClient(&b) + assert.NoError(t, err) + // Set global Otel Client + beholder.SetClient(client) + + // Set global otel tracer, meter, logger providers from global beholder otel client + beholder.SetGlobalOtelProviders() + + assert.Equal(t, client.LoggerProvider, otellogglobal.GetLoggerProvider()) + assert.Equal(t, client.TracerProvider, otel.GetTracerProvider()) + assert.Equal(t, client.MeterProvider, otel.GetMeterProvider()) +} + +type providers struct { + loggerProvider otellog.LoggerProvider + tracerProvider oteltrace.TracerProvider + textMapPropagator propagation.TextMapPropagator + meterProvider otelmetric.MeterProvider +} + +func restoreProviders(t *testing.T, p providers) { + otellogglobal.SetLoggerProvider(p.loggerProvider) + otel.SetTracerProvider(p.tracerProvider) + otel.SetTextMapPropagator(p.textMapPropagator) + otel.SetMeterProvider(p.meterProvider) + + assert.Equal(t, p.loggerProvider, otellogglobal.GetLoggerProvider()) + assert.Equal(t, p.tracerProvider, otel.GetTracerProvider()) + assert.Equal(t, p.textMapPropagator, otel.GetTextMapPropagator()) + assert.Equal(t, p.meterProvider, otel.GetMeterProvider()) +} diff --git a/pkg/beholder/internal/exporter.go b/pkg/beholder/internal/exporter.go new file mode 100644 index 000000000..271077a5c --- /dev/null +++ b/pkg/beholder/internal/exporter.go @@ -0,0 +1,18 @@ +package internal + +import ( + "context" + + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc" + sdklog "go.opentelemetry.io/otel/sdk/log" +) + +var _ sdklog.Exporter = (*otlploggrpc.Exporter)(nil) +var _ OTLPExporter = (*otlploggrpc.Exporter)(nil) + +// Copy of sdklog.Exporter interface, used for mocking +type OTLPExporter interface { + Export(ctx context.Context, records []sdklog.Record) error + Shutdown(ctx context.Context) error + ForceFlush(ctx context.Context) error +} diff --git a/pkg/beholder/internal/mocks/otlp_exporter.go b/pkg/beholder/internal/mocks/otlp_exporter.go new file mode 100644 index 000000000..c5feb8aa2 --- /dev/null +++ b/pkg/beholder/internal/mocks/otlp_exporter.go @@ -0,0 +1,177 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +package mocks + +import ( + context "context" + + log "go.opentelemetry.io/otel/sdk/log" + + mock "github.com/stretchr/testify/mock" +) + +// OTLPExporter is an autogenerated mock type for the OTLPExporter type +type OTLPExporter struct { + mock.Mock +} + +type OTLPExporter_Expecter struct { + mock *mock.Mock +} + +func (_m *OTLPExporter) EXPECT() *OTLPExporter_Expecter { + return &OTLPExporter_Expecter{mock: &_m.Mock} +} + +// Export provides a mock function with given fields: ctx, records +func (_m *OTLPExporter) Export(ctx context.Context, records []log.Record) error { + ret := _m.Called(ctx, records) + + if len(ret) == 0 { + panic("no return value specified for Export") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, []log.Record) error); ok { + r0 = rf(ctx, records) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// OTLPExporter_Export_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Export' +type OTLPExporter_Export_Call struct { + *mock.Call +} + +// Export is a helper method to define mock.On call +// - ctx context.Context +// - records []log.Record +func (_e *OTLPExporter_Expecter) Export(ctx interface{}, records interface{}) *OTLPExporter_Export_Call { + return &OTLPExporter_Export_Call{Call: _e.mock.On("Export", ctx, records)} +} + +func (_c *OTLPExporter_Export_Call) Run(run func(ctx context.Context, records []log.Record)) *OTLPExporter_Export_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].([]log.Record)) + }) + return _c +} + +func (_c *OTLPExporter_Export_Call) Return(_a0 error) *OTLPExporter_Export_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *OTLPExporter_Export_Call) RunAndReturn(run func(context.Context, []log.Record) error) *OTLPExporter_Export_Call { + _c.Call.Return(run) + return _c +} + +// ForceFlush provides a mock function with given fields: ctx +func (_m *OTLPExporter) ForceFlush(ctx context.Context) error { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for ForceFlush") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// OTLPExporter_ForceFlush_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ForceFlush' +type OTLPExporter_ForceFlush_Call struct { + *mock.Call +} + +// ForceFlush is a helper method to define mock.On call +// - ctx context.Context +func (_e *OTLPExporter_Expecter) ForceFlush(ctx interface{}) *OTLPExporter_ForceFlush_Call { + return &OTLPExporter_ForceFlush_Call{Call: _e.mock.On("ForceFlush", ctx)} +} + +func (_c *OTLPExporter_ForceFlush_Call) Run(run func(ctx context.Context)) *OTLPExporter_ForceFlush_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *OTLPExporter_ForceFlush_Call) Return(_a0 error) *OTLPExporter_ForceFlush_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *OTLPExporter_ForceFlush_Call) RunAndReturn(run func(context.Context) error) *OTLPExporter_ForceFlush_Call { + _c.Call.Return(run) + return _c +} + +// Shutdown provides a mock function with given fields: ctx +func (_m *OTLPExporter) Shutdown(ctx context.Context) error { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for Shutdown") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// OTLPExporter_Shutdown_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Shutdown' +type OTLPExporter_Shutdown_Call struct { + *mock.Call +} + +// Shutdown is a helper method to define mock.On call +// - ctx context.Context +func (_e *OTLPExporter_Expecter) Shutdown(ctx interface{}) *OTLPExporter_Shutdown_Call { + return &OTLPExporter_Shutdown_Call{Call: _e.mock.On("Shutdown", ctx)} +} + +func (_c *OTLPExporter_Shutdown_Call) Run(run func(ctx context.Context)) *OTLPExporter_Shutdown_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *OTLPExporter_Shutdown_Call) Return(_a0 error) *OTLPExporter_Shutdown_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *OTLPExporter_Shutdown_Call) RunAndReturn(run func(context.Context) error) *OTLPExporter_Shutdown_Call { + _c.Call.Return(run) + return _c +} + +// NewOTLPExporter creates a new instance of OTLPExporter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewOTLPExporter(t interface { + mock.TestingT + Cleanup(func()) +}) *OTLPExporter { + mock := &OTLPExporter{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/beholder/message.go b/pkg/beholder/message.go new file mode 100644 index 000000000..2b1d89d3a --- /dev/null +++ b/pkg/beholder/message.go @@ -0,0 +1,239 @@ +package beholder + +import ( + "fmt" + + "github.com/go-playground/validator/v10" + "go.opentelemetry.io/otel/attribute" + otellog "go.opentelemetry.io/otel/log" +) + +type Message struct { + Attrs Attributes + Body []byte +} + +type Metadata struct { + // REQUIRED FIELDS + // Schema Registry URI to fetch schema + BeholderDataSchema string `validate:"required,uri"` + + // OPTIONAL FIELDS + // The version of the CL node. + NodeVersion string + // mTLS public key for the node operator. This is used as an identity key but with the added benefit of being able to provide signatures. + NodeCsaKey string + // Signature from CSA private key. + NodeCsaSignature string + DonID string + // The RDD network name the CL node is operating with. + NetworkName []string + WorkflowID string + WorkflowName string + WorkflowOwnerAddress string + // Hash of the workflow spec. + WorkflowSpecID string + // The unique execution of a workflow. + WorkflowExecutionID string + // The address for the contract. + CapabilityContractAddress string + CapabilityID string + CapabilityVersion string + CapabilityName string + NetworkChainID string +} + +func (m Metadata) Attributes() Attributes { + return Attributes{ + "node_version": m.NodeVersion, + "node_csa_key": m.NodeCsaKey, + "node_csa_signature": m.NodeCsaSignature, + "don_id": m.DonID, + "network_name": m.NetworkName, + "workflow_id": m.WorkflowID, + "workflow_name": m.WorkflowName, + "workflow_owner_address": m.WorkflowOwnerAddress, + "workflow_spec_id": m.WorkflowSpecID, + "workflow_execution_id": m.WorkflowExecutionID, + "beholder_data_schema": m.BeholderDataSchema, + "capability_contract_address": m.CapabilityContractAddress, + "capability_id": m.CapabilityID, + "capability_version": m.CapabilityVersion, + "capability_name": m.CapabilityName, + "network_chain_id": m.NetworkChainID, + } +} + +type Attributes = map[string]any + +func newAttributes(attrKVs ...any) Attributes { + a := make(Attributes, len(attrKVs)/2) + + l := len(attrKVs) + for i := 0; i < l; { + switch t := attrKVs[i].(type) { + case Attributes: + for k, v := range t { + a[k] = v + } + i++ + case string: + if i+1 >= l { + break + } + val := attrKVs[i+1] + a[t] = val + i += 2 + default: + // Unexpected type + return a + } + } + return a +} + +func NewMessage(body []byte, attrKVs ...any) Message { + return Message{ + Body: body, + Attrs: newAttributes(attrKVs...), + } +} + +func (e *Message) AddAttributes(attrKVs ...any) { + attrs := newAttributes(attrKVs...) + if e.Attrs == nil { + e.Attrs = make(map[string]any, len(attrs)/2) + } + for k, v := range attrs { + e.Attrs[k] = v + } +} + +func (e *Message) OtelRecord() otellog.Record { + return newRecord(e.Body, e.Attrs) +} + +func (e *Message) Copy() Message { + attrs := make(Attributes, len(e.Attrs)) + for k, v := range e.Attrs { + attrs[k] = v + } + c := Message{ + Attrs: attrs, + } + if e.Body != nil { + c.Body = make([]byte, len(e.Body)) + copy(c.Body, e.Body) + } + return c +} + +// Creates otellog.Record from body and attributes +func newRecord(body []byte, attrs map[string]any) otellog.Record { + otelRecord := otellog.Record{} + if body != nil { + otelRecord.SetBody(otellog.BytesValue(body)) + } + for k, v := range attrs { + otelRecord.AddAttributes(OtelAttr(k, v)) + } + return otelRecord +} + +func OtelAttr(key string, value any) otellog.KeyValue { + switch v := value.(type) { + case string: + return otellog.String(key, v) + case []string: + vals := make([]otellog.Value, 0, len(v)) + for _, s := range v { + vals = append(vals, otellog.StringValue(s)) + } + return otellog.Slice(key, vals...) + case int64: + return otellog.Int64(key, v) + case int: + return otellog.Int(key, v) + case float64: + return otellog.Float64(key, v) + case bool: + return otellog.Bool(key, v) + case []byte: + return otellog.Bytes(key, v) + case nil: + return otellog.Empty(key) + case otellog.Value: + return otellog.KeyValue{Key: key, Value: v} + case attribute.Value: + return OtelAttr(key, v.AsInterface()) + default: + return otellog.String(key, fmt.Sprintf("", v, v)) + } +} + +func (e Message) String() string { + return fmt.Sprintf("Message{Attrs: %v, Body: %v}", e.Attrs, e.Body) +} + +// Sets metadata fields from attributes +func (m *Metadata) FromAttributes(attrs Attributes) *Metadata { + for k, v := range attrs { + switch k { + case "node_version": + m.NodeVersion = v.(string) + case "node_csa_key": + m.NodeCsaKey = v.(string) + case "node_csa_signature": + m.NodeCsaSignature = v.(string) + case "don_id": + m.DonID = v.(string) + case "network_name": + m.NetworkName = v.([]string) + case "workflow_id": + m.WorkflowID = v.(string) + case "workflow_name": + m.WorkflowName = v.(string) + case "workflow_owner_address": + m.WorkflowOwnerAddress = v.(string) + case "workflow_spec_id": + m.WorkflowSpecID = v.(string) + case "workflow_execution_id": + m.WorkflowExecutionID = v.(string) + case "beholder_data_schema": + m.BeholderDataSchema = v.(string) + case "capability_contract_address": + m.CapabilityContractAddress = v.(string) + case "capability_id": + m.CapabilityID = v.(string) + case "capability_version": + m.CapabilityVersion = v.(string) + case "capability_name": + m.CapabilityName = v.(string) + case "network_chain_id": + m.NetworkChainID = v.(string) + } + } + return m +} + +func NewMetadata(attrs Attributes) *Metadata { + m := &Metadata{} + m.FromAttributes(attrs) + return m +} + +func (m *Metadata) Validate() error { + validate := validator.New() + return validate.Struct(m) +} + +func (e Message) Validate() error { + if e.Body == nil { + return fmt.Errorf("message body is required") + } + if len(e.Attrs) == 0 { + return fmt.Errorf("message attributes are required") + } + metadata := NewMetadata(e.Attrs) + return metadata.Validate() +} diff --git a/pkg/beholder/message_test.go b/pkg/beholder/message_test.go new file mode 100644 index 000000000..1f8f990fb --- /dev/null +++ b/pkg/beholder/message_test.go @@ -0,0 +1,184 @@ +package beholder_test + +import ( + "fmt" + "slices" + "strings" + "testing" + + "github.com/go-playground/validator/v10" + "github.com/stretchr/testify/assert" + otellog "go.opentelemetry.io/otel/log" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" +) + +func ExampleMessage() { + // Create message with body and attributes + m1 := beholder.NewMessage([]byte{1}, beholder.Attributes{"key_string": "value"}) + fmt.Println("#1", m1) + // Create attributes + additionalAttributes := beholder.Attributes{ + "key_string": "new value", + "key_int32": int32(1), + } + // Add attributes to message + m1.AddAttributes(additionalAttributes) + fmt.Println("#2", m1) + // Create mmpty message struct + m2 := beholder.Message{} + fmt.Println("#3", m2) + // Add attributes to message + m2.AddAttributes(beholder.Attributes{"key_int": 1}) + fmt.Println("#4", m2) + // Update attribute key_int + m2.AddAttributes(beholder.Attributes{"key_int": 2}) + fmt.Println("#5", m2) + // Set message body + m2.Body = []byte("0123") + fmt.Println("#6", m2) + // Reset attributes + m2.Attrs = beholder.Attributes{} + fmt.Println("#7", m2) + // Reset body + m2.Body = nil + fmt.Println("#8", m2) + // Shalow copy of message + m3 := beholder.NewMessage(m1.Body, m1.Attrs) + fmt.Println("#9", m3) + m1.Body[0] = byte(2) // Wil mutate m3 + fmt.Println("#10", m3) + // Deep copy + m4 := m1.Copy() + fmt.Println("#11", m4) + m1.Body[0] = byte(3) // Should not mutate m4 + fmt.Println("#12", m4) + // Create message with mixed attributes: kv pairs and maps + m5 := beholder.NewMessage([]byte{1}, + // Add attributes from the map + map[string]any{ + "key1": "value1", + }, + // Add attributes from KV pair + "key2", "value2", + // Add attributes from Attributes map + beholder.Attributes{"key3": "value3"}, + // Add attributes from KV pair + "key4", "value4", + // Modify key1 + "key1", "value5", + // Modify key2 + map[string]any{ + "key2": "value6", + }, + ) + fmt.Println("#13", m5) + // Create message with no attributes + m6 := beholder.NewMessage([]byte{1}, beholder.Attributes{}) + // Add attributes using AddAttributes + m6.AddAttributes( + "key1", "value1", + "key2", "value2", + ) + fmt.Println("#14", m6) + // Output: + // #1 Message{Attrs: map[key_string:value], Body: [1]} + // #2 Message{Attrs: map[key_int32:1 key_string:new value], Body: [1]} + // #3 Message{Attrs: map[], Body: []} + // #4 Message{Attrs: map[key_int:1], Body: []} + // #5 Message{Attrs: map[key_int:2], Body: []} + // #6 Message{Attrs: map[key_int:2], Body: [48 49 50 51]} + // #7 Message{Attrs: map[], Body: [48 49 50 51]} + // #8 Message{Attrs: map[], Body: []} + // #9 Message{Attrs: map[key_int32:1 key_string:new value], Body: [1]} + // #10 Message{Attrs: map[key_int32:1 key_string:new value], Body: [2]} + // #11 Message{Attrs: map[key_int32:1 key_string:new value], Body: [2]} + // #12 Message{Attrs: map[key_int32:1 key_string:new value], Body: [2]} + // #13 Message{Attrs: map[key1:value5 key2:value6 key3:value3 key4:value4], Body: [1]} + // #14 Message{Attrs: map[key1:value1 key2:value2], Body: [1]} +} + +func testMetadata() beholder.Metadata { + return beholder.Metadata{ + NodeVersion: "v1.0.0", + NodeCsaKey: "test_key", + NodeCsaSignature: "test_signature", + DonID: "test_don_id", + NetworkName: []string{"test_network"}, + WorkflowID: "test_workflow_id", + WorkflowName: "test_workflow_name", + WorkflowOwnerAddress: "test_owner_address", + WorkflowSpecID: "test_spec_id", + WorkflowExecutionID: "test_execution_id", + BeholderDataSchema: "/schemas/ids/test_schema", // required field, URI + CapabilityContractAddress: "test_contract_address", + CapabilityID: "test_capability_id", + CapabilityVersion: "test_capability_version", + CapabilityName: "test_capability_name", + NetworkChainID: "test_chain_id", + } +} +func ExampleMetadata() { + m := testMetadata() + fmt.Printf("%#v\n", m) + fmt.Println(m.Attributes()) + // Output: + // beholder.Metadata{BeholderDataSchema:"/schemas/ids/test_schema", NodeVersion:"v1.0.0", NodeCsaKey:"test_key", NodeCsaSignature:"test_signature", DonID:"test_don_id", NetworkName:[]string{"test_network"}, WorkflowID:"test_workflow_id", WorkflowName:"test_workflow_name", WorkflowOwnerAddress:"test_owner_address", WorkflowSpecID:"test_spec_id", WorkflowExecutionID:"test_execution_id", CapabilityContractAddress:"test_contract_address", CapabilityID:"test_capability_id", CapabilityVersion:"test_capability_version", CapabilityName:"test_capability_name", NetworkChainID:"test_chain_id"} + // map[beholder_data_schema:/schemas/ids/test_schema capability_contract_address:test_contract_address capability_id:test_capability_id capability_name:test_capability_name capability_version:test_capability_version don_id:test_don_id network_chain_id:test_chain_id network_name:[test_network] node_csa_key:test_key node_csa_signature:test_signature node_version:v1.0.0 workflow_execution_id:test_execution_id workflow_id:test_workflow_id workflow_name:test_workflow_name workflow_owner_address:test_owner_address workflow_spec_id:test_spec_id] +} + +func ExampleValidate() { + validate := validator.New() + + metadata := beholder.Metadata{} + if err := validate.Struct(metadata); err != nil { + fmt.Println(err) + } + metadata.BeholderDataSchema = "example.proto" + if err := validate.Struct(metadata); err != nil { + fmt.Println(err) + } + metadata.BeholderDataSchema = "/schemas/ids/test_schema" + if err := validate.Struct(metadata); err != nil { + fmt.Println(err) + } else { + fmt.Println("Metadata is valid") + } + // Output: + // Key: 'Metadata.BeholderDataSchema' Error:Field validation for 'BeholderDataSchema' failed on the 'required' tag + // Key: 'Metadata.BeholderDataSchema' Error:Field validation for 'BeholderDataSchema' failed on the 'uri' tag + // Metadata is valid +} + +func TestAttributesConversion(t *testing.T) { + expected := testMetadata() + attrs := expected.Attributes() + actual := beholder.NewMetadata(attrs) + assert.Equal(t, expected, *actual) +} + +func TestMessage_OtelAttributes(t *testing.T) { + type notSupportedType struct { + value string + } + m := beholder.NewMessage([]byte{1}, beholder.Attributes{ + "key_string": "value", + "key_int": 1, + "not_supported_type": notSupportedType{"not supported type"}, + }) + otelAttrs := make([]otellog.KeyValue, 0, len(m.Attrs)) + for k, v := range m.Attrs { + otelAttrs = append(otelAttrs, beholder.OtelAttr(k, v)) + } + slices.SortFunc(otelAttrs, func(a, b otellog.KeyValue) int { + return strings.Compare(a.Key, b.Key) + }) + + assert.Equal(t, 3, len(otelAttrs)) + assert.Equal(t, "key_int", otelAttrs[0].Key) + assert.Equal(t, int64(1), otelAttrs[0].Value.AsInt64()) + assert.Equal(t, "key_string", otelAttrs[1].Key) + assert.Equal(t, "value", otelAttrs[1].Value.AsString()) + assert.Equal(t, "not_supported_type", otelAttrs[2].Key) + assert.Equal(t, "", otelAttrs[2].Value.AsString()) +} diff --git a/pkg/beholder/noop.go b/pkg/beholder/noop.go new file mode 100644 index 000000000..2238453ec --- /dev/null +++ b/pkg/beholder/noop.go @@ -0,0 +1,139 @@ +package beholder + +import ( + "context" + "errors" + "fmt" + "io" + "os" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/stdout/stdoutlog" + "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" + "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" + otellognoop "go.opentelemetry.io/otel/log/noop" + otelmetricnoop "go.opentelemetry.io/otel/metric/noop" + sdklog "go.opentelemetry.io/otel/sdk/log" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + oteltracenoop "go.opentelemetry.io/otel/trace/noop" +) + +// Default client to fallback when is is not initialized properly +func NewNoopClient() *Client { + cfg := DefaultConfig() + // Logger + loggerProvider := otellognoop.NewLoggerProvider() + logger := loggerProvider.Logger(defaultPackageName) + // Tracer + tracerProvider := oteltracenoop.NewTracerProvider() + tracer := tracerProvider.Tracer(defaultPackageName) + + // Meter + meterProvider := otelmetricnoop.NewMeterProvider() + meter := meterProvider.Meter(defaultPackageName) + + // MessageEmitter + messageEmitter := noopMessageEmitter{} + + client := Client{cfg, logger, tracer, meter, messageEmitter, loggerProvider, tracerProvider, meterProvider, loggerProvider, noopOnClose} + + return &client +} + +// NewStdoutClient creates a new Client with exporters which send telemetry data to standard output +// Used for testing and debugging +func NewStdoutClient() (*Client, error) { + return NewWriterClient(os.Stdout) +} + +// NewWriterClient creates a new Client with otel exporters which send telemetry data to custom io.Writer +func NewWriterClient(w io.Writer) (*Client, error) { + cfg := DefaultWriterClientConfig() + cfg.WithWriter(w) + + // Logger + loggerExporter, err := stdoutlog.New( + append([]stdoutlog.Option{stdoutlog.WithoutTimestamps()}, cfg.LogOptions...)..., + ) + if err != nil { + return NewNoopClient(), err + } + loggerProvider := sdklog.NewLoggerProvider(sdklog.WithProcessor(sdklog.NewSimpleProcessor(loggerExporter))) + logger := loggerProvider.Logger(defaultPackageName) + otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) { + fmt.Printf("OTel error %s", err) + })) + + // Tracer + traceExporter, err := stdouttrace.New(cfg.TraceOptions...) + if err != nil { + return NewNoopClient(), err + } + + tracerProvider := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor( + sdktrace.NewSimpleSpanProcessor(traceExporter), + )) + tracer := tracerProvider.Tracer(defaultPackageName) + + // Meter + metricExporter, err := stdoutmetric.New(cfg.MetricOptions...) + if err != nil { + return NewNoopClient(), err + } + meterProvider := sdkmetric.NewMeterProvider( + sdkmetric.WithReader( + sdkmetric.NewPeriodicReader( + metricExporter, + sdkmetric.WithInterval(100*time.Millisecond), // Default is 10s + )), + ) + meter := meterProvider.Meter(defaultPackageName) + + // MessageEmitter + emitter := messageEmitter{messageLogger: logger} + + onClose := func() (err error) { + for _, provider := range []shutdowner{loggerProvider, tracerProvider, meterProvider} { + err = errors.Join(err, provider.Shutdown(context.Background())) + } + return + } + + client := Client{cfg.Config, logger, tracer, meter, emitter, loggerProvider, tracerProvider, meterProvider, loggerProvider, onClose} + + return &client, nil +} + +type noopMessageEmitter struct{} + +func (noopMessageEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error { + return nil +} +func (noopMessageEmitter) EmitMessage(ctx context.Context, message Message) error { + return nil +} + +func noopOnClose() error { + return nil +} + +type writerClientConfig struct { + Config + LogOptions []stdoutlog.Option + TraceOptions []stdouttrace.Option + MetricOptions []stdoutmetric.Option +} + +func DefaultWriterClientConfig() writerClientConfig { + return writerClientConfig{ + Config: DefaultConfig(), + } +} + +func (cfg *writerClientConfig) WithWriter(w io.Writer) { + cfg.LogOptions = append(cfg.LogOptions, stdoutlog.WithWriter(w)) + cfg.TraceOptions = append(cfg.TraceOptions, stdouttrace.WithWriter(w)) + cfg.MetricOptions = append(cfg.MetricOptions, stdoutmetric.WithWriter(w)) +} diff --git a/pkg/beholder/noop_test.go b/pkg/beholder/noop_test.go new file mode 100644 index 000000000..ee1fb7209 --- /dev/null +++ b/pkg/beholder/noop_test.go @@ -0,0 +1,55 @@ +package beholder + +import ( + "context" + "log" + "math/rand" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/otel/attribute" + otellog "go.opentelemetry.io/otel/log" + "go.opentelemetry.io/otel/trace" + + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" +) + +func TestNoopClient(t *testing.T) { + noopClient := NewNoopClient() + assert.NotNil(t, noopClient) + + // Message Emitter + err := noopClient.Emitter.Emit(tests.Context(t), []byte("test"), + "key1", "value1", + ) + assert.NoError(t, err) + + // Logger + noopClient.Logger.Emit(tests.Context(t), otellog.Record{}) + + // Define a new counter + counter, err := noopClient.Meter.Int64Counter("custom_message.count") + if err != nil { + log.Fatalf("failed to create new counter") + } + + // Define a new gauge + gauge, err := noopClient.Meter.Int64Gauge("custom_message.gauge") + if err != nil { + log.Fatalf("failed to create new gauge") + } + assert.NoError(t, err) + + // Use the counter and gauge for metrics within application logic + counter.Add(tests.Context(t), 1) + gauge.Record(tests.Context(t), rand.Int63n(101)) + + // Create a new trace span + _, rootSpan := noopClient.Tracer.Start(context.Background(), "foo", trace.WithAttributes( + attribute.String("app_name", "beholderdemo"), + )) + rootSpan.End() + + err = noopClient.Close() + assert.NoError(t, err) +} diff --git a/pkg/beholder/pb/example.pb.go b/pkg/beholder/pb/example.pb.go new file mode 100644 index 000000000..074b49f27 --- /dev/null +++ b/pkg/beholder/pb/example.pb.go @@ -0,0 +1,185 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.31.0 +// protoc v4.25.1 +// source: example.proto + +package pb + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// Used for testing +type TestCustomMessage struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + BoolVal bool `protobuf:"varint,1,opt,name=bool_val,json=boolVal,proto3" json:"bool_val,omitempty"` + IntVal int64 `protobuf:"varint,2,opt,name=int_val,json=intVal,proto3" json:"int_val,omitempty"` + FloatVal float32 `protobuf:"fixed32,3,opt,name=float_val,json=floatVal,proto3" json:"float_val,omitempty"` + StringVal string `protobuf:"bytes,4,opt,name=string_val,json=stringVal,proto3" json:"string_val,omitempty"` + BytesVal []byte `protobuf:"bytes,5,opt,name=bytes_val,json=bytesVal,proto3" json:"bytes_val,omitempty"` +} + +func (x *TestCustomMessage) Reset() { + *x = TestCustomMessage{} + if protoimpl.UnsafeEnabled { + mi := &file_example_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *TestCustomMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TestCustomMessage) ProtoMessage() {} + +func (x *TestCustomMessage) ProtoReflect() protoreflect.Message { + mi := &file_example_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TestCustomMessage.ProtoReflect.Descriptor instead. +func (*TestCustomMessage) Descriptor() ([]byte, []int) { + return file_example_proto_rawDescGZIP(), []int{0} +} + +func (x *TestCustomMessage) GetBoolVal() bool { + if x != nil { + return x.BoolVal + } + return false +} + +func (x *TestCustomMessage) GetIntVal() int64 { + if x != nil { + return x.IntVal + } + return 0 +} + +func (x *TestCustomMessage) GetFloatVal() float32 { + if x != nil { + return x.FloatVal + } + return 0 +} + +func (x *TestCustomMessage) GetStringVal() string { + if x != nil { + return x.StringVal + } + return "" +} + +func (x *TestCustomMessage) GetBytesVal() []byte { + if x != nil { + return x.BytesVal + } + return nil +} + +var File_example_proto protoreflect.FileDescriptor + +var file_example_proto_rawDesc = []byte{ + 0x0a, 0x0d, 0x65, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, + 0x02, 0x70, 0x62, 0x22, 0xa0, 0x01, 0x0a, 0x11, 0x54, 0x65, 0x73, 0x74, 0x43, 0x75, 0x73, 0x74, + 0x6f, 0x6d, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x19, 0x0a, 0x08, 0x62, 0x6f, 0x6f, + 0x6c, 0x5f, 0x76, 0x61, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x62, 0x6f, 0x6f, + 0x6c, 0x56, 0x61, 0x6c, 0x12, 0x17, 0x0a, 0x07, 0x69, 0x6e, 0x74, 0x5f, 0x76, 0x61, 0x6c, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x69, 0x6e, 0x74, 0x56, 0x61, 0x6c, 0x12, 0x1b, 0x0a, + 0x09, 0x66, 0x6c, 0x6f, 0x61, 0x74, 0x5f, 0x76, 0x61, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x02, + 0x52, 0x08, 0x66, 0x6c, 0x6f, 0x61, 0x74, 0x56, 0x61, 0x6c, 0x12, 0x1d, 0x0a, 0x0a, 0x73, 0x74, + 0x72, 0x69, 0x6e, 0x67, 0x5f, 0x76, 0x61, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, + 0x73, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x56, 0x61, 0x6c, 0x12, 0x1b, 0x0a, 0x09, 0x62, 0x79, 0x74, + 0x65, 0x73, 0x5f, 0x76, 0x61, 0x6c, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x08, 0x62, 0x79, + 0x74, 0x65, 0x73, 0x56, 0x61, 0x6c, 0x42, 0x3e, 0x5a, 0x3c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, + 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x6d, 0x61, 0x72, 0x74, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x61, + 0x63, 0x74, 0x6b, 0x69, 0x74, 0x2f, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x6c, 0x69, 0x6e, 0x6b, 0x2d, + 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x62, 0x65, 0x68, 0x6f, 0x6c, + 0x64, 0x65, 0x72, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_example_proto_rawDescOnce sync.Once + file_example_proto_rawDescData = file_example_proto_rawDesc +) + +func file_example_proto_rawDescGZIP() []byte { + file_example_proto_rawDescOnce.Do(func() { + file_example_proto_rawDescData = protoimpl.X.CompressGZIP(file_example_proto_rawDescData) + }) + return file_example_proto_rawDescData +} + +var file_example_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_example_proto_goTypes = []interface{}{ + (*TestCustomMessage)(nil), // 0: pb.TestCustomMessage +} +var file_example_proto_depIdxs = []int32{ + 0, // [0:0] is the sub-list for method output_type + 0, // [0:0] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_example_proto_init() } +func file_example_proto_init() { + if File_example_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_example_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*TestCustomMessage); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_example_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_example_proto_goTypes, + DependencyIndexes: file_example_proto_depIdxs, + MessageInfos: file_example_proto_msgTypes, + }.Build() + File_example_proto = out.File + file_example_proto_rawDesc = nil + file_example_proto_goTypes = nil + file_example_proto_depIdxs = nil +} diff --git a/pkg/beholder/pb/example.proto b/pkg/beholder/pb/example.proto new file mode 100644 index 000000000..223770ff8 --- /dev/null +++ b/pkg/beholder/pb/example.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +option go_package = "github.com/smartcontractkit/chainlink-common/pkg/beholder/pb"; + +package pb; + +// Used for testing +message TestCustomMessage { + bool bool_val=1; + int64 int_val=2; + float float_val=3; + string string_val=4; + bytes bytes_val=5; +} \ No newline at end of file diff --git a/pkg/beholder/pb/generate.go b/pkg/beholder/pb/generate.go new file mode 100644 index 000000000..697c33637 --- /dev/null +++ b/pkg/beholder/pb/generate.go @@ -0,0 +1,3 @@ +//go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative example.proto + +package pb