diff --git a/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go b/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go index 77a27b6699..4f518ced55 100644 --- a/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go +++ b/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go @@ -80,11 +80,11 @@ func (p *Plugin) Create(ctx context.Context, taskCtx webapi.TaskExecutionContext webapi.Resource, error) { taskTemplate, err := taskCtx.TaskReader().Read(ctx) if err != nil { - return nil, nil, err + return nil, nil, fmt.Errorf("failed to read task template with error: %v", err) } inputs, err := taskCtx.InputReader().Get(ctx) if err != nil { - return nil, nil, err + return nil, nil, fmt.Errorf("failed to read inputs with error: %v", err) } var argTemplate []string @@ -98,7 +98,7 @@ func (p *Plugin) Create(ctx context.Context, taskCtx webapi.TaskExecutionContext argTemplate = taskTemplate.GetContainer().GetArgs() modifiedArgs, err := template.Render(ctx, taskTemplate.GetContainer().GetArgs(), templateParameters) if err != nil { - return nil, nil, err + return nil, nil, fmt.Errorf("failed to render args with error: %v", err) } taskTemplate.GetContainer().Args = modifiedArgs defer func() { @@ -135,7 +135,7 @@ func (p *Plugin) Create(ctx context.Context, taskCtx webapi.TaskExecutionContext request := &admin.CreateTaskRequest{Inputs: inputs, Template: taskTemplate, OutputPrefix: outputPrefix, TaskExecutionMetadata: &taskExecutionMetadata} res, err := client.CreateTask(finalCtx, request) if err != nil { - return nil, nil, err + return nil, nil, fmt.Errorf("failed to create task from agent with %v", err) } return ResourceMetaWrapper{ @@ -153,7 +153,8 @@ func (p *Plugin) ExecuteTaskSync( ) (webapi.ResourceMeta, webapi.Resource, error) { stream, err := client.ExecuteTaskSync(ctx) if err != nil { - return nil, nil, err + logger.Errorf(ctx, "failed to execute task from agent with %v", err) + return nil, nil, fmt.Errorf("failed to execute task from agent with %v", err) } headerProto := &admin.ExecuteTaskSyncRequest{ @@ -185,8 +186,8 @@ func (p *Plugin) ExecuteTaskSync( in, err := stream.Recv() if err != nil { - logger.Errorf(ctx, "failed to receive from server %s", err.Error()) - return nil, nil, err + logger.Errorf(ctx, "failed to receive stream from server %s", err.Error()) + return nil, nil, fmt.Errorf("failed to receive stream from server %w", err) } if in.GetHeader() == nil { return nil, nil, fmt.Errorf("expected header in the response, but got none") @@ -202,7 +203,7 @@ func (p *Plugin) ExecuteTaskSync( LogLinks: resource.GetLogLinks(), CustomInfo: resource.GetCustomInfo(), AgentError: resource.GetAgentError(), - }, err + }, nil } func (p *Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest webapi.Resource, err error) { @@ -223,7 +224,7 @@ func (p *Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest web } res, err := client.GetTask(finalCtx, request) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to get task from agent with %v", err) } return ResourceWrapper{ @@ -256,7 +257,7 @@ func (p *Plugin) Delete(ctx context.Context, taskCtx webapi.DeleteContext) error ResourceMeta: metadata.AgentResourceMeta, } _, err = client.DeleteTask(finalCtx, request) - return err + return fmt.Errorf("failed to delete task from agent with %v", err) } func (p *Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase core.PhaseInfo, err error) { @@ -307,7 +308,7 @@ func (p *Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phas err = writeOutput(ctx, taskCtx, resource.Outputs) if err != nil { logger.Errorf(ctx, "failed to write output with err %s", err.Error()) - return core.PhaseInfoUndefined, err + return core.PhaseInfoUndefined, fmt.Errorf("failed to write output with err %s", err.Error()) } return core.PhaseInfoSuccess(taskInfo), nil }