Skip to content

Commit

Permalink
feat: updated to use slog everywhere in the main controller
Browse files Browse the repository at this point in the history
Signed-off-by: isubasinghe <[email protected]>
  • Loading branch information
isubasinghe committed Dec 11, 2024
1 parent f6eb175 commit 6a791a9
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 70 deletions.
2 changes: 1 addition & 1 deletion workflow/controller/container_set_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func (woc *wfOperationCtx) executeContainerSet(ctx context.Context, nodeName str
if err != nil {
node = woc.initializeExecutableNode(ctx, nodeName, wfv1.NodeTypePod, templateScope, tmpl, orgTmpl, opts.boundaryID, wfv1.NodePending, opts.nodeFlag)
}
includeScriptOutput, err := woc.includeScriptOutput(nodeName, opts.boundaryID)
includeScriptOutput, err := woc.includeScriptOutput(ctx, nodeName, opts.boundaryID)
if err != nil {
return node, err
}
Expand Down
4 changes: 2 additions & 2 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ func (woc *wfOperationCtx) executeDAGTask(ctx context.Context, dagCtx *dagContex

if node != nil && node.Fulfilled() {
// Collect the completed task metrics
_, tmpl, _, tmplErr := dagCtx.tmplCtx.ResolveTemplate(task)
_, tmpl, _, tmplErr := dagCtx.tmplCtx.ResolveTemplate(ctx, task)
if tmplErr != nil {
woc.markNodeError(ctx, node.Name, tmplErr)
return
Expand Down Expand Up @@ -664,7 +664,7 @@ func (woc *wfOperationCtx) buildLocalScopeFromTask(ctx context.Context, dagCtx *
ancestorNodes = append(ancestorNodes, node)
}
}
_, _, templateStored, err := dagCtx.tmplCtx.ResolveTemplate(ancestorNode)
_, _, templateStored, err := dagCtx.tmplCtx.ResolveTemplate(ctx, ancestorNode)
if err != nil {
return nil, errors.InternalWrapError(err)
}
Expand Down
18 changes: 9 additions & 9 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1905,7 +1905,7 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,
return woc.initializeNodeOrMarkError(ctx, node, nodeName, templateScope, orgTmpl, opts.boundaryID, opts.nodeFlag, ErrMaxDepthExceeded), ErrMaxDepthExceeded
}

newTmplCtx, resolvedTmpl, templateStored, err := tmplCtx.ResolveTemplate(orgTmpl)
newTmplCtx, resolvedTmpl, templateStored, err := tmplCtx.ResolveTemplate(ctx, orgTmpl)
if err != nil {
return woc.initializeNodeOrMarkError(ctx, node, nodeName, templateScope, orgTmpl, opts.boundaryID, opts.nodeFlag, err), err
}
Expand Down Expand Up @@ -2458,7 +2458,7 @@ func (woc *wfOperationCtx) GetNodeTemplate(ctx context.Context, node *wfv1.NodeS
woc.markNodeError(ctx, node.Name, err)
return nil, err
}
tmpl, err := tmplCtx.GetTemplateFromRef(node.TemplateRef)
tmpl, err := tmplCtx.GetTemplateFromRef(ctx, node.TemplateRef)
if err != nil {
woc.markNodeError(ctx, node.Name, err)
return tmpl, err
Expand Down Expand Up @@ -2807,7 +2807,7 @@ func (woc *wfOperationCtx) checkParallelism(ctx context.Context, tmpl *wfv1.Temp
return err
}

boundaryTemplate, templateStored, err := woc.GetTemplateByBoundaryID(boundaryID)
boundaryTemplate, templateStored, err := woc.GetTemplateByBoundaryID(ctx, boundaryID)
if err != nil {
return err
}
Expand Down Expand Up @@ -2839,7 +2839,7 @@ func (woc *wfOperationCtx) executeContainer(ctx context.Context, nodeName string

// Check if the output of this container is referenced elsewhere in the Workflow. If so, make sure to include it during
// execution.
includeScriptOutput, err := woc.includeScriptOutput(nodeName, opts.boundaryID)
includeScriptOutput, err := woc.includeScriptOutput(ctx, nodeName, opts.boundaryID)
if err != nil {
return node, err
}
Expand Down Expand Up @@ -2873,7 +2873,7 @@ func (woc *wfOperationCtx) getOutboundNodes(ctx context.Context, nodeID string)
if err != nil {
return []string{node.ID}
}
_, parentTemplate, _, err := tmplCtx.ResolveTemplate(node)
_, parentTemplate, _, err := tmplCtx.ResolveTemplate(ctx, node)
if err != nil {
return []string{node.ID}
}
Expand Down Expand Up @@ -3057,7 +3057,7 @@ func (woc *wfOperationCtx) executeScript(ctx context.Context, nodeName string, t

// Check if the output of this script is referenced elsewhere in the Workflow. If so, make sure to include it during
// execution.
includeScriptOutput, err := woc.includeScriptOutput(nodeName, opts.boundaryID)
includeScriptOutput, err := woc.includeScriptOutput(ctx, nodeName, opts.boundaryID)
if err != nil {
return node, err
}
Expand Down Expand Up @@ -3684,7 +3684,7 @@ func (woc *wfOperationCtx) createTemplateContext(scope wfv1.ResourceScope, resou
} else {
clusterWorkflowTemplateGetter = &templateresolution.NullClusterWorkflowTemplateGetter{}
}
ctx := templateresolution.NewContext(woc.controller.wftmplInformer.Lister().WorkflowTemplates(woc.wf.Namespace), clusterWorkflowTemplateGetter, woc.execWf, woc.wf)
ctx := templateresolution.NewContext(woc.controller.wftmplInformer.Lister().WorkflowTemplates(woc.wf.Namespace), clusterWorkflowTemplateGetter, woc.execWf, woc.wf, woc.log)

switch scope {
case wfv1.ResourceScopeNamespaced:
Expand Down Expand Up @@ -3868,12 +3868,12 @@ func (woc *wfOperationCtx) deletePDBResource(ctx context.Context) error {

// Check if the output of this node is referenced elsewhere in the Workflow. If so, make sure to include it during
// execution.
func (woc *wfOperationCtx) includeScriptOutput(nodeName, boundaryID string) (bool, error) {
func (woc *wfOperationCtx) includeScriptOutput(ctx context.Context, nodeName, boundaryID string) (bool, error) {
if boundaryID == "" {
return false, nil
}

parentTemplate, templateStored, err := woc.GetTemplateByBoundaryID(boundaryID)
parentTemplate, templateStored, err := woc.GetTemplateByBoundaryID(ctx, boundaryID)
if err != nil {
return false, err
}
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (woc *wfOperationCtx) executeSteps(ctx context.Context, nodeName string, tm
}
if len(childNodes) > 0 {
// Expanded child nodes should be created from the same template.
_, _, templateStored, err := stepsCtx.tmplCtx.ResolveTemplate(&childNodes[0])
_, _, templateStored, err := stepsCtx.tmplCtx.ResolveTemplate(ctx, &childNodes[0])
if err != nil {
return node, err
}
Expand Down
12 changes: 6 additions & 6 deletions workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ var (

// scheduleOnDifferentHost adds affinity to prevent retry on the same host when
// retryStrategy.affinity.nodeAntiAffinity{} is specified
func (woc *wfOperationCtx) scheduleOnDifferentHost(node *wfv1.NodeStatus, pod *apiv1.Pod) error {
func (woc *wfOperationCtx) scheduleOnDifferentHost(ctx context.Context, node *wfv1.NodeStatus, pod *apiv1.Pod) error {
if node != nil && pod != nil {
if retryNode := FindRetryNode(woc.wf.Status.Nodes, node.ID); retryNode != nil {
// recover template for the retry node
tmplCtx, err := woc.createTemplateContext(retryNode.GetTemplateScope())
if err != nil {
return err
}
_, retryTmpl, _, err := tmplCtx.ResolveTemplate(retryNode)
_, retryTmpl, _, err := tmplCtx.ResolveTemplate(ctx, retryNode)
if err != nil {
return err
}
Expand Down Expand Up @@ -496,7 +496,7 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin
return nil, err
}

if err := woc.scheduleOnDifferentHost(node, pod); err != nil {
if err := woc.scheduleOnDifferentHost(ctx, node, pod); err != nil {
return nil, err
}

Expand Down Expand Up @@ -830,15 +830,15 @@ func (woc *wfOperationCtx) GetBoundaryTemplate(ctx context.Context, nodeName str
woc.log.Warnf(ctx, "couldn't retrieve node for nodeName %s, will get nil templateDeadline", nodeName)
return nil, err
}
boundaryTmpl, _, err := woc.GetTemplateByBoundaryID(node.BoundaryID)
boundaryTmpl, _, err := woc.GetTemplateByBoundaryID(ctx, node.BoundaryID)
if err != nil {
return nil, err
}
return boundaryTmpl, nil
}

// GetTemplateByBoundaryID get a template through the node's BoundaryID.
func (woc *wfOperationCtx) GetTemplateByBoundaryID(boundaryID string) (*wfv1.Template, bool, error) {
func (woc *wfOperationCtx) GetTemplateByBoundaryID(ctx context.Context, boundaryID string) (*wfv1.Template, bool, error) {
boundaryNode, err := woc.wf.Status.Nodes.Get(boundaryID)
if err != nil {
return nil, false, err
Expand All @@ -847,7 +847,7 @@ func (woc *wfOperationCtx) GetTemplateByBoundaryID(boundaryID string) (*wfv1.Tem
if err != nil {
return nil, false, err
}
_, boundaryTmpl, templateStored, err := tmplCtx.ResolveTemplate(boundaryNode)
_, boundaryTmpl, templateStored, err := tmplCtx.ResolveTemplate(ctx, boundaryNode)
if err != nil {
return nil, templateStored, err
}
Expand Down
50 changes: 25 additions & 25 deletions workflow/templateresolution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import (
"context"
"fmt"

log "github.com/sirupsen/logrus"
apierr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/argoproj/argo-workflows/v3/errors"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
typed "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/util/logging"
"github.com/argoproj/argo-workflows/v3/workflow/common"
)

Expand Down Expand Up @@ -73,35 +73,35 @@ type Context struct {
tmplBase wfv1.TemplateHolder
// workflow is the Workflow where templates will be stored
workflow *wfv1.Workflow
// log is a logrus entry.
log *log.Entry
// log is a logging entry.
log logging.Logger
}

// NewContext returns new Context.
func NewContext(wftmplGetter WorkflowTemplateNamespacedGetter, cwftmplGetter ClusterWorkflowTemplateGetter, tmplBase wfv1.TemplateHolder, workflow *wfv1.Workflow) *Context {
func NewContext(wftmplGetter WorkflowTemplateNamespacedGetter, cwftmplGetter ClusterWorkflowTemplateGetter, tmplBase wfv1.TemplateHolder, workflow *wfv1.Workflow, log logging.Logger) *Context {
return &Context{
wftmplGetter: wftmplGetter,
cwftmplGetter: cwftmplGetter,
tmplBase: tmplBase,
workflow: workflow,
log: log.WithFields(log.Fields{}),
log: log,
}
}

// NewContextFromClientSet returns new Context.
func NewContextFromClientSet(wftmplClientset typed.WorkflowTemplateInterface, clusterWftmplClient typed.ClusterWorkflowTemplateInterface, tmplBase wfv1.TemplateHolder, workflow *wfv1.Workflow) *Context {
func NewContextFromClientSet(wftmplClientset typed.WorkflowTemplateInterface, clusterWftmplClient typed.ClusterWorkflowTemplateInterface, tmplBase wfv1.TemplateHolder, workflow *wfv1.Workflow, log logging.Logger) *Context {
return &Context{
wftmplGetter: WrapWorkflowTemplateInterface(wftmplClientset),
cwftmplGetter: WrapClusterWorkflowTemplateInterface(clusterWftmplClient),
tmplBase: tmplBase,
workflow: workflow,
log: log.WithFields(log.Fields{}),
log: log,
}
}

// GetTemplateByName returns a template by name in the context.
func (ctx *Context) GetTemplateByName(name string) (*wfv1.Template, error) {
ctx.log.Debugf("Getting the template by name: %s", name)
func (ctx *Context) GetTemplateByName(c context.Context, name string) (*wfv1.Template, error) {
ctx.log.Debugf(c, "Getting the template by name: %s", name)

tmpl := ctx.tmplBase.GetTemplateByName(name)
if tmpl == nil {
Expand All @@ -118,8 +118,8 @@ func (ctx *Context) GetTemplateGetterFromRef(tmplRef *wfv1.TemplateRef) (wfv1.Te
}

// GetTemplateFromRef returns a template found by a given template ref.
func (ctx *Context) GetTemplateFromRef(tmplRef *wfv1.TemplateRef) (*wfv1.Template, error) {
ctx.log.Debug("Getting the template from ref")
func (ctx *Context) GetTemplateFromRef(c context.Context, tmplRef *wfv1.TemplateRef) (*wfv1.Template, error) {
ctx.log.Debug(c, "Getting the template from ref")
var template *wfv1.Template
var wftmpl wfv1.TemplateHolder
var err error
Expand All @@ -145,14 +145,14 @@ func (ctx *Context) GetTemplateFromRef(tmplRef *wfv1.TemplateRef) (*wfv1.Templat
}

// GetTemplate returns a template found by template name or template ref.
func (ctx *Context) GetTemplate(h wfv1.TemplateReferenceHolder) (*wfv1.Template, error) {
ctx.log.Debug("Getting the template")
func (ctx *Context) GetTemplate(c context.Context, h wfv1.TemplateReferenceHolder) (*wfv1.Template, error) {
ctx.log.Debug(c, "Getting the template")
if x := h.GetTemplate(); x != nil {
return x, nil
} else if x := h.GetTemplateRef(); x != nil {
return ctx.GetTemplateFromRef(x)
return ctx.GetTemplateFromRef(c, x)
} else if x := h.GetTemplateName(); x != "" {
return ctx.GetTemplateByName(x)
return ctx.GetTemplateByName(c, x)
}
return nil, errors.Errorf(errors.CodeInternal, "failed to get a template")
}
Expand All @@ -168,20 +168,20 @@ func (ctx *Context) GetTemplateScope() string {

// ResolveTemplate digs into referenes and returns a merged template.
// This method is the public start point of template resolution.
func (ctx *Context) ResolveTemplate(tmplHolder wfv1.TemplateReferenceHolder) (*Context, *wfv1.Template, bool, error) {
return ctx.resolveTemplateImpl(tmplHolder)
func (ctx *Context) ResolveTemplate(c context.Context, tmplHolder wfv1.TemplateReferenceHolder) (*Context, *wfv1.Template, bool, error) {
return ctx.resolveTemplateImpl(c, tmplHolder)
}

// resolveTemplateImpl digs into references and returns a merged template.
// This method processes inputs and arguments so the inputs of the final
// resolved template include intermediate parameter passing.
// The other fields are just merged and shallower templates overwrite deeper.
func (ctx *Context) resolveTemplateImpl(tmplHolder wfv1.TemplateReferenceHolder) (*Context, *wfv1.Template, bool, error) {
ctx.log = ctx.log.WithFields(log.Fields{
func (ctx *Context) resolveTemplateImpl(c context.Context, tmplHolder wfv1.TemplateReferenceHolder) (*Context, *wfv1.Template, bool, error) {
ctx.log = ctx.log.WithFields(c, logging.Fields{
"base": common.GetTemplateGetterString(ctx.tmplBase),
"tmpl": common.GetTemplateHolderString(tmplHolder),
})
ctx.log.Debug("Resolving the template")
ctx.log.Debug(c, "Resolving the template")

templateStored := false
var tmpl *wfv1.Template
Expand All @@ -192,10 +192,10 @@ func (ctx *Context) resolveTemplateImpl(tmplHolder wfv1.TemplateReferenceHolder)
tmpl = ctx.workflow.GetStoredTemplate(scope, resourceName, tmplHolder)
}
if tmpl != nil {
ctx.log.Debug("Found stored template")
ctx.log.Debug(c, "Found stored template")
} else {
// Find newly appeared template.
newTmpl, err := ctx.GetTemplate(tmplHolder)
newTmpl, err := ctx.GetTemplate(c, tmplHolder)
if err != nil {
return nil, nil, false, err
}
Expand All @@ -208,12 +208,12 @@ func (ctx *Context) resolveTemplateImpl(tmplHolder wfv1.TemplateReferenceHolder)
return nil, nil, false, err
}
if stored {
ctx.log.Debug("Stored the template")
ctx.log.Debug(c, "Stored the template")
templateStored = true
}
err = ctx.workflow.SetStoredInlineTemplate(scope, resourceName, newTmpl)
if err != nil {
ctx.log.Errorf("Failed to store the inline template: %v", err)
ctx.log.Errorf(c, "Failed to store the inline template: %v", err)
}
}
tmpl = newTmpl
Expand Down Expand Up @@ -248,7 +248,7 @@ func (ctx *Context) WithTemplateHolder(tmplHolder wfv1.TemplateReferenceHolder)

// WithTemplateBase creates new context with a wfv1.TemplateHolder.
func (ctx *Context) WithTemplateBase(tmplBase wfv1.TemplateHolder) *Context {
return NewContext(ctx.wftmplGetter, ctx.cwftmplGetter, tmplBase, ctx.workflow)
return NewContext(ctx.wftmplGetter, ctx.cwftmplGetter, tmplBase, ctx.workflow, ctx.log)
}

// WithWorkflowTemplate creates new context with a wfv1.TemplateHolder.
Expand Down
Loading

0 comments on commit 6a791a9

Please sign in to comment.