diff --git a/apis/orchestration/v1alpha1/entityworkflow_types.go b/apis/orchestration/v1alpha1/entityworkflow_types.go index cbbeccb..cdef6c4 100644 --- a/apis/orchestration/v1alpha1/entityworkflow_types.go +++ b/apis/orchestration/v1alpha1/entityworkflow_types.go @@ -63,6 +63,9 @@ type EntityWorkflowStatus struct { // ResourceStatus includes common fields for tracking the status of external resources. xpv1.ResourceStatus `json:",inline"` + // WorkflowID `json:",inline"` + WorkflowID string `json:"workflowID,omitempty"` + // AtProvider contains the provider-specific observed state, such as workflow IDs and status. AtProvider EntityWorkflowObservation `json:"atProvider,omitempty"` } diff --git a/apis/orchestration/v1alpha1/zz_generated.deepcopy.go b/apis/orchestration/v1alpha1/zz_generated.deepcopy.go index 78a0a37..88776c0 100644 --- a/apis/orchestration/v1alpha1/zz_generated.deepcopy.go +++ b/apis/orchestration/v1alpha1/zz_generated.deepcopy.go @@ -24,6 +24,21 @@ import ( runtime "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EntityInput) DeepCopyInto(out *EntityInput) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new EntityInput. +func (in *EntityInput) DeepCopy() *EntityInput { + if in == nil { + return nil + } + out := new(EntityInput) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *EntityWorkflow) DeepCopyInto(out *EntityWorkflow) { *out = *in diff --git a/apis/v1alpha1/providerconfig_types.go b/apis/v1alpha1/providerconfig_types.go index 461d083..2f0731f 100644 --- a/apis/v1alpha1/providerconfig_types.go +++ b/apis/v1alpha1/providerconfig_types.go @@ -27,6 +27,10 @@ import ( // A ProviderConfigSpec defines the desired state of a ProviderConfig. type ProviderConfigSpec struct { + // Hostname is the temporal hostname (rpc endpoint). + Hostname string `json:"hostname"` + // Hostname is the temporal namespace used for our workflows. + Namespace string `json:"namespace"` // Credentials required to authenticate to this provider. Credentials ProviderCredentials `json:"credentials"` } diff --git a/examples/provider/config.yaml b/examples/provider/config.yaml index b2f86d2..811b1b4 100644 --- a/examples/provider/config.yaml +++ b/examples/provider/config.yaml @@ -17,6 +17,8 @@ kind: ProviderConfig metadata: name: default spec: + hostname: xxx + namespace: daanvi credentials: source: Secret secretRef: diff --git a/go.mod b/go.mod index 39360cf..a0687ef 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,9 @@ require ( github.com/crossplane/crossplane-tools v0.0.0-20230925130601-628280f8bf79 github.com/daanvinken/tempoplane v0.0.0-20241015192601-7f2b8bb1683a github.com/google/go-cmp v0.6.0 + github.com/google/uuid v1.6.0 github.com/pkg/errors v0.9.1 + go.temporal.io/api v1.38.0 go.temporal.io/sdk v1.29.1 gopkg.in/alecthomas/kingpin.v2 v2.2.6 k8s.io/apimachinery v0.29.2 @@ -43,7 +45,6 @@ require ( github.com/golang/protobuf v1.5.4 // indirect github.com/google/gnostic-models v0.6.8 // indirect github.com/google/gofuzz v1.2.0 // indirect - github.com/google/uuid v1.6.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect github.com/imdario/mergo v0.3.16 // indirect @@ -70,7 +71,6 @@ require ( github.com/spf13/pflag v1.0.5 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/stretchr/testify v1.9.0 // indirect - go.temporal.io/api v1.38.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.26.0 // indirect golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 // indirect diff --git a/go.sum b/go.sum index 1d12e94..ddd7f31 100644 --- a/go.sum +++ b/go.sum @@ -22,8 +22,6 @@ github.com/crossplane/crossplane-runtime v1.16.0 h1:lz+l0wEB3qowdTmN7t0PZkfuNSvf github.com/crossplane/crossplane-runtime v1.16.0/go.mod h1:Pz2tdGVMF6KDGzHZOkvKro0nKc8EzK0sb/nSA7pH4Dc= github.com/crossplane/crossplane-tools v0.0.0-20230925130601-628280f8bf79 h1:HigXs5tEQxWz0fcj8hzbU2UAZgEM7wPe0XRFOsrtF8Y= github.com/crossplane/crossplane-tools v0.0.0-20230925130601-628280f8bf79/go.mod h1:+e4OaFlOcmr0JvINHl/yvEYBrZawzTgj6pQumOH1SS0= -github.com/daanvinken/tempoplane v0.0.0-20241015122140-7ede1cdd040c h1:LEtqb3QP8HrpcWsb6xNaiwqsojnqOX13TKqlDHx9ans= -github.com/daanvinken/tempoplane v0.0.0-20241015122140-7ede1cdd040c/go.mod h1:ewmbXaPj6s3VvtdGCG/xuL+k0l0edywmodS1wQ6vwOc= github.com/daanvinken/tempoplane v0.0.0-20241015192601-7f2b8bb1683a h1:LA3MHm88hZAFXN5hbo1+kaUJp6smjalWzonNUaLzOOE= github.com/daanvinken/tempoplane v0.0.0-20241015192601-7f2b8bb1683a/go.mod h1:ewmbXaPj6s3VvtdGCG/xuL+k0l0edywmodS1wQ6vwOc= github.com/dave/jennifer v1.4.1 h1:XyqG6cn5RQsTj3qlWQTKlRGAyrTcsk1kUmWdZBzRjDw= diff --git a/internal/controller/entityworkflow/entityworkflow.go b/internal/controller/entityworkflow/entityworkflow.go index 95ad839..0fb9761 100644 --- a/internal/controller/entityworkflow/entityworkflow.go +++ b/internal/controller/entityworkflow/entityworkflow.go @@ -20,8 +20,6 @@ import ( "context" "fmt" "github.com/google/uuid" - "os" - "github.com/pkg/errors" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" @@ -43,19 +41,12 @@ import ( ) const ( - errNotEntityWorkflow = "managed resource is not a EntityWorkflow custom resource" - errTrackPCUsage = "cannot track ProviderConfig usage" - errGetPC = "cannot get ProviderConfig" - errGetCreds = "cannot get credentials" - - errNewClient = "cannot create new Service" -) - -// A NoOpService does nothing. -type NoOpService struct{} - -var ( - newNoOpService = func(_ []byte) (interface{}, error) { return &NoOpService{}, nil } + errNotEntityWorkflow = "managed resource is not a EntityWorkflow custom resource" + errTrackPCUsage = "cannot track ProviderConfig usage" + errGetPC = "cannot get ProviderConfig" + errGetCreds = "cannot get credentials" + errTemporalConnection = "cannot connect to temporal" + errNewClient = "cannot create new Service" ) // Setup adds a controller that reconciles EntityWorkflow managed resources. @@ -70,9 +61,9 @@ func Setup(mgr ctrl.Manager, o controller.Options) error { r := managed.NewReconciler(mgr, resource.ManagedKind(v1alpha1.EntityWorkflowGroupVersionKind), managed.WithExternalConnecter(&connector{ - kube: mgr.GetClient(), - usage: resource.NewProviderConfigUsageTracker(mgr.GetClient(), &apisv1alpha1.ProviderConfigUsage{}), - newServiceFn: newNoOpService}), + kube: mgr.GetClient(), + usage: resource.NewProviderConfigUsageTracker(mgr.GetClient(), &apisv1alpha1.ProviderConfigUsage{}), + }), managed.WithLogger(o.Logger.WithValues("controller", name)), managed.WithPollInterval(o.PollInterval), managed.WithRecorder(event.NewAPIRecorder(mgr.GetEventRecorderFor(name))), @@ -89,9 +80,9 @@ func Setup(mgr ctrl.Manager, o controller.Options) error { // A connector is expected to produce an ExternalClient when its Connect method // is called. type connector struct { - kube client.Client - usage resource.Tracker - newServiceFn func(creds []byte) (interface{}, error) + kube client.Client + usage resource.Tracker + temporal client.Client } // Connect typically produces an ExternalClient by: @@ -114,29 +105,33 @@ func (c *connector) Connect(ctx context.Context, mg resource.Managed) (managed.E return nil, errors.Wrap(err, errGetPC) } - cd := pc.Spec.Credentials - data, err := resource.CommonCredentialExtractor(ctx, cd.Source, c.kube, cd.CommonCredentialSelectors) + //cd := pc.Spec.Credentials + + tc, err := temporalclient.Dial(temporalclient.Options{ + HostPort: pc.Spec.Hostname, + Namespace: pc.Spec.Namespace, + }) + if err != nil { - return nil, errors.Wrap(err, errGetCreds) + return &external{}, errors.Wrap(err, errTemporalConnection) } - svc, err := c.newServiceFn(data) + hreq := temporalclient.CheckHealthRequest{} + _, err = tc.CheckHealth(context.Background(), &hreq) if err != nil { - return nil, errors.Wrap(err, errNewClient) + return &external{}, errors.Wrap(err, errTemporalConnection) } - return &external{service: svc}, nil + return &external{temporalClient: tc}, nil } // An ExternalClient observes, then either creates, updates, or deletes an // external resource to ensure it reflects the managed resource's desired state. type external struct { - // A 'client' used to connect to the external resource API. In practice this - // would be something like an AWS SDK client. - service interface{} + temporalClient temporalclient.Client } -func ConvertToEntityWorkflowInput(input v1alpha1.EntityInput) entityworkflow.EntityInput { +func ConvertToEntityWorkflowInput(input v1alpha1.EntityInput, workflowID string) entityworkflow.EntityInput { return entityworkflow.EntityInput{ EntityID: input.EntityID, Kind: input.Kind, @@ -146,7 +141,7 @@ func ConvertToEntityWorkflowInput(input v1alpha1.EntityInput) entityworkflow.Ent DC: input.DC, Env: input.Env, Timestamp: input.Timestamp, - CorrelationID: input.CorrelationID, + CorrelationID: workflowID, } } @@ -157,23 +152,14 @@ func (c *external) Observe(ctx context.Context, mg resource.Managed) (managed.Ex } // Extract WorkflowID from CorrelationID in EntityInput - workflowID := cr.Spec.ForProvider.EntityInput.CorrelationID - - // Create a Temporal client to query workflow status - tc, err := temporalclient.Dial(temporalclient.Options{ - HostPort: os.Getenv("TEMPORAL_ADDRESS"), - Namespace: os.Getenv("TEMPORAL_NS"), - }) - defer tc.Close() - if err != nil { - return managed.ExternalObservation{}, errors.Wrap(err, "failed to create Temporal client for observation") - } + workflowID := cr.Status.WorkflowID // Describe the workflow execution using the WorkflowID (CorrelationID) - // TODO probably wanna use runID as identifier on k8s - resp, err := tc.DescribeWorkflowExecution(ctx, workflowID, "") + resp, err := c.temporalClient.DescribeWorkflowExecution(ctx, workflowID, "") + if err != nil { //TODO improve error check + fmt.Println("Resource doesn't exist, returning") if string(err.Error()) == "operation GetCurrentExecution encountered not found" { return managed.ExternalObservation{ResourceExists: false}, nil } @@ -190,6 +176,7 @@ func (c *external) Observe(ctx context.Context, mg resource.Managed) (managed.Ex ResourceUpToDate: true, ConnectionDetails: managed.ConnectionDetails{ "WorkflowID": []byte(workflowID), + "runID": []byte(resp.WorkflowExecutionInfo.FirstRunId), "Status": []byte("completed"), }, }, nil @@ -206,6 +193,7 @@ func (c *external) Observe(ctx context.Context, mg resource.Managed) (managed.Ex ResourceUpToDate: false, ConnectionDetails: managed.ConnectionDetails{ "WorkflowID": []byte(workflowID), + "runID": []byte(resp.WorkflowExecutionInfo.Execution.RunId), //TODO correct RunID? "Status": []byte(workflowStatus.String()), }, }, nil @@ -213,6 +201,7 @@ func (c *external) Observe(ctx context.Context, mg resource.Managed) (managed.Ex } func (c *external) Create(ctx context.Context, mg resource.Managed) (managed.ExternalCreation, error) { + fmt.Println("creating...") cr, ok := mg.(*v1alpha1.EntityWorkflow) if !ok { return managed.ExternalCreation{}, errors.New(errNotEntityWorkflow) @@ -225,41 +214,22 @@ func (c *external) Create(ctx context.Context, mg resource.Managed) (managed.Ext taskQueue := "TempoPlane-" + entityInput.Kind //uniqueID := uuid.New().String() - //return fmt.Sprintf("%s-%s-%s-%s", operationType, entityID, requesterID, uniqueID) + //ID := "TempoPlane" + "-" + uniqueID + "-" + entityInput.RequesterID ID := "TempoPlane" + "-" + entityInput.RequesterID - // Set CorrelationID to workflow ID - // TODO uniqueness - entityInput.CorrelationID = ID cr.Spec.ForProvider.EntityInput.CorrelationID = ID - //TODO Now we need to update this - //err := c.kubeClient.Update(ctx, cr) - //if err != nil { - // return managed.ExternalCreation{}, err - //} - // Configure workflow execution options workflowOptions := temporalclient.StartWorkflowOptions{ TaskQueue: taskQueue, ID: ID, } - - // Create a Temporal client (assumes you have setup to obtain this client externally, adjust as needed) - tc, err := temporalclient.Dial(temporalclient.Options{ - HostPort: os.Getenv("TEMPORAL_ADDRESS"), - Namespace: os.Getenv("TEMPORAL_NS"), - }) - defer tc.Close() - if err != nil { - return managed.ExternalCreation{}, errors.Wrap(err, "failed to create Temporal client") - } - // Execute the CreateWorkflow - workflowExecution, err := tc.ExecuteWorkflow(ctx, workflowOptions, entityworkflow.CreateWorkflow, ConvertToEntityWorkflowInput(entityInput)) + workflowExecution, err := c.temporalClient.ExecuteWorkflow(ctx, workflowOptions, entityworkflow.CreateWorkflow, ConvertToEntityWorkflowInput(entityInput, ID)) if err != nil { return managed.ExternalCreation{}, errors.Wrap(err, "failed to start CreateWorkflow") } + fmt.Println("Executing workflow") // Log the WorkflowID and RunID for reference fmt.Printf("Started CreateWorkflow with WorkflowID: %s and RunID: %s\n", workflowExecution.GetID(), workflowExecution.GetRunID()) @@ -315,20 +285,8 @@ func (c *external) Delete(ctx context.Context, mg resource.Managed) error { ID: ID, } - //TODO DRY this stuff - - // Create a Temporal client - tc, err := temporalclient.Dial(temporalclient.Options{ - HostPort: os.Getenv("TEMPORAL_ADDRESS"), - Namespace: os.Getenv("TEMPORAL_NS"), - }) - defer tc.Close() - if err != nil { - return errors.Wrap(err, "failed to create Temporal client") - } - // Execute the DeleteWorkflow - workflowExecution, err := tc.ExecuteWorkflow(ctx, workflowOptions, entityworkflow.DeleteWorkflow, ConvertToEntityWorkflowInput(entityInput)) + workflowExecution, err := c.temporalClient.ExecuteWorkflow(ctx, workflowOptions, entityworkflow.DeleteWorkflow, ConvertToEntityWorkflowInput(entityInput)) if err != nil { return errors.Wrap(err, "failed to start DeleteWorkflow") } diff --git a/internal/utils/temporal.go b/internal/utils/temporal.go new file mode 100644 index 0000000..da71447 --- /dev/null +++ b/internal/utils/temporal.go @@ -0,0 +1,19 @@ +package utils + +import ( + "github.com/pkg/errors" + "go.temporal.io/sdk/client" + "os" +) + +// NewTemporalClient initializes a Temporal client with configuration from environment variables. +func NewTemporalClient() (client.Client, error) { + tc, err := client.Dial(client.Options{ + HostPort: os.Getenv("TEMPORAL_ADDRESS"), + Namespace: os.Getenv("TEMPORAL_NS"), + }) + if err != nil { + return nil, errors.Wrap(err, "failed to create Temporal client") + } + return tc, nil +} diff --git a/package/crds/orchestration.tempoplane.crossplane.io_entityworkflows.yaml b/package/crds/orchestration.tempoplane.crossplane.io_entityworkflows.yaml index 5effd55..528a057 100644 --- a/package/crds/orchestration.tempoplane.crossplane.io_entityworkflows.yaml +++ b/package/crds/orchestration.tempoplane.crossplane.io_entityworkflows.yaml @@ -78,6 +78,51 @@ spec: entityInput: description: EntityInput captures the input data required for CRUD operations. + properties: + apiVersion: + description: APIVersion specifies the API version of the entity. + type: string + correlationID: + description: CorrelationID is used for tracking and correlating + requests across services. + type: string + data: + description: Data contains the main payload for the entity. + type: string + dc: + description: DC specifies the data center for deployment or + location context. + type: string + entityID: + description: EntityID is the unique identifier for the entity + in the workflow. + type: string + env: + description: Env specifies the environment (e.g., production, + development). + type: string + kind: + description: Kind specifies the kind of entity (e.g., MyType, + ResourceType). + type: string + requesterID: + description: RequesterID identifies the entity's requester. + type: string + timestamp: + description: Timestamp is the Unix timestamp of when the request + was made. + format: int64 + type: integer + required: + - apiVersion + - correlationID + - data + - dc + - entityID + - env + - kind + - requesterID + - timestamp type: object required: - entityInput @@ -331,6 +376,9 @@ spec: it can not recover from without human intervention. format: int64 type: integer + workflowID: + description: WorkflowID `json:",inline"` + type: string type: object required: - spec diff --git a/package/crds/tempoplane.crossplane.io_providerconfigs.yaml b/package/crds/tempoplane.crossplane.io_providerconfigs.yaml index 4aa9325..5b4fa79 100644 --- a/package/crds/tempoplane.crossplane.io_providerconfigs.yaml +++ b/package/crds/tempoplane.crossplane.io_providerconfigs.yaml @@ -103,8 +103,16 @@ spec: required: - source type: object + hostname: + description: Hostname is the temporal hostname (rpc endpoint). + type: string + namespace: + description: Hostname is the temporal namespace used for our workflows. + type: string required: - credentials + - hostname + - namespace type: object status: description: A ProviderConfigStatus reflects the observed state of a ProviderConfig. diff --git a/tmp/sa.kubeconfig b/tmp/sa.kubeconfig new file mode 100644 index 0000000..a5a65d0 --- /dev/null +++ b/tmp/sa.kubeconfig @@ -0,0 +1,20 @@ + +apiVersion: v1 +kind: Config +clusters: +- name: default-cluster + cluster: + certificate-authority-data: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJlRENDQVIrZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWtNU0l3SUFZRFZRUUREQmx5YTJVeUxYTmwKY25abGNpMWpZVUF4Tmprd09EZzBOREE0TUI0WERUSXpNRGd3TVRFd01EWTBPRm9YRFRNek1EY3lPVEV3TURZMApPRm93SkRFaU1DQUdBMVVFQXd3WmNtdGxNaTF6WlhKMlpYSXRZMkZBTVRZNU1EZzRORFF3T0RCWk1CTUdCeXFHClNNNDlBZ0VHQ0NxR1NNNDlBd0VIQTBJQUJLSnpmOXBUMEJWLzhaeE5QaHFJUm1MTEQ1MnlQMUZpODFxbUltR08KRmZlb3FReGpGTDFFZGYyQ3V1Mlp4dXhJSkxmbTFOMjJ4TzZ1NnBBNFJ3elVRR0tqUWpCQU1BNEdBMVVkRHdFQgovd1FFQXdJQ3BEQVBCZ05WSFJNQkFmOEVCVEFEQVFIL01CMEdBMVVkRGdRV0JCUVZuUDVpZEoyeDYxdlljSFVkClZIVjIxNzFlK0RBS0JnZ3Foa2pPUFFRREFnTkhBREJFQWlCdDRqSldKNFBuS0x3T3BVYjZPYnk5NDlaVXp3UEcKMkhmZDFUWGNudW9WVGdJZ1RlbkZxQ2Q5aWEyZVVxZlBJMy9zNWUwdTVpcXk1L0VxN0piZHhuWGExMzA9Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K + server: https://k8s-dev01.service.ams1o.consul:6443 +contexts: +- name: default-context@a + context: + cluster: default-cluster + namespace: crossplane-system + user: default-user +current-context: default-context@a +users: +- name: default-user + user: + token: eyJhbGciOiJSUzI1NiIsImtpZCI6IjFTTkJqQ25TREQwLVhBZ2toNlFrdzZLczY4OVJNUk8zQXBkTnJHT2l1cU0ifQ.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJjcm9zc3BsYW5lLXN5c3RlbSIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VjcmV0Lm5hbWUiOiJhZG1pbi1zZXJ2aWNlYWNjb3VudC10b2tlbiIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VydmljZS1hY2NvdW50Lm5hbWUiOiJhZG1pbi1zZXJ2aWNlYWNjb3VudCIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VydmljZS1hY2NvdW50LnVpZCI6ImU4ZGYzZmYzLWI5ZTctNGMzNi04ODU3LWM3Yjk2MWNhYzVhMyIsInN1YiI6InN5c3RlbTpzZXJ2aWNlYWNjb3VudDpjcm9zc3BsYW5lLXN5c3RlbTphZG1pbi1zZXJ2aWNlYWNjb3VudCJ9.mbH5JYQovl4qB2fm7TYHqY-Sr90CmN2kAWa_iNN0hNWUi0JeeIHesQUJq7rNZ3xY1w1GAqytd0p-4GMB7To3b4a85Em47LBEOTWup-kw9-8Ixx2T0Nfnt1EMstvAUaQ3xLuAWUa6lB76I0C35-bquS6P1q31geQgxnI20SQ9AiDL9B7_7tVlvEYanOBNkK_58N3w796no1k996Poxd99OEAXBiNz5vvOg5z_hLzbK9XQBch93LhUoVQ63Acn4e4qhveUNk5S32U-6l2hD47ehX528NTsZa8-oBw4LX9YgIzQi2Wi0hB8zln29L4sTyoE72cYRhtq2B2nCTzRJcXFIg +