From 4aa13f898c7395b43e105c58552ba9f23152b8d8 Mon Sep 17 00:00:00 2001 From: Tommy Hughes Date: Mon, 4 Dec 2023 12:41:23 -0600 Subject: [PATCH 1/3] SonataFlowClusterPlatform CRD & Controller Signed-off-by: Tommy Hughes --- PROJECT | 8 + .../sonataflowclusterplatform_types.go | 101 +++++++++ ...sonataflowclusterplatform_types_support.go | 34 +++ .../sonataflowplatform_services_types.go | 8 +- api/v1alpha08/sonataflowplatform_types.go | 37 +++- api/v1alpha08/zz_generated.deepcopy.go | 178 ++++++++++++++- ...c.authorization.k8s.io_v1_clusterrole.yaml | 27 +++ ...taflow-operator.clusterserviceversion.yaml | 56 +++++ ...taflow.org_sonataflowclusterplatforms.yaml | 120 ++++++++++ .../sonataflow.org_sonataflowplatforms.yaml | 68 +++++- ...taflow.org_sonataflowclusterplatforms.yaml | 115 ++++++++++ .../sonataflow.org_sonataflowplatforms.yaml | 68 +++++- config/crd/kustomization.yaml | 3 + ...jection_in_sonataflowclusterplatforms.yaml | 7 + ...webhook_in_sonataflowclusterplatforms.yaml | 16 ++ ...taflow-operator.clusterserviceversion.yaml | 17 ++ config/rbac/role.yaml | 26 +++ ...sonataflowclusterplatform_editor_role.yaml | 31 +++ ...rplatform_viewer_cluster_role_binding.yaml | 13 ++ ...sonataflowclusterplatform_viewer_role.yaml | 27 +++ config/samples/kustomization.yaml | 1 + ...g_v1alpha08_sonataflowclusterplatform.yaml | 8 + controllers/clusterplatform/action.go | 50 +++++ .../clusterplatform/clusterplatform.go | 117 ++++++++++ controllers/clusterplatform/initialize.go | 118 ++++++++++ controllers/platform/k8s.go | 39 ++-- controllers/platform/platformutils.go | 16 +- controllers/platform/services/properties.go | 7 +- .../services/properties_services_test.go | 18 ++ controllers/platform/services/services.go | 159 +++++++++++-- .../common/constants/platform_services.go | 8 +- .../common/properties/application_test.go | 11 +- .../sonataflowclusterplatform_controller.go | 175 +++++++++++++++ controllers/sonataflowplatform_controller.go | 120 +++++++++- .../sonataflowplatform_controller_test.go | 143 +++++++++++- main.go | 10 + operator.yaml | 209 +++++++++++++++++- ...g_v1alpha08_sonataflowclusterplatform.yaml | 25 +++ test/yaml.go | 48 +++- 39 files changed, 2122 insertions(+), 120 deletions(-) create mode 100644 api/v1alpha08/sonataflowclusterplatform_types.go create mode 100644 api/v1alpha08/sonataflowclusterplatform_types_support.go create mode 100644 bundle/manifests/sonataflow-operator-sonataflowclusterplatform-viewer-role_rbac.authorization.k8s.io_v1_clusterrole.yaml create mode 100644 bundle/manifests/sonataflow.org_sonataflowclusterplatforms.yaml create mode 100644 config/crd/bases/sonataflow.org_sonataflowclusterplatforms.yaml create mode 100644 config/crd/patches/cainjection_in_sonataflowclusterplatforms.yaml create mode 100644 config/crd/patches/webhook_in_sonataflowclusterplatforms.yaml create mode 100644 config/rbac/sonataflowclusterplatform_editor_role.yaml create mode 100644 config/rbac/sonataflowclusterplatform_viewer_cluster_role_binding.yaml create mode 100644 config/rbac/sonataflowclusterplatform_viewer_role.yaml create mode 100644 config/samples/sonataflow.org_v1alpha08_sonataflowclusterplatform.yaml create mode 100644 controllers/clusterplatform/action.go create mode 100644 controllers/clusterplatform/clusterplatform.go create mode 100644 controllers/clusterplatform/initialize.go create mode 100644 controllers/sonataflowclusterplatform_controller.go create mode 100644 test/testdata/sonataflow.org_v1alpha08_sonataflowclusterplatform.yaml diff --git a/PROJECT b/PROJECT index f32f2b3cc..6ef3543eb 100644 --- a/PROJECT +++ b/PROJECT @@ -34,4 +34,12 @@ resources: kind: SonataFlowPlatform path: github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08 version: v1alpha08 +- api: + crdVersion: v1 + controller: true + domain: org + group: sonataflow + kind: SonataFlowClusterPlatform + path: github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08 + version: v1alpha08 version: "3" diff --git a/api/v1alpha08/sonataflowclusterplatform_types.go b/api/v1alpha08/sonataflowclusterplatform_types.go new file mode 100644 index 000000000..a67f5ecdc --- /dev/null +++ b/api/v1alpha08/sonataflowclusterplatform_types.go @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package v1alpha08 + +import ( + "github.com/apache/incubator-kie-kogito-serverless-operator/api" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + // SonataFlowClusterPlatformKind is the Kind name of the SonataFlowClusterPlatform CR + SonataFlowClusterPlatformKind string = "SonataFlowClusterPlatform" + PlatformNotFoundReason string = "PlatformNotFound" +) + +// SonataFlowClusterPlatformSpec defines the desired state of SonataFlowClusterPlatform +type SonataFlowClusterPlatformSpec struct { + // PlatformRef defines which existing SonataFlowPlatform's services should be used cluster-wide. + PlatformRef SonataFlowPlatformRef `json:"platformRef"` +} + +// SonataFlowPlatformRef defines which existing SonataFlowPlatform should be used cluster-wide +type SonataFlowPlatformRef struct { + // Name of the SonataFlowPlatform + //+operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Platform_Name" + Name string `json:"name"` + // Namespace of the SonataFlowPlatform + //+operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Platform_NS" + Namespace string `json:"namespace"` +} + +// SonataFlowClusterPlatformStatus defines the observed state of SonataFlowClusterPlatform +type SonataFlowClusterPlatformStatus struct { + api.Status `json:",inline"` + // Version the operator version controlling this ClusterPlatform + //+operator-sdk:csv:customresourcedefinitions:type=status,displayName="version" + Version string `json:"version,omitempty"` +} + +func (in *SonataFlowClusterPlatformStatus) GetTopLevelConditionType() api.ConditionType { + return api.SucceedConditionType +} + +func (in *SonataFlowClusterPlatformStatus) IsReady() bool { + return in.GetTopLevelCondition().IsTrue() +} + +func (in *SonataFlowClusterPlatformStatus) GetTopLevelCondition() *api.Condition { + return in.GetCondition(in.GetTopLevelConditionType()) +} + +func (in *SonataFlowClusterPlatformStatus) Manager() api.ConditionsManager { + return api.NewConditionManager(in, api.SucceedConditionType) +} + +func (in *SonataFlowClusterPlatformStatus) IsDuplicated() bool { + cond := in.GetTopLevelCondition() + return cond.IsFalse() && cond.Reason == PlatformDuplicatedReason +} + +// SonataFlowClusterPlatform is the Schema for the sonataflowclusterplatforms API +// +kubebuilder:object:root=true +// +kubebuilder:subresource:status +// +kubebuilder:resource:scope=Cluster +// +kubebuilder:printcolumn:name="Platform_Name",type=string,JSONPath=`.spec.platformRef.name` +// +kubebuilder:printcolumn:name="Platform_NS",type=string,JSONPath=`.spec.platformRef.namespace` +// +kubebuilder:printcolumn:name="Ready",type=string,JSONPath=`.status.conditions[?(@.type=='Succeed')].status` +// +kubebuilder:printcolumn:name="Reason",type=string,JSONPath=`.status.conditions[?(@.type=='Succeed')].reason` +type SonataFlowClusterPlatform struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec SonataFlowClusterPlatformSpec `json:"spec,omitempty"` + Status SonataFlowClusterPlatformStatus `json:"status,omitempty"` +} + +//+kubebuilder:object:root=true + +// SonataFlowClusterPlatformList contains a list of SonataFlowClusterPlatform +type SonataFlowClusterPlatformList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []SonataFlowClusterPlatform `json:"items"` +} + +func init() { + SchemeBuilder.Register(&SonataFlowClusterPlatform{}, &SonataFlowClusterPlatformList{}) +} diff --git a/api/v1alpha08/sonataflowclusterplatform_types_support.go b/api/v1alpha08/sonataflowclusterplatform_types_support.go new file mode 100644 index 000000000..8c126555d --- /dev/null +++ b/api/v1alpha08/sonataflowclusterplatform_types_support.go @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package v1alpha08 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// NewSonataFlowClusterPlatformList returns an empty list of ClusterPlatform objects +func NewSonataFlowClusterPlatformList() SonataFlowClusterPlatformList { + return SonataFlowClusterPlatformList{ + TypeMeta: metav1.TypeMeta{ + APIVersion: GroupVersion.String(), + Kind: SonataFlowClusterPlatformKind, + }, + } +} diff --git a/api/v1alpha08/sonataflowplatform_services_types.go b/api/v1alpha08/sonataflowplatform_services_types.go index 4c3a66108..9ecad3de9 100644 --- a/api/v1alpha08/sonataflowplatform_services_types.go +++ b/api/v1alpha08/sonataflowplatform_services_types.go @@ -14,12 +14,12 @@ package v1alpha08 -// ServicesPlatformSpec describes the desired service configuration for "prod" workflows. +// ServicesPlatformSpec describes the desired service configuration for workflows without the `sonataflow.org/profile: dev` annotation. type ServicesPlatformSpec struct { - // Deploys the Data Index service for use by "prod" profile workflows. + // Deploys the Data Index service for use by workflows without the `sonataflow.org/profile: dev` annotation. // +optional DataIndex *ServiceSpec `json:"dataIndex,omitempty"` - // Deploys the Job service for use by "prod" profile workflows. + // Deploys the Job service for use by workflows without the `sonataflow.org/profile: dev` annotation. // +optional JobService *ServiceSpec `json:"jobService,omitempty"` } @@ -27,7 +27,7 @@ type ServicesPlatformSpec struct { // ServiceSpec defines the desired state of a platform service // +k8s:openapi-gen=true type ServiceSpec struct { - // Determines whether "prod" profile workflows should be configured to use this service + // Determines whether workflows without the `sonataflow.org/profile: dev` annotation should be configured to use this service // +optional Enabled *bool `json:"enabled,omitempty"` // Persists service to a datasource of choice. Ephemeral by default. diff --git a/api/v1alpha08/sonataflowplatform_types.go b/api/v1alpha08/sonataflowplatform_types.go index fd2ba0137..0ee7e9ab7 100644 --- a/api/v1alpha08/sonataflowplatform_types.go +++ b/api/v1alpha08/sonataflowplatform_types.go @@ -41,11 +41,11 @@ type SonataFlowPlatformSpec struct { // +optional // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="DevMode" DevMode DevModePlatformSpec `json:"devMode,omitempty"` - // Services attributes for deploying supporting applications like Data Index. - // Only workflows with the proper annotation will be configured to use these service(s). - // `sonataflow.org/profile: prod` + // Services attributes for deploying supporting applications like Data Index & Job Service. + // Only workflows without the `sonataflow.org/profile: dev` annotation will be configured to use these service(s). + // Setting this will override the use of any cluster-scoped services that might be defined via `SonataFlowClusterPlatform`. // +optional - Services ServicesPlatformSpec `json:"services,omitempty"` + Services *ServicesPlatformSpec `json:"services,omitempty"` } // PlatformCluster is the kind of orchestration cluster the platform is installed into @@ -79,6 +79,35 @@ type SonataFlowPlatformStatus struct { // Info generic information related to the build //+operator-sdk:csv:customresourcedefinitions:type=status,displayName="info" Info map[string]string `json:"info,omitempty"` + // ClusterPlatformRef information related to the (optional) active SonataFlowClusterPlatform + ClusterPlatformRef *SonataFlowClusterPlatformRefStatus `json:"clusterPlatformRef,omitempty"` +} + +// SonataFlowClusterPlatformRefStatus information related to the (optional) active SonataFlowClusterPlatform +// +k8s:openapi-gen=true +type SonataFlowClusterPlatformRefStatus struct { + // Name of the active SonataFlowClusterPlatform + Name string `json:"name,omitempty"` + // PlatformRef displays which SonataFlowPlatform has been referenced by the active SonataFlowClusterPlatform + PlatformRef SonataFlowPlatformRef `json:"platformRef,omitempty"` + // Services displays which cluster-wide services are being used by this SonataFlowPlatform + Services *PlatformServicesStatus `json:"services,omitempty"` +} + +// PlatformServicesStatus displays which cluster-wide services are being used by a SonataFlowPlatform +// +k8s:openapi-gen=true +type PlatformServicesStatus struct { + // DataIndexRef displays information on the cluster-wide Data Index service + DataIndexRef *PlatformServiceRefStatus `json:"dataIndexRef,omitempty"` + // JobServiceRef displays information on the cluster-wide Job Service + JobServiceRef *PlatformServiceRefStatus `json:"jobServiceRef,omitempty"` +} + +// PlatformServiceRefStatus displays information on a cluster-wide service +// +k8s:openapi-gen=true +type PlatformServiceRefStatus struct { + // Url displays the base url of a cluster-wide service + Url string `json:"url,omitempty"` } func (in *SonataFlowPlatformStatus) GetTopLevelConditionType() api.ConditionType { diff --git a/api/v1alpha08/zz_generated.deepcopy.go b/api/v1alpha08/zz_generated.deepcopy.go index 3c24b8c61..d4449494f 100644 --- a/api/v1alpha08/zz_generated.deepcopy.go +++ b/api/v1alpha08/zz_generated.deepcopy.go @@ -360,6 +360,46 @@ func (in *PersistencePostgreSql) DeepCopy() *PersistencePostgreSql { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PlatformServiceRefStatus) DeepCopyInto(out *PlatformServiceRefStatus) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PlatformServiceRefStatus. +func (in *PlatformServiceRefStatus) DeepCopy() *PlatformServiceRefStatus { + if in == nil { + return nil + } + out := new(PlatformServiceRefStatus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PlatformServicesStatus) DeepCopyInto(out *PlatformServicesStatus) { + *out = *in + if in.DataIndexRef != nil { + in, out := &in.DataIndexRef, &out.DataIndexRef + *out = new(PlatformServiceRefStatus) + **out = **in + } + if in.JobServiceRef != nil { + in, out := &in.JobServiceRef, &out.JobServiceRef + *out = new(PlatformServiceRefStatus) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PlatformServicesStatus. +func (in *PlatformServicesStatus) DeepCopy() *PlatformServicesStatus { + if in == nil { + return nil + } + out := new(PlatformServicesStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PodSpec) DeepCopyInto(out *PodSpec) { *out = *in @@ -764,6 +804,118 @@ func (in *SonataFlowBuildStatus) DeepCopy() *SonataFlowBuildStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SonataFlowClusterPlatform) DeepCopyInto(out *SonataFlowClusterPlatform) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + out.Spec = in.Spec + in.Status.DeepCopyInto(&out.Status) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SonataFlowClusterPlatform. +func (in *SonataFlowClusterPlatform) DeepCopy() *SonataFlowClusterPlatform { + if in == nil { + return nil + } + out := new(SonataFlowClusterPlatform) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *SonataFlowClusterPlatform) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SonataFlowClusterPlatformList) DeepCopyInto(out *SonataFlowClusterPlatformList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]SonataFlowClusterPlatform, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SonataFlowClusterPlatformList. +func (in *SonataFlowClusterPlatformList) DeepCopy() *SonataFlowClusterPlatformList { + if in == nil { + return nil + } + out := new(SonataFlowClusterPlatformList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *SonataFlowClusterPlatformList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SonataFlowClusterPlatformRefStatus) DeepCopyInto(out *SonataFlowClusterPlatformRefStatus) { + *out = *in + out.PlatformRef = in.PlatformRef + if in.Services != nil { + in, out := &in.Services, &out.Services + *out = new(PlatformServicesStatus) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SonataFlowClusterPlatformRefStatus. +func (in *SonataFlowClusterPlatformRefStatus) DeepCopy() *SonataFlowClusterPlatformRefStatus { + if in == nil { + return nil + } + out := new(SonataFlowClusterPlatformRefStatus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SonataFlowClusterPlatformSpec) DeepCopyInto(out *SonataFlowClusterPlatformSpec) { + *out = *in + out.PlatformRef = in.PlatformRef +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SonataFlowClusterPlatformSpec. +func (in *SonataFlowClusterPlatformSpec) DeepCopy() *SonataFlowClusterPlatformSpec { + if in == nil { + return nil + } + out := new(SonataFlowClusterPlatformSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SonataFlowClusterPlatformStatus) DeepCopyInto(out *SonataFlowClusterPlatformStatus) { + *out = *in + in.Status.DeepCopyInto(&out.Status) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SonataFlowClusterPlatformStatus. +func (in *SonataFlowClusterPlatformStatus) DeepCopy() *SonataFlowClusterPlatformStatus { + if in == nil { + return nil + } + out := new(SonataFlowClusterPlatformStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SonataFlowList) DeepCopyInto(out *SonataFlowList) { *out = *in @@ -855,12 +1007,31 @@ func (in *SonataFlowPlatformList) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SonataFlowPlatformRef) DeepCopyInto(out *SonataFlowPlatformRef) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SonataFlowPlatformRef. +func (in *SonataFlowPlatformRef) DeepCopy() *SonataFlowPlatformRef { + if in == nil { + return nil + } + out := new(SonataFlowPlatformRef) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SonataFlowPlatformSpec) DeepCopyInto(out *SonataFlowPlatformSpec) { *out = *in in.Build.DeepCopyInto(&out.Build) out.DevMode = in.DevMode - in.Services.DeepCopyInto(&out.Services) + if in.Services != nil { + in, out := &in.Services, &out.Services + *out = new(ServicesPlatformSpec) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SonataFlowPlatformSpec. @@ -884,6 +1055,11 @@ func (in *SonataFlowPlatformStatus) DeepCopyInto(out *SonataFlowPlatformStatus) (*out)[key] = val } } + if in.ClusterPlatformRef != nil { + in, out := &in.ClusterPlatformRef, &out.ClusterPlatformRef + *out = new(SonataFlowClusterPlatformRefStatus) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SonataFlowPlatformStatus. diff --git a/bundle/manifests/sonataflow-operator-sonataflowclusterplatform-viewer-role_rbac.authorization.k8s.io_v1_clusterrole.yaml b/bundle/manifests/sonataflow-operator-sonataflowclusterplatform-viewer-role_rbac.authorization.k8s.io_v1_clusterrole.yaml new file mode 100644 index 000000000..4b7af794b --- /dev/null +++ b/bundle/manifests/sonataflow-operator-sonataflowclusterplatform-viewer-role_rbac.authorization.k8s.io_v1_clusterrole.yaml @@ -0,0 +1,27 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + creationTimestamp: null + labels: + app.kubernetes.io/component: rbac + app.kubernetes.io/created-by: sonataflow-operator + app.kubernetes.io/instance: sonataflowclusterplatform-viewer-role + app.kubernetes.io/managed-by: kustomize + app.kubernetes.io/name: clusterrole + app.kubernetes.io/part-of: sonataflow-operator + name: sonataflow-operator-sonataflowclusterplatform-viewer-role +rules: +- apiGroups: + - sonataflow.org + resources: + - sonataflowclusterplatforms + verbs: + - get + - list + - watch +- apiGroups: + - sonataflow.org + resources: + - sonataflowclusterplatforms/status + verbs: + - get diff --git a/bundle/manifests/sonataflow-operator.clusterserviceversion.yaml b/bundle/manifests/sonataflow-operator.clusterserviceversion.yaml index 94baa4f11..9dced8cd4 100644 --- a/bundle/manifests/sonataflow-operator.clusterserviceversion.yaml +++ b/bundle/manifests/sonataflow-operator.clusterserviceversion.yaml @@ -86,6 +86,19 @@ metadata: "timeout": "360s" } }, + { + "apiVersion": "sonataflow.org/v1alpha08", + "kind": "SonataFlowClusterPlatform", + "metadata": { + "name": "sonataflow-clusterplatform" + }, + "spec": { + "platformRef": { + "name": "sonataflow-platform", + "namespace": "sonataflow-operator-system" + } + } + }, { "apiVersion": "sonataflow.org/v1alpha08", "kind": "SonataFlowPlatform", @@ -167,6 +180,23 @@ spec: displayName: InnerBuild path: innerBuild version: v1alpha08 + - description: SonataFlowClusterPlatform is the Schema for the sonataflowclusterplatforms + API + displayName: Sonata Flow Cluster Platform + kind: SonataFlowClusterPlatform + name: sonataflowclusterplatforms.sonataflow.org + specDescriptors: + - description: Name of the SonataFlowPlatform + displayName: Platform_Name + path: platformRef.name + - description: Namespace of the SonataFlowPlatform + displayName: Platform_NS + path: platformRef.namespace + statusDescriptors: + - description: Version the operator version controlling this ClusterPlatform + displayName: version + path: version + version: v1alpha08 - description: SonataFlowPlatform is the descriptor for the workflow platform infrastructure. displayName: Sonata Flow Platform @@ -386,6 +416,32 @@ spec: - get - patch - update + - apiGroups: + - sonataflow.org + resources: + - sonataflowclusterplatforms + verbs: + - create + - delete + - get + - list + - patch + - update + - watch + - apiGroups: + - sonataflow.org + resources: + - sonataflowclusterplatforms/finalizers + verbs: + - update + - apiGroups: + - sonataflow.org + resources: + - sonataflowclusterplatforms/status + verbs: + - get + - patch + - update - apiGroups: - sonataflow.org resources: diff --git a/bundle/manifests/sonataflow.org_sonataflowclusterplatforms.yaml b/bundle/manifests/sonataflow.org_sonataflowclusterplatforms.yaml new file mode 100644 index 000000000..170acace6 --- /dev/null +++ b/bundle/manifests/sonataflow.org_sonataflowclusterplatforms.yaml @@ -0,0 +1,120 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.9.2 + creationTimestamp: null + name: sonataflowclusterplatforms.sonataflow.org +spec: + group: sonataflow.org + names: + kind: SonataFlowClusterPlatform + listKind: SonataFlowClusterPlatformList + plural: sonataflowclusterplatforms + singular: sonataflowclusterplatform + scope: Cluster + versions: + - additionalPrinterColumns: + - jsonPath: .spec.platformRef.name + name: Platform_Name + type: string + - jsonPath: .spec.platformRef.namespace + name: Platform_NS + type: string + - jsonPath: .status.conditions[?(@.type=='Succeed')].status + name: Ready + type: string + - jsonPath: .status.conditions[?(@.type=='Succeed')].reason + name: Reason + type: string + name: v1alpha08 + schema: + openAPIV3Schema: + description: SonataFlowClusterPlatform is the Schema for the sonataflowclusterplatforms + API + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: SonataFlowClusterPlatformSpec defines the desired state of + SonataFlowClusterPlatform + properties: + platformRef: + description: PlatformRef defines which existing SonataFlowPlatform's + services should be used cluster-wide. + properties: + name: + description: Name of the SonataFlowPlatform + type: string + namespace: + description: Namespace of the SonataFlowPlatform + type: string + required: + - name + - namespace + type: object + required: + - platformRef + type: object + status: + description: SonataFlowClusterPlatformStatus defines the observed state + of SonataFlowClusterPlatform + properties: + conditions: + description: The latest available observations of a resource's current + state. + items: + description: Condition describes the common structure for conditions + in our types + properties: + lastUpdateTime: + description: The last time this condition was updated. + format: date-time + type: string + message: + description: A human-readable message indicating details about + the transition. + type: string + reason: + description: The reason for the condition's last transition. + type: string + status: + description: Status of the condition, one of True, False, Unknown. + type: string + type: + description: Type condition for the given object + type: string + required: + - status + - type + type: object + type: array + observedGeneration: + description: The generation observed by the deployment controller. + format: int64 + type: integer + version: + description: Version the operator version controlling this ClusterPlatform + type: string + type: object + type: object + served: true + storage: true + subresources: + status: {} +status: + acceptedNames: + kind: "" + plural: "" + conditions: null + storedVersions: null diff --git a/bundle/manifests/sonataflow.org_sonataflowplatforms.yaml b/bundle/manifests/sonataflow.org_sonataflowplatforms.yaml index a8437baca..5910e8b78 100644 --- a/bundle/manifests/sonataflow.org_sonataflowplatforms.yaml +++ b/bundle/manifests/sonataflow.org_sonataflowplatforms.yaml @@ -422,17 +422,18 @@ spec: type: object services: description: 'Services attributes for deploying supporting applications - like Data Index. Only workflows with the proper annotation will - be configured to use these service(s). `sonataflow.org/profile: - prod`' + like Data Index & Job Service. Only workflows without the `sonataflow.org/profile: + dev` annotation will be configured to use these service(s). Setting + this will override the use of any cluster-scoped services that might + be defined via `SonataFlowClusterPlatform`.' properties: dataIndex: - description: Deploys the Data Index service for use by "prod" - profile workflows. + description: 'Deploys the Data Index service for use by workflows + without the `sonataflow.org/profile: dev` annotation.' properties: enabled: - description: Determines whether "prod" profile workflows should - be configured to use this service + description: 'Determines whether workflows without the `sonataflow.org/profile: + dev` annotation should be configured to use this service' type: boolean persistence: description: Persists service to a datasource of choice. Ephemeral @@ -8299,12 +8300,12 @@ spec: type: object type: object jobService: - description: Deploys the Job service for use by "prod" profile - workflows. + description: 'Deploys the Job service for use by workflows without + the `sonataflow.org/profile: dev` annotation.' properties: enabled: - description: Determines whether "prod" profile workflows should - be configured to use this service + description: 'Determines whether workflows without the `sonataflow.org/profile: + dev` annotation should be configured to use this service' type: boolean persistence: description: Persists service to a datasource of choice. Ephemeral @@ -16182,6 +16183,51 @@ spec: - kubernetes - openshift type: string + clusterPlatformRef: + description: ClusterPlatformRef information related to the (optional) + active SonataFlowClusterPlatform + properties: + name: + description: Name of the active SonataFlowClusterPlatform + type: string + platformRef: + description: PlatformRef displays which SonataFlowPlatform has + been referenced by the active SonataFlowClusterPlatform + properties: + name: + description: Name of the SonataFlowPlatform + type: string + namespace: + description: Namespace of the SonataFlowPlatform + type: string + required: + - name + - namespace + type: object + services: + description: Services displays which cluster-wide services are + being used by this SonataFlowPlatform + properties: + dataIndexRef: + description: DataIndexRef displays information on the cluster-wide + Data Index service + properties: + url: + description: Url displays the base url of a cluster-wide + service + type: string + type: object + jobServiceRef: + description: JobServiceRef displays information on the cluster-wide + Job Service + properties: + url: + description: Url displays the base url of a cluster-wide + service + type: string + type: object + type: object + type: object conditions: description: The latest available observations of a resource's current state. diff --git a/config/crd/bases/sonataflow.org_sonataflowclusterplatforms.yaml b/config/crd/bases/sonataflow.org_sonataflowclusterplatforms.yaml new file mode 100644 index 000000000..133647aae --- /dev/null +++ b/config/crd/bases/sonataflow.org_sonataflowclusterplatforms.yaml @@ -0,0 +1,115 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.9.2 + creationTimestamp: null + name: sonataflowclusterplatforms.sonataflow.org +spec: + group: sonataflow.org + names: + kind: SonataFlowClusterPlatform + listKind: SonataFlowClusterPlatformList + plural: sonataflowclusterplatforms + singular: sonataflowclusterplatform + scope: Cluster + versions: + - additionalPrinterColumns: + - jsonPath: .spec.platformRef.name + name: Platform_Name + type: string + - jsonPath: .spec.platformRef.namespace + name: Platform_NS + type: string + - jsonPath: .status.conditions[?(@.type=='Succeed')].status + name: Ready + type: string + - jsonPath: .status.conditions[?(@.type=='Succeed')].reason + name: Reason + type: string + name: v1alpha08 + schema: + openAPIV3Schema: + description: SonataFlowClusterPlatform is the Schema for the sonataflowclusterplatforms + API + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: SonataFlowClusterPlatformSpec defines the desired state of + SonataFlowClusterPlatform + properties: + platformRef: + description: PlatformRef defines which existing SonataFlowPlatform's + services should be used cluster-wide. + properties: + name: + description: Name of the SonataFlowPlatform + type: string + namespace: + description: Namespace of the SonataFlowPlatform + type: string + required: + - name + - namespace + type: object + required: + - platformRef + type: object + status: + description: SonataFlowClusterPlatformStatus defines the observed state + of SonataFlowClusterPlatform + properties: + conditions: + description: The latest available observations of a resource's current + state. + items: + description: Condition describes the common structure for conditions + in our types + properties: + lastUpdateTime: + description: The last time this condition was updated. + format: date-time + type: string + message: + description: A human-readable message indicating details about + the transition. + type: string + reason: + description: The reason for the condition's last transition. + type: string + status: + description: Status of the condition, one of True, False, Unknown. + type: string + type: + description: Type condition for the given object + type: string + required: + - status + - type + type: object + type: array + observedGeneration: + description: The generation observed by the deployment controller. + format: int64 + type: integer + version: + description: Version the operator version controlling this ClusterPlatform + type: string + type: object + type: object + served: true + storage: true + subresources: + status: {} diff --git a/config/crd/bases/sonataflow.org_sonataflowplatforms.yaml b/config/crd/bases/sonataflow.org_sonataflowplatforms.yaml index 6fd5a9b58..9db303538 100644 --- a/config/crd/bases/sonataflow.org_sonataflowplatforms.yaml +++ b/config/crd/bases/sonataflow.org_sonataflowplatforms.yaml @@ -423,17 +423,18 @@ spec: type: object services: description: 'Services attributes for deploying supporting applications - like Data Index. Only workflows with the proper annotation will - be configured to use these service(s). `sonataflow.org/profile: - prod`' + like Data Index & Job Service. Only workflows without the `sonataflow.org/profile: + dev` annotation will be configured to use these service(s). Setting + this will override the use of any cluster-scoped services that might + be defined via `SonataFlowClusterPlatform`.' properties: dataIndex: - description: Deploys the Data Index service for use by "prod" - profile workflows. + description: 'Deploys the Data Index service for use by workflows + without the `sonataflow.org/profile: dev` annotation.' properties: enabled: - description: Determines whether "prod" profile workflows should - be configured to use this service + description: 'Determines whether workflows without the `sonataflow.org/profile: + dev` annotation should be configured to use this service' type: boolean persistence: description: Persists service to a datasource of choice. Ephemeral @@ -8300,12 +8301,12 @@ spec: type: object type: object jobService: - description: Deploys the Job service for use by "prod" profile - workflows. + description: 'Deploys the Job service for use by workflows without + the `sonataflow.org/profile: dev` annotation.' properties: enabled: - description: Determines whether "prod" profile workflows should - be configured to use this service + description: 'Determines whether workflows without the `sonataflow.org/profile: + dev` annotation should be configured to use this service' type: boolean persistence: description: Persists service to a datasource of choice. Ephemeral @@ -16183,6 +16184,51 @@ spec: - kubernetes - openshift type: string + clusterPlatformRef: + description: ClusterPlatformRef information related to the (optional) + active SonataFlowClusterPlatform + properties: + name: + description: Name of the active SonataFlowClusterPlatform + type: string + platformRef: + description: PlatformRef displays which SonataFlowPlatform has + been referenced by the active SonataFlowClusterPlatform + properties: + name: + description: Name of the SonataFlowPlatform + type: string + namespace: + description: Namespace of the SonataFlowPlatform + type: string + required: + - name + - namespace + type: object + services: + description: Services displays which cluster-wide services are + being used by this SonataFlowPlatform + properties: + dataIndexRef: + description: DataIndexRef displays information on the cluster-wide + Data Index service + properties: + url: + description: Url displays the base url of a cluster-wide + service + type: string + type: object + jobServiceRef: + description: JobServiceRef displays information on the cluster-wide + Job Service + properties: + url: + description: Url displays the base url of a cluster-wide + service + type: string + type: object + type: object + type: object conditions: description: The latest available observations of a resource's current state. diff --git a/config/crd/kustomization.yaml b/config/crd/kustomization.yaml index a07d24bb8..492412313 100644 --- a/config/crd/kustomization.yaml +++ b/config/crd/kustomization.yaml @@ -5,6 +5,7 @@ resources: - bases/sonataflow.org_sonataflows.yaml - bases/sonataflow.org_sonataflowbuilds.yaml - bases/sonataflow.org_sonataflowplatforms.yaml +- bases/sonataflow.org_sonataflowclusterplatforms.yaml #+kubebuilder:scaffold:crdkustomizeresource patchesStrategicMerge: @@ -13,6 +14,7 @@ patchesStrategicMerge: #- patches/webhook_in_workflows.yaml #- patches/webhook_in_sonataflows.yaml #- patches/webhook_in_sonataflowplatforms.yaml +#- patches/webhook_in_sonataflowclusterplatforms.yaml #+kubebuilder:scaffold:crdkustomizewebhookpatch # [CERTMANAGER] To enable cert-manager, uncomment all the sections with [CERTMANAGER] prefix. @@ -20,6 +22,7 @@ patchesStrategicMerge: #- patches/cainjection_in_workflows.yaml #- patches/cainjection_in_sonataflowworkflows.yaml #- patches/cainjection_in_sonataflowplatforms.yaml +#- patches/cainjection_in_sonataflowclusterplatforms.yaml #+kubebuilder:scaffold:crdkustomizecainjectionpatch # the following config is for teaching kustomize how to do kustomization for CRDs. diff --git a/config/crd/patches/cainjection_in_sonataflowclusterplatforms.yaml b/config/crd/patches/cainjection_in_sonataflowclusterplatforms.yaml new file mode 100644 index 000000000..dc0274cd7 --- /dev/null +++ b/config/crd/patches/cainjection_in_sonataflowclusterplatforms.yaml @@ -0,0 +1,7 @@ +# The following patch adds a directive for certmanager to inject CA into the CRD +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + cert-manager.io/inject-ca-from: $(CERTIFICATE_NAMESPACE)/$(CERTIFICATE_NAME) + name: sonataflowclusterplatforms.sonataflow.org diff --git a/config/crd/patches/webhook_in_sonataflowclusterplatforms.yaml b/config/crd/patches/webhook_in_sonataflowclusterplatforms.yaml new file mode 100644 index 000000000..ed7b0f41e --- /dev/null +++ b/config/crd/patches/webhook_in_sonataflowclusterplatforms.yaml @@ -0,0 +1,16 @@ +# The following patch enables a conversion webhook for the CRD +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: sonataflowclusterplatforms.sonataflow.org +spec: + conversion: + strategy: Webhook + webhook: + clientConfig: + service: + namespace: system + name: webhook-service + path: /convert + conversionReviewVersions: + - v1 diff --git a/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml b/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml index dc1b051e1..fcd5460ea 100644 --- a/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml +++ b/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml @@ -64,6 +64,23 @@ spec: displayName: InnerBuild path: innerBuild version: v1alpha08 + - description: SonataFlowClusterPlatform is the Schema for the sonataflowclusterplatforms + API + displayName: Sonata Flow Cluster Platform + kind: SonataFlowClusterPlatform + name: sonataflowclusterplatforms.sonataflow.org + specDescriptors: + - description: Name of the SonataFlowPlatform + displayName: Platform_Name + path: platformRef.name + - description: Namespace of the SonataFlowPlatform + displayName: Platform_NS + path: platformRef.namespace + statusDescriptors: + - description: Version the operator version controlling this ClusterPlatform + displayName: version + path: version + version: v1alpha08 - description: SonataFlowPlatform is the descriptor for the workflow platform infrastructure. displayName: Sonata Flow Platform diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 49e76885f..e09a00c21 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -31,6 +31,32 @@ rules: - get - patch - update +- apiGroups: + - sonataflow.org + resources: + - sonataflowclusterplatforms + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - sonataflow.org + resources: + - sonataflowclusterplatforms/finalizers + verbs: + - update +- apiGroups: + - sonataflow.org + resources: + - sonataflowclusterplatforms/status + verbs: + - get + - patch + - update - apiGroups: - sonataflow.org resources: diff --git a/config/rbac/sonataflowclusterplatform_editor_role.yaml b/config/rbac/sonataflowclusterplatform_editor_role.yaml new file mode 100644 index 000000000..26f5aa2cc --- /dev/null +++ b/config/rbac/sonataflowclusterplatform_editor_role.yaml @@ -0,0 +1,31 @@ +# permissions for end users to edit sonataflowclusterplatforms. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + labels: + app.kubernetes.io/name: clusterrole + app.kubernetes.io/instance: sonataflowclusterplatform-editor-role + app.kubernetes.io/component: rbac + app.kubernetes.io/created-by: sonataflow-operator + app.kubernetes.io/part-of: sonataflow-operator + app.kubernetes.io/managed-by: kustomize + name: sonataflowclusterplatform-editor-role +rules: +- apiGroups: + - sonataflow.org + resources: + - sonataflowclusterplatforms + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - sonataflow.org + resources: + - sonataflowclusterplatforms/status + verbs: + - get diff --git a/config/rbac/sonataflowclusterplatform_viewer_cluster_role_binding.yaml b/config/rbac/sonataflowclusterplatform_viewer_cluster_role_binding.yaml new file mode 100644 index 000000000..d54f94282 --- /dev/null +++ b/config/rbac/sonataflowclusterplatform_viewer_cluster_role_binding.yaml @@ -0,0 +1,13 @@ +# allow users to view SonataFlowClusterPlatforms cluster-wide +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: sonataflowclusterplatform-viewer +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: sonataflowclusterplatform-viewer-role +subjects: + - apiGroup: rbac.authorization.k8s.io + kind: Group + name: system:authenticated \ No newline at end of file diff --git a/config/rbac/sonataflowclusterplatform_viewer_role.yaml b/config/rbac/sonataflowclusterplatform_viewer_role.yaml new file mode 100644 index 000000000..4a64a8811 --- /dev/null +++ b/config/rbac/sonataflowclusterplatform_viewer_role.yaml @@ -0,0 +1,27 @@ +# permissions for end users to view sonataflowclusterplatforms. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + labels: + app.kubernetes.io/name: clusterrole + app.kubernetes.io/instance: sonataflowclusterplatform-viewer-role + app.kubernetes.io/component: rbac + app.kubernetes.io/created-by: sonataflow-operator + app.kubernetes.io/part-of: sonataflow-operator + app.kubernetes.io/managed-by: kustomize + name: sonataflowclusterplatform-viewer-role +rules: +- apiGroups: + - sonataflow.org + resources: + - sonataflowclusterplatforms + verbs: + - get + - list + - watch +- apiGroups: + - sonataflow.org + resources: + - sonataflowclusterplatforms/status + verbs: + - get diff --git a/config/samples/kustomization.yaml b/config/samples/kustomization.yaml index d4ca147ba..642faef13 100644 --- a/config/samples/kustomization.yaml +++ b/config/samples/kustomization.yaml @@ -3,4 +3,5 @@ resources: - sonataflow.org_v1alpha08_sonataflow.yaml - sonataflow.org_v1alpha08_sonataflowplatform.yaml - sonataflow.org_v1alpha08_sonataflowbuild.yaml +- sonataflow.org_v1alpha08_sonataflowclusterplatform.yaml #+kubebuilder:scaffold:manifestskustomizesamples diff --git a/config/samples/sonataflow.org_v1alpha08_sonataflowclusterplatform.yaml b/config/samples/sonataflow.org_v1alpha08_sonataflowclusterplatform.yaml new file mode 100644 index 000000000..2117a7a07 --- /dev/null +++ b/config/samples/sonataflow.org_v1alpha08_sonataflowclusterplatform.yaml @@ -0,0 +1,8 @@ +apiVersion: sonataflow.org/v1alpha08 +kind: SonataFlowClusterPlatform +metadata: + name: sonataflow-clusterplatform +spec: + platformRef: + name: sonataflow-platform + namespace: sonataflow-operator-system diff --git a/controllers/clusterplatform/action.go b/controllers/clusterplatform/action.go new file mode 100644 index 000000000..20ba9dcf0 --- /dev/null +++ b/controllers/clusterplatform/action.go @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package clusterplatform + +import ( + "context" + + "github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/client" + + v08 "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" +) + +// Action --. +type Action interface { + client.Injectable + + // a user friendly name for the action + Name() string + + // returns true if the action can handle the cluster platform + CanHandle(ctx context.Context, cPlatform *v08.SonataFlowClusterPlatform) bool + + // executes the handling function + Handle(ctx context.Context, cPlatform *v08.SonataFlowClusterPlatform) error +} + +type baseAction struct { + client client.Client +} + +func (action *baseAction) InjectClient(client client.Client) { + action.client = client +} diff --git a/controllers/clusterplatform/clusterplatform.go b/controllers/clusterplatform/clusterplatform.go new file mode 100644 index 000000000..6c1ffe45e --- /dev/null +++ b/controllers/clusterplatform/clusterplatform.go @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package clusterplatform + +import ( + "context" + + "github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata" + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "github.com/apache/incubator-kie-kogito-serverless-operator/log" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" +) + +// GetActiveClusterPlatform returns the currently installed active cluster platform. +func GetActiveClusterPlatform(ctx context.Context, c ctrl.Client) (*operatorapi.SonataFlowClusterPlatform, error) { + return getClusterPlatform(ctx, c, true) +} + +// getClusterPlatform returns the currently active cluster platform or any cluster platform existing in the cluster. +func getClusterPlatform(ctx context.Context, c ctrl.Client, active bool) (*operatorapi.SonataFlowClusterPlatform, error) { + klog.V(log.D).InfoS("Finding available cluster platforms") + + lst, err := listPrimaryClusterPlatforms(ctx, c) + if err != nil { + return nil, err + } + + for _, cPlatform := range lst.Items { + if IsActive(&cPlatform) { + klog.V(log.D).InfoS("Found active cluster platform", "platform", cPlatform.Name) + return &cPlatform, nil + } + } + + if !active && len(lst.Items) > 0 { + // does not require the cluster platform to be active, just return one if present + res := lst.Items[0] + klog.V(log.D).InfoS("Found cluster platform", "platform", res.Name) + return &res, nil + } + klog.V(log.I).InfoS("No cluster platform found") + return nil, k8serrors.NewNotFound(operatorapi.Resource(operatorapi.SonataFlowClusterPlatformKind), "") +} + +// listPrimaryClusterPlatforms returns all non-secondary cluster platforms installed (only one will be active). +func listPrimaryClusterPlatforms(ctx context.Context, c ctrl.Reader) (*operatorapi.SonataFlowClusterPlatformList, error) { + lst, err := listAllClusterPlatforms(ctx, c) + if err != nil { + return nil, err + } + + filtered := &operatorapi.SonataFlowClusterPlatformList{} + for i := range lst.Items { + cPl := lst.Items[i] + if !IsSecondary(&cPl) { + filtered.Items = append(filtered.Items, cPl) + } + } + return filtered, nil +} + +// allDuplicatedClusterPlatforms returns true if every cluster platform has a "Duplicated" status set +func allDuplicatedClusterPlatforms(ctx context.Context, c ctrl.Reader) bool { + lst, err := listAllClusterPlatforms(ctx, c) + if err != nil { + return false + } + + for i := range lst.Items { + if !lst.Items[i].Status.IsDuplicated() { + return false + } + } + + return true +} + +// listAllClusterPlatforms returns all clusterplatforms installed. +func listAllClusterPlatforms(ctx context.Context, c ctrl.Reader) (*operatorapi.SonataFlowClusterPlatformList, error) { + lst := operatorapi.NewSonataFlowClusterPlatformList() + if err := c.List(ctx, &lst); err != nil { + return nil, err + } + return &lst, nil +} + +// IsActive determines if the given cluster platform is being used. +func IsActive(p *operatorapi.SonataFlowClusterPlatform) bool { + return p.Status.IsReady() && !p.Status.IsDuplicated() +} + +// IsSecondary determines if the given cluster platform is marked as secondary. +func IsSecondary(p *operatorapi.SonataFlowClusterPlatform) bool { + if l, ok := p.Annotations[metadata.SecondaryPlatformAnnotation]; ok && l == "true" { + return true + } + return false +} diff --git a/controllers/clusterplatform/initialize.go b/controllers/clusterplatform/initialize.go new file mode 100644 index 000000000..15a9b9f82 --- /dev/null +++ b/controllers/clusterplatform/initialize.go @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package clusterplatform + +import ( + "context" + "fmt" + + "github.com/apache/incubator-kie-kogito-serverless-operator/api" + "github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata" + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "github.com/apache/incubator-kie-kogito-serverless-operator/log" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" +) + +// NewInitializeAction returns an action that initializes the platform configuration when not provided by the user. +func NewInitializeAction() Action { + return &initializeAction{} +} + +type initializeAction struct { + baseAction +} + +func (action *initializeAction) Name() string { + return "initialize" +} + +func (action *initializeAction) CanHandle(ctx context.Context, cPlatform *operatorapi.SonataFlowClusterPlatform) bool { + return !cPlatform.Status.IsDuplicated() || allDuplicatedClusterPlatforms(ctx, action.client) +} + +func (action *initializeAction) Handle(ctx context.Context, cPlatform *operatorapi.SonataFlowClusterPlatform) error { + duplicate, err := action.isPrimaryDuplicate(ctx, cPlatform) + if err != nil { + return err + } + if duplicate { + // another cluster platform already present + if !cPlatform.Status.IsDuplicated() { + cPlatform.Status.Manager().MarkFalse(api.SucceedConditionType, operatorapi.PlatformDuplicatedReason, "") + } + return nil + } + cPlatform.Status.Version = metadata.SpecVersion + platformRef := cPlatform.Spec.PlatformRef + + // Check referenced platform status + platform := &operatorapi.SonataFlowPlatform{} + err = action.client.Get(ctx, types.NamespacedName{Namespace: platformRef.Namespace, Name: platformRef.Name}, platform) + if err != nil { + if k8serrors.IsNotFound(err) { + klog.V(log.D).InfoS("%s platform does not exist in %s namespace.", platformRef.Name, platformRef.Namespace) + cPlatform.Status.Manager().MarkFalse(api.SucceedConditionType, operatorapi.PlatformNotFoundReason, + fmt.Sprintf("%s platform does not exist in %s namespace.", platformRef.Name, platformRef.Namespace)) + return nil + } + return err + } + + if platform != nil { + condition := platform.Status.GetTopLevelCondition() + if condition.IsTrue() { + klog.V(log.D).InfoS("Referenced SonataFlowPlatform '%s/%s' is ready", platformRef.Namespace, platformRef.Name) + cPlatform.Status.Manager().MarkTrueWithReason(api.SucceedConditionType, "", + "Referenced SonataFlowPlatform '%s/%s' is ready", platformRef.Namespace, platformRef.Name) + } else if condition.IsFalse() { + klog.V(log.D).InfoS("Referenced SonataFlowPlatform '%s/%s' not ready", platformRef.Namespace, platformRef.Name) + cPlatform.Status.Manager().MarkFalse(api.SucceedConditionType, operatorapi.PlatformFailureReason, + "Referenced SonataFlowPlatform '%s/%s' not ready", platformRef.Namespace, platformRef.Name) + } else { + klog.V(log.D).InfoS("Waiting for referenced SonataFlowPlatform '%s/%s' to be ready", platformRef.Namespace, platformRef.Name) + cPlatform.Status.Manager().MarkUnknown(api.SucceedConditionType, operatorapi.PlatformWarmingReason, + "Waiting for referenced SonataFlowPlatform '%s/%s' to be ready", platformRef.Namespace, platformRef.Name) + } + } + + return nil +} + +// Function to double-check if there is already an active cluster platform +func (action *initializeAction) isPrimaryDuplicate(ctx context.Context, cPlatform *operatorapi.SonataFlowClusterPlatform) (bool, error) { + if IsSecondary(cPlatform) { + // Always reconcile secondary cluster platforms + return false, nil + } + platforms, err := listPrimaryClusterPlatforms(ctx, action.client) + if err != nil { + return false, err + } + for _, p := range platforms.Items { + p := p // pin + if p.Name != cPlatform.Name && IsActive(&p) { + return true, nil + } + } + + return false, nil +} diff --git a/controllers/platform/k8s.go b/controllers/platform/k8s.go index 8360f1bc9..5a853f6a7 100644 --- a/controllers/platform/k8s.go +++ b/controllers/platform/k8s.go @@ -29,14 +29,13 @@ import ( "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common/constants" "github.com/apache/incubator-kie-kogito-serverless-operator/log" "github.com/apache/incubator-kie-kogito-serverless-operator/utils" + kubeutil "github.com/apache/incubator-kie-kogito-serverless-operator/utils/kubernetes" "github.com/apache/incubator-kie-kogito-serverless-operator/workflowproj" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" - - kubeutil "github.com/apache/incubator-kie-kogito-serverless-operator/utils/kubernetes" - appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" ) // NewServiceAction returns an action that deploys the services. @@ -62,32 +61,36 @@ func (action *serviceAction) Handle(ctx context.Context, platform *operatorapi.S return nil, err } - if platform.Spec.Services.DataIndex != nil { - if err := createServiceComponents(ctx, action.client, platform, services.NewDataIndexHandler(platform)); err != nil { - return nil, err + if platform.Spec.Services != nil { + psDI := services.NewDataIndexHandler(platform) + if psDI.IsServiceSetInSpec() { + if err := createOrUpdateServiceComponents(ctx, action.client, platform, psDI); err != nil { + return nil, err + } } - } - if platform.Spec.Services.JobService != nil { - if err := createServiceComponents(ctx, action.client, platform, services.NewJobServiceHandler(platform)); err != nil { - return nil, err + psJS := services.NewJobServiceHandler(platform) + if psJS.IsServiceSetInSpec() { + if err := createOrUpdateServiceComponents(ctx, action.client, platform, psJS); err != nil { + return nil, err + } } } return platform, nil } -func createServiceComponents(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error { - if err := createConfigMap(ctx, client, platform, psh); err != nil { +func createOrUpdateServiceComponents(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error { + if err := createOrUpdateConfigMap(ctx, client, platform, psh); err != nil { return err } - if err := createDeployment(ctx, client, platform, psh); err != nil { + if err := createOrUpdateDeployment(ctx, client, platform, psh); err != nil { return err } - return createService(ctx, client, platform, psh) + return createOrUpdateService(ctx, client, platform, psh) } -func createDeployment(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error { +func createOrUpdateDeployment(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error { readyProbe := &corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ HTTPGet: &corev1.HTTPGetAction{ @@ -193,7 +196,7 @@ func createDeployment(ctx context.Context, client client.Client, platform *opera return nil } -func createService(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error { +func createOrUpdateService(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error { lbl, selectorLbl := getLabels(platform, psh) dataSvcSpec := corev1.ServiceSpec{ Ports: []corev1.ServicePort{ @@ -241,7 +244,7 @@ func getLabels(platform *operatorapi.SonataFlowPlatform, psh services.PlatformSe return lbl, selectorLbl } -func createConfigMap(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error { +func createOrUpdateConfigMap(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error { handler, err := services.NewServiceAppPropertyHandler(psh) if err != nil { return err diff --git a/controllers/platform/platformutils.go b/controllers/platform/platformutils.go index 9249bed01..ae14c40e9 100644 --- a/controllers/platform/platformutils.go +++ b/controllers/platform/platformutils.go @@ -111,13 +111,15 @@ func setPlatformDefaults(p *operatorapi.SonataFlowPlatform, verbose bool) error } // When dataIndex object set, default to enabled if bool not set - var enable = true - if p.Spec.Services.DataIndex != nil && p.Spec.Services.DataIndex.Enabled == nil { - p.Spec.Services.DataIndex.Enabled = &enable - } - // When the JobService field has a value, default to enabled if the `Enabled` field's value is nil - if p.Spec.Services.JobService != nil && p.Spec.Services.JobService.Enabled == nil { - p.Spec.Services.JobService.Enabled = &enable + if p.Spec.Services != nil { + var enable = true + if p.Spec.Services.DataIndex != nil && p.Spec.Services.DataIndex.Enabled == nil { + p.Spec.Services.DataIndex.Enabled = &enable + } + // When the JobService field has a value, default to enabled if the `Enabled` field's value is nil + if p.Spec.Services.JobService != nil && p.Spec.Services.JobService.Enabled == nil { + p.Spec.Services.JobService.Enabled = &enable + } } setStatusAdditionalInfo(p) diff --git a/controllers/platform/services/properties.go b/controllers/platform/services/properties.go index 5c411f38c..dbe6dc3f8 100644 --- a/controllers/platform/services/properties.go +++ b/controllers/platform/services/properties.go @@ -160,7 +160,8 @@ func GenerateDataIndexWorkflowProperties(workflow *operatorapi.SonataFlow, platf props := properties.NewProperties() props.Set(constants.KogitoProcessDefinitionsEventsEnabled, "false") props.Set(constants.KogitoProcessInstancesEventsEnabled, "false") - if workflow != nil && !profiles.IsDevProfile(workflow) && dataIndexEnabled(platform) { + di := NewDataIndexHandler(platform) + if workflow != nil && !profiles.IsDevProfile(workflow) && di.IsServiceEnabled() { props.Set(constants.KogitoProcessDefinitionsEventsEnabled, "true") props.Set(constants.KogitoProcessInstancesEventsEnabled, "true") di := NewDataIndexHandler(platform) @@ -183,8 +184,8 @@ func GenerateJobServiceWorkflowProperties(workflow *operatorapi.SonataFlow, plat props := properties.NewProperties() props.Set(constants.JobServiceRequestEventsConnector, constants.QuarkusHTTP) props.Set(constants.JobServiceRequestEventsURL, fmt.Sprintf("%s://localhost/v2/jobs/events", constants.JobServiceURLProtocol)) - if workflow != nil && !profiles.IsDevProfile(workflow) && jobServiceEnabled(platform) { - js := NewJobServiceHandler(platform) + js := NewJobServiceHandler(platform) + if workflow != nil && !profiles.IsDevProfile(workflow) && js.IsServiceEnabled() { p, err := js.GenerateWorkflowProperties() if err != nil { return nil, err diff --git a/controllers/platform/services/properties_services_test.go b/controllers/platform/services/properties_services_test.go index 90da80837..9cdae2e9f 100644 --- a/controllers/platform/services/properties_services_test.go +++ b/controllers/platform/services/properties_services_test.go @@ -173,6 +173,9 @@ func generatePlatform(opts ...plfmOptionFn) *operatorapi.SonataFlowPlatform { func setJobServiceEnabledValue(v *bool) plfmOptionFn { return func(p *operatorapi.SonataFlowPlatform) { + if p.Spec.Services == nil { + p.Spec.Services = &operatorapi.ServicesPlatformSpec{} + } if p.Spec.Services.JobService == nil { p.Spec.Services.JobService = &operatorapi.ServiceSpec{} } @@ -182,6 +185,9 @@ func setJobServiceEnabledValue(v *bool) plfmOptionFn { func setDataIndexEnabledValue(v *bool) plfmOptionFn { return func(p *operatorapi.SonataFlowPlatform) { + if p.Spec.Services == nil { + p.Spec.Services = &operatorapi.ServicesPlatformSpec{} + } if p.Spec.Services.DataIndex == nil { p.Spec.Services.DataIndex = &operatorapi.ServiceSpec{} } @@ -191,6 +197,9 @@ func setDataIndexEnabledValue(v *bool) plfmOptionFn { func emptyDataIndexServiceSpec() plfmOptionFn { return func(p *operatorapi.SonataFlowPlatform) { + if p.Spec.Services == nil { + p.Spec.Services = &operatorapi.ServicesPlatformSpec{} + } if p.Spec.Services.DataIndex == nil { p.Spec.Services.DataIndex = &operatorapi.ServiceSpec{} } @@ -199,6 +208,9 @@ func emptyDataIndexServiceSpec() plfmOptionFn { func emptyJobServiceSpec() plfmOptionFn { return func(p *operatorapi.SonataFlowPlatform) { + if p.Spec.Services == nil { + p.Spec.Services = &operatorapi.ServicesPlatformSpec{} + } if p.Spec.Services.JobService == nil { p.Spec.Services.JobService = &operatorapi.ServiceSpec{} } @@ -219,6 +231,9 @@ func setPlatformName(name string) plfmOptionFn { func setJobServiceJDBC(jdbc string) plfmOptionFn { return func(p *operatorapi.SonataFlowPlatform) { + if p.Spec.Services == nil { + p.Spec.Services = &operatorapi.ServicesPlatformSpec{} + } if p.Spec.Services.JobService == nil { p.Spec.Services.JobService = &operatorapi.ServiceSpec{} } @@ -234,6 +249,9 @@ func setJobServiceJDBC(jdbc string) plfmOptionFn { func setDataIndexJDBC(jdbc string) plfmOptionFn { return func(p *operatorapi.SonataFlowPlatform) { + if p.Spec.Services == nil { + p.Spec.Services = &operatorapi.ServicesPlatformSpec{} + } if p.Spec.Services.DataIndex == nil { p.Spec.Services.DataIndex = &operatorapi.ServiceSpec{} } diff --git a/controllers/platform/services/services.go b/controllers/platform/services/services.go index c04b4eb32..835264365 100644 --- a/controllers/platform/services/services.go +++ b/controllers/platform/services/services.go @@ -72,6 +72,22 @@ type PlatformServiceHandler interface { GenerateWorkflowProperties() (*properties.Properties, error) // GenerateServiceProperties returns a property object that contains the application properties required by the service deployment GenerateServiceProperties() (*properties.Properties, error) + + // IsServiceSetInSpec returns true if the service is set in the spec. + IsServiceSetInSpec() bool + // IsServiceEnabledInSpec returns true if the service is enabled in the spec. + IsServiceEnabledInSpec() bool + // GetLocalServiceBaseUrl returns the base url of the local service + GetLocalServiceBaseUrl() string + // GetServiceBaseUrl returns the base url of the service, based on whether using local or cluster-scoped service. + GetServiceBaseUrl() string + // GetServiceUrl returns the service url, based on whether using local or cluster-scoped service. + GetServiceUrl() string + // IsServiceEnabled returns true if the service is enabled in either the spec or the status.clusterPlatformRef. + IsServiceEnabled() bool + // SetServiceUrlInStatus sets the service url in status. if reconciled instance does not have service set in spec AND + // if cluster referenced platform has said service enabled, use the cluster platform's service + SetServiceUrlInStatus(clusterRefPlatform *operatorapi.SonataFlowPlatform) } type DataIndexHandler struct { @@ -102,6 +118,56 @@ func (d DataIndexHandler) GetServiceName() string { return fmt.Sprintf("%s-%s", d.platform.Name, constants.DataIndexServiceName) } +func (d DataIndexHandler) SetServiceUrlInStatus(clusterRefPlatform *operatorapi.SonataFlowPlatform) { + psDI := NewDataIndexHandler(clusterRefPlatform) + if !isServicesSet(d.platform) && psDI.IsServiceEnabledInSpec() { + if d.platform.Status.ClusterPlatformRef != nil { + if d.platform.Status.ClusterPlatformRef.Services == nil { + d.platform.Status.ClusterPlatformRef.Services = &operatorapi.PlatformServicesStatus{} + } + d.platform.Status.ClusterPlatformRef.Services.DataIndexRef = &operatorapi.PlatformServiceRefStatus{ + Url: psDI.GetLocalServiceBaseUrl(), + } + } + } +} + +func (d DataIndexHandler) IsServiceSetInSpec() bool { + return isDataIndexSet(d.platform) +} + +func (d DataIndexHandler) IsServiceEnabledInSpec() bool { + return isDataIndexEnabled(d.platform) +} + +func (d DataIndexHandler) isServiceEnabledInStatus() bool { + return d.platform != nil && d.platform.Status.ClusterPlatformRef != nil && + d.platform.Status.ClusterPlatformRef.Services != nil && d.platform.Status.ClusterPlatformRef.Services.DataIndexRef != nil && + !isServicesSet(d.platform) +} + +func (d DataIndexHandler) IsServiceEnabled() bool { + return d.IsServiceEnabledInSpec() || d.isServiceEnabledInStatus() +} + +func (d DataIndexHandler) GetServiceUrl() string { + return d.GetServiceBaseUrl() + constants.DataIndexServiceURLPath +} + +func (d DataIndexHandler) GetServiceBaseUrl() string { + if d.IsServiceEnabledInSpec() { + return d.GetLocalServiceBaseUrl() + } + if d.isServiceEnabledInStatus() { + return d.platform.Status.ClusterPlatformRef.Services.DataIndexRef.Url + } + return "" +} + +func (d DataIndexHandler) GetLocalServiceBaseUrl() string { + return generateServiceURL(constants.KogitoServiceURLProtocol, d.platform.Namespace, d.GetServiceName()) +} + func (d DataIndexHandler) GetEnvironmentVariables() []corev1.EnvVar { return []corev1.EnvVar{ { @@ -169,17 +235,16 @@ func (d DataIndexHandler) GetServiceCmName() string { func (d DataIndexHandler) GenerateWorkflowProperties() (*properties.Properties, error) { props := properties.NewProperties() - if d.platform.Spec.Services.DataIndex != nil { - dataIndexUrl := generateServiceURL(constants.KogitoProcessEventsProtocol, d.platform.Namespace, d.GetServiceName()) - props.Set(constants.KogitoProcessDefinitionsEventsURL, fmt.Sprintf("%s/definitions", dataIndexUrl)) - props.Set(constants.KogitoProcessInstancesEventsURL, fmt.Sprintf("%s/processes", dataIndexUrl)) + if d.IsServiceEnabled() { + props.Set(constants.KogitoProcessDefinitionsEventsURL, d.GetServiceBaseUrl()+"/definitions") + props.Set(constants.KogitoProcessInstancesEventsURL, d.GetServiceUrl()) } return props, nil } func (d DataIndexHandler) GenerateServiceProperties() (*properties.Properties, error) { props := properties.NewProperties() - props.Set(constants.KogitoServiceURLProperty, generateServiceURL(constants.KogitoServiceURLProtocol, d.platform.Namespace, d.GetServiceName())) + props.Set(constants.KogitoServiceURLProperty, d.GetLocalServiceBaseUrl()) props.Set(constants.DataIndexKafkaSmallRyeHealthProperty, "false") return props, nil } @@ -216,6 +281,56 @@ func (j JobServiceHandler) GetServiceCmName() string { return fmt.Sprintf("%s-props", j.GetServiceName()) } +func (j JobServiceHandler) SetServiceUrlInStatus(clusterRefPlatform *operatorapi.SonataFlowPlatform) { + psJS := NewJobServiceHandler(clusterRefPlatform) + if !isServicesSet(j.platform) && psJS.IsServiceEnabledInSpec() { + if j.platform.Status.ClusterPlatformRef != nil { + if j.platform.Status.ClusterPlatformRef.Services == nil { + j.platform.Status.ClusterPlatformRef.Services = &operatorapi.PlatformServicesStatus{} + } + j.platform.Status.ClusterPlatformRef.Services.JobServiceRef = &operatorapi.PlatformServiceRefStatus{ + Url: psJS.GetLocalServiceBaseUrl(), + } + } + } +} + +func (j JobServiceHandler) IsServiceSetInSpec() bool { + return isJobServiceSet(j.platform) +} + +func (j JobServiceHandler) IsServiceEnabledInSpec() bool { + return isJobServiceEnabled(j.platform) +} + +func (j JobServiceHandler) isServiceEnabledInStatus() bool { + return j.platform != nil && j.platform.Status.ClusterPlatformRef != nil && + j.platform.Status.ClusterPlatformRef.Services != nil && j.platform.Status.ClusterPlatformRef.Services.JobServiceRef != nil && + !isServicesSet(j.platform) +} + +func (j JobServiceHandler) IsServiceEnabled() bool { + return j.IsServiceEnabledInSpec() || j.isServiceEnabledInStatus() +} + +func (j JobServiceHandler) GetServiceUrl() string { + return j.GetServiceBaseUrl() + constants.JobServiceURLPath +} + +func (j JobServiceHandler) GetServiceBaseUrl() string { + if j.IsServiceEnabledInSpec() { + return j.GetLocalServiceBaseUrl() + } + if j.isServiceEnabledInStatus() { + return j.platform.Status.ClusterPlatformRef.Services.JobServiceRef.Url + } + return "" +} + +func (j JobServiceHandler) GetLocalServiceBaseUrl() string { + return generateServiceURL(constants.JobServiceURLProtocol, j.platform.Namespace, j.GetServiceName()) +} + func (j JobServiceHandler) GetEnvironmentVariables() []corev1.EnvVar { return []corev1.EnvVar{ { @@ -277,17 +392,17 @@ func (j JobServiceHandler) GenerateServiceProperties() (*properties.Properties, props.Set(constants.JobServiceKafkaSmallRyeHealthProperty, "false") // add data source reactive URL jspec := j.platform.Spec.Services.JobService - if jspec != nil && jspec.Persistence != nil && jspec.Persistence.PostgreSql != nil { + if j.IsServiceSetInSpec() && jspec.Persistence != nil && jspec.Persistence.PostgreSql != nil { dataSourceReactiveURL, err := generateReactiveURL(jspec.Persistence.PostgreSql, j.GetServiceName(), j.platform.Namespace, constants.DefaultDatabaseName, constants.DefaultPostgreSQLPort) if err != nil { return nil, err } props.Set(constants.JobServiceDataSourceReactiveURL, dataSourceReactiveURL) } - if dataIndexEnabled(j.platform) { + if isDataIndexEnabled(j.platform) { di := NewDataIndexHandler(j.platform) props.Set(constants.JobServiceStatusChangeEvents, "true") - props.Set(constants.JobServiceStatusChangeEventsURL, fmt.Sprintf("%s/jobs", generateServiceURL(constants.KogitoProcessEventsProtocol, j.platform.Namespace, di.GetServiceName()))) + props.Set(constants.JobServiceStatusChangeEventsURL, di.GetLocalServiceBaseUrl()+"/jobs") } props.Sort() return props, nil @@ -295,17 +410,33 @@ func (j JobServiceHandler) GenerateServiceProperties() (*properties.Properties, func (j JobServiceHandler) GenerateWorkflowProperties() (*properties.Properties, error) { props := properties.NewProperties() - props.Set(constants.JobServiceRequestEventsURL, fmt.Sprintf("%s/v2/jobs/events", generateServiceURL(constants.KogitoProcessEventsProtocol, j.platform.Namespace, j.GetServiceName()))) + if j.IsServiceEnabled() { + // add data source reactive URL + props.Set(constants.JobServiceRequestEventsURL, j.GetServiceUrl()) + } return props, nil } -func dataIndexEnabled(platform *operatorapi.SonataFlowPlatform) bool { - return platform != nil && platform.Spec.Services.DataIndex != nil && - platform.Spec.Services.DataIndex.Enabled != nil && *platform.Spec.Services.DataIndex.Enabled +func isDataIndexEnabled(platform *operatorapi.SonataFlowPlatform) bool { + return isDataIndexSet(platform) && platform.Spec.Services.DataIndex.Enabled != nil && + *platform.Spec.Services.DataIndex.Enabled +} + +func isJobServiceEnabled(platform *operatorapi.SonataFlowPlatform) bool { + return isJobServiceSet(platform) && platform.Spec.Services.JobService.Enabled != nil && + *platform.Spec.Services.JobService.Enabled +} + +func isDataIndexSet(platform *operatorapi.SonataFlowPlatform) bool { + return isServicesSet(platform) && platform.Spec.Services.DataIndex != nil +} + +func isJobServiceSet(platform *operatorapi.SonataFlowPlatform) bool { + return isServicesSet(platform) && platform.Spec.Services.JobService != nil } -func jobServiceEnabled(platform *operatorapi.SonataFlowPlatform) bool { - return platform != nil && platform.Spec.Services.JobService != nil && platform.Spec.Services.JobService.Enabled != nil && *platform.Spec.Services.JobService.Enabled +func isServicesSet(platform *operatorapi.SonataFlowPlatform) bool { + return platform != nil && platform.Spec.Services != nil } func generateServiceURL(protocol string, namespace string, name string) string { diff --git a/controllers/profiles/common/constants/platform_services.go b/controllers/profiles/common/constants/platform_services.go index 99d295633..b3af717fa 100644 --- a/controllers/profiles/common/constants/platform_services.go +++ b/controllers/profiles/common/constants/platform_services.go @@ -32,6 +32,7 @@ const ( JobServiceStatusChangeEventsURL = "mp.messaging.outgoing.kogito-job-service-job-status-events-http.url" JobServiceURLProtocol = "http" JobServiceDataSourceReactiveURL = "quarkus.datasource.reactive.url" + JobServiceURLPath = "/v2/jobs/events" KogitoProcessEventsProtocol = "http" KogitoProcessInstancesEventsURL = "mp.messaging.outgoing.kogito-processinstances-events.url" @@ -45,9 +46,10 @@ const ( DataIndexKafkaSmallRyeHealthProperty = `quarkus.smallrye-health.check."io.quarkus.kafka.client.health.KafkaHealthCheck".enabled` JobServiceKafkaSmallRyeHealthProperty = `quarkus.smallrye-health.check."org.kie.kogito.jobs.service.messaging.http.health.knative.KSinkInjectionHealthCheck".enabled` - DataIndexServiceName = "data-index-service" - JobServiceName = "jobs-service" - ImageNamePrefix = "quay.io/kiegroup/kogito" + DataIndexServiceName = "data-index-service" + DataIndexServiceURLPath = "/processes" + JobServiceName = "jobs-service" + ImageNamePrefix = "quay.io/kiegroup/kogito" //TODO, the usage of this constant was temporary introduced since only the nightly images are being updated for the //data-index and jobs-service. And this is causing issues at the time of using the workflows integrated with these, etc. //This will be removed when the CI is fixed. diff --git a/controllers/profiles/common/properties/application_test.go b/controllers/profiles/common/properties/application_test.go index a4c5dcf33..2a95e9674 100644 --- a/controllers/profiles/common/properties/application_test.go +++ b/controllers/profiles/common/properties/application_test.go @@ -200,7 +200,7 @@ func Test_appPropertyHandler_WithServicesWithUserOverrides(t *testing.T) { platform := test.GetBasePlatform() platform.Namespace = ns platform.Spec = operatorapi.SonataFlowPlatformSpec{ - Services: operatorapi.ServicesPlatformSpec{ + Services: &operatorapi.ServicesPlatformSpec{ DataIndex: &operatorapi.ServiceSpec{ Enabled: &enabled, }, @@ -604,6 +604,9 @@ func generatePlatform(opts ...plfmOptionFn) *operatorapi.SonataFlowPlatform { func setJobServiceEnabledValue(v *bool) plfmOptionFn { return func(p *operatorapi.SonataFlowPlatform) { + if p.Spec.Services == nil { + p.Spec.Services = &operatorapi.ServicesPlatformSpec{} + } if p.Spec.Services.JobService == nil { p.Spec.Services.JobService = &operatorapi.ServiceSpec{} } @@ -613,6 +616,9 @@ func setJobServiceEnabledValue(v *bool) plfmOptionFn { func setDataIndexEnabledValue(v *bool) plfmOptionFn { return func(p *operatorapi.SonataFlowPlatform) { + if p.Spec.Services == nil { + p.Spec.Services = &operatorapi.ServicesPlatformSpec{} + } if p.Spec.Services.DataIndex == nil { p.Spec.Services.DataIndex = &operatorapi.ServiceSpec{} } @@ -634,6 +640,9 @@ func setPlatformName(name string) plfmOptionFn { func setJobServiceJDBC(jdbc string) plfmOptionFn { return func(p *operatorapi.SonataFlowPlatform) { + if p.Spec.Services == nil { + p.Spec.Services = &operatorapi.ServicesPlatformSpec{} + } if p.Spec.Services.JobService == nil { p.Spec.Services.JobService = &operatorapi.ServiceSpec{} } diff --git a/controllers/sonataflowclusterplatform_controller.go b/controllers/sonataflowclusterplatform_controller.go new file mode 100644 index 000000000..a511bbafb --- /dev/null +++ b/controllers/sonataflowclusterplatform_controller.go @@ -0,0 +1,175 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package controllers + +import ( + "context" + "fmt" + "time" + + "github.com/apache/incubator-kie-kogito-serverless-operator/api" + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + clientr "github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/client" + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/clusterplatform" + "github.com/apache/incubator-kie-kogito-serverless-operator/log" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" + ctrlrun "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +// SonataFlowClusterPlatformReconciler reconciles a SonataFlowClusterPlatform object +type SonataFlowClusterPlatformReconciler struct { + // This Client, initialized using mgr.Client() above, is a split Client + // that reads objects from the cache and writes to the API server + ctrl.Client + // Non-caching Client + Reader ctrl.Reader + Scheme *runtime.Scheme + Config *rest.Config + Recorder record.EventRecorder +} + +//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflowclusterplatforms,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflowclusterplatforms/status,verbs=get;update;patch +//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflowclusterplatforms/finalizers,verbs=update + +// Reconcile is part of the main kubernetes reconciliation loop which aims to +// move the current state of the cluster closer to the desired state. +// TODO(user): Modify the Reconcile function to compare the state specified by +// the SonataFlowClusterPlatform object against the actual cluster state, and then +// perform operations to make the cluster state reflect the state specified by +// the user. +// +// For more details, check Reconcile and its Result here: +// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.13.0/pkg/reconcile +func (r *SonataFlowClusterPlatformReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + + // Fetch the SonataFlowClusterPlatform instance + var instance operatorapi.SonataFlowClusterPlatform + + err := r.Client.Get(ctx, req.NamespacedName, &instance) + if err != nil { + if errors.IsNotFound(err) { + return reconcile.Result{}, nil + } + klog.V(log.E).ErrorS(err, "Failed to get SonataFlowClusterPlatform") + return reconcile.Result{}, err + } + + instance.Status.Manager().InitializeConditions() + + cli, _ := clientr.FromCtrlClientSchemeAndConfig(r.Client, r.Scheme, r.Config) + action := clusterplatform.NewInitializeAction() + action.InjectClient(cli) + klog.V(log.I).InfoS("Invoking action", "Name", action.Name()) + + target := instance.DeepCopy() + + if action.CanHandle(ctx, target) { + if err = action.Handle(ctx, target); err != nil { + target.Status.Manager().MarkFalse(api.SucceedConditionType, operatorapi.PlatformFailureReason, err.Error()) + if err := r.Client.Status().Patch(ctx, target, ctrl.MergeFrom(&instance)); err != nil { + return reconcile.Result{}, err + } + r.Recorder.Event(&instance, corev1.EventTypeWarning, "Failed", fmt.Sprintf("Failed to update SonataFlowClusterPlaform: %s", err)) + return reconcile.Result{}, err + } + + if target != nil { + target.Status.ObservedGeneration = instance.Generation + + if err := r.Client.Status().Patch(ctx, target, ctrl.MergeFrom(&instance)); err != nil { + r.Recorder.Event(&instance, corev1.EventTypeNormal, "Status Updated", fmt.Sprintf("Updated cluster platform condition %s", instance.Status.GetTopLevelCondition())) + return reconcile.Result{}, err + } + } + + // handle one action at time so the resource + // is always at its latest state + r.Recorder.Event(&instance, corev1.EventTypeNormal, "Updated", fmt.Sprintf("Updated cluster platform condition to %s", instance.Status.GetTopLevelCondition())) + + if target != nil && target.Status.IsReady() { + return reconcile.Result{}, nil + } + + // Requeue + return reconcile.Result{ + RequeueAfter: 5 * time.Second, + }, nil + } + + return reconcile.Result{}, nil +} + +// SetupWithManager sets up the controller with the Manager. +func (r *SonataFlowClusterPlatformReconciler) SetupWithManager(mgr ctrlrun.Manager) error { + return ctrlrun.NewControllerManagedBy(mgr). + For(&operatorapi.SonataFlowClusterPlatform{}). + Watches(&operatorapi.SonataFlowPlatform{}, handler.EnqueueRequestsFromMapFunc(r.mapPlatformToClusterPlatformRequests)). + Watches(&operatorapi.SonataFlowClusterPlatform{}, handler.EnqueueRequestsFromMapFunc(r.mapClusterPlatformToClusterPlatformRequests)). + Complete(r) +} + +// if actively referenced sonataflowplatform object is changed, reconcile the active SonataFlowClusterPlatform. +func (r *SonataFlowClusterPlatformReconciler) mapPlatformToClusterPlatformRequests(ctx context.Context, object client.Object) []reconcile.Request { + sfcPlatform, err := clusterplatform.GetActiveClusterPlatform(ctx, r.Client) + if err != nil && !errors.IsNotFound(err) { + klog.V(log.E).ErrorS(err, "Failed to get active SonataFlowClusterPlatform") + return nil + } + + if sfcPlatform != nil { + sfpcRefNsName := types.NamespacedName{Namespace: sfcPlatform.Spec.PlatformRef.Namespace, Name: sfcPlatform.Spec.PlatformRef.Name} + if client.ObjectKeyFromObject(object) == sfpcRefNsName { + return []reconcile.Request{{NamespacedName: client.ObjectKeyFromObject(sfcPlatform)}} + } + } + return nil +} + +// if active sonataflowclusterplatform is changed, reconcile other SonataFlowClusterPlatforms. +func (r *SonataFlowClusterPlatformReconciler) mapClusterPlatformToClusterPlatformRequests(ctx context.Context, object client.Object) []reconcile.Request { + sfcPlatform := object.(*operatorapi.SonataFlowClusterPlatform) + var requests []reconcile.Request + if sfcPlatform != nil && clusterplatform.IsActive(sfcPlatform) { + var scpList operatorapi.SonataFlowClusterPlatformList + if err := r.List(ctx, &scpList); err != nil { + klog.V(log.E).ErrorS(err, "Could not list SonataFlowClusterPlatforms. "+ + "SonataFlowClusterPlatforms affected by changes to the active SonataFlowClusterPlatform %s will not be reconciled.", + sfcPlatform.Name) + return nil + } + + scpNamespacedName := client.ObjectKeyFromObject(sfcPlatform) + for _, cPlatform := range scpList.Items { + namespacedName := client.ObjectKeyFromObject(&cPlatform) + // this check is required so that the active clusterplatform object doesn't reconcile + if scpNamespacedName != namespacedName { + requests = append(requests, reconcile.Request{NamespacedName: namespacedName}) + } + } + } + return requests +} diff --git a/controllers/sonataflowplatform_controller.go b/controllers/sonataflowplatform_controller.go index 595b8339b..c0ccd3991 100644 --- a/controllers/sonataflowplatform_controller.go +++ b/controllers/sonataflowplatform_controller.go @@ -24,26 +24,26 @@ import ( "fmt" "time" + "github.com/apache/incubator-kie-kogito-serverless-operator/api" + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + clientr "github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/client" + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/clusterplatform" + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/platform" + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/platform/services" + "github.com/apache/incubator-kie-kogito-serverless-operator/log" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - - "github.com/apache/incubator-kie-kogito-serverless-operator/api" - - clientr "github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/client" - - "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/platform" - ctrlrun "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" ctrl "sigs.k8s.io/controller-runtime/pkg/client" - - operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" - "github.com/apache/incubator-kie-kogito-serverless-operator/log" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" ) // SonataFlowPlatformReconciler reconciles a SonataFlowPlatform object @@ -114,6 +114,10 @@ func (r *SonataFlowPlatformReconciler) Reconcile(ctx context.Context, req reconc target := instance.DeepCopy() + if err = r.SonataFlowPlatformUpdateStatus(ctx, req, target); err != nil { + return reconcile.Result{}, err + } + for _, a := range actions { cli, _ := clientr.FromCtrlClientSchemeAndConfig(r.Client, r.Scheme, r.Config) a.InjectClient(cli) @@ -166,6 +170,51 @@ func (r *SonataFlowPlatformReconciler) Reconcile(ctx context.Context, req reconc } +// If an active cluster platform exists, update platform.Status accordingly +func (r *SonataFlowPlatformReconciler) SonataFlowPlatformUpdateStatus(ctx context.Context, req reconcile.Request, target *operatorapi.SonataFlowPlatform) error { + // Fetch the active SonataFlowClusterPlatform instance + sfcPlatform, err := clusterplatform.GetActiveClusterPlatform(ctx, r.Client) + if err != nil && !errors.IsNotFound(err) { + klog.V(log.E).ErrorS(err, "Failed to get active SonataFlowClusterPlatform") + return err + } + + if sfcPlatform != nil { + sfPlatform := &operatorapi.SonataFlowPlatform{} + + platformRef := sfcPlatform.Spec.PlatformRef + namespacedName := types.NamespacedName{Namespace: platformRef.Namespace, Name: platformRef.Name} + if req.NamespacedName == namespacedName { + sfPlatform = target.DeepCopy() + } else { + // retrieve referenced platform object + err := r.Reader.Get(ctx, namespacedName, sfPlatform) + if err != nil && !errors.IsNotFound(err) { + klog.V(log.E).ErrorS(err, "Failed to get referenced SonataFlowPlatform", namespacedName) + return err + } + } + + target.Status.ClusterPlatformRef = &operatorapi.SonataFlowClusterPlatformRefStatus{ + Name: sfcPlatform.Name, + PlatformRef: operatorapi.SonataFlowPlatformRef{ + Name: platformRef.Name, + Namespace: platformRef.Namespace, + }, + } + + tpsDI := services.NewDataIndexHandler(target) + tpsDI.SetServiceUrlInStatus(sfPlatform) + + tpsJS := services.NewJobServiceHandler(target) + tpsJS.SetServiceUrlInStatus(sfPlatform) + } else { + target.Status.ClusterPlatformRef = nil + } + + return nil +} + // SetupWithManager sets up the controller with the Manager. func (r *SonataFlowPlatformReconciler) SetupWithManager(mgr ctrlrun.Manager) error { return ctrlrun.NewControllerManagedBy(mgr). @@ -173,5 +222,54 @@ func (r *SonataFlowPlatformReconciler) SetupWithManager(mgr ctrlrun.Manager) err Owns(&appsv1.Deployment{}). Owns(&corev1.Service{}). Owns(&corev1.ConfigMap{}). + Watches(&operatorapi.SonataFlowPlatform{}, handler.EnqueueRequestsFromMapFunc(r.mapPlatformToPlatformRequests)). + Watches(&operatorapi.SonataFlowClusterPlatform{}, handler.EnqueueRequestsFromMapFunc(r.mapClusterPlatformToPlatformRequests)). Complete(r) } + +// if active clusterplatform object is changed, reconcile all SonataFlowPlatforms in the cluster. +func (r *SonataFlowPlatformReconciler) mapClusterPlatformToPlatformRequests(ctx context.Context, object client.Object) []reconcile.Request { + sfcPlatform := object.(*operatorapi.SonataFlowClusterPlatform) + if sfcPlatform != nil && clusterplatform.IsActive(sfcPlatform) { + return r.platformRequests(ctx, sfcPlatform, true) + } + return nil +} + +// if actively referenced sonataflowplatform is changed, reconcile other SonataFlowPlatforms in the cluster. +func (r *SonataFlowPlatformReconciler) mapPlatformToPlatformRequests(ctx context.Context, object client.Object) []reconcile.Request { + platform := object.(*operatorapi.SonataFlowPlatform) + sfcPlatform, err := clusterplatform.GetActiveClusterPlatform(ctx, r.Client) + if err != nil && !errors.IsNotFound(err) { + klog.V(log.E).ErrorS(err, "Failed to get active SonataFlowClusterPlatform") + return nil + } + + if sfcPlatform != nil { + sfpcRefNsName := types.NamespacedName{Namespace: sfcPlatform.Spec.PlatformRef.Namespace, Name: sfcPlatform.Spec.PlatformRef.Name} + if client.ObjectKeyFromObject(platform) == sfpcRefNsName { + return r.platformRequests(ctx, sfcPlatform, false) + } + } + return nil +} + +func (r *SonataFlowPlatformReconciler) platformRequests(ctx context.Context, sfcPlatform *operatorapi.SonataFlowClusterPlatform, allPlatforms bool) []reconcile.Request { + var plList operatorapi.SonataFlowPlatformList + if err := r.List(ctx, &plList, client.InNamespace("")); err != nil { + klog.V(log.E).ErrorS(err, "could not list SonataFlowPlatforms. "+ + "SonataFlowPlatforms affected by changes to the active SonataFlowPlatform or SonataFlowClusterPlatform object will not be reconciled.") + return nil + } + + sfpcRefNsName := types.NamespacedName{Namespace: sfcPlatform.Spec.PlatformRef.Namespace, Name: sfcPlatform.Spec.PlatformRef.Name} + var requests []reconcile.Request + for _, platform := range plList.Items { + sfpNsName := client.ObjectKeyFromObject(&platform) + // this check is required so that the cluster-referenced platform object doesn't infinitely reconcile + if sfpNsName != sfpcRefNsName || allPlatforms { + requests = append(requests, reconcile.Request{NamespacedName: sfpNsName}) + } + } + return requests +} diff --git a/controllers/sonataflowplatform_controller_test.go b/controllers/sonataflowplatform_controller_test.go index db0ba0805..afea81a36 100644 --- a/controllers/sonataflowplatform_controller_test.go +++ b/controllers/sonataflowplatform_controller_test.go @@ -23,6 +23,10 @@ import ( "context" "testing" + "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/platform/services" + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common/constants" + "github.com/apache/incubator-kie-kogito-serverless-operator/test" "github.com/stretchr/testify/assert" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -30,12 +34,6 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/reconcile" - - "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/platform/services" - "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common/constants" - "github.com/apache/incubator-kie-kogito-serverless-operator/test" - - "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" ) var ( @@ -79,7 +77,7 @@ func TestSonataFlowPlatformController(t *testing.T) { assert.Equal(t, "quay.io/kiegroup", ksp.Spec.Build.Config.Registry.Address) assert.Equal(t, "regcred", ksp.Spec.Build.Config.Registry.Secret) assert.Equal(t, v1alpha08.OperatorBuildStrategy, ksp.Spec.Build.Config.BuildStrategy) - assert.Nil(t, ksp.Spec.Services.DataIndex) + assert.Nil(t, ksp.Spec.Services) assert.Equal(t, v1alpha08.PlatformClusterKubernetes, ksp.Status.Cluster) assert.Equal(t, v1alpha08.PlatformCreatingReason, ksp.Status.GetTopLevelCondition().Reason) @@ -89,7 +87,7 @@ func TestSonataFlowPlatformController(t *testing.T) { namespace := t.Name() // Create a SonataFlowPlatform object with metadata and spec. ksp := test.GetBasePlatformInReadyPhase(namespace) - ksp.Spec.Services = v1alpha08.ServicesPlatformSpec{ + ksp.Spec.Services = &v1alpha08.ServicesPlatformSpec{ DataIndex: &v1alpha08.ServiceSpec{}, } @@ -168,7 +166,7 @@ func TestSonataFlowPlatformController(t *testing.T) { // Create a SonataFlowPlatform object with metadata and spec. ksp := test.GetBasePlatformInReadyPhase(namespace) var replicas int32 = 2 - ksp.Spec.Services = v1alpha08.ServicesPlatformSpec{ + ksp.Spec.Services = &v1alpha08.ServicesPlatformSpec{ DataIndex: &v1alpha08.ServiceSpec{ PodTemplate: v1alpha08.PodTemplateSpec{ Replicas: &replicas, @@ -257,7 +255,7 @@ func TestSonataFlowPlatformController(t *testing.T) { namespace := t.Name() // Create a SonataFlowPlatform object with metadata and spec. ksp := test.GetBasePlatformInReadyPhase(namespace) - ksp.Spec.Services = v1alpha08.ServicesPlatformSpec{ + ksp.Spec.Services = &v1alpha08.ServicesPlatformSpec{ JobService: &v1alpha08.ServiceSpec{}, } @@ -334,7 +332,7 @@ func TestSonataFlowPlatformController(t *testing.T) { // Create a SonataFlowPlatform object with metadata and spec. ksp := test.GetBasePlatformInReadyPhase(namespace) var replicas int32 = 2 - ksp.Spec.Services = v1alpha08.ServicesPlatformSpec{ + ksp.Spec.Services = &v1alpha08.ServicesPlatformSpec{ JobService: &v1alpha08.ServiceSpec{ PodTemplate: v1alpha08.PodTemplateSpec{ Replicas: &replicas, @@ -415,7 +413,7 @@ func TestSonataFlowPlatformController(t *testing.T) { namespace := t.Name() // Create a SonataFlowPlatform object with metadata and spec. ksp := test.GetBasePlatformInReadyPhase(namespace) - ksp.Spec.Services = v1alpha08.ServicesPlatformSpec{ + ksp.Spec.Services = &v1alpha08.ServicesPlatformSpec{ DataIndex: &v1alpha08.ServiceSpec{}, JobService: &v1alpha08.ServiceSpec{}, } @@ -475,4 +473,125 @@ func TestSonataFlowPlatformController(t *testing.T) { assert.NotContains(t, dep.Spec.Template.Spec.Containers[0].Env, envDataIndex) }) + t.Run("verify that a basic reconcile of a cluster platform is performed without error", func(t *testing.T) { + namespace := t.Name() + + // Create a SonataFlowClusterPlatform object with metadata and spec. + kscp := test.GetBaseClusterPlatformInReadyPhase(namespace) + + // Create a SonataFlowPlatform object with metadata and spec. + ksp := test.GetBasePlatformInReadyPhase(namespace) + ksp.Spec.Services = &v1alpha08.ServicesPlatformSpec{ + DataIndex: &v1alpha08.ServiceSpec{}, + JobService: &v1alpha08.ServiceSpec{}, + } + ksp2 := test.GetBasePlatformInReadyPhase(namespace) + ksp2.Name = "ksp2" + + // Create a fake client to mock API calls. + cl := test.NewSonataFlowClientBuilder().WithRuntimeObjects(kscp, ksp, ksp2).WithStatusSubresource(kscp, ksp, ksp2).Build() + + // Create a SonataFlowPlatformReconciler object with the scheme and fake client. + r := &SonataFlowPlatformReconciler{cl, cl, cl.Scheme(), &rest.Config{}, &record.FakeRecorder{}} + + // Mock request to simulate Reconcile() being called on an event for a + // watched resource . + req := reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: ksp.Name, + Namespace: ksp.Namespace, + }, + } + _, err := r.Reconcile(context.TODO(), req) + if err != nil { + t.Fatalf("reconcile: (%v)", err) + } + + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: ksp.Name, Namespace: ksp.Namespace}, ksp)) + assert.Greater(t, len(ksp2.Status.Conditions), 0) + assert.Nil(t, ksp2.Status.ClusterPlatformRef) + + // Create a SonataFlowClusterPlatformReconciler object with the scheme and fake client. + cr := &SonataFlowClusterPlatformReconciler{cl, cl, cl.Scheme(), &rest.Config{}, &record.FakeRecorder{}} + + // Mock request to simulate Reconcile() being called on an event for a + // watched resource . + cReq := reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: kscp.Name, + }, + } + _, err = cr.Reconcile(context.TODO(), cReq) + if err != nil { + t.Fatalf("reconcile: (%v)", err) + } + + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: kscp.Name}, kscp)) + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: ksp.Name, Namespace: ksp.Namespace}, ksp)) + + // Perform some checks on the created CR + assert.True(t, ksp.Status.IsReady()) + assert.True(t, kscp.Status.IsReady()) + assert.Equal(t, "quay.io/kiegroup", ksp.Spec.Build.Config.Registry.Address) + assert.Equal(t, "regcred", ksp.Spec.Build.Config.Registry.Secret) + assert.Equal(t, v1alpha08.OperatorBuildStrategy, ksp.Spec.Build.Config.BuildStrategy) + assert.NotNil(t, ksp.Spec.Services.DataIndex) + assert.NotNil(t, ksp.Spec.Services.DataIndex.Enabled) + assert.Equal(t, true, *ksp.Spec.Services.DataIndex.Enabled) + assert.Equal(t, v1alpha08.PlatformClusterKubernetes, ksp.Status.Cluster) + assert.Equal(t, "", ksp.Status.GetTopLevelCondition().Reason) + assert.Equal(t, kscp.Name, ksp.Status.ClusterPlatformRef.Name) + assert.Equal(t, kscp.Spec.PlatformRef.Name, ksp.Status.ClusterPlatformRef.PlatformRef.Name) + assert.Equal(t, kscp.Spec.PlatformRef.Namespace, ksp.Status.ClusterPlatformRef.PlatformRef.Namespace) + + assert.NotNil(t, ksp.Status.ClusterPlatformRef) + assert.Nil(t, ksp.Status.ClusterPlatformRef.Services) + + req2 := reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: ksp2.Name, + Namespace: ksp2.Namespace, + }, + } + _, err = r.Reconcile(context.TODO(), req2) + if err != nil { + t.Fatalf("reconcile: (%v)", err) + } + + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: ksp2.Name, Namespace: ksp2.Namespace}, ksp2)) + assert.True(t, ksp2.Status.IsReady()) + assert.NotNil(t, ksp2.Status.ClusterPlatformRef) + assert.Equal(t, kscp.Name, ksp2.Status.ClusterPlatformRef.Name) + assert.Equal(t, kscp.Spec.PlatformRef.Name, ksp2.Status.ClusterPlatformRef.PlatformRef.Name) + assert.Equal(t, kscp.Spec.PlatformRef.Namespace, ksp2.Status.ClusterPlatformRef.PlatformRef.Namespace) + assert.NotNil(t, ksp2.Status.ClusterPlatformRef.Services) + assert.NotNil(t, ksp2.Status.ClusterPlatformRef.Services.DataIndexRef) + assert.NotEmpty(t, ksp2.Status.ClusterPlatformRef.Services.DataIndexRef.Url) + assert.NotNil(t, ksp2.Status.ClusterPlatformRef.Services.JobServiceRef) + assert.NotEmpty(t, ksp2.Status.ClusterPlatformRef.Services.JobServiceRef.Url) + + psDi := services.NewDataIndexHandler(ksp) + psDi2 := services.NewDataIndexHandler(ksp2) + assert.Equal(t, ksp2.Status.ClusterPlatformRef.Services.DataIndexRef.Url, psDi.GetLocalServiceBaseUrl()) + assert.Equal(t, psDi.GetLocalServiceBaseUrl()+constants.DataIndexServiceURLPath, psDi2.GetServiceUrl()) + psJs := services.NewJobServiceHandler(ksp) + psJs2 := services.NewJobServiceHandler(ksp2) + assert.Equal(t, ksp2.Status.ClusterPlatformRef.Services.JobServiceRef.Url, psJs.GetLocalServiceBaseUrl()) + assert.Equal(t, psJs.GetLocalServiceBaseUrl()+constants.JobServiceURLPath, psJs2.GetServiceUrl()) + + ksp2.Spec.Services = &v1alpha08.ServicesPlatformSpec{} + + assert.NoError(t, cl.Update(context.TODO(), ksp2)) + _, err = r.Reconcile(context.TODO(), req2) + if err != nil { + t.Fatalf("reconcile: (%v)", err) + } + + assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: ksp2.Name, Namespace: ksp2.Namespace}, ksp2)) + assert.True(t, ksp2.Status.IsReady()) + assert.NotNil(t, ksp2.Status.ClusterPlatformRef) + assert.Equal(t, kscp.Spec.PlatformRef.Name, ksp2.Status.ClusterPlatformRef.PlatformRef.Name) + assert.Equal(t, kscp.Spec.PlatformRef.Namespace, ksp2.Status.ClusterPlatformRef.PlatformRef.Namespace) + assert.Nil(t, ksp2.Status.ClusterPlatformRef.Services) + }) } diff --git a/main.go b/main.go index 645cfb880..a1040f7bf 100644 --- a/main.go +++ b/main.go @@ -115,6 +115,16 @@ func main() { klog.V(log.E).ErrorS(err, "unable to create controller", "controller", "SonataFlowPlatform") os.Exit(1) } + if err = (&controllers.SonataFlowClusterPlatformReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Reader: mgr.GetAPIReader(), + Config: mgr.GetConfig(), + Recorder: mgr.GetEventRecorderFor("cluster-platform-controller"), + }).SetupWithManager(mgr); err != nil { + klog.V(log.E).ErrorS(err, "unable to create controller", "controller", "SonataFlowClusterPlatform") + os.Exit(1) + } //+kubebuilder:scaffold:builder if utils.IsOpenShift() { diff --git a/operator.yaml b/operator.yaml index 973b5cacc..7c031b075 100644 --- a/operator.yaml +++ b/operator.yaml @@ -364,6 +364,121 @@ spec: --- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.9.2 + creationTimestamp: null + name: sonataflowclusterplatforms.sonataflow.org +spec: + group: sonataflow.org + names: + kind: SonataFlowClusterPlatform + listKind: SonataFlowClusterPlatformList + plural: sonataflowclusterplatforms + singular: sonataflowclusterplatform + scope: Cluster + versions: + - additionalPrinterColumns: + - jsonPath: .spec.platformRef.name + name: Platform_Name + type: string + - jsonPath: .spec.platformRef.namespace + name: Platform_NS + type: string + - jsonPath: .status.conditions[?(@.type=='Succeed')].status + name: Ready + type: string + - jsonPath: .status.conditions[?(@.type=='Succeed')].reason + name: Reason + type: string + name: v1alpha08 + schema: + openAPIV3Schema: + description: SonataFlowClusterPlatform is the Schema for the sonataflowclusterplatforms + API + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: SonataFlowClusterPlatformSpec defines the desired state of + SonataFlowClusterPlatform + properties: + platformRef: + description: PlatformRef defines which existing SonataFlowPlatform's + services should be used cluster-wide. + properties: + name: + description: Name of the SonataFlowPlatform + type: string + namespace: + description: Namespace of the SonataFlowPlatform + type: string + required: + - name + - namespace + type: object + required: + - platformRef + type: object + status: + description: SonataFlowClusterPlatformStatus defines the observed state + of SonataFlowClusterPlatform + properties: + conditions: + description: The latest available observations of a resource's current + state. + items: + description: Condition describes the common structure for conditions + in our types + properties: + lastUpdateTime: + description: The last time this condition was updated. + format: date-time + type: string + message: + description: A human-readable message indicating details about + the transition. + type: string + reason: + description: The reason for the condition's last transition. + type: string + status: + description: Status of the condition, one of True, False, Unknown. + type: string + type: + description: Type condition for the given object + type: string + required: + - status + - type + type: object + type: array + observedGeneration: + description: The generation observed by the deployment controller. + format: int64 + type: integer + version: + description: Version the operator version controlling this ClusterPlatform + type: string + type: object + type: object + served: true + storage: true + subresources: + status: {} +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition metadata: annotations: controller-gen.kubebuilder.io/version: v0.9.2 @@ -786,17 +901,18 @@ spec: type: object services: description: 'Services attributes for deploying supporting applications - like Data Index. Only workflows with the proper annotation will - be configured to use these service(s). `sonataflow.org/profile: - prod`' + like Data Index & Job Service. Only workflows without the `sonataflow.org/profile: + dev` annotation will be configured to use these service(s). Setting + this will override the use of any cluster-scoped services that might + be defined via `SonataFlowClusterPlatform`.' properties: dataIndex: - description: Deploys the Data Index service for use by "prod" - profile workflows. + description: 'Deploys the Data Index service for use by workflows + without the `sonataflow.org/profile: dev` annotation.' properties: enabled: - description: Determines whether "prod" profile workflows should - be configured to use this service + description: 'Determines whether workflows without the `sonataflow.org/profile: + dev` annotation should be configured to use this service' type: boolean persistence: description: Persists service to a datasource of choice. Ephemeral @@ -8663,12 +8779,12 @@ spec: type: object type: object jobService: - description: Deploys the Job service for use by "prod" profile - workflows. + description: 'Deploys the Job service for use by workflows without + the `sonataflow.org/profile: dev` annotation.' properties: enabled: - description: Determines whether "prod" profile workflows should - be configured to use this service + description: 'Determines whether workflows without the `sonataflow.org/profile: + dev` annotation should be configured to use this service' type: boolean persistence: description: Persists service to a datasource of choice. Ephemeral @@ -16546,6 +16662,51 @@ spec: - kubernetes - openshift type: string + clusterPlatformRef: + description: ClusterPlatformRef information related to the (optional) + active SonataFlowClusterPlatform + properties: + name: + description: Name of the active SonataFlowClusterPlatform + type: string + platformRef: + description: PlatformRef displays which SonataFlowPlatform has + been referenced by the active SonataFlowClusterPlatform + properties: + name: + description: Name of the SonataFlowPlatform + type: string + namespace: + description: Namespace of the SonataFlowPlatform + type: string + required: + - name + - namespace + type: object + services: + description: Services displays which cluster-wide services are + being used by this SonataFlowPlatform + properties: + dataIndexRef: + description: DataIndexRef displays information on the cluster-wide + Data Index service + properties: + url: + description: Url displays the base url of a cluster-wide + service + type: string + type: object + jobServiceRef: + description: JobServiceRef displays information on the cluster-wide + Job Service + properties: + url: + description: Url displays the base url of a cluster-wide + service + type: string + type: object + type: object + type: object conditions: description: The latest available observations of a resource's current state. @@ -26175,6 +26336,32 @@ rules: - get - patch - update +- apiGroups: + - sonataflow.org + resources: + - sonataflowclusterplatforms + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - sonataflow.org + resources: + - sonataflowclusterplatforms/finalizers + verbs: + - update +- apiGroups: + - sonataflow.org + resources: + - sonataflowclusterplatforms/status + verbs: + - get + - patch + - update - apiGroups: - sonataflow.org resources: diff --git a/test/testdata/sonataflow.org_v1alpha08_sonataflowclusterplatform.yaml b/test/testdata/sonataflow.org_v1alpha08_sonataflowclusterplatform.yaml new file mode 100644 index 000000000..9759c15ae --- /dev/null +++ b/test/testdata/sonataflow.org_v1alpha08_sonataflowclusterplatform.yaml @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +apiVersion: sonataflow.org/v1alpha08 +kind: SonataFlowClusterPlatform +metadata: + name: cluster +spec: + platformRef: + name: sonataflow-platform + namespace: test-ns \ No newline at end of file diff --git a/test/yaml.go b/test/yaml.go index 37d28220d..d1c4006b9 100644 --- a/test/yaml.go +++ b/test/yaml.go @@ -27,19 +27,15 @@ import ( "runtime" "strings" - "github.com/davecgh/go-spew/spew" - "k8s.io/klog/v2" - "sigs.k8s.io/controller-runtime/pkg/client" - "github.com/apache/incubator-kie-kogito-serverless-operator/api" - + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "github.com/apache/incubator-kie-kogito-serverless-operator/log" + "github.com/davecgh/go-spew/spew" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/yaml" - - "github.com/apache/incubator-kie-kogito-serverless-operator/log" - - operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" ) const ( @@ -53,6 +49,7 @@ const ( sonataFlowPlatformYamlCR = "sonataflow.org_v1alpha08_sonataflowplatform.yaml" sonataFlowPlatformWithCacheMinikubeYamlCR = "sonataflow.org_v1alpha08_sonataflowplatform_withCache_minikube.yaml" sonataFlowPlatformForOpenshift = "sonataflow.org_v1alpha08_sonataflowplatform_openshift.yaml" + sonataFlowClusterPlatformYamlCR = "sonataflow.org_v1alpha08_sonataflowclusterplatform.yaml" sonataFlowBuilderConfig = "sonataflow-operator-builder-config_v1_configmap.yaml" sonataFlowBuildSucceed = "sonataflow.org_v1alpha08_sonataflowbuild.yaml" @@ -86,6 +83,31 @@ func GetKubernetesResource(testFile string, resource client.Object) { } } +func getSonataFlowClusterPlatform(testFile string) *operatorapi.SonataFlowClusterPlatform { + kscp := &operatorapi.SonataFlowClusterPlatform{} + yamlFile, err := os.ReadFile(path.Join(getTestDataDir(), testFile)) + if err != nil { + klog.V(log.E).ErrorS(err, "yamlFile.Get") + panic(err) + } + // Important: Here we are reading the CR deployment file from a given path and creating a &operatorapi.SonataFlowPlatform struct + err = yaml.NewYAMLOrJSONDecoder(bytes.NewReader(yamlFile), 100).Decode(kscp) + if err != nil { + klog.V(log.E).ErrorS(err, "Unmarshal") + panic(err) + } + klog.V(log.D).InfoS("Successfully read KSCP", "kscp", kscp) + kscp.Status.Manager().InitializeConditions() + return kscp +} + +func GetSonataFlowClusterPlatformInReadyPhase(path string, namespace string) *operatorapi.SonataFlowClusterPlatform { + kscp := getSonataFlowClusterPlatform(path) + kscp.Spec.PlatformRef.Namespace = namespace + kscp.Status.Manager().MarkTrue(api.SucceedConditionType) + return kscp +} + func getSonataFlowPlatform(testFile string) *operatorapi.SonataFlowPlatform { ksp := &operatorapi.SonataFlowPlatform{} yamlFile, err := os.ReadFile(path.Join(getTestDataDir(), testFile)) @@ -193,6 +215,10 @@ func GetBaseSonataFlowWithProdOpsProfile(namespace string) *operatorapi.SonataFl return NewSonataFlow(SonataFlowSimpleOpsYamlCR, namespace) } +func GetBaseClusterPlatformInReadyPhase(namespace string) *operatorapi.SonataFlowClusterPlatform { + return GetSonataFlowClusterPlatformInReadyPhase(sonataFlowClusterPlatformYamlCR, namespace) +} + func GetBasePlatformInReadyPhase(namespace string) *operatorapi.SonataFlowPlatform { return GetSonataFlowPlatformInReadyPhase(sonataFlowPlatformYamlCR, namespace) } @@ -213,6 +239,10 @@ func GetBasePlatformWithDevBaseImageInReadyPhase(namespace string) *operatorapi. return platform } +func GetBaseClusterPlatform() *operatorapi.SonataFlowClusterPlatform { + return getSonataFlowClusterPlatform(sonataFlowClusterPlatformYamlCR) +} + func GetBasePlatform() *operatorapi.SonataFlowPlatform { return getSonataFlowPlatform(sonataFlowPlatformYamlCR) } From 544178826b1604e3e17ac6fb079b4ef695c5d375 Mon Sep 17 00:00:00 2001 From: Tommy Hughes Date: Tue, 23 Jan 2024 11:25:09 -0600 Subject: [PATCH 2/3] service constants cleanup Signed-off-by: Tommy Hughes --- controllers/platform/services/services.go | 4 ++-- .../profiles/common/constants/platform_services.go | 9 +++++---- controllers/sonataflowplatform_controller_test.go | 2 +- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/controllers/platform/services/services.go b/controllers/platform/services/services.go index 835264365..e49679e5b 100644 --- a/controllers/platform/services/services.go +++ b/controllers/platform/services/services.go @@ -151,7 +151,7 @@ func (d DataIndexHandler) IsServiceEnabled() bool { } func (d DataIndexHandler) GetServiceUrl() string { - return d.GetServiceBaseUrl() + constants.DataIndexServiceURLPath + return d.GetServiceBaseUrl() + constants.KogitoProcessInstancesEventsPath } func (d DataIndexHandler) GetServiceBaseUrl() string { @@ -236,7 +236,7 @@ func (d DataIndexHandler) GetServiceCmName() string { func (d DataIndexHandler) GenerateWorkflowProperties() (*properties.Properties, error) { props := properties.NewProperties() if d.IsServiceEnabled() { - props.Set(constants.KogitoProcessDefinitionsEventsURL, d.GetServiceBaseUrl()+"/definitions") + props.Set(constants.KogitoProcessDefinitionsEventsURL, d.GetServiceBaseUrl()+constants.KogitoProcessDefinitionsEventsPath) props.Set(constants.KogitoProcessInstancesEventsURL, d.GetServiceUrl()) } return props, nil diff --git a/controllers/profiles/common/constants/platform_services.go b/controllers/profiles/common/constants/platform_services.go index b3af717fa..3db52e16c 100644 --- a/controllers/profiles/common/constants/platform_services.go +++ b/controllers/profiles/common/constants/platform_services.go @@ -37,8 +37,10 @@ const ( KogitoProcessEventsProtocol = "http" KogitoProcessInstancesEventsURL = "mp.messaging.outgoing.kogito-processinstances-events.url" KogitoProcessInstancesEventsEnabled = "kogito.events.processinstances.enabled" + KogitoProcessInstancesEventsPath = "/processes" KogitoProcessDefinitionsEventsURL = "mp.messaging.outgoing.kogito-processdefinitions-events.url" KogitoProcessDefinitionsEventsEnabled = "kogito.events.processdefinitions.enabled" + KogitoProcessDefinitionsEventsPath = "/definitions" KogitoUserTasksEventsEnabled = "kogito.events.usertasks.enabled" KogitoEventsVariablesEnabled = "kogito.events.variables.enabled" KogitoServiceURLProperty = "kogito.service.url" @@ -46,10 +48,9 @@ const ( DataIndexKafkaSmallRyeHealthProperty = `quarkus.smallrye-health.check."io.quarkus.kafka.client.health.KafkaHealthCheck".enabled` JobServiceKafkaSmallRyeHealthProperty = `quarkus.smallrye-health.check."org.kie.kogito.jobs.service.messaging.http.health.knative.KSinkInjectionHealthCheck".enabled` - DataIndexServiceName = "data-index-service" - DataIndexServiceURLPath = "/processes" - JobServiceName = "jobs-service" - ImageNamePrefix = "quay.io/kiegroup/kogito" + DataIndexServiceName = "data-index-service" + JobServiceName = "jobs-service" + ImageNamePrefix = "quay.io/kiegroup/kogito" //TODO, the usage of this constant was temporary introduced since only the nightly images are being updated for the //data-index and jobs-service. And this is causing issues at the time of using the workflows integrated with these, etc. //This will be removed when the CI is fixed. diff --git a/controllers/sonataflowplatform_controller_test.go b/controllers/sonataflowplatform_controller_test.go index afea81a36..5594d6449 100644 --- a/controllers/sonataflowplatform_controller_test.go +++ b/controllers/sonataflowplatform_controller_test.go @@ -573,7 +573,7 @@ func TestSonataFlowPlatformController(t *testing.T) { psDi := services.NewDataIndexHandler(ksp) psDi2 := services.NewDataIndexHandler(ksp2) assert.Equal(t, ksp2.Status.ClusterPlatformRef.Services.DataIndexRef.Url, psDi.GetLocalServiceBaseUrl()) - assert.Equal(t, psDi.GetLocalServiceBaseUrl()+constants.DataIndexServiceURLPath, psDi2.GetServiceUrl()) + assert.Equal(t, psDi.GetLocalServiceBaseUrl()+constants.KogitoProcessInstancesEventsPath, psDi2.GetServiceUrl()) psJs := services.NewJobServiceHandler(ksp) psJs2 := services.NewJobServiceHandler(ksp2) assert.Equal(t, ksp2.Status.ClusterPlatformRef.Services.JobServiceRef.Url, psJs.GetLocalServiceBaseUrl()) From 8c07c6007e0d7523a417de42516c3bd7d284bc13 Mon Sep 17 00:00:00 2001 From: Tommy Hughes Date: Tue, 30 Jan 2024 08:36:55 -0600 Subject: [PATCH 3/3] clusterplatform api descrip change Signed-off-by: Tommy Hughes --- api/v1alpha08/sonataflowclusterplatform_types.go | 3 +-- .../manifests/sonataflow.org_sonataflowclusterplatforms.yaml | 4 ++-- .../crd/bases/sonataflow.org_sonataflowclusterplatforms.yaml | 4 ++-- operator.yaml | 4 ++-- 4 files changed, 7 insertions(+), 8 deletions(-) diff --git a/api/v1alpha08/sonataflowclusterplatform_types.go b/api/v1alpha08/sonataflowclusterplatform_types.go index a67f5ecdc..4016e5072 100644 --- a/api/v1alpha08/sonataflowclusterplatform_types.go +++ b/api/v1alpha08/sonataflowclusterplatform_types.go @@ -28,11 +28,10 @@ const ( // SonataFlowClusterPlatformSpec defines the desired state of SonataFlowClusterPlatform type SonataFlowClusterPlatformSpec struct { - // PlatformRef defines which existing SonataFlowPlatform's services should be used cluster-wide. PlatformRef SonataFlowPlatformRef `json:"platformRef"` } -// SonataFlowPlatformRef defines which existing SonataFlowPlatform should be used cluster-wide +// SonataFlowPlatformRef defines which existing SonataFlowPlatform's supporting services should be used cluster-wide. type SonataFlowPlatformRef struct { // Name of the SonataFlowPlatform //+operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Platform_Name" diff --git a/bundle/manifests/sonataflow.org_sonataflowclusterplatforms.yaml b/bundle/manifests/sonataflow.org_sonataflowclusterplatforms.yaml index 170acace6..faa83264e 100644 --- a/bundle/manifests/sonataflow.org_sonataflowclusterplatforms.yaml +++ b/bundle/manifests/sonataflow.org_sonataflowclusterplatforms.yaml @@ -50,8 +50,8 @@ spec: SonataFlowClusterPlatform properties: platformRef: - description: PlatformRef defines which existing SonataFlowPlatform's - services should be used cluster-wide. + description: SonataFlowPlatformRef defines which existing SonataFlowPlatform's + supporting services should be used cluster-wide. properties: name: description: Name of the SonataFlowPlatform diff --git a/config/crd/bases/sonataflow.org_sonataflowclusterplatforms.yaml b/config/crd/bases/sonataflow.org_sonataflowclusterplatforms.yaml index 133647aae..a740ddea9 100644 --- a/config/crd/bases/sonataflow.org_sonataflowclusterplatforms.yaml +++ b/config/crd/bases/sonataflow.org_sonataflowclusterplatforms.yaml @@ -51,8 +51,8 @@ spec: SonataFlowClusterPlatform properties: platformRef: - description: PlatformRef defines which existing SonataFlowPlatform's - services should be used cluster-wide. + description: SonataFlowPlatformRef defines which existing SonataFlowPlatform's + supporting services should be used cluster-wide. properties: name: description: Name of the SonataFlowPlatform diff --git a/operator.yaml b/operator.yaml index 7c031b075..a016cf252 100644 --- a/operator.yaml +++ b/operator.yaml @@ -414,8 +414,8 @@ spec: SonataFlowClusterPlatform properties: platformRef: - description: PlatformRef defines which existing SonataFlowPlatform's - services should be used cluster-wide. + description: SonataFlowPlatformRef defines which existing SonataFlowPlatform's + supporting services should be used cluster-wide. properties: name: description: Name of the SonataFlowPlatform