Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KOGITO-8792] - Add events to build controllers and the ability to restart a build #318

Merged
merged 4 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion api/condition_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ const (
BuildFailedReason = "BuildFailedReason"
WaitingForBuildReason = "WaitingForBuild"
BuildIsRunningReason = "BuildIsRunning"
BuildSkipped = "BuildSkipped"
BuildSkippedReason = "BuildSkipped"
BuildSuccessfulReason = "BuildSuccessful"
BuildMarkedToRestartReason = "BuildMarkedToRestart"
)

// Condition describes the common structure for conditions in our types
Expand Down
21 changes: 6 additions & 15 deletions api/status_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,21 +283,12 @@ func (s *conditionManager) MarkUnknown(t ConditionType, reason, messageFormat st

// MarkFalse sets the status of t and the ready condition to False.
func (s *conditionManager) MarkFalse(t ConditionType, reason, messageFormat string, messageA ...interface{}) {
types := []ConditionType{t}
for _, cond := range s.dependents {
if cond == t {
types = append(types, s.ready)
}
}

for _, t := range types {
s.setCondition(Condition{
Type: t,
Status: corev1.ConditionFalse,
Reason: reason,
Message: fmt.Sprintf(messageFormat, messageA...),
})
}
s.setCondition(Condition{
Type: t,
Status: corev1.ConditionFalse,
Reason: reason,
Message: fmt.Sprintf(messageFormat, messageA...),
})
}

// InitializeConditions updates all Conditions in the ConditionSet to Unknown
Expand Down
2 changes: 1 addition & 1 deletion api/v1alpha08/sonataflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,7 @@ func (s *SonataFlowStatus) Manager() api.ConditionsManager {
}

func (s *SonataFlowStatus) IsWaitingForPlatform() bool {
cond := s.GetCondition(api.RunningConditionType)
cond := s.GetCondition(api.BuiltConditionType)
return cond.IsFalse() && cond.Reason == api.WaitingForPlatformReason
}

Expand Down
4 changes: 4 additions & 0 deletions api/v1alpha08/sonataflowbuild_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package v1alpha08
import (
"encoding/json"

"github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -50,6 +51,9 @@ const (
BuildPhaseError BuildPhase = "Error"
)

// BuildRestartAnnotation marks a SonataFlowBuild to restart
const BuildRestartAnnotation = metadata.Domain + "/restartBuild"

// BuildTemplate an abstraction over the actual build process performed by the platform.
// +k8s:openapi-gen=true
type BuildTemplate struct {
Expand Down
3 changes: 3 additions & 0 deletions controllers/builder/containerbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ func (c *containerBuilderManager) Schedule(build *operatorapi.SonataFlowBuild) e
return err
}
build.Status.BuildPhase = operatorapi.BuildPhase(containerBuilder.Status.Phase)
if len(build.Status.BuildPhase) == 0 {
build.Status.BuildPhase = operatorapi.BuildPhaseInitialization
}
build.Status.Error = containerBuilder.Status.Error
return nil
}
Expand Down
26 changes: 15 additions & 11 deletions controllers/platform/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package platform
import (
"context"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -60,24 +61,27 @@ func ConfigureDefaults(ctx context.Context, c client.Client, p *operatorapi.Sona
klog.V(log.I).InfoS("Maven Timeout set", "timeout", p.Spec.Build.Config.Timeout.Duration)
}

updatePlatform(ctx, c, p)

return nil
return createOrUpdatePlatform(ctx, c, p)
}

func updatePlatform(ctx context.Context, c client.Client, p *operatorapi.SonataFlowPlatform) {
func createOrUpdatePlatform(ctx context.Context, c client.Client, p *operatorapi.SonataFlowPlatform) error {
config := operatorapi.SonataFlowPlatform{}
errGet := c.Get(ctx, ctrl.ObjectKey{Namespace: p.Namespace, Name: p.Name}, &config)
if errGet != nil {
klog.V(log.E).ErrorS(errGet, "Error reading the Platform")
err := c.Get(ctx, ctrl.ObjectKey{Namespace: p.Namespace, Name: p.Name}, &config)
if errors.IsNotFound(err) {
klog.V(log.D).ErrorS(err, "Platform not found, creating it")
return c.Create(ctx, p)
} else if err != nil {
klog.V(log.E).ErrorS(err, "Error reading the Platform")
return err
}

config.Spec = p.Spec
config.Status.Cluster = p.Status.Cluster

updateErr := c.Update(ctx, &config)
if updateErr != nil {
klog.V(log.E).ErrorS(updateErr, "Error updating the BuildPlatform")
err = c.Update(ctx, &config)
if err != nil {
klog.V(log.E).ErrorS(err, "Error updating the BuildPlatform")
}
return err
}

func newDefaultSonataFlowPlatform(namespace string) *operatorapi.SonataFlowPlatform {
Expand Down
30 changes: 24 additions & 6 deletions controllers/profiles/common/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"

appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -36,26 +37,43 @@ import (
kubeutil "github.com/apache/incubator-kie-kogito-serverless-operator/utils/kubernetes"
)

var _ WorkflowDeploymentHandler = &deploymentHandler{}
var _ WorkflowDeploymentManager = &deploymentHandler{}

// WorkflowDeploymentHandler interface to handle workflow deployment features.
type WorkflowDeploymentHandler interface {
// WorkflowDeploymentManager interface to handle workflow deployment features.
type WorkflowDeploymentManager interface {
// SyncDeploymentStatus updates the workflow status aligned with the deployment counterpart.
// For example, if the deployment is in a failed state, it sets the status to
// Running `false` and the Message and Reason to human-readable format.
SyncDeploymentStatus(ctx context.Context, workflow *operatorapi.SonataFlow) (ctrl.Result, error)
// RolloutDeployment rolls out the underlying deployment object for the given workflow.
RolloutDeployment(ctx context.Context, workflow *operatorapi.SonataFlow) error
}

// DeploymentHandler creates a new WorkflowDeploymentHandler implementation based on the current profile.
func DeploymentHandler(c client.Client) WorkflowDeploymentHandler {
// DeploymentManager creates a new WorkflowDeploymentManager implementation based on the current profile.
func DeploymentManager(c client.Client) WorkflowDeploymentManager {
return &deploymentHandler{c: c}
}

type deploymentHandler struct {
c client.Client
}

func (d deploymentHandler) SyncDeploymentStatus(ctx context.Context, workflow *operatorapi.SonataFlow) (ctrl.Result, error) {
func (d *deploymentHandler) RolloutDeployment(ctx context.Context, workflow *operatorapi.SonataFlow) error {
deployment := &appsv1.Deployment{}
if err := d.c.Get(ctx, client.ObjectKeyFromObject(workflow), deployment); err != nil {
// Deployment not found, nothing to do.
if errors.IsNotFound(err) {
return nil
}
return err
}
if err := kubeutil.MarkDeploymentToRollout(deployment); err != nil {
return err
}
return d.c.Update(ctx, deployment)
}

func (d *deploymentHandler) SyncDeploymentStatus(ctx context.Context, workflow *operatorapi.SonataFlow) (ctrl.Result, error) {
deployment := &appsv1.Deployment{}
if err := d.c.Get(ctx, client.ObjectKeyFromObject(workflow), deployment); err != nil {
// we should have the deployment by this time, so even if the error above is not found, we should halt.
Expand Down
24 changes: 10 additions & 14 deletions controllers/profiles/common/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"

"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/discovery"
"k8s.io/client-go/tools/record"

"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -36,8 +37,9 @@ import (

// StateSupport is the shared structure with common accessors used throughout the whole reconciliation profiles
type StateSupport struct {
C client.Client
Catalog discovery.ServiceCatalog
C client.Client
Catalog discovery.ServiceCatalog
Recorder record.EventRecorder
}

// PerformStatusUpdate updates the SonataFlow Status conditions
Expand All @@ -51,29 +53,23 @@ func (s StateSupport) PerformStatusUpdate(ctx context.Context, workflow *operato
return true, err
}

// PostReconcile function to perform all the other operations required after the reconciliation - placeholder for null pattern usages
func (s StateSupport) PostReconcile(ctx context.Context, workflow *operatorapi.SonataFlow) error {
//By default, we don't want to perform anything after the reconciliation, and so we will simply return no error
return nil
}

// BaseReconciler is the base structure used by every reconciliation profile.
// Use NewBaseProfileReconciler to build a new reference.
type BaseReconciler struct {
// Reconciler is the base structure used by every reconciliation profile.
// Use NewReconciler to build a new reference.
type Reconciler struct {
*StateSupport
reconciliationStateMachine *ReconciliationStateMachine
objects []client.Object
}

func NewBaseProfileReconciler(support *StateSupport, stateMachine *ReconciliationStateMachine) BaseReconciler {
return BaseReconciler{
func NewReconciler(support *StateSupport, stateMachine *ReconciliationStateMachine) Reconciler {
return Reconciler{
StateSupport: support,
reconciliationStateMachine: stateMachine,
}
}

// Reconcile does the actual reconciliation algorithm based on a set of ReconciliationState
func (b *BaseReconciler) Reconcile(ctx context.Context, workflow *operatorapi.SonataFlow) (ctrl.Result, error) {
func (b *Reconciler) Reconcile(ctx context.Context, workflow *operatorapi.SonataFlow) (ctrl.Result, error) {
workflow.Status.Manager().InitializeConditions()
result, objects, err := b.reconciliationStateMachine.do(ctx, workflow)
if err != nil {
Expand Down
12 changes: 7 additions & 5 deletions controllers/profiles/dev/profile_dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package dev

import (
"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/discovery"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand All @@ -34,17 +35,18 @@ import (
var _ profiles.ProfileReconciler = &developmentProfile{}

type developmentProfile struct {
common.BaseReconciler
common.Reconciler
}

func (d developmentProfile) GetProfile() metadata.ProfileType {
return metadata.DevProfile
}

func NewProfileReconciler(client client.Client) profiles.ProfileReconciler {
func NewProfileReconciler(client client.Client, recorder record.EventRecorder) profiles.ProfileReconciler {
support := &common.StateSupport{
C: client,
Catalog: discovery.NewServiceCatalog(client),
C: client,
Catalog: discovery.NewServiceCatalog(client),
Recorder: recorder,
}

var ensurers *objectEnsurers
Expand All @@ -63,7 +65,7 @@ func NewProfileReconciler(client client.Client) profiles.ProfileReconciler {
&recoverFromFailureState{StateSupport: support})

profile := &developmentProfile{
BaseReconciler: common.NewBaseProfileReconciler(support, stateMachine),
Reconciler: common.NewReconciler(support, stateMachine),
}

klog.V(log.I).InfoS("Reconciling in", "profile", profile.GetProfile())
Expand Down
18 changes: 9 additions & 9 deletions controllers/profiles/dev/profile_dev_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func Test_OverrideStartupProbe(t *testing.T) {

client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build()

devReconciler := NewProfileReconciler(client)
devReconciler := NewProfileReconciler(client, test.NewFakeRecorder())

result, err := devReconciler.Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
Expand All @@ -82,7 +82,7 @@ func Test_recoverFromFailureNoDeployment(t *testing.T) {
workflow.Status.Manager().MarkFalse(api.RunningConditionType, api.DeploymentFailureReason, "")
client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build()

reconciler := NewProfileReconciler(client)
reconciler := NewProfileReconciler(client, test.NewFakeRecorder())

// we are in failed state and have no objects
result, err := reconciler.Reconcile(context.TODO(), workflow)
Expand Down Expand Up @@ -123,7 +123,7 @@ func Test_newDevProfile(t *testing.T) {

client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build()

devReconciler := NewProfileReconciler(client)
devReconciler := NewProfileReconciler(client, test.NewFakeRecorder())

result, err := devReconciler.Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
Expand Down Expand Up @@ -196,7 +196,7 @@ func Test_newDevProfile(t *testing.T) {
func Test_devProfileImageDefaultsNoPlatform(t *testing.T) {
workflow := test.GetBaseSonataFlowWithDevProfile(t.Name())
client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build()
devReconciler := NewProfileReconciler(client)
devReconciler := NewProfileReconciler(client, test.NewFakeRecorder())

result, err := devReconciler.Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
Expand All @@ -213,7 +213,7 @@ func Test_devProfileWithImageSnapshotOverrideWithPlatform(t *testing.T) {
platform := test.GetBasePlatformWithDevBaseImageInReadyPhase(workflow.Namespace)

client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow, platform).WithStatusSubresource(workflow, platform).Build()
devReconciler := NewProfileReconciler(client)
devReconciler := NewProfileReconciler(client, test.NewFakeRecorder())

result, err := devReconciler.Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
Expand All @@ -230,7 +230,7 @@ func Test_devProfileWithWPlatformWithoutDevBaseImageAndWithBaseImage(t *testing.
platform := test.GetBasePlatformWithBaseImageInReadyPhase(workflow.Namespace)

client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow, platform).WithStatusSubresource(workflow, platform).Build()
devReconciler := NewProfileReconciler(client)
devReconciler := NewProfileReconciler(client, test.NewFakeRecorder())

result, err := devReconciler.Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
Expand All @@ -247,7 +247,7 @@ func Test_devProfileWithPlatformWithoutDevBaseImageAndWithoutBaseImage(t *testin
platform := test.GetBasePlatformInReadyPhase(workflow.Namespace)

client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow, platform).WithStatusSubresource(workflow, platform).Build()
devReconciler := NewProfileReconciler(client)
devReconciler := NewProfileReconciler(client, test.NewFakeRecorder())

result, err := devReconciler.Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
Expand All @@ -266,7 +266,7 @@ func Test_newDevProfileWithExternalConfigMaps(t *testing.T) {

client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build()

devReconciler := NewProfileReconciler(client)
devReconciler := NewProfileReconciler(client, test.NewFakeRecorder())

camelXmlRouteFileName := "camelroute-xml"
xmlRoute := `<route routeConfigurationId="xmlError">
Expand Down Expand Up @@ -380,7 +380,7 @@ func Test_VolumeWithCapitalizedPaths(t *testing.T) {

client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow, configMap).WithStatusSubresource(workflow, configMap).Build()

devReconciler := NewProfileReconciler(client)
devReconciler := NewProfileReconciler(client, test.NewFakeRecorder())

result, err := devReconciler.Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
Expand Down
12 changes: 11 additions & 1 deletion controllers/profiles/dev/states_dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ func (e *ensureRunningWorkflowState) Do(ctx context.Context, workflow *operatora
return ctrl.Result{RequeueAfter: constants.RequeueAfterIsRunning}, objs, nil
}

func (e *ensureRunningWorkflowState) PostReconcile(ctx context.Context, workflow *operatorapi.SonataFlow) error {
//By default, we don't want to perform anything after the reconciliation, and so we will simply return no error
return nil
}

type followWorkflowDeploymentState struct {
*common.StateSupport
enrichers *statusEnrichers
Expand All @@ -145,7 +150,7 @@ func (f *followWorkflowDeploymentState) CanReconcile(workflow *operatorapi.Sonat
}

func (f *followWorkflowDeploymentState) Do(ctx context.Context, workflow *operatorapi.SonataFlow) (ctrl.Result, []client.Object, error) {
result, err := common.DeploymentHandler(f.C).SyncDeploymentStatus(ctx, workflow)
result, err := common.DeploymentManager(f.C).SyncDeploymentStatus(ctx, workflow)
if err != nil {
return ctrl.Result{RequeueAfter: constants.RequeueAfterFailure}, nil, err
}
Expand Down Expand Up @@ -247,3 +252,8 @@ func (r *recoverFromFailureState) Do(ctx context.Context, workflow *operatorapi.
}
return ctrl.Result{RequeueAfter: constants.RequeueRecoverDeploymentErrorInterval}, nil, nil
}

func (r *recoverFromFailureState) PostReconcile(ctx context.Context, workflow *operatorapi.SonataFlow) error {
//By default, we don't want to perform anything after the reconciliation, and so we will simply return no error
return nil
}
Loading