From 847f0d95474dd8a8fc0e52ab395851e00e8d2e4c Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Sun, 1 Oct 2023 21:56:53 -0700 Subject: [PATCH 1/5] Return ResourceWrapper without pointer Signed-off-by: Kevin Su --- .../tasks/plugins/array/awsbatch/transformer.go | 14 +++++++------- .../go/tasks/plugins/webapi/agent/plugin.go | 12 ++++++------ .../go/tasks/plugins/webapi/databricks/plugin.go | 12 ++++++------ .../go/tasks/plugins/webapi/snowflake/plugin.go | 16 ++++++++-------- 4 files changed, 27 insertions(+), 27 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/array/awsbatch/transformer.go b/flyteplugins/go/tasks/plugins/array/awsbatch/transformer.go index 886803fe35..a795aa990f 100644 --- a/flyteplugins/go/tasks/plugins/array/awsbatch/transformer.go +++ b/flyteplugins/go/tasks/plugins/array/awsbatch/transformer.go @@ -48,12 +48,12 @@ func FlyteTaskToBatchInput(ctx context.Context, tCtx pluginCore.TaskExecutionCon "Required value not set, taskTemplate Container") } - jobConfig := newJobConfig(). - MergeFromKeyValuePairs(taskTemplate.GetContainer().GetConfig()). - MergeFromConfigMap(tCtx.TaskExecutionMetadata().GetOverrides().GetConfig()) - if len(jobConfig.DynamicTaskQueue) == 0 { - return nil, errors.Errorf(errors.BadTaskSpecification, "config[%v] is missing", DynamicTaskQueueKey) - } + //jobConfig := newJobConfig(). + // MergeFromKeyValuePairs(taskTemplate.GetContainer().GetConfig()). + // MergeFromConfigMap(tCtx.TaskExecutionMetadata().GetOverrides().GetConfig()) + //if len(jobConfig.DynamicTaskQueue) == 0 { + // return nil, errors.Errorf(errors.BadTaskSpecification, "config[%v] is missing", DynamicTaskQueueKey) + //} inputReader := array.GetInputReader(tCtx, taskTemplate) cmd, err := template.Render( @@ -107,7 +107,7 @@ func FlyteTaskToBatchInput(ctx context.Context, tCtx pluginCore.TaskExecutionCon } } submitJobInput.SetJobName(tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()). - SetJobDefinition(jobDefinition).SetJobQueue(jobConfig.DynamicTaskQueue). + SetJobDefinition(jobDefinition).SetJobQueue("tutorial"). SetRetryStrategy(toRetryStrategy(ctx, toBackoffLimit(taskTemplate.Metadata), cfg.MinRetries, cfg.MaxRetries)). SetContainerOverrides(toContainerOverrides(ctx, append(cmd, args...), &resources, envVars)). SetTimeout(toTimeout(taskTemplate.Metadata.GetTimeout(), cfg.DefaultTimeOut.Duration)) diff --git a/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go b/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go index 9713ba90f3..2ceba6f43e 100644 --- a/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go +++ b/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go @@ -112,16 +112,16 @@ func (p Plugin) Create(ctx context.Context, taskCtx webapi.TaskExecutionContextR taskTemplate.GetContainer().Args = argTemplate } - return &ResourceMetaWrapper{ + return ResourceMetaWrapper{ OutputPrefix: outputPrefix, AgentResourceMeta: res.GetResourceMeta(), Token: "", TaskType: taskTemplate.Type, - }, &ResourceWrapper{State: admin.State_RUNNING}, nil + }, ResourceWrapper{State: admin.State_RUNNING}, nil } func (p Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest webapi.Resource, err error) { - metadata := taskCtx.ResourceMeta().(*ResourceMetaWrapper) + metadata := taskCtx.ResourceMeta().(ResourceMetaWrapper) agent, err := getFinalAgent(metadata.TaskType, p.cfg) if err != nil { @@ -140,7 +140,7 @@ func (p Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest weba return nil, err } - return &ResourceWrapper{ + return ResourceWrapper{ State: res.Resource.State, Outputs: res.Resource.Outputs, }, nil @@ -169,7 +169,7 @@ func (p Plugin) Delete(ctx context.Context, taskCtx webapi.DeleteContext) error } func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase core.PhaseInfo, err error) { - resource := taskCtx.Resource().(*ResourceWrapper) + resource := taskCtx.Resource().(ResourceWrapper) taskInfo := &core.TaskInfo{} switch resource.State { @@ -190,7 +190,7 @@ func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase return core.PhaseInfoUndefined, pluginErrors.Errorf(core.SystemErrorCode, "unknown execution phase [%v].", resource.State) } -func writeOutput(ctx context.Context, taskCtx webapi.StatusContext, resource *ResourceWrapper) error { +func writeOutput(ctx context.Context, taskCtx webapi.StatusContext, resource ResourceWrapper) error { taskTemplate, err := taskCtx.TaskReader().Read(ctx) if err != nil { return err diff --git a/flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go b/flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go index 7626cb9c6b..f911c8bfb6 100644 --- a/flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go +++ b/flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go @@ -146,12 +146,12 @@ func (p Plugin) Create(ctx context.Context, taskCtx webapi.TaskExecutionContextR } runID := fmt.Sprintf("%v", data["run_id"]) - return &ResourceMetaWrapper{runID, p.cfg.DatabricksInstance, token}, - &ResourceWrapper{StatusCode: resp.StatusCode}, nil + return ResourceMetaWrapper{runID, p.cfg.DatabricksInstance, token}, + ResourceWrapper{StatusCode: resp.StatusCode}, nil } func (p Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest webapi.Resource, err error) { - exec := taskCtx.ResourceMeta().(*ResourceMetaWrapper) + exec := taskCtx.ResourceMeta().(ResourceMetaWrapper) req, err := buildRequest(get, nil, p.cfg.databricksEndpoint, p.cfg.DatabricksInstance, exec.Token, exec.RunID, false) if err != nil { @@ -176,7 +176,7 @@ func (p Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest weba jobID := fmt.Sprintf("%.0f", data["job_id"]) lifeCycleState := fmt.Sprintf("%s", jobState["life_cycle_state"]) resultState := fmt.Sprintf("%s", jobState["result_state"]) - return &ResourceWrapper{ + return ResourceWrapper{ StatusCode: resp.StatusCode, JobID: jobID, LifeCycleState: lifeCycleState, @@ -206,8 +206,8 @@ func (p Plugin) Delete(ctx context.Context, taskCtx webapi.DeleteContext) error } func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase core.PhaseInfo, err error) { - exec := taskCtx.ResourceMeta().(*ResourceMetaWrapper) - resource := taskCtx.Resource().(*ResourceWrapper) + exec := taskCtx.ResourceMeta().(ResourceMetaWrapper) + resource := taskCtx.Resource().(ResourceWrapper) message := resource.Message statusCode := resource.StatusCode jobID := resource.JobID diff --git a/flyteplugins/go/tasks/plugins/webapi/snowflake/plugin.go b/flyteplugins/go/tasks/plugins/webapi/snowflake/plugin.go index 88bb8f50f5..3c7871880a 100644 --- a/flyteplugins/go/tasks/plugins/webapi/snowflake/plugin.go +++ b/flyteplugins/go/tasks/plugins/webapi/snowflake/plugin.go @@ -139,12 +139,12 @@ func (p Plugin) Create(ctx context.Context, taskCtx webapi.TaskExecutionContextR queryID := fmt.Sprintf("%v", data["statementHandle"]) message := fmt.Sprintf("%v", data["message"]) - return &ResourceMetaWrapper{queryID, queryInfo.Account, token}, - &ResourceWrapper{StatusCode: resp.StatusCode, Message: message}, nil + return ResourceMetaWrapper{queryID, queryInfo.Account, token}, + ResourceWrapper{StatusCode: resp.StatusCode, Message: message}, nil } func (p Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest webapi.Resource, err error) { - exec := taskCtx.ResourceMeta().(*ResourceMetaWrapper) + exec := taskCtx.ResourceMeta().(ResourceMetaWrapper) req, err := buildRequest(get, QueryInfo{}, p.cfg.snowflakeEndpoint, exec.Account, exec.Token, exec.QueryID, false) if err != nil { @@ -160,7 +160,7 @@ func (p Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest weba return nil, err } message := fmt.Sprintf("%v", data["message"]) - return &ResourceWrapper{ + return ResourceWrapper{ StatusCode: resp.StatusCode, Message: message, }, nil @@ -170,7 +170,7 @@ func (p Plugin) Delete(ctx context.Context, taskCtx webapi.DeleteContext) error if taskCtx.ResourceMeta() == nil { return nil } - exec := taskCtx.ResourceMeta().(*ResourceMetaWrapper) + exec := taskCtx.ResourceMeta().(ResourceMetaWrapper) req, err := buildRequest(post, QueryInfo{}, p.cfg.snowflakeEndpoint, exec.Account, exec.Token, exec.QueryID, true) if err != nil { @@ -187,8 +187,8 @@ func (p Plugin) Delete(ctx context.Context, taskCtx webapi.DeleteContext) error } func (p Plugin) Status(_ context.Context, taskCtx webapi.StatusContext) (phase core.PhaseInfo, err error) { - exec := taskCtx.ResourceMeta().(*ResourceMetaWrapper) - statusCode := taskCtx.Resource().(*ResourceWrapper).StatusCode + exec := taskCtx.ResourceMeta().(ResourceMetaWrapper) + statusCode := taskCtx.Resource().(ResourceWrapper).StatusCode if statusCode == 0 { return core.PhaseInfoUndefined, errors.Errorf(ErrSystem, "No Status field set.") } @@ -276,7 +276,7 @@ func newSnowflakeJobTaskPlugin() webapi.PluginEntry { ID: "snowflake", SupportedTaskTypes: []core.TaskType{"snowflake"}, PluginLoader: func(ctx context.Context, iCtx webapi.PluginSetupContext) (webapi.AsyncPlugin, error) { - return &Plugin{ + return Plugin{ metricScope: iCtx.MetricsScope(), cfg: GetConfig(), client: &http.Client{}, From fecb4cd068a6c9b63ece7f8ca01031639ee27159 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Sun, 1 Oct 2023 21:58:27 -0700 Subject: [PATCH 2/5] nit Signed-off-by: Kevin Su --- .../go/tasks/plugins/array/awsbatch/transformer.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/array/awsbatch/transformer.go b/flyteplugins/go/tasks/plugins/array/awsbatch/transformer.go index a795aa990f..886803fe35 100644 --- a/flyteplugins/go/tasks/plugins/array/awsbatch/transformer.go +++ b/flyteplugins/go/tasks/plugins/array/awsbatch/transformer.go @@ -48,12 +48,12 @@ func FlyteTaskToBatchInput(ctx context.Context, tCtx pluginCore.TaskExecutionCon "Required value not set, taskTemplate Container") } - //jobConfig := newJobConfig(). - // MergeFromKeyValuePairs(taskTemplate.GetContainer().GetConfig()). - // MergeFromConfigMap(tCtx.TaskExecutionMetadata().GetOverrides().GetConfig()) - //if len(jobConfig.DynamicTaskQueue) == 0 { - // return nil, errors.Errorf(errors.BadTaskSpecification, "config[%v] is missing", DynamicTaskQueueKey) - //} + jobConfig := newJobConfig(). + MergeFromKeyValuePairs(taskTemplate.GetContainer().GetConfig()). + MergeFromConfigMap(tCtx.TaskExecutionMetadata().GetOverrides().GetConfig()) + if len(jobConfig.DynamicTaskQueue) == 0 { + return nil, errors.Errorf(errors.BadTaskSpecification, "config[%v] is missing", DynamicTaskQueueKey) + } inputReader := array.GetInputReader(tCtx, taskTemplate) cmd, err := template.Render( @@ -107,7 +107,7 @@ func FlyteTaskToBatchInput(ctx context.Context, tCtx pluginCore.TaskExecutionCon } } submitJobInput.SetJobName(tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()). - SetJobDefinition(jobDefinition).SetJobQueue("tutorial"). + SetJobDefinition(jobDefinition).SetJobQueue(jobConfig.DynamicTaskQueue). SetRetryStrategy(toRetryStrategy(ctx, toBackoffLimit(taskTemplate.Metadata), cfg.MinRetries, cfg.MaxRetries)). SetContainerOverrides(toContainerOverrides(ctx, append(cmd, args...), &resources, envVars)). SetTimeout(toTimeout(taskTemplate.Metadata.GetTimeout(), cfg.DefaultTimeOut.Duration)) From 1e482f2f98c07cf8130e43212b58c8f7a4fffa81 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 2 Oct 2023 12:35:24 -0700 Subject: [PATCH 3/5] Add item ID to the workqueue instead Signed-off-by: Kevin Su --- flytestdlib/cache/auto_refresh.go | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/flytestdlib/cache/auto_refresh.go b/flytestdlib/cache/auto_refresh.go index 13a787b03f..3ed258f97c 100644 --- a/flytestdlib/cache/auto_refresh.go +++ b/flytestdlib/cache/auto_refresh.go @@ -234,8 +234,10 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error { } for _, batch := range batches { - b := batch - w.workqueue.Add(&b) + for _, b := range batch { + logger.Debugf(ctx, "Enqueuing batch with id: %v", b.GetID()) + w.workqueue.Add(b.GetID()) + } } return nil @@ -273,18 +275,29 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { case <-ctx.Done(): return nil default: - item, shutdown := w.workqueue.Get() + itemID, shutdown := w.workqueue.Get() if shutdown { + logger.Debugf(ctx, "Shutting down worker") return nil } t := w.metrics.SyncLatency.Start() - updatedBatch, err := w.syncCb(ctx, *item.(*Batch)) + logger.Debugf(ctx, "Syncing item with id [%v]", itemID) + item, ok := w.lruMap.Get(itemID) + if !ok { + logger.Debugf(ctx, "item with id [%v] not found in cache", itemID) + t.Stop() + continue + } + updatedBatch, err := w.syncCb(ctx, Batch{itemWrapper{ + id: itemID.(ItemID), + item: item.(Item), + }}) // Since we create batches every time we sync, we will just remove the item from the queue here // regardless of whether it succeeded the sync or not. - w.workqueue.Forget(item) - w.workqueue.Done(item) + w.workqueue.Forget(itemID) + w.workqueue.Done(itemID) if err != nil { w.metrics.SyncErrors.Inc() From 1dfd610b54b06e358c4d4b701059f5238570478f Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 2 Oct 2023 12:42:48 -0700 Subject: [PATCH 4/5] Revert "Add item ID to the workqueue instead" This reverts commit 1e482f2f98c07cf8130e43212b58c8f7a4fffa81. --- flytestdlib/cache/auto_refresh.go | 25 ++++++------------------- 1 file changed, 6 insertions(+), 19 deletions(-) diff --git a/flytestdlib/cache/auto_refresh.go b/flytestdlib/cache/auto_refresh.go index 3ed258f97c..13a787b03f 100644 --- a/flytestdlib/cache/auto_refresh.go +++ b/flytestdlib/cache/auto_refresh.go @@ -234,10 +234,8 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error { } for _, batch := range batches { - for _, b := range batch { - logger.Debugf(ctx, "Enqueuing batch with id: %v", b.GetID()) - w.workqueue.Add(b.GetID()) - } + b := batch + w.workqueue.Add(&b) } return nil @@ -275,29 +273,18 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { case <-ctx.Done(): return nil default: - itemID, shutdown := w.workqueue.Get() + item, shutdown := w.workqueue.Get() if shutdown { - logger.Debugf(ctx, "Shutting down worker") return nil } t := w.metrics.SyncLatency.Start() - logger.Debugf(ctx, "Syncing item with id [%v]", itemID) - item, ok := w.lruMap.Get(itemID) - if !ok { - logger.Debugf(ctx, "item with id [%v] not found in cache", itemID) - t.Stop() - continue - } - updatedBatch, err := w.syncCb(ctx, Batch{itemWrapper{ - id: itemID.(ItemID), - item: item.(Item), - }}) + updatedBatch, err := w.syncCb(ctx, *item.(*Batch)) // Since we create batches every time we sync, we will just remove the item from the queue here // regardless of whether it succeeded the sync or not. - w.workqueue.Forget(itemID) - w.workqueue.Done(itemID) + w.workqueue.Forget(item) + w.workqueue.Done(item) if err != nil { w.metrics.SyncErrors.Inc() From ce948557b3e432b41262a94965edac2ec07efb9c Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 2 Oct 2023 12:51:53 -0700 Subject: [PATCH 5/5] nit Signed-off-by: Kevin Su --- rsts/deployment/index.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rsts/deployment/index.rst b/rsts/deployment/index.rst index 8f569eb843..820e941bc4 100644 --- a/rsts/deployment/index.rst +++ b/rsts/deployment/index.rst @@ -36,7 +36,7 @@ plugins, authentication, performance tuning, and maintaining Flyte as a producti :text: 🤖 Agent Setup :classes: btn-block stretched-link ^^^^^^^^^^^^ - Enable Flyte agents to extend Flyte's capabilities, including features like File sesnor, Databricks job, and Snowflake query services. + Enable Flyte agents to extend Flyte's capabilities, including features like File sensor, Databricks job, and Snowflake query services. ---