Skip to content

Commit

Permalink
[issue-368] knative integration with DataIndex and JobService: fix wo…
Browse files Browse the repository at this point in the history
…rkflow deletion hanging issue
  • Loading branch information
jianrongzhang89 committed Sep 11, 2024
1 parent 748810a commit 728b8ea
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 56 deletions.
1 change: 0 additions & 1 deletion controllers/profiles/common/constants/platform_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ const (
JobServiceJobEventsPath = "/v2/jobs/events"
JobServiceLeaderCheckExpirationInSeconds = "kogito.jobs-service.management.leader-check.expiration-in-seconds"
DefaultJobServiceLeaderCheckExpirationInSeconds = "60"
JobServiceRequestEventsURL = "mp.messaging.outgoing.kogito-job-service-job-request-events.url"

KogitoProcessInstancesEventsConnector = "mp.messaging.outgoing.kogito-processinstances-events.connector"
KogitoProcessInstancesEventsMethod = "mp.messaging.outgoing.kogito-processinstances-events.method"
Expand Down
31 changes: 10 additions & 21 deletions controllers/profiles/common/object_creators.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ const (
healthStartedFailureThreshold = 5
healthStartedPeriodSeconds = 15
healthStartedInitialDelaySeconds = 10
healthSuccessThreshold = 1
)

// DeploymentCreator is an objectCreator for a base Kubernetes Deployments for profiles that need to deploy the workflow on a vanilla deployment.
Expand Down Expand Up @@ -186,44 +185,34 @@ func defaultContainer(workflow *operatorapi.SonataFlow, plf *operatorapi.SonataF
LivenessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: constants.QuarkusHealthPathLive,
Port: variables.DefaultHTTPWorkflowPortIntStr,
Scheme: corev1.URISchemeHTTP,
Path: constants.QuarkusHealthPathLive,
Port: variables.DefaultHTTPWorkflowPortIntStr,
},
},
InitialDelaySeconds: healthStartedInitialDelaySeconds,
TimeoutSeconds: healthTimeoutSeconds,
FailureThreshold: healthStartedFailureThreshold,
PeriodSeconds: healthStartedPeriodSeconds,
SuccessThreshold: healthSuccessThreshold,
TimeoutSeconds: healthTimeoutSeconds,
PeriodSeconds: healthStartedPeriodSeconds,
},
ReadinessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: constants.QuarkusHealthPathReady,
Port: variables.DefaultHTTPWorkflowPortIntStr,
Scheme: corev1.URISchemeHTTP,
Path: constants.QuarkusHealthPathReady,
Port: variables.DefaultHTTPWorkflowPortIntStr,
},
},
InitialDelaySeconds: healthStartedInitialDelaySeconds,
TimeoutSeconds: healthTimeoutSeconds,
FailureThreshold: healthStartedFailureThreshold,
PeriodSeconds: healthStartedPeriodSeconds,
SuccessThreshold: healthSuccessThreshold,
TimeoutSeconds: healthTimeoutSeconds,
PeriodSeconds: healthStartedPeriodSeconds,
},
StartupProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: constants.QuarkusHealthPathStarted,
Port: variables.DefaultHTTPWorkflowPortIntStr,
Scheme: corev1.URISchemeHTTP,
Path: constants.QuarkusHealthPathStarted,
Port: variables.DefaultHTTPWorkflowPortIntStr,
},
},
InitialDelaySeconds: healthStartedInitialDelaySeconds,
TimeoutSeconds: healthTimeoutSeconds,
FailureThreshold: healthStartedFailureThreshold,
PeriodSeconds: healthStartedPeriodSeconds,
SuccessThreshold: healthSuccessThreshold,
},
SecurityContext: kubeutil.SecurityDefaults(),
}
Expand Down
28 changes: 9 additions & 19 deletions controllers/sonataflow_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,25 +128,15 @@ func (r *SonataFlowReconciler) setDefaults(workflow *operatorapi.SonataFlow) {
}

func (r *SonataFlowReconciler) cleanupTriggers(ctx context.Context, workflow *operatorapi.SonataFlow) error {
plf, _ := platform.GetActivePlatform(ctx, r.Client, workflow.Namespace)
if plf == nil || len(plf.Status.Triggers) == 0 {
return nil
}
avail, err := knative.GetKnativeAvailability(r.Config)
if err != nil {
return err
}
if avail.Eventing {
for _, triggerRef := range workflow.Status.Triggers {
trigger := &eventingv1.Trigger{
ObjectMeta: metav1.ObjectMeta{
Name: triggerRef.Name,
Namespace: triggerRef.Namespace,
},
}
if err := r.Client.Delete(ctx, trigger); err != nil {
return err
}
for _, triggerRef := range workflow.Status.Triggers {
trigger := &eventingv1.Trigger{
ObjectMeta: metav1.ObjectMeta{
Name: triggerRef.Name,
Namespace: triggerRef.Namespace,
},
}
if err := r.Client.Delete(ctx, trigger); err != nil && !errors.IsNotFound(err) {
return err
}
}
controllerutil.RemoveFinalizer(workflow, constants.TriggerFinalizer)
Expand Down
24 changes: 9 additions & 15 deletions controllers/sonataflowplatform_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,21 +185,15 @@ func (r *SonataFlowPlatformReconciler) Reconcile(ctx context.Context, req reconc
}

func (r *SonataFlowPlatformReconciler) cleanupTriggers(ctx context.Context, platform *operatorapi.SonataFlowPlatform) error {
avail, err := knative.GetKnativeAvailability(r.Config)
if err != nil {
return err
}
if avail.Eventing {
for _, triggerRef := range platform.Status.Triggers {
trigger := &eventingv1.Trigger{
ObjectMeta: metav1.ObjectMeta{
Name: triggerRef.Name,
Namespace: triggerRef.Namespace,
},
}
if err := r.Client.Delete(ctx, trigger); err != nil {
return err
}
for _, triggerRef := range platform.Status.Triggers {
trigger := &eventingv1.Trigger{
ObjectMeta: metav1.ObjectMeta{
Name: triggerRef.Name,
Namespace: triggerRef.Namespace,
},
}
if err := r.Client.Delete(ctx, trigger); err != nil && !errors.IsNotFound(err) {
return err
}
}
controllerutil.RemoveFinalizer(platform, constants.TriggerFinalizer)
Expand Down

0 comments on commit 728b8ea

Please sign in to comment.