Skip to content

Commit

Permalink
Merge branch 'master' into feature/datacatalog-cache-deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
pvditt committed Jan 8, 2024
2 parents 7c93bb6 + 513c3e1 commit 662355c
Show file tree
Hide file tree
Showing 11 changed files with 222 additions and 97 deletions.
162 changes: 98 additions & 64 deletions README.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions charts/flyte-sandbox/templates/buildkit/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ spec:
metadata:
labels: {{- include "flyte-sandbox.buildkitSelectorLabels" . | nindent 8 }}
spec:
dnsPolicy: ClusterFirstWithHostNet
hostNetwork: true
containers:
- name: buildkit
Expand Down
3 changes: 2 additions & 1 deletion docker/sandbox-bundled/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ flyte:
.PHONY: manifests
manifests:
mkdir -p manifests
helm dependency update ../../charts/flyte-sandbox
helm dependency update ../../charts/flyteagent
helm dependency update ../../charts/flyte-binary
helm dependency update ../../charts/flyte-sandbox
kustomize build \
--enable-helm \
--load-restrictor=LoadRestrictionsNone \
Expand Down
5 changes: 3 additions & 2 deletions docker/sandbox-bundled/manifests/complete-agent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,7 @@ type: Opaque
---
apiVersion: v1
data:
haSharedSecret: SlF2M0tXNXdGTUpHZzdvNg==
haSharedSecret: RkZLd2xsVEZSNXNHMExjeg==
proxyPassword: ""
proxyUsername: ""
kind: Secret
Expand Down Expand Up @@ -1388,6 +1388,7 @@ spec:
periodSeconds: 30
securityContext:
privileged: true
dnsPolicy: ClusterFirstWithHostNet
hostNetwork: true
---
apiVersion: apps/v1
Expand All @@ -1411,7 +1412,7 @@ spec:
metadata:
annotations:
checksum/config: 8f50e768255a87f078ba8b9879a0c174c3e045ffb46ac8723d2eedbe293c8d81
checksum/secret: b76914839daf95717da7a8ef31769c3b97f6c3c46a09776e5d0f8cc3fe9b1e9a
checksum/secret: 54b8e8ed67f9cf2f4b8c83b917ba11b0ed9f81f453df70376c332e202522643e
labels:
app: docker-registry
release: flyte-sandbox
Expand Down
5 changes: 3 additions & 2 deletions docker/sandbox-bundled/manifests/complete.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ type: Opaque
---
apiVersion: v1
data:
haSharedSecret: QU5jN3V6ZjA4Y0tFbzZ0dw==
haSharedSecret: SENrVEZGc09ob3RKdFJRSA==
proxyPassword: ""
proxyUsername: ""
kind: Secret
Expand Down Expand Up @@ -1336,6 +1336,7 @@ spec:
periodSeconds: 30
securityContext:
privileged: true
dnsPolicy: ClusterFirstWithHostNet
hostNetwork: true
---
apiVersion: apps/v1
Expand All @@ -1359,7 +1360,7 @@ spec:
metadata:
annotations:
checksum/config: 8f50e768255a87f078ba8b9879a0c174c3e045ffb46ac8723d2eedbe293c8d81
checksum/secret: 436cdf2d6630635a2652233c83f71c617facb2fda8edabb639ab7dfa0f15e46f
checksum/secret: 7c61351bdfcc86e008feb9f8d023e02d3aa7e570e1759790f8d7c3f6a4fd9c86
labels:
app: docker-registry
release: flyte-sandbox
Expand Down
5 changes: 3 additions & 2 deletions docker/sandbox-bundled/manifests/dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ metadata:
---
apiVersion: v1
data:
haSharedSecret: MGNlbmFPenJydE1hNlB0Sw==
haSharedSecret: RjVjd3lFdXVJRllyTzlTNA==
proxyPassword: ""
proxyUsername: ""
kind: Secret
Expand Down Expand Up @@ -910,6 +910,7 @@ spec:
periodSeconds: 30
securityContext:
privileged: true
dnsPolicy: ClusterFirstWithHostNet
hostNetwork: true
---
apiVersion: apps/v1
Expand All @@ -933,7 +934,7 @@ spec:
metadata:
annotations:
checksum/config: 8f50e768255a87f078ba8b9879a0c174c3e045ffb46ac8723d2eedbe293c8d81
checksum/secret: 05cc4dc3bf13796e3a35e79da36baf286c81f80b207628f1f64197c80a04130c
checksum/secret: 048f8e7e067f646dce51250b72fe31919755c8fa62f7f4f4ccc054e20a719d9e
labels:
app: docker-registry
release: flyte-sandbox
Expand Down
16 changes: 11 additions & 5 deletions flyteplugins/go/tasks/pluginmachinery/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
package k8s

import (
"net/http"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/cache"
Expand Down Expand Up @@ -40,9 +38,14 @@ type Options struct {
// NewKubeClient creates a new KubeClient that caches reads and falls back to
// make API calls on failure. Write calls are not cached.
func NewKubeClient(config *rest.Config, options Options) (core.KubeClient, error) {
httpClient, err := rest.HTTPClientFor(config)
if err != nil {
return nil, err
}

if options.MapperProvider == nil {
options.MapperProvider = func(c *rest.Config) (meta.RESTMapper, error) {
return apiutil.NewDynamicRESTMapper(config, http.DefaultClient)
return apiutil.NewDynamicRESTMapper(config, httpClient)
}
}

Expand All @@ -53,7 +56,7 @@ func NewKubeClient(config *rest.Config, options Options) (core.KubeClient, error

if options.CacheOptions == nil {
options.CacheOptions = &cache.Options{
HTTPClient: http.DefaultClient,
HTTPClient: httpClient,
Mapper: mapper,
}
}
Expand All @@ -64,7 +67,10 @@ func NewKubeClient(config *rest.Config, options Options) (core.KubeClient, error
}

if options.ClientOptions == nil {
options.ClientOptions = &client.Options{Mapper: mapper}
options.ClientOptions = &client.Options{
HTTPClient: httpClient,
Mapper: mapper,
}
}

client, err := client.New(config, *options.ClientOptions)
Expand Down
54 changes: 39 additions & 15 deletions flytepropeller/pkg/controller/nodes/array/event_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,24 @@ package array
import (
"context"
"fmt"
"strconv"
"time"

"github.com/golang/protobuf/ptypes"

idlcore "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/event"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/encoding"
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/common"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/interfaces"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task"
)

type arrayEventRecorder interface {
interfaces.EventRecorder
process(ctx context.Context, nCtx interfaces.NodeExecutionContext, index int, retryAttempt uint32)
process(ctx context.Context, nCtx interfaces.NodeExecutionContext, index int, retryAttempt uint32) error
finalize(ctx context.Context, nCtx interfaces.NodeExecutionContext, taskPhase idlcore.TaskExecution_Phase, taskPhaseVersion uint32, eventConfig *config.EventConfig) error
finalizeRequired(ctx context.Context) bool
}
Expand All @@ -39,8 +42,23 @@ func (e *externalResourcesEventRecorder) RecordTaskEvent(ctx context.Context, ev
return nil
}

func (e *externalResourcesEventRecorder) process(ctx context.Context, nCtx interfaces.NodeExecutionContext, index int, retryAttempt uint32) {
externalResourceID := fmt.Sprintf("%s-%d", buildSubNodeID(nCtx, index), retryAttempt)
func (e *externalResourcesEventRecorder) process(ctx context.Context, nCtx interfaces.NodeExecutionContext, index int, retryAttempt uint32) error {
// generate externalResourceID
currentNodeUniqueID := nCtx.NodeID()
if nCtx.ExecutionContext().GetEventVersion() != v1alpha1.EventVersion0 {
var err error
currentNodeUniqueID, err = common.GenerateUniqueID(nCtx.ExecutionContext().GetParentInfo(), nCtx.NodeID())
if err != nil {
return err
}
}

uniqueID, err := encoding.FixedLengthUniqueIDForParts(task.IDMaxLength, []string{nCtx.NodeExecutionMetadata().GetOwnerID().Name, currentNodeUniqueID, strconv.Itoa(int(retryAttempt))})
if err != nil {
return err
}

externalResourceID := fmt.Sprintf("%s-n%d-%d", uniqueID, index, retryAttempt)

// process events
cacheStatus := idlcore.CatalogCacheStatus_CACHE_DISABLED
Expand Down Expand Up @@ -83,6 +101,8 @@ func (e *externalResourcesEventRecorder) process(ctx context.Context, nCtx inter
// clear nodeEvents and taskEvents
e.nodeEvents = e.nodeEvents[:0]
e.taskEvents = e.taskEvents[:0]

return nil
}

func (e *externalResourcesEventRecorder) finalize(ctx context.Context, nCtx interfaces.NodeExecutionContext,
Expand All @@ -94,6 +114,17 @@ func (e *externalResourcesEventRecorder) finalize(ctx context.Context, nCtx inte
return err
}

var taskID *idlcore.Identifier
subNode := nCtx.Node().GetArrayNode().GetSubNodeSpec()
if subNode != nil && subNode.Kind == v1alpha1.NodeKindTask {
executableTask, err := nCtx.ExecutionContext().GetTask(*subNode.GetTaskID())
if err != nil {
return err
}

taskID = executableTask.CoreTask().GetId()
}

nodeExecutionID := *nCtx.NodeExecutionMetadata().GetNodeExecutionID()
if nCtx.ExecutionContext().GetEventVersion() != v1alpha1.EventVersion0 {
currentNodeUniqueID, err := common.GenerateUniqueID(nCtx.ExecutionContext().GetParentInfo(), nodeExecutionID.NodeId)
Expand All @@ -103,26 +134,18 @@ func (e *externalResourcesEventRecorder) finalize(ctx context.Context, nCtx inte
nodeExecutionID.NodeId = currentNodeUniqueID
}

workflowExecutionID := nodeExecutionID.ExecutionId

taskExecutionEvent := &event.TaskExecutionEvent{
TaskId: &idlcore.Identifier{
ResourceType: idlcore.ResourceType_TASK,
Project: workflowExecutionID.Project,
Domain: workflowExecutionID.Domain,
Name: nCtx.NodeID(),
Version: "v1", // this value is irrelevant but necessary for the identifier to be valid
},
TaskId: taskID,
ParentNodeExecutionId: &nodeExecutionID,
RetryAttempt: 0, // ArrayNode will never retry
Phase: taskPhase,
PhaseVersion: taskPhaseVersion,
OccurredAt: occurredAt,
Metadata: &event.TaskExecutionMetadata{
ExternalResources: e.externalResources,
PluginIdentifier: "container",
PluginIdentifier: "k8s-array",
},
TaskType: "k8s-array",
TaskType: "container_array",
EventVersion: 1,
}

Expand Down Expand Up @@ -165,7 +188,8 @@ type passThroughEventRecorder struct {
interfaces.EventRecorder
}

func (*passThroughEventRecorder) process(ctx context.Context, nCtx interfaces.NodeExecutionContext, index int, retryAttempt uint32) {
func (*passThroughEventRecorder) process(ctx context.Context, nCtx interfaces.NodeExecutionContext, index int, retryAttempt uint32) error {
return nil
}

func (*passThroughEventRecorder) finalize(ctx context.Context, nCtx interfaces.NodeExecutionContext,
Expand Down
14 changes: 10 additions & 4 deletions flytepropeller/pkg/controller/nodes/array/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ func (a *arrayNodeHandler) Abort(ctx context.Context, nCtx interfaces.NodeExecut
logger.Warnf(ctx, "failed to record ArrayNode events: %v", err)
}

eventRecorder.process(ctx, nCtx, i, retryAttempt)
if err := eventRecorder.process(ctx, nCtx, i, retryAttempt); err != nil {
logger.Warnf(ctx, "failed to record ArrayNode events: %v", err)
}
}
}
}
Expand Down Expand Up @@ -241,7 +243,9 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
logger.Warnf(ctx, "failed to record ArrayNode events: %v", err)
}

eventRecorder.process(ctx, nCtx, i, 0)
if err := eventRecorder.process(ctx, nCtx, i, 0); err != nil {
logger.Warnf(ctx, "failed to record ArrayNode events: %v", err)
}
}

// transition ArrayNode to `ArrayNodePhaseExecuting`
Expand Down Expand Up @@ -331,7 +335,9 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
}
}
}
eventRecorder.process(ctx, nCtx, index, subNodeStatus.GetAttempts())
if err := eventRecorder.process(ctx, nCtx, index, subNodeStatus.GetAttempts()); err != nil {
return handler.UnknownTransition, err
}

// update subNode state
arrayNodeState.SubNodePhases.SetItem(index, uint64(subNodeStatus.GetPhase()))
Expand Down Expand Up @@ -633,7 +639,7 @@ func (a *arrayNodeHandler) buildArrayNodeContext(ctx context.Context, nCtx inter
// currently just mocking based on node phase -> which works for all k8s plugins
// we can not pre-allocated a bit array because max size is 256B and with 5k fanout node state = 1.28MB
pluginStateBytes := a.pluginStateBytesStarted
if taskPhase == int(core.PhaseUndefined) || taskPhase == int(core.PhaseRetryableFailure) {
if taskPhase == int(core.PhaseUndefined) || taskPhase == int(core.PhaseRetryableFailure) || taskPhase == int(core.PhaseWaitingForResources) {
pluginStateBytes = a.pluginStateBytesNotStarted
}

Expand Down
23 changes: 23 additions & 0 deletions flytepropeller/pkg/controller/nodes/array/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"k8s.io/apimachinery/pkg/types"

idlcore "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/event"
Expand Down Expand Up @@ -145,6 +146,10 @@ func createNodeExecutionContext(dataStore *storage.DataStore, eventRecorder inte
Name: "name",
},
})
nodeExecutionMetadata.OnGetOwnerID().Return(types.NamespacedName{
Namespace: "wf-namespace",
Name: "wf-name",
})
nCtx.OnNodeExecutionMetadata().Return(nodeExecutionMetadata)

// NodeID
Expand Down Expand Up @@ -507,6 +512,24 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING},
},
{
name: "StartSubNodesNewAttempts",
subNodePhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseQueued,
v1alpha1.NodePhaseQueued,
},
subNodeTaskPhases: []core.Phase{
core.PhaseRetryableFailure,
core.PhaseWaitingForResources,
},
subNodeTransitions: []handler.Transition{
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})),
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})),
},
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_RUNNING},
},
{
name: "AllSubNodesSuccedeed",
subNodePhases: []v1alpha1.NodePhase{
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 662355c

Please sign in to comment.