From 9f248d1dc9f1b49ab963c3e3f67b9c6b79f644a3 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Fri, 3 Nov 2023 14:06:20 -0500 Subject: [PATCH] instrumented flytepropeller Signed-off-by: Daniel Rammer --- flytepropeller/cmd/controller/cmd/root.go | 10 + flytepropeller/events/admin_eventsink.go | 11 +- flytepropeller/go.mod | 28 +- flytepropeller/go.sum | 43 ++- flytepropeller/pkg/controller/handler.go | 290 ++++++++++-------- .../nodes/catalog/datacatalog/datacatalog.go | 9 +- .../pkg/controller/nodes/executor.go | 4 + .../pkg/controller/nodes/task/handler.go | 4 + go.mod | 1 + go.sum | 4 + 10 files changed, 250 insertions(+), 154 deletions(-) diff --git a/flytepropeller/cmd/controller/cmd/root.go b/flytepropeller/cmd/controller/cmd/root.go index 6be97b0c9b..15a26acb40 100644 --- a/flytepropeller/cmd/controller/cmd/root.go +++ b/flytepropeller/cmd/controller/cmd/root.go @@ -28,6 +28,7 @@ import ( "github.com/flyteorg/flyte/flytestdlib/config/viper" "github.com/flyteorg/flyte/flytestdlib/contextutils" "github.com/flyteorg/flyte/flytestdlib/logger" + "github.com/flyteorg/flyte/flytestdlib/otelutils" "github.com/flyteorg/flyte/flytestdlib/profutils" "github.com/flyteorg/flyte/flytestdlib/promutils" "github.com/flyteorg/flyte/flytestdlib/promutils/labeled" @@ -119,6 +120,15 @@ func executeRootCmd(baseCtx context.Context, cfg *config2.Config) error { labeled.SetMetricKeys(keys...) } + // register opentelementry tracer providers + for _, serviceName := range []string{otelutils.AdminClientTracer, otelutils.BlobstoreClientTracer, + otelutils.DataCatalogClientTracer, otelutils.FlytePropellerTracer, otelutils.K8sClientTracer} { + if err := otelutils.RegisterTracerProvider(serviceName, otelutils.GetConfig()); err != nil { + logger.Errorf(ctx, "Failed to create otel tracer provider. %v", err) + return err + } + } + // Add the propeller subscope because the MetricsPrefix only has "flyte:" to get uniform collection of metrics. propellerScope := promutils.NewScope(cfg.MetricsPrefix).NewSubScope("propeller").NewSubScope(cfg.LimitNamespace) limitNamespace := "" diff --git a/flytepropeller/events/admin_eventsink.go b/flytepropeller/events/admin_eventsink.go index beeb3d35f7..f79531b7d1 100644 --- a/flytepropeller/events/admin_eventsink.go +++ b/flytepropeller/events/admin_eventsink.go @@ -5,7 +5,9 @@ import ( "fmt" "github.com/golang/protobuf/proto" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "golang.org/x/time/rate" + "google.golang.org/grpc" admin2 "github.com/flyteorg/flyte/flyteidl/clients/go/admin" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" @@ -14,6 +16,7 @@ import ( "github.com/flyteorg/flyte/flytepropeller/events/errors" "github.com/flyteorg/flyte/flytestdlib/fastcheck" "github.com/flyteorg/flyte/flytestdlib/logger" + "github.com/flyteorg/flyte/flytestdlib/otelutils" "github.com/flyteorg/flyte/flytestdlib/promutils" ) @@ -126,7 +129,13 @@ func IDFromMessage(message proto.Message) ([]byte, error) { func initializeAdminClientFromConfig(ctx context.Context) (client service.AdminServiceClient, err error) { cfg := admin2.GetConfig(ctx) - clients, err := admin2.NewClientsetBuilder().WithConfig(cfg).Build(ctx) + tracerProvider := otelutils.GetTracerProvider(otelutils.AdminClientTracer) + opt := grpc.WithUnaryInterceptor( + otelgrpc.UnaryClientInterceptor( + otelgrpc.WithTracerProvider(tracerProvider), + ), + ) + clients, err := admin2.NewClientsetBuilder().WithDialOptions(opt).WithConfig(cfg).Build(ctx) if err != nil { return nil, fmt.Errorf("failed to initialize clientset. Error: %w", err) } diff --git a/flytepropeller/go.mod b/flytepropeller/go.mod index 673dd9aa7c..cd3c9b1865 100644 --- a/flytepropeller/go.mod +++ b/flytepropeller/go.mod @@ -25,18 +25,20 @@ require ( github.com/spf13/cobra v1.7.0 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.4 + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.35.0 + go.opentelemetry.io/otel/trace v1.19.0 golang.org/x/exp v0.0.0-20231005195138-3e424a577f31 golang.org/x/sync v0.2.0 golang.org/x/time v0.3.0 google.golang.org/grpc v1.56.1 google.golang.org/protobuf v1.30.0 - k8s.io/api v0.28.2 + k8s.io/api v0.28.3 k8s.io/apiextensions-apiserver v0.28.0 - k8s.io/apimachinery v0.28.2 - k8s.io/client-go v0.28.1 + k8s.io/apimachinery v0.28.3 + k8s.io/client-go v0.28.3 k8s.io/klog v1.0.0 k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 - sigs.k8s.io/controller-runtime v0.12.1 + sigs.k8s.io/controller-runtime v0.16.3 ) require ( @@ -71,12 +73,13 @@ require ( github.com/coocood/freecache v1.1.1 // indirect github.com/dask/dask-kubernetes/v2023 v2023.0.0-20230626103304-abd02cd17b26 // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/emicklei/go-restful/v3 v3.9.0 // indirect + github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/evanphx/json-patch v5.6.0+incompatible // indirect github.com/evanphx/json-patch/v5 v5.6.0 // indirect github.com/flyteorg/stow v0.3.7 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/go-logr/logr v1.2.4 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/go-openapi/jsonpointer v0.19.6 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect github.com/go-openapi/swag v0.22.3 // indirect @@ -121,12 +124,17 @@ require ( github.com/stretchr/objx v0.5.0 // indirect github.com/subosito/gotenv v1.2.0 // indirect go.opencensus.io v0.24.0 // indirect - golang.org/x/crypto v0.11.0 // indirect - golang.org/x/net v0.13.0 // indirect + go.opentelemetry.io/otel v1.19.0 // indirect + go.opentelemetry.io/otel/exporters/jaeger v1.17.0 // indirect + go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.19.0 // indirect + go.opentelemetry.io/otel/metric v1.19.0 // indirect + go.opentelemetry.io/otel/sdk v1.19.0 // indirect + golang.org/x/crypto v0.14.0 // indirect + golang.org/x/net v0.17.0 // indirect golang.org/x/oauth2 v0.8.0 // indirect - golang.org/x/sys v0.12.0 // indirect - golang.org/x/term v0.10.0 // indirect - golang.org/x/text v0.11.0 // indirect + golang.org/x/sys v0.13.0 // indirect + golang.org/x/term v0.13.0 // indirect + golang.org/x/text v0.13.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect google.golang.org/api v0.114.0 // indirect diff --git a/flytepropeller/go.sum b/flytepropeller/go.sum index 6949600a0a..4f65198ebc 100644 --- a/flytepropeller/go.sum +++ b/flytepropeller/go.sum @@ -119,6 +119,7 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4 h1:/inchEIKaYC1Akx+H+gqO04wryn5h75LSazbRlnya1k= github.com/coocood/freecache v1.1.1 h1:uukNF7QKCZEdZ9gAV7WQzvh0SbjwdMF6m3x3rxEkaPc= github.com/coocood/freecache v1.1.1/go.mod h1:OKrEjkGVoxZhyWAJoeFi5BMLUJm2Tit0kpGkIr7NGYY= github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= @@ -130,14 +131,15 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dnaeon/go-vcr v1.1.0 h1:ReYa/UBrRyQdant9B4fNHGoCNKw6qh6P0fsdGmZpR7c= github.com/dnaeon/go-vcr v1.1.0/go.mod h1:M7tiix8f0r6mKKJ3Yq/kqU1OYf3MnfmBWVbPx/yU9ko= -github.com/emicklei/go-restful/v3 v3.9.0 h1:XwGDlfxEnQZzuopoqxwSEllNcCOM9DhhFyhFIIGKwxE= -github.com/emicklei/go-restful/v3 v3.9.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= +github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g= +github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po= github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/envoyproxy/protoc-gen-validate v0.10.1 h1:c0g45+xCJhdgFGw7a5QAfdS4byAbud7miNWJ1WwEVf8= github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCvpL6mnFh5mB2/l16U= github.com/evanphx/json-patch v5.6.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJCLunww= @@ -157,8 +159,11 @@ github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-logr/zapr v1.2.4 h1:QHVo+6stLbfJmYGkQ7uGHUCu5hnAFAj6mDe6Ea0SeOo= github.com/go-openapi/jsonpointer v0.19.6 h1:eCs3fxoIi3Wh6vtgmLTOjdhSpiqphQ+DaPn38N2ZdrE= github.com/go-openapi/jsonpointer v0.19.6/go.mod h1:osyAmYz/mB/C3I+WsTTSgw1ONzaLJoLCyoi6/zppojs= @@ -408,6 +413,20 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.35.0 h1:xFSRQBbXF6VvYRf2lqMJXxoB72XI1K/azav8TekHHSw= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.35.0/go.mod h1:h8TWwRAhQpOd0aM5nYsRD8+flnkj+526GEIVlarH7eY= +go.opentelemetry.io/otel v1.19.0 h1:MuS/TNf4/j4IXsZuJegVzI1cwut7Qc00344rgH7p8bs= +go.opentelemetry.io/otel v1.19.0/go.mod h1:i0QyjOq3UPoTzff0PJB2N66fb4S0+rSbSB15/oyH9fY= +go.opentelemetry.io/otel/exporters/jaeger v1.17.0 h1:D7UpUy2Xc2wsi1Ras6V40q806WM07rqoCWzXu7Sqy+4= +go.opentelemetry.io/otel/exporters/jaeger v1.17.0/go.mod h1:nPCqOnEH9rNLKqH/+rrUjiMzHJdV1BlpKcTwRTyKkKI= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.19.0 h1:Nw7Dv4lwvGrI68+wULbcq7su9K2cebeCUrDjVrUJHxM= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.19.0/go.mod h1:1MsF6Y7gTqosgoZvHlzcaaM8DIMNZgJh87ykokoNH7Y= +go.opentelemetry.io/otel/metric v1.19.0 h1:aTzpGtV0ar9wlV4Sna9sdJyII5jTVJEvKETPiOKwvpE= +go.opentelemetry.io/otel/metric v1.19.0/go.mod h1:L5rUsV9kM1IxCj1MmSdS+JQAcVm319EUrDVLrt7jqt8= +go.opentelemetry.io/otel/sdk v1.19.0 h1:6USY6zH+L8uMH8L3t1enZPR3WFEmSTADlqldyHtJi3o= +go.opentelemetry.io/otel/sdk v1.19.0/go.mod h1:NedEbbS4w3C6zElbLdPJKOpJQOrGUJ+GfzpjUvI0v1A= +go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1Dzxpg= +go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= @@ -423,8 +442,8 @@ golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA= -golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= +golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -495,8 +514,8 @@ golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= -golang.org/x/net v0.13.0 h1:Nvo8UFsZ8X3BhAC9699Z1j7XQ3rsZnUUm7jfBEk1ueY= -golang.org/x/net v0.13.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= +golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -563,12 +582,12 @@ golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= -golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.10.0 h1:3R7pNqamzBraeqj/Tj8qt1aQ2HpmlC+Cx/qL/7hn4/c= -golang.org/x/term v0.10.0/go.mod h1:lpqdcUyK/oCiQxvxVrppt5ggO2KCZ5QblwqPnfZ6d5o= +golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek= +golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -577,8 +596,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4= -golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/flytepropeller/pkg/controller/handler.go b/flytepropeller/pkg/controller/handler.go index 7a01181e2a..4b00953410 100644 --- a/flytepropeller/pkg/controller/handler.go +++ b/flytepropeller/pkg/controller/handler.go @@ -8,6 +8,7 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + "go.opentelemetry.io/otel/trace" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" @@ -19,6 +20,7 @@ import ( "github.com/flyteorg/flyte/flytepropeller/pkg/controller/workflowstore" "github.com/flyteorg/flyte/flytestdlib/contextutils" "github.com/flyteorg/flyte/flytestdlib/logger" + "github.com/flyteorg/flyte/flytestdlib/otelutils" "github.com/flyteorg/flyte/flytestdlib/promutils" "github.com/flyteorg/flyte/flytestdlib/promutils/labeled" "github.com/flyteorg/flyte/flytestdlib/storage" @@ -176,11 +178,17 @@ func (p *Propeller) TryMutateWorkflow(ctx context.Context, originalW *v1alpha1.F // // func (p *Propeller) Handle(ctx context.Context, namespace, name string) error { + var span trace.Span + ctx, span = otelutils.NewSpan(ctx, otelutils.FlytePropellerTracer, "pkg.controller.Propeller/Handle") + defer span.End() + logger.Infof(ctx, "Processing Workflow.") defer logger.Infof(ctx, "Completed processing workflow.") // Get the FlyteWorkflow resource with this namespace/name + _, wfStoreGetSpan := otelutils.NewSpan(ctx, otelutils.FlytePropellerTracer, "WorkflowStore.Get") w, fetchErr := p.wfStore.Get(ctx, namespace, name) + wfStoreGetSpan.End() if fetchErr != nil { if workflowstore.IsNotFound(fetchErr) { p.metrics.WorkflowNotFound.Inc() @@ -213,25 +221,12 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error { // static fields to the blobstore to reduce CRD size. we must read and parse the workflow // closure so that these fields may be temporarily repopulated. var wfClosureCrdFields *k8s.WfClosureCrdFields + var err error if len(w.WorkflowClosureReference) > 0 { - t := p.metrics.WorkflowClosureReadTime.Start(ctx) - - wfClosure := &admin.WorkflowClosure{} - err := p.store.ReadProtobuf(ctx, w.WorkflowClosureReference, wfClosure) - if err != nil { - t.Stop() - logger.Errorf(ctx, "Failed to retrieve workflow closure data from '%s' with error '%s'", w.WorkflowClosureReference, err) - return err - } - - wfClosureCrdFields, err = k8s.BuildWfClosureCrdFields(wfClosure.CompiledWorkflow) + wfClosureCrdFields, err = p.parseWorkflowClosureCrdFields(ctx, w.WorkflowClosureReference) if err != nil { - t.Stop() - logger.Errorf(ctx, "Failed to parse workflow closure data from '%s' with error '%s'", w.WorkflowClosureReference, err) return err } - - t.Stop() } streak := 0 @@ -243,140 +238,175 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error { } for streak = 0; streak < maxLength; streak++ { - // if the wfClosureCrdFields struct is not nil then it contains static workflow data which - // has been offloaded to the blobstore. we must set these fields so they're available - // during workflow processing and immediately remove them afterwards so they do not - // accidentally get written to the workflow store once the new state is stored. - if wfClosureCrdFields != nil { - w.WorkflowSpec = wfClosureCrdFields.WorkflowSpec - w.Tasks = wfClosureCrdFields.Tasks - w.SubWorkflows = wfClosureCrdFields.SubWorkflows + w, err = p.streak(ctx, w, wfClosureCrdFields) + if err != nil { + return err + } else if w == nil { + break } - t := p.metrics.RoundTime.Start(ctx) - mutatedWf, err := p.TryMutateWorkflow(ctx, w) + logger.Infof(ctx, "FastFollow Enabled. Detected State change, we will try another round. StreakLength [%d]", streak) + } + logger.Infof(ctx, "Streak ended at [%d]/Max: [%d]", streak, maxLength) + return nil +} - if wfClosureCrdFields != nil { - // strip data populated from WorkflowClosureReference - w.SubWorkflows, w.Tasks, w.WorkflowSpec = nil, nil, nil - if mutatedWf != nil { - mutatedWf.SubWorkflows, mutatedWf.Tasks, mutatedWf.WorkflowSpec = nil, nil, nil - } +// parseWorkflowClosureCrdFields attempts to retrieve offloaded static workflow closure data from the specified +// DataReference. +func (p *Propeller) parseWorkflowClosureCrdFields(ctx context.Context, dataReference storage.DataReference) (*k8s.WfClosureCrdFields, error) { + _, span := otelutils.NewSpan(ctx, otelutils.FlytePropellerTracer, "pkg.controller.Propeller/parseWorkflowClosureCrdFields") + defer span.End() + + t := p.metrics.WorkflowClosureReadTime.Start(ctx) + defer t.Stop() + + wfClosure := &admin.WorkflowClosure{} + err := p.store.ReadProtobuf(ctx, dataReference, wfClosure) + if err != nil { + logger.Errorf(ctx, "Failed to retrieve workflow closure data from '%s' with error '%s'", dataReference, err) + return nil, err + } + + wfClosureCrdFields, err := k8s.BuildWfClosureCrdFields(wfClosure.CompiledWorkflow) + if err != nil { + logger.Errorf(ctx, "Failed to parse workflow closure data from '%s' with error '%s'", dataReference, err) + return nil, err + } + + return wfClosureCrdFields, nil +} + +// streak performs a single iteration of mutating a workflow returning the newly mutated workflow on success or nil if +// the workflow was not updated. +func (p *Propeller) streak(ctx context.Context, w *v1alpha1.FlyteWorkflow, wfClosureCrdFields *k8s.WfClosureCrdFields) (*v1alpha1.FlyteWorkflow, error) { + ctx, span := otelutils.NewSpan(ctx, otelutils.FlytePropellerTracer, "pkg.controller.Propeller/streak") + defer span.End() + + t := p.metrics.RoundTime.Start(ctx) + defer t.Stop() + + // if the wfClosureCrdFields struct is not nil then it contains static workflow data which + // has been offloaded to the blobstore. we must set these fields so they're available + // during workflow processing and immediately remove them afterwards so they do not + // accidentally get written to the workflow store once the new state is stored. + if wfClosureCrdFields != nil { + w.WorkflowSpec = wfClosureCrdFields.WorkflowSpec + w.Tasks = wfClosureCrdFields.Tasks + w.SubWorkflows = wfClosureCrdFields.SubWorkflows + } + + mutatedWf, err := p.TryMutateWorkflow(ctx, w) + + if wfClosureCrdFields != nil { + // strip data populated from WorkflowClosureReference + w.SubWorkflows, w.Tasks, w.WorkflowSpec = nil, nil, nil + if mutatedWf != nil { + mutatedWf.SubWorkflows, mutatedWf.Tasks, mutatedWf.WorkflowSpec = nil, nil, nil } + } - if err != nil { - // NOTE We are overriding the deepcopy here, as we are essentially ignoring all mutations - // We only want to increase failed attempts and discard any other partial changes to the CRD. - mutatedWf = RecordSystemError(w, err) - p.metrics.SystemError.Inc(ctx) - } else if mutatedWf == nil { - logger.Errorf(ctx, "Should not happen! Mutation resulted in a nil workflow!") - return nil - } else { - if !w.GetExecutionStatus().IsTerminated() { - // No updates in the status we detected, we will skip writing to KubeAPI - if mutatedWf.Status.Equals(&w.Status) { - logger.Info(ctx, "WF hasn't been updated in this round.") - t.Stop() - return nil - } - } - if mutatedWf.GetExecutionStatus().IsTerminated() { - // If the end result is a terminated workflow, we remove the labels - // We add a completed label so that we can avoid polling for this workflow - SetCompletedLabel(mutatedWf, time.Now()) - ResetFinalizers(mutatedWf) + if err != nil { + // NOTE We are overriding the deepcopy here, as we are essentially ignoring all mutations + // We only want to increase failed attempts and discard any other partial changes to the CRD. + mutatedWf = RecordSystemError(w, err) + p.metrics.SystemError.Inc(ctx) + } else if mutatedWf == nil { + logger.Errorf(ctx, "Should not happen! Mutation resulted in a nil workflow!") + return nil, nil + } else { + if !w.GetExecutionStatus().IsTerminated() { + // No updates in the status we detected, we will skip writing to KubeAPI + if mutatedWf.Status.Equals(&w.Status) { + logger.Info(ctx, "WF hasn't been updated in this round.") + return nil, nil } } + if mutatedWf.GetExecutionStatus().IsTerminated() { + // If the end result is a terminated workflow, we remove the labels + // We add a completed label so that we can avoid polling for this workflow + SetCompletedLabel(mutatedWf, time.Now()) + ResetFinalizers(mutatedWf) + } + } - // ExecutionNotFound error is returned when flyteadmin is missing the workflow. This is not - // a valid state unless we are experiencing a race condition where the workflow has not yet - // been inserted into the db (ie. workflow phase is WorkflowPhaseReady). - if err != nil && eventsErr.IsNotFound(err) && w.GetExecutionStatus().GetPhase() != v1alpha1.WorkflowPhaseReady { - t.Stop() - logger.Errorf(ctx, "Failed to process workflow, failing: %s", err) + // ExecutionNotFound error is returned when flyteadmin is missing the workflow. This is not + // a valid state unless we are experiencing a race condition where the workflow has not yet + // been inserted into the db (ie. workflow phase is WorkflowPhaseReady). + if err != nil && eventsErr.IsNotFound(err) && w.GetExecutionStatus().GetPhase() != v1alpha1.WorkflowPhaseReady { + logger.Errorf(ctx, "Failed to process workflow, failing: %s", err) + + // We set the workflow status to failing to abort any active tasks in the next round. + mutableW := w.DeepCopy() + mutableW.Status.UpdatePhase(v1alpha1.WorkflowPhaseFailing, "Workflow execution is missing in flyteadmin, aborting", &core.ExecutionError{ + Kind: core.ExecutionError_SYSTEM, + Code: "ExecutionNotFound", + Message: "Workflow execution not found in flyteadmin.", + }) + if _, e := p.wfStore.Update(ctx, mutableW, workflowstore.PriorityClassCritical); e != nil { + logger.Errorf(ctx, "Failed to record an ExecutionNotFound workflow as failed, reason: %s. Retrying...", e) + return nil, e + } + return nil, nil + } - // We set the workflow status to failing to abort any active tasks in the next round. - mutableW := w.DeepCopy() - mutableW.Status.UpdatePhase(v1alpha1.WorkflowPhaseFailing, "Workflow execution is missing in flyteadmin, aborting", &core.ExecutionError{ - Kind: core.ExecutionError_SYSTEM, - Code: "ExecutionNotFound", - Message: "Workflow execution not found in flyteadmin.", - }) - if _, e := p.wfStore.Update(ctx, mutableW, workflowstore.PriorityClassCritical); e != nil { - logger.Errorf(ctx, "Failed to record an ExecutionNotFound workflow as failed, reason: %s. Retrying...", e) - return e - } - return nil + // Incompatible cluster means that another cluster has been designated to handle this workflow execution. + // We should early abort in this case, since any events originating from this cluster for this execution will + // be rejected. + if err != nil && eventsErr.IsEventIncompatibleClusterError(err) { + logger.Errorf(ctx, "No longer designated to process workflow, failing: %s", err) + + // We set the workflow status to failing to abort any active tasks in the next round. + mutableW := w.DeepCopy() + mutableW.Status.UpdatePhase(v1alpha1.WorkflowPhaseFailing, "Workflow execution cluster reassigned, aborting", &core.ExecutionError{ + Kind: core.ExecutionError_SYSTEM, + Code: string(eventsErr.EventIncompatibleCusterError), + Message: fmt.Sprintf("Workflow execution cluster reassigned: %v", err), + }) + if _, e := p.wfStore.Update(ctx, mutableW, workflowstore.PriorityClassCritical); e != nil { + logger.Errorf(ctx, "Failed to record an EventIncompatibleClusterError workflow as failed, reason: %s. Retrying...", e) + return nil, e } + return nil, nil + } - // Incompatible cluster means that another cluster has been designated to handle this workflow execution. - // We should early abort in this case, since any events originating from this cluster for this execution will - // be rejected. - if err != nil && eventsErr.IsEventIncompatibleClusterError(err) { - t.Stop() - logger.Errorf(ctx, "No longer designated to process workflow, failing: %s", err) + // TODO we will need to call updatestatus when it is supported. But to preserve metadata like (label/finalizer) we will need to use update - // We set the workflow status to failing to abort any active tasks in the next round. + // update the GetExecutionStatus block of the FlyteWorkflow resource. UpdateStatus will not + // allow changes to the Spec of the resource, which is ideal for ensuring + // nothing other than resource status has been updated. + newWf, updateErr := p.wfStore.Update(ctx, mutatedWf, workflowstore.PriorityClassCritical) + if updateErr != nil { + // The update has failed, lets check if this is because the size is too large. If so + if workflowstore.IsWorkflowTooLarge(updateErr) { + logger.Errorf(ctx, "Failed storing workflow to the store, reason: %s", updateErr) + p.metrics.SystemError.Inc(ctx) + // Workflow is too large, we will mark the workflow as failing and record it. This will automatically + // propagate the failure in the next round. mutableW := w.DeepCopy() - mutableW.Status.UpdatePhase(v1alpha1.WorkflowPhaseFailing, "Workflow execution cluster reassigned, aborting", &core.ExecutionError{ + mutableW.Status.UpdatePhase(v1alpha1.WorkflowPhaseFailing, "Workflow size has breached threshold, aborting", &core.ExecutionError{ Kind: core.ExecutionError_SYSTEM, - Code: string(eventsErr.EventIncompatibleCusterError), - Message: fmt.Sprintf("Workflow execution cluster reassigned: %v", err), + Code: "WorkflowTooLarge", + Message: "Workflow execution state is too large for Flyte to handle.", }) if _, e := p.wfStore.Update(ctx, mutableW, workflowstore.PriorityClassCritical); e != nil { - logger.Errorf(ctx, "Failed to record an EventIncompatibleClusterError workflow as failed, reason: %s. Retrying...", e) - return e + logger.Errorf(ctx, "Failed recording a large workflow as failed, reason: %s. Retrying...", e) + return nil, e } - return nil - } - - // TODO we will need to call updatestatus when it is supported. But to preserve metadata like (label/finalizer) we will need to use update - - // update the GetExecutionStatus block of the FlyteWorkflow resource. UpdateStatus will not - // allow changes to the Spec of the resource, which is ideal for ensuring - // nothing other than resource status has been updated. - newWf, updateErr := p.wfStore.Update(ctx, mutatedWf, workflowstore.PriorityClassCritical) - if updateErr != nil { - t.Stop() - // The update has failed, lets check if this is because the size is too large. If so - if workflowstore.IsWorkflowTooLarge(updateErr) { - logger.Errorf(ctx, "Failed storing workflow to the store, reason: %s", updateErr) - p.metrics.SystemError.Inc(ctx) - // Workflow is too large, we will mark the workflow as failing and record it. This will automatically - // propagate the failure in the next round. - mutableW := w.DeepCopy() - mutableW.Status.UpdatePhase(v1alpha1.WorkflowPhaseFailing, "Workflow size has breached threshold, aborting", &core.ExecutionError{ - Kind: core.ExecutionError_SYSTEM, - Code: "WorkflowTooLarge", - Message: "Workflow execution state is too large for Flyte to handle.", - }) - if _, e := p.wfStore.Update(ctx, mutableW, workflowstore.PriorityClassCritical); e != nil { - logger.Errorf(ctx, "Failed recording a large workflow as failed, reason: %s. Retrying...", e) - return e - } - return nil - } - return updateErr - } - if err != nil { - t.Stop() - // An error was encountered during the round. Let us return, so that we can back-off gracefully - return err + return nil, nil } - if mutatedWf.GetExecutionStatus().IsTerminated() || newWf.ResourceVersion == mutatedWf.ResourceVersion { - // Workflow is terminated (no need to continue) or no status was changed, we can wait - logger.Infof(ctx, "Will not fast follow, Reason: Wf terminated? %v, Version matched? %v", - mutatedWf.GetExecutionStatus().IsTerminated(), newWf.ResourceVersion == mutatedWf.ResourceVersion) - t.Stop() - return nil - } - logger.Infof(ctx, "FastFollow Enabled. Detected State change, we will try another round. StreakLength [%d]", streak) - w = newWf - t.Stop() + return nil, updateErr } - logger.Infof(ctx, "Streak ended at [%d]/Max: [%d]", streak, maxLength) - return nil + if err != nil { + // An error was encountered during the round. Let us return, so that we can back-off gracefully + return nil, err + } + if mutatedWf.GetExecutionStatus().IsTerminated() || newWf.ResourceVersion == mutatedWf.ResourceVersion { + // Workflow is terminated (no need to continue) or no status was changed, we can wait + logger.Infof(ctx, "Will not fast follow, Reason: Wf terminated? %v, Version matched? %v", + mutatedWf.GetExecutionStatus().IsTerminated(), newWf.ResourceVersion == mutatedWf.ResourceVersion) + return nil, nil + } + return newWf, nil } // NewPropellerHandler creates a new Propeller and initializes metrics diff --git a/flytepropeller/pkg/controller/nodes/catalog/datacatalog/datacatalog.go b/flytepropeller/pkg/controller/nodes/catalog/datacatalog/datacatalog.go index 717c65739f..c9c85d25dd 100644 --- a/flytepropeller/pkg/controller/nodes/catalog/datacatalog/datacatalog.go +++ b/flytepropeller/pkg/controller/nodes/catalog/datacatalog/datacatalog.go @@ -10,6 +10,7 @@ import ( grpcRetry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/pkg/errors" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" @@ -22,6 +23,7 @@ import ( "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/io" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/ioutils" "github.com/flyteorg/flyte/flytestdlib/logger" + "github.com/flyteorg/flyte/flytestdlib/otelutils" ) var ( @@ -472,7 +474,12 @@ func NewDataCatalog(ctx context.Context, endpoint string, insecureConnection boo retryInterceptor := grpcRetry.UnaryClientInterceptor(grpcOptions...) - opts = append(opts, grpc.WithChainUnaryInterceptor(grpcPrometheus.UnaryClientInterceptor, + tracerProvider := otelutils.GetTracerProvider(otelutils.DataCatalogClientTracer) + opts = append(opts, grpc.WithChainUnaryInterceptor( + grpcPrometheus.UnaryClientInterceptor, + otelgrpc.UnaryClientInterceptor( + otelgrpc.WithTracerProvider(tracerProvider), + ), retryInterceptor)) clientConn, err := grpc.Dial(endpoint, opts...) if err != nil { diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index b9d727db7e..5e1cbe4f51 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -47,6 +47,7 @@ import ( "github.com/flyteorg/flyte/flytestdlib/contextutils" errors2 "github.com/flyteorg/flyte/flytestdlib/errors" "github.com/flyteorg/flyte/flytestdlib/logger" + "github.com/flyteorg/flyte/flytestdlib/otelutils" "github.com/flyteorg/flyte/flytestdlib/promutils" "github.com/flyteorg/flyte/flytestdlib/promutils/labeled" "github.com/flyteorg/flyte/flytestdlib/storage" @@ -1296,6 +1297,9 @@ func (c *nodeExecutor) handleRetryableFailure(ctx context.Context, nCtx interfac } func (c *nodeExecutor) HandleNode(ctx context.Context, dag executors.DAGStructure, nCtx interfaces.NodeExecutionContext, h interfaces.NodeHandler) (interfaces.NodeStatus, error) { + ctx, span := otelutils.NewSpan(ctx, otelutils.FlytePropellerTracer, "pkg.controller.nodes.NodeExecutor/handleNode") + defer span.End() + logger.Debugf(ctx, "Handling Node [%s]", nCtx.NodeID()) defer logger.Debugf(ctx, "Completed node [%s]", nCtx.NodeID()) diff --git a/flytepropeller/pkg/controller/nodes/task/handler.go b/flytepropeller/pkg/controller/nodes/task/handler.go index 64bf012b80..72dba11a50 100644 --- a/flytepropeller/pkg/controller/nodes/task/handler.go +++ b/flytepropeller/pkg/controller/nodes/task/handler.go @@ -33,6 +33,7 @@ import ( "github.com/flyteorg/flyte/flytepropeller/pkg/utils" "github.com/flyteorg/flyte/flytestdlib/contextutils" "github.com/flyteorg/flyte/flytestdlib/logger" + "github.com/flyteorg/flyte/flytestdlib/otelutils" "github.com/flyteorg/flyte/flytestdlib/promutils" "github.com/flyteorg/flyte/flytestdlib/promutils/labeled" "github.com/flyteorg/flyte/flytestdlib/storage" @@ -516,6 +517,9 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta } func (t Handler) Handle(ctx context.Context, nCtx interfaces.NodeExecutionContext) (handler.Transition, error) { + ctx, span := otelutils.NewSpan(ctx, otelutils.FlytePropellerTracer, "pkg.controller.nodes.task.Handler/HandleTask") + defer span.End() + ttype := nCtx.TaskReader().GetTaskType() ctx = contextutils.WithTaskType(ctx, ttype) p, err := t.ResolvePlugin(ctx, ttype, nCtx.ExecutionContext().GetExecutionConfig()) diff --git a/go.mod b/go.mod index 06e2cef990..d21bef9770 100644 --- a/go.mod +++ b/go.mod @@ -174,6 +174,7 @@ require ( github.com/stretchr/testify v1.8.4 // indirect github.com/subosito/gotenv v1.2.0 // indirect go.opencensus.io v0.24.0 // indirect + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.35.0 // indirect go.opentelemetry.io/otel v1.19.0 // indirect go.opentelemetry.io/otel/exporters/jaeger v1.17.0 // indirect go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.19.0 // indirect diff --git a/go.sum b/go.sum index d2b03356d5..e503fd283f 100644 --- a/go.sum +++ b/go.sum @@ -187,6 +187,7 @@ github.com/cloudevents/sdk-go/v2 v2.8.0/go.mod h1:GpCBmUj7DIRiDhVvsK5d6WCbgTWs8D github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4 h1:/inchEIKaYC1Akx+H+gqO04wryn5h75LSazbRlnya1k= github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I= github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= github.com/cockroachdb/cockroach-go v0.0.0-20181001143604-e0a95dfd547c/go.mod h1:XGLbWH/ujMcbPbhZq52Nv6UrCghb1yGn//133kEsvDk= @@ -263,6 +264,7 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po= github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/envoyproxy/protoc-gen-validate v0.10.1 h1:c0g45+xCJhdgFGw7a5QAfdS4byAbud7miNWJ1WwEVf8= github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCvpL6mnFh5mB2/l16U= github.com/evanphx/json-patch v5.6.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJCLunww= @@ -1383,6 +1385,8 @@ go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/contrib v0.18.0/go.mod h1:G/EtFaa6qaN7+LxqfIAT3GiZa7Wv5DTBUzl5H4LY0Kc= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.35.0 h1:xFSRQBbXF6VvYRf2lqMJXxoB72XI1K/azav8TekHHSw= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.35.0/go.mod h1:h8TWwRAhQpOd0aM5nYsRD8+flnkj+526GEIVlarH7eY= go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.18.0/go.mod h1:iK1G0FgHurSJ/aYLg5LpnPI0pqdanM73S3dhyDp0Lk4= go.opentelemetry.io/otel v0.18.0/go.mod h1:PT5zQj4lTsR1YeARt8YNKcFb88/c2IKoSABK9mX0r78= go.opentelemetry.io/otel v1.19.0 h1:MuS/TNf4/j4IXsZuJegVzI1cwut7Qc00344rgH7p8bs=