diff --git a/flytepropeller/pkg/controller/nodes/cache.go b/flytepropeller/pkg/controller/nodes/cache.go index 5d4c8455a5..68871c33d5 100644 --- a/flytepropeller/pkg/controller/nodes/cache.go +++ b/flytepropeller/pkg/controller/nodes/cache.go @@ -5,6 +5,7 @@ import ( "strconv" "time" + "github.com/flyteorg/flyte/flytestdlib/otelutils" "github.com/pkg/errors" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -77,6 +78,8 @@ func updatePhaseCacheInfo(phaseInfo handler.PhaseInfo, cacheStatus *catalog.Stat // CheckCatalogCache uses the handler and contexts to check if cached outputs for the current node // exist. If the exist, this function also copies the outputs to this node. func (n *nodeExecutor) CheckCatalogCache(ctx context.Context, nCtx interfaces.NodeExecutionContext, cacheHandler interfaces.CacheableNodeHandler) (catalog.Entry, error) { + ctx, span := otelutils.NewSpan(ctx, otelutils.FlytePropellerTracer, "pkg.controller.nodes.NodeExecutor/CheckCatalogCache") + defer span.End() catalogKey, err := cacheHandler.GetCatalogKey(ctx, nCtx) if err != nil { return catalog.Entry{}, errors.Wrapf(err, "failed to initialize the catalogKey") @@ -197,6 +200,8 @@ func (n *nodeExecutor) ReleaseCatalogReservation(ctx context.Context, nCtx inter // WriteCatalogCache relays the outputs of this node to the cache. This allows future executions // to reuse these data to avoid recomputation. func (n *nodeExecutor) WriteCatalogCache(ctx context.Context, nCtx interfaces.NodeExecutionContext, cacheHandler interfaces.CacheableNodeHandler) (catalog.Status, error) { + ctx, span := otelutils.NewSpan(ctx, otelutils.FlytePropellerTracer, "pkg.controller.nodes.NodeExecutor/WriteCatalogCache") + defer span.End() catalogKey, err := cacheHandler.GetCatalogKey(ctx, nCtx) if err != nil { return catalog.NewStatus(core.CatalogCacheStatus_CACHE_DISABLED, nil), errors.Wrapf(err, "failed to initialize the catalogKey")