Skip to content

Commit

Permalink
more examples and improve status handling (delete)
Browse files Browse the repository at this point in the history
  • Loading branch information
daanvinken committed Oct 18, 2024
1 parent c215dfa commit f4ac589
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 271 deletions.
2 changes: 1 addition & 1 deletion apis/orchestration/v1alpha1/entityworkflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type EntityWorkflowObservation struct {
// WorkflowID is the unique identifier of the Temporal workflow.
WorkflowID string `json:"workflowID,omitempty"`

// RunID is the unique identifier for the specific run of the Temporal workflow.
// RunID is the unique identifier for the last known run of the Temporal workflow.
RunID string `json:"runID,omitempty"`

// Status indicates the current state of the workflow, such as running, completed, or failed.
Expand Down
3 changes: 3 additions & 0 deletions cmd/provider/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package main

import (
"context"
"k8s.io/klog/v2"
"os"
"path/filepath"
"time"
Expand Down Expand Up @@ -61,6 +62,8 @@ func main() {

zl := zap.New(zap.UseDevMode(*debug))
log := logging.NewLogrLogger(zl.WithName("provider-tempoplane"))
ctrl.SetLogger(zl)
klog.SetLogger(zl)
if *debug {
// The controller-runtime runs with a no-op logger by default. It is
// *very* verbose even at info level, so we only provide it a real
Expand Down
3 changes: 2 additions & 1 deletion examples/provider/config.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

apiVersion: v1
kind: Namespace
metadata:
Expand All @@ -17,7 +18,7 @@ kind: ProviderConfig
metadata:
name: default
spec:
hostname: xxx
hostname: temporal-poc-rpc.app01.k8s.adyen.com:7233
namespace: daanvi
credentials:
source: Secret
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
apiVersion: orchestration.tempoplane.crossplane.io/v1alpha1
kind: EntityWorkflow
metadata:
name: example-entityworkflow
name: example-entityworkflow-1
spec:
deletionPolicy: Delete
forProvider:
entityInput:
entityID: "entity-420"
kind: "HostLocalFile"
apiVersion: "0.0.1"
data: "SomeTestDataCouldBeJSON"
data: "/tmp/abcdf.txt"
requesterID: "CrossPlane"
dc: "EIN1"
env: "beta"
Expand Down
19 changes: 19 additions & 0 deletions examples/sample/entityworkflow2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
apiVersion: orchestration.tempoplane.crossplane.io/v1alpha1
kind: EntityWorkflow
metadata:
name: example-entityworkflow-2
spec:
deletionPolicy: Delete
forProvider:
entityInput:
entityID: "entity-420"
kind: "HostLocalFile"
apiVersion: "0.0.1"
data: "/tmp/ghijk.txt"
requesterID: "CrossPlane"
dc: "EIN1"
env: "beta"
timestamp: 420000
correlationID: "TempoPlane-CrossPlane"
providerConfigRef:
name: default
82 changes: 61 additions & 21 deletions internal/controller/entityworkflow/entityworkflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package entityworkflow
import (
"context"
"fmt"
xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1"
"github.com/google/uuid"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -46,7 +47,8 @@ const (
errGetPC = "cannot get ProviderConfig"
errGetCreds = "cannot get credentials"
errTemporalConnection = "cannot connect to temporal"
errNewClient = "cannot create new Service"
errNewKubeClient = "cannot connect to KubeAPI"
errUpdateKubeObject = "cannot update Kubernetes object"
)

// Setup adds a controller that reconciles EntityWorkflow managed resources.
Expand Down Expand Up @@ -105,8 +107,6 @@ func (c *connector) Connect(ctx context.Context, mg resource.Managed) (managed.E
return nil, errors.Wrap(err, errGetPC)
}

//cd := pc.Spec.Credentials

tc, err := temporalclient.Dial(temporalclient.Options{
HostPort: pc.Spec.Hostname,
Namespace: pc.Spec.Namespace,
Expand All @@ -122,16 +122,20 @@ func (c *connector) Connect(ctx context.Context, mg resource.Managed) (managed.E
return &external{}, errors.Wrap(err, errTemporalConnection)
}

return &external{temporalClient: tc}, nil
return &external{
temporalClient: tc,
kubeClient: c.kube,
}, nil
}

// An ExternalClient observes, then either creates, updates, or deletes an
// external resource to ensure it reflects the managed resource's desired state.
type external struct {
temporalClient temporalclient.Client
kubeClient client.Client
}

func ConvertToEntityWorkflowInput(input v1alpha1.EntityInput, workflowID string) entityworkflow.EntityInput {
func ConvertToEntityWorkflowInput(input v1alpha1.EntityInput) entityworkflow.EntityInput {
return entityworkflow.EntityInput{
EntityID: input.EntityID,
Kind: input.Kind,
Expand All @@ -141,7 +145,7 @@ func ConvertToEntityWorkflowInput(input v1alpha1.EntityInput, workflowID string)
DC: input.DC,
Env: input.Env,
Timestamp: input.Timestamp,
CorrelationID: workflowID,
CorrelationID: input.CorrelationID,
}
}

Expand All @@ -153,14 +157,20 @@ func (c *external) Observe(ctx context.Context, mg resource.Managed) (managed.Ex

// Extract WorkflowID from CorrelationID in EntityInput
workflowID := cr.Status.WorkflowID
lastRunID := cr.Status.AtProvider.RunID

if workflowID == "" || lastRunID == "" {
return managed.ExternalObservation{ResourceExists: false}, nil
}

// Describe the workflow execution using the WorkflowID (CorrelationID)
resp, err := c.temporalClient.DescribeWorkflowExecution(ctx, workflowID, "")
resp, err := c.temporalClient.DescribeWorkflowExecution(ctx, workflowID, lastRunID)

// If we are deleting, we'll just check if the latest run was a delete and succeeded. Observe is always called after delete.

if err != nil {
//TODO improve error check
fmt.Println("Resource doesn't exist, returning")
if string(err.Error()) == "operation GetCurrentExecution encountered not found" {
//TODO (daanvi) improve error check
if string(err.Error()) == "operation GetCurrentExecution encountered not found" || string(err.Error()) == "operation GetWorkflowExecution encountered not found" {
return managed.ExternalObservation{ResourceExists: false}, nil
}
return managed.ExternalObservation{}, errors.Wrap(err, "failed to describe workflow execution")
Expand All @@ -170,6 +180,15 @@ func (c *external) Observe(ctx context.Context, mg resource.Managed) (managed.Ex
workflowStatus := resp.WorkflowExecutionInfo.GetStatus()
switch workflowStatus {
case temporalpb.WORKFLOW_EXECUTION_STATUS_COMPLETED:
if resp.WorkflowExecutionInfo.Type.Name == "DeleteWorkflow" {
fmt.Println("Now we done")
// Indicate that we're about to delete the instance.
cr.SetConditions(xpv1.Deleting())
return managed.ExternalObservation{
ResourceExists: false,
}, nil
}
cr.SetConditions(xpv1.Available())
// If the workflow has completed successfully
return managed.ExternalObservation{
ResourceExists: true,
Expand Down Expand Up @@ -201,7 +220,7 @@ func (c *external) Observe(ctx context.Context, mg resource.Managed) (managed.Ex
}

func (c *external) Create(ctx context.Context, mg resource.Managed) (managed.ExternalCreation, error) {
fmt.Println("creating...")
mg.SetConditions(xpv1.Creating())
cr, ok := mg.(*v1alpha1.EntityWorkflow)
if !ok {
return managed.ExternalCreation{}, errors.New(errNotEntityWorkflow)
Expand All @@ -213,9 +232,8 @@ func (c *external) Create(ctx context.Context, mg resource.Managed) (managed.Ext
// Define the task queue that the workflow worker is listening on
taskQueue := "TempoPlane-" + entityInput.Kind

//uniqueID := uuid.New().String()
//ID := "TempoPlane" + "-" + uniqueID + "-" + entityInput.RequesterID
ID := "TempoPlane" + "-" + entityInput.RequesterID
uniqueID := uuid.New().String()
ID := "TempoPlane" + "-" + uniqueID + "-" + entityInput.RequesterID

cr.Spec.ForProvider.EntityInput.CorrelationID = ID

Expand All @@ -224,12 +242,24 @@ func (c *external) Create(ctx context.Context, mg resource.Managed) (managed.Ext
TaskQueue: taskQueue,
ID: ID,
}

cr.Status.WorkflowID = ID

if err := c.kubeClient.Status().Update(ctx, cr); err != nil {
return managed.ExternalCreation{}, errors.Wrap(err, errUpdateKubeObject)
}

// Execute the CreateWorkflow
workflowExecution, err := c.temporalClient.ExecuteWorkflow(ctx, workflowOptions, entityworkflow.CreateWorkflow, ConvertToEntityWorkflowInput(entityInput, ID))
workflowExecution, err := c.temporalClient.ExecuteWorkflow(ctx, workflowOptions, entityworkflow.CreateWorkflow, ConvertToEntityWorkflowInput(entityInput))
if err != nil {
return managed.ExternalCreation{}, errors.Wrap(err, "failed to start CreateWorkflow")
}
fmt.Println("Executing workflow")

cr.Status.AtProvider.RunID = workflowExecution.GetRunID()

if err := c.kubeClient.Status().Update(ctx, cr); err != nil {
return managed.ExternalCreation{}, errors.Wrap(err, errUpdateKubeObject)
}

// Log the WorkflowID and RunID for reference
fmt.Printf("Started CreateWorkflow with WorkflowID: %s and RunID: %s\n", workflowExecution.GetID(), workflowExecution.GetRunID())
Expand Down Expand Up @@ -270,19 +300,18 @@ func (c *external) Delete(ctx context.Context, mg resource.Managed) error {
return errors.New(errNotEntityWorkflow)
}

//TODO (daanvi) Multiple calls seem to happen before delete

// Parse EntityInput from the managed resource's spec
entityInput := cr.Spec.ForProvider.EntityInput

// Define the task queue that the workflow worker is listening on
taskQueue := "TempoPlane-" + entityInput.Kind

uniqueID := uuid.New().String()
ID := "TempoPlane-Delete-" + uniqueID + "-" + entityInput.RequesterID

// Configure workflow execution options
workflowOptions := temporalclient.StartWorkflowOptions{
TaskQueue: taskQueue,
ID: ID,
ID: cr.Status.WorkflowID,
}

// Execute the DeleteWorkflow
Expand All @@ -291,6 +320,11 @@ func (c *external) Delete(ctx context.Context, mg resource.Managed) error {
return errors.Wrap(err, "failed to start DeleteWorkflow")
}

cr.Status.AtProvider.RunID = workflowExecution.GetRunID()
if err := c.kubeClient.Status().Update(ctx, cr); err != nil {
return errors.Wrap(err, errUpdateKubeObject)
}

// Log the WorkflowID and RunID for reference
fmt.Printf("Started DeleteWorkflow with WorkflowID: %s and RunID: %s\n", workflowExecution.GetID(), workflowExecution.GetRunID())

Expand All @@ -302,6 +336,12 @@ func (c *external) Delete(ctx context.Context, mg resource.Managed) error {
}
fmt.Printf("DeleteWorkflow completed successfully with result: %s\n", deleteResult.Message)

// Return nil as there's no ExternalCreation required for deletions
//TODO (daanvi) Implement deletion workflowID in state, prior to trigger. This way we can fetch in here the intial workflow that triggered deletion.
// If deletion is failing or already happened, we do not have to retry I think

// Note that we use resource.Ignore to squash any error that indicates the
// external resource does not exist. Delete implementations must not return
// an error when asked to delete a non-existent external resource.
//return errors.Wrap(resource.Ignore(database.IsNotFound, err), "cannot delete instance")
return nil
}
1 change: 0 additions & 1 deletion tmp/.gitignore

This file was deleted.

26 changes: 0 additions & 26 deletions tmp/clusteradmin.yaml

This file was deleted.

30 changes: 0 additions & 30 deletions tmp/create_kubeconfig.sh

This file was deleted.

Loading

0 comments on commit f4ac589

Please sign in to comment.