Skip to content

Commit

Permalink
finish changes
Browse files Browse the repository at this point in the history
  • Loading branch information
mfleader committed Dec 5, 2023
1 parent 3ed1440 commit c2f203a
Showing 1 changed file with 59 additions and 49 deletions.
108 changes: 59 additions & 49 deletions workflow/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,66 +550,76 @@ func (e *executor) buildOutputProperties(
err,
)
}

stageOutputsLen := len(stage.Outputs)
if stageOutputsLen == 0 {
continue
}
stageOutputProperties := make(map[string]*schema.PropertySchema, stageOutputsLen)

for outputID, outputSchema := range stage.Outputs {
stageDAGItem := &DAGItem{
Kind: DAGItemKindStepStageOutput,
StepID: stepID,
StageID: stage.ID,
OutputID: outputID,
Provider: runnableStep,
}
stageNode, err := dag.AddNode(stageDAGItem.String(), stageDAGItem)
if err != nil {
return nil, fmt.Errorf(
"failed to add output %s for stage %s in step %s to DAG (%w)",
outputID,
stage.ID,
stepID,
err,
)
}
if err := stepNode.Connect(stageNode.ID()); err != nil {
return nil, fmt.Errorf(
"failed to connect stage %s to its output %s in step %s (%w)",
stage.ID, outputID, stepID, err,
)

if stageOutputsLen > 0 {
// only add stages with outputs
stageOutputProperties := make(map[string]*schema.PropertySchema, stageOutputsLen)
m, err2 := e.addOutputProperties(
stage, stepID, runnableStep, dag, stepNode,
stageOutputProperties, outputProperties)
if err2 != nil {
return m, err2
}
stageOutputProperties[outputID] = schema.NewPropertySchema(
outputSchema.Schema(),
outputSchema.Display(),
false,
nil,
nil,
nil,
nil,
nil,
)
}
}
return outputProperties, nil
}

stageOutputs := schema.NewObjectSchema(
GetStageNodeID(stepID, stage.ID),
stageOutputProperties,
)

outputProperties[stage.ID] = schema.NewPropertySchema(
stageOutputs,
nil,
true,
func (e *executor) addOutputProperties(stage step.LifecycleStageWithSchema, stepID string, runnableStep step.RunnableStep, dag dgraph.DirectedGraph[*DAGItem], stepNode dgraph.Node[*DAGItem], stageOutputProperties map[string]*schema.PropertySchema, outputProperties map[string]*schema.PropertySchema) (map[string]*schema.PropertySchema, error) {
for outputID, outputSchema := range stage.Outputs {
stageDAGItem := &DAGItem{
Kind: DAGItemKindStepStageOutput,
StepID: stepID,
StageID: stage.ID,
OutputID: outputID,
Provider: runnableStep,
}
stageNode, err := dag.AddNode(stageDAGItem.String(), stageDAGItem)
if err != nil {
return nil, fmt.Errorf(
"failed to add output %s for stage %s in step %s to DAG (%w)",
outputID,
stage.ID,
stepID,
err,
)
}
if err := stepNode.Connect(stageNode.ID()); err != nil {
return nil, fmt.Errorf(
"failed to connect stage %s to its output %s in step %s (%w)",
stage.ID, outputID, stepID, err,
)
}
stageOutputProperties[outputID] = schema.NewPropertySchema(
outputSchema.Schema(),
outputSchema.Display(),
false,
nil,
nil,
nil,
nil,
nil,
)
}
return outputProperties, nil

stageOutputs := schema.NewObjectSchema(
GetStageNodeID(stepID, stage.ID),
stageOutputProperties,
)

outputProperties[stage.ID] = schema.NewPropertySchema(
stageOutputs,
nil,
true,
nil,
nil,
nil,
nil,
nil,
)
return nil, nil
}

func (e *executor) getRunData(stepKind step.Provider, runnableStep step.RunnableStep, stepID string, stepDataMap map[any]any) (map[string]any, error) {
Expand Down

0 comments on commit c2f203a

Please sign in to comment.