Skip to content

Commit

Permalink
[KOGITO-8792] - Add events to build controllers and the ability to re…
Browse files Browse the repository at this point in the history
…start a build (#318)

* [KOGITO-8792] - Add events to build controllers and the ability to restart a build

Signed-off-by: Ricardo Zanini <[email protected]>

* Add the ability to restart builds and signal to workflows

Signed-off-by: Ricardo Zanini <[email protected]>

* Rollout deployment after a successful build

Signed-off-by: Ricardo Zanini <[email protected]>

* Fix rollout deployment once a build finishes

Signed-off-by: Ricardo Zanini <[email protected]>

---------

Signed-off-by: Ricardo Zanini <[email protected]>
  • Loading branch information
ricardozanini authored Dec 20, 2023
1 parent 2120e30 commit 08c8854
Show file tree
Hide file tree
Showing 26 changed files with 346 additions and 208 deletions.
4 changes: 3 additions & 1 deletion api/condition_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ const (
BuildFailedReason = "BuildFailedReason"
WaitingForBuildReason = "WaitingForBuild"
BuildIsRunningReason = "BuildIsRunning"
BuildSkipped = "BuildSkipped"
BuildSkippedReason = "BuildSkipped"
BuildSuccessfulReason = "BuildSuccessful"
BuildMarkedToRestartReason = "BuildMarkedToRestart"
)

// Condition describes the common structure for conditions in our types
Expand Down
21 changes: 6 additions & 15 deletions api/status_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,21 +283,12 @@ func (s *conditionManager) MarkUnknown(t ConditionType, reason, messageFormat st

// MarkFalse sets the status of t and the ready condition to False.
func (s *conditionManager) MarkFalse(t ConditionType, reason, messageFormat string, messageA ...interface{}) {
types := []ConditionType{t}
for _, cond := range s.dependents {
if cond == t {
types = append(types, s.ready)
}
}

for _, t := range types {
s.setCondition(Condition{
Type: t,
Status: corev1.ConditionFalse,
Reason: reason,
Message: fmt.Sprintf(messageFormat, messageA...),
})
}
s.setCondition(Condition{
Type: t,
Status: corev1.ConditionFalse,
Reason: reason,
Message: fmt.Sprintf(messageFormat, messageA...),
})
}

// InitializeConditions updates all Conditions in the ConditionSet to Unknown
Expand Down
2 changes: 1 addition & 1 deletion api/v1alpha08/sonataflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,7 @@ func (s *SonataFlowStatus) Manager() api.ConditionsManager {
}

func (s *SonataFlowStatus) IsWaitingForPlatform() bool {
cond := s.GetCondition(api.RunningConditionType)
cond := s.GetCondition(api.BuiltConditionType)
return cond.IsFalse() && cond.Reason == api.WaitingForPlatformReason
}

Expand Down
4 changes: 4 additions & 0 deletions api/v1alpha08/sonataflowbuild_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package v1alpha08
import (
"encoding/json"

"github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -50,6 +51,9 @@ const (
BuildPhaseError BuildPhase = "Error"
)

// BuildRestartAnnotation marks a SonataFlowBuild to restart
const BuildRestartAnnotation = metadata.Domain + "/restartBuild"

// BuildTemplate an abstraction over the actual build process performed by the platform.
// +k8s:openapi-gen=true
type BuildTemplate struct {
Expand Down
3 changes: 3 additions & 0 deletions controllers/builder/containerbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ func (c *containerBuilderManager) Schedule(build *operatorapi.SonataFlowBuild) e
return err
}
build.Status.BuildPhase = operatorapi.BuildPhase(containerBuilder.Status.Phase)
if len(build.Status.BuildPhase) == 0 {
build.Status.BuildPhase = operatorapi.BuildPhaseInitialization
}
build.Status.Error = containerBuilder.Status.Error
return nil
}
Expand Down
26 changes: 15 additions & 11 deletions controllers/platform/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package platform
import (
"context"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -60,24 +61,27 @@ func ConfigureDefaults(ctx context.Context, c client.Client, p *operatorapi.Sona
klog.V(log.I).InfoS("Maven Timeout set", "timeout", p.Spec.Build.Config.Timeout.Duration)
}

updatePlatform(ctx, c, p)

return nil
return createOrUpdatePlatform(ctx, c, p)
}

func updatePlatform(ctx context.Context, c client.Client, p *operatorapi.SonataFlowPlatform) {
func createOrUpdatePlatform(ctx context.Context, c client.Client, p *operatorapi.SonataFlowPlatform) error {
config := operatorapi.SonataFlowPlatform{}
errGet := c.Get(ctx, ctrl.ObjectKey{Namespace: p.Namespace, Name: p.Name}, &config)
if errGet != nil {
klog.V(log.E).ErrorS(errGet, "Error reading the Platform")
err := c.Get(ctx, ctrl.ObjectKey{Namespace: p.Namespace, Name: p.Name}, &config)
if errors.IsNotFound(err) {
klog.V(log.D).ErrorS(err, "Platform not found, creating it")
return c.Create(ctx, p)
} else if err != nil {
klog.V(log.E).ErrorS(err, "Error reading the Platform")
return err
}

config.Spec = p.Spec
config.Status.Cluster = p.Status.Cluster

updateErr := c.Update(ctx, &config)
if updateErr != nil {
klog.V(log.E).ErrorS(updateErr, "Error updating the BuildPlatform")
err = c.Update(ctx, &config)
if err != nil {
klog.V(log.E).ErrorS(err, "Error updating the BuildPlatform")
}
return err
}

func newDefaultSonataFlowPlatform(namespace string) *operatorapi.SonataFlowPlatform {
Expand Down
30 changes: 24 additions & 6 deletions controllers/profiles/common/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"

appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -36,26 +37,43 @@ import (
kubeutil "github.com/apache/incubator-kie-kogito-serverless-operator/utils/kubernetes"
)

var _ WorkflowDeploymentHandler = &deploymentHandler{}
var _ WorkflowDeploymentManager = &deploymentHandler{}

// WorkflowDeploymentHandler interface to handle workflow deployment features.
type WorkflowDeploymentHandler interface {
// WorkflowDeploymentManager interface to handle workflow deployment features.
type WorkflowDeploymentManager interface {
// SyncDeploymentStatus updates the workflow status aligned with the deployment counterpart.
// For example, if the deployment is in a failed state, it sets the status to
// Running `false` and the Message and Reason to human-readable format.
SyncDeploymentStatus(ctx context.Context, workflow *operatorapi.SonataFlow) (ctrl.Result, error)
// RolloutDeployment rolls out the underlying deployment object for the given workflow.
RolloutDeployment(ctx context.Context, workflow *operatorapi.SonataFlow) error
}

// DeploymentHandler creates a new WorkflowDeploymentHandler implementation based on the current profile.
func DeploymentHandler(c client.Client) WorkflowDeploymentHandler {
// DeploymentManager creates a new WorkflowDeploymentManager implementation based on the current profile.
func DeploymentManager(c client.Client) WorkflowDeploymentManager {
return &deploymentHandler{c: c}
}

type deploymentHandler struct {
c client.Client
}

func (d deploymentHandler) SyncDeploymentStatus(ctx context.Context, workflow *operatorapi.SonataFlow) (ctrl.Result, error) {
func (d *deploymentHandler) RolloutDeployment(ctx context.Context, workflow *operatorapi.SonataFlow) error {
deployment := &appsv1.Deployment{}
if err := d.c.Get(ctx, client.ObjectKeyFromObject(workflow), deployment); err != nil {
// Deployment not found, nothing to do.
if errors.IsNotFound(err) {
return nil
}
return err
}
if err := kubeutil.MarkDeploymentToRollout(deployment); err != nil {
return err
}
return d.c.Update(ctx, deployment)
}

func (d *deploymentHandler) SyncDeploymentStatus(ctx context.Context, workflow *operatorapi.SonataFlow) (ctrl.Result, error) {
deployment := &appsv1.Deployment{}
if err := d.c.Get(ctx, client.ObjectKeyFromObject(workflow), deployment); err != nil {
// we should have the deployment by this time, so even if the error above is not found, we should halt.
Expand Down
24 changes: 10 additions & 14 deletions controllers/profiles/common/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"

"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/discovery"
"k8s.io/client-go/tools/record"

"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -36,8 +37,9 @@ import (

// StateSupport is the shared structure with common accessors used throughout the whole reconciliation profiles
type StateSupport struct {
C client.Client
Catalog discovery.ServiceCatalog
C client.Client
Catalog discovery.ServiceCatalog
Recorder record.EventRecorder
}

// PerformStatusUpdate updates the SonataFlow Status conditions
Expand All @@ -51,29 +53,23 @@ func (s StateSupport) PerformStatusUpdate(ctx context.Context, workflow *operato
return true, err
}

// PostReconcile function to perform all the other operations required after the reconciliation - placeholder for null pattern usages
func (s StateSupport) PostReconcile(ctx context.Context, workflow *operatorapi.SonataFlow) error {
//By default, we don't want to perform anything after the reconciliation, and so we will simply return no error
return nil
}

// BaseReconciler is the base structure used by every reconciliation profile.
// Use NewBaseProfileReconciler to build a new reference.
type BaseReconciler struct {
// Reconciler is the base structure used by every reconciliation profile.
// Use NewReconciler to build a new reference.
type Reconciler struct {
*StateSupport
reconciliationStateMachine *ReconciliationStateMachine
objects []client.Object
}

func NewBaseProfileReconciler(support *StateSupport, stateMachine *ReconciliationStateMachine) BaseReconciler {
return BaseReconciler{
func NewReconciler(support *StateSupport, stateMachine *ReconciliationStateMachine) Reconciler {
return Reconciler{
StateSupport: support,
reconciliationStateMachine: stateMachine,
}
}

// Reconcile does the actual reconciliation algorithm based on a set of ReconciliationState
func (b *BaseReconciler) Reconcile(ctx context.Context, workflow *operatorapi.SonataFlow) (ctrl.Result, error) {
func (b *Reconciler) Reconcile(ctx context.Context, workflow *operatorapi.SonataFlow) (ctrl.Result, error) {
workflow.Status.Manager().InitializeConditions()
result, objects, err := b.reconciliationStateMachine.do(ctx, workflow)
if err != nil {
Expand Down
12 changes: 7 additions & 5 deletions controllers/profiles/dev/profile_dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package dev

import (
"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/discovery"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand All @@ -34,17 +35,18 @@ import (
var _ profiles.ProfileReconciler = &developmentProfile{}

type developmentProfile struct {
common.BaseReconciler
common.Reconciler
}

func (d developmentProfile) GetProfile() metadata.ProfileType {
return metadata.DevProfile
}

func NewProfileReconciler(client client.Client) profiles.ProfileReconciler {
func NewProfileReconciler(client client.Client, recorder record.EventRecorder) profiles.ProfileReconciler {
support := &common.StateSupport{
C: client,
Catalog: discovery.NewServiceCatalog(client),
C: client,
Catalog: discovery.NewServiceCatalog(client),
Recorder: recorder,
}

var ensurers *objectEnsurers
Expand All @@ -63,7 +65,7 @@ func NewProfileReconciler(client client.Client) profiles.ProfileReconciler {
&recoverFromFailureState{StateSupport: support})

profile := &developmentProfile{
BaseReconciler: common.NewBaseProfileReconciler(support, stateMachine),
Reconciler: common.NewReconciler(support, stateMachine),
}

klog.V(log.I).InfoS("Reconciling in", "profile", profile.GetProfile())
Expand Down
18 changes: 9 additions & 9 deletions controllers/profiles/dev/profile_dev_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func Test_OverrideStartupProbe(t *testing.T) {

client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build()

devReconciler := NewProfileReconciler(client)
devReconciler := NewProfileReconciler(client, test.NewFakeRecorder())

result, err := devReconciler.Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
Expand All @@ -82,7 +82,7 @@ func Test_recoverFromFailureNoDeployment(t *testing.T) {
workflow.Status.Manager().MarkFalse(api.RunningConditionType, api.DeploymentFailureReason, "")
client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build()

reconciler := NewProfileReconciler(client)
reconciler := NewProfileReconciler(client, test.NewFakeRecorder())

// we are in failed state and have no objects
result, err := reconciler.Reconcile(context.TODO(), workflow)
Expand Down Expand Up @@ -123,7 +123,7 @@ func Test_newDevProfile(t *testing.T) {

client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build()

devReconciler := NewProfileReconciler(client)
devReconciler := NewProfileReconciler(client, test.NewFakeRecorder())

result, err := devReconciler.Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
Expand Down Expand Up @@ -196,7 +196,7 @@ func Test_newDevProfile(t *testing.T) {
func Test_devProfileImageDefaultsNoPlatform(t *testing.T) {
workflow := test.GetBaseSonataFlowWithDevProfile(t.Name())
client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build()
devReconciler := NewProfileReconciler(client)
devReconciler := NewProfileReconciler(client, test.NewFakeRecorder())

result, err := devReconciler.Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
Expand All @@ -213,7 +213,7 @@ func Test_devProfileWithImageSnapshotOverrideWithPlatform(t *testing.T) {
platform := test.GetBasePlatformWithDevBaseImageInReadyPhase(workflow.Namespace)

client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow, platform).WithStatusSubresource(workflow, platform).Build()
devReconciler := NewProfileReconciler(client)
devReconciler := NewProfileReconciler(client, test.NewFakeRecorder())

result, err := devReconciler.Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
Expand All @@ -230,7 +230,7 @@ func Test_devProfileWithWPlatformWithoutDevBaseImageAndWithBaseImage(t *testing.
platform := test.GetBasePlatformWithBaseImageInReadyPhase(workflow.Namespace)

client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow, platform).WithStatusSubresource(workflow, platform).Build()
devReconciler := NewProfileReconciler(client)
devReconciler := NewProfileReconciler(client, test.NewFakeRecorder())

result, err := devReconciler.Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
Expand All @@ -247,7 +247,7 @@ func Test_devProfileWithPlatformWithoutDevBaseImageAndWithoutBaseImage(t *testin
platform := test.GetBasePlatformInReadyPhase(workflow.Namespace)

client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow, platform).WithStatusSubresource(workflow, platform).Build()
devReconciler := NewProfileReconciler(client)
devReconciler := NewProfileReconciler(client, test.NewFakeRecorder())

result, err := devReconciler.Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
Expand All @@ -266,7 +266,7 @@ func Test_newDevProfileWithExternalConfigMaps(t *testing.T) {

client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build()

devReconciler := NewProfileReconciler(client)
devReconciler := NewProfileReconciler(client, test.NewFakeRecorder())

camelXmlRouteFileName := "camelroute-xml"
xmlRoute := `<route routeConfigurationId="xmlError">
Expand Down Expand Up @@ -380,7 +380,7 @@ func Test_VolumeWithCapitalizedPaths(t *testing.T) {

client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow, configMap).WithStatusSubresource(workflow, configMap).Build()

devReconciler := NewProfileReconciler(client)
devReconciler := NewProfileReconciler(client, test.NewFakeRecorder())

result, err := devReconciler.Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
Expand Down
12 changes: 11 additions & 1 deletion controllers/profiles/dev/states_dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ func (e *ensureRunningWorkflowState) Do(ctx context.Context, workflow *operatora
return ctrl.Result{RequeueAfter: constants.RequeueAfterIsRunning}, objs, nil
}

func (e *ensureRunningWorkflowState) PostReconcile(ctx context.Context, workflow *operatorapi.SonataFlow) error {
//By default, we don't want to perform anything after the reconciliation, and so we will simply return no error
return nil
}

type followWorkflowDeploymentState struct {
*common.StateSupport
enrichers *statusEnrichers
Expand All @@ -145,7 +150,7 @@ func (f *followWorkflowDeploymentState) CanReconcile(workflow *operatorapi.Sonat
}

func (f *followWorkflowDeploymentState) Do(ctx context.Context, workflow *operatorapi.SonataFlow) (ctrl.Result, []client.Object, error) {
result, err := common.DeploymentHandler(f.C).SyncDeploymentStatus(ctx, workflow)
result, err := common.DeploymentManager(f.C).SyncDeploymentStatus(ctx, workflow)
if err != nil {
return ctrl.Result{RequeueAfter: constants.RequeueAfterFailure}, nil, err
}
Expand Down Expand Up @@ -247,3 +252,8 @@ func (r *recoverFromFailureState) Do(ctx context.Context, workflow *operatorapi.
}
return ctrl.Result{RequeueAfter: constants.RequeueRecoverDeploymentErrorInterval}, nil, nil
}

func (r *recoverFromFailureState) PostReconcile(ctx context.Context, workflow *operatorapi.SonataFlow) error {
//By default, we don't want to perform anything after the reconciliation, and so we will simply return no error
return nil
}
Loading

0 comments on commit 08c8854

Please sign in to comment.