Skip to content

Commit

Permalink
[issue-368] knative integration with DataIndex and JobService: fix e2…
Browse files Browse the repository at this point in the history
…e test errors and use temp images for DI and JS
  • Loading branch information
jianrongzhang89 committed Aug 9, 2024
1 parent 6bc17ba commit 4480a80
Show file tree
Hide file tree
Showing 30 changed files with 948 additions and 80 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ generate-all: generate generate-deploy bundle addheaders vet fmt

.PHONY: test-e2e # You will need to have a Minikube/Kind cluster up in running to run this target, and run container-builder before the test
test-e2e:
go test ./test/e2e/* -v -ginkgo.v -ginkgo.no-color -ginkgo.junit-report=./e2e-test-report.xml -timeout 75m
go test ./test/e2e/* -v -ginkgo.v -ginkgo.no-color -ginkgo.timeout=90m -ginkgo.junit-report=./e2e-test-report.xml -timeout 90m

.PHONY: before-pr
before-pr: test generate-all
Expand Down
22 changes: 22 additions & 0 deletions controllers/knative/knative.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/apache/incubator-kie-kogito-serverless-operator/utils"
kubeutil "github.com/apache/incubator-kie-kogito-serverless-operator/utils/kubernetes"
"github.com/apache/incubator-kie-kogito-serverless-operator/workflowproj"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -270,3 +271,24 @@ func MapTriggerToPlatformRequests(ctx context.Context, object client.Object) []r
}
return nil
}

// Does the deployment have K_SINK injected?
func CheckKSinkInjected(name, namespace string) (bool, error) {
deployment := &appsv1.Deployment{}
if err := utils.GetClient().Get(context.TODO(), types.NamespacedName{Name: name, Namespace: namespace}, deployment); err != nil {
if errors.IsNotFound(err) {
return false, nil // deployment hasn't been created yet
}
return false, err
}
injected := false
visitContainers(&deployment.Spec.Template.Spec, func(container *corev1.Container) {
for _, e := range container.Env {
if e.Name == "K_SINK" {
injected = true
break
}
}
})
return injected, nil // K_SINK has not been injected yet
}
16 changes: 15 additions & 1 deletion controllers/platform/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package platform

import (
"context"
"fmt"

operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/client"
Expand Down Expand Up @@ -144,6 +145,13 @@ func createOrUpdateDeployment(ctx context.Context, client client.Client, platfor
serviceContainer.Name = psh.GetContainerName()

replicas := psh.GetReplicaCount()
kSinkInjected, err := psh.CheckKSinkInjected()
if err != nil {
return nil
}
if !kSinkInjected {
replicas = 0 // Wait for K_SINK injection
}
lbl, selectorLbl := getLabels(platform, psh)
serviceDeploymentSpec := appsv1.DeploymentSpec{
Selector: &metav1.LabelSelector{
Expand Down Expand Up @@ -200,7 +208,6 @@ func createOrUpdateDeployment(ctx context.Context, client client.Client, platfor
} else {
klog.V(log.I).InfoS("Deployment successfully reconciled", "operation", op)
}

return nil
}

Expand Down Expand Up @@ -341,6 +348,13 @@ func createOrUpdateKnativeResources(ctx context.Context, client client.Client, p
if err != nil {
return err
}
kSinkInjected, err := psh.CheckKSinkInjected()
if err != nil {
return err
}
if !kSinkInjected {
return fmt.Errorf("waiting for K_SINK injection for %s to complete", psh.GetServiceName())
}
}
}
return nil
Expand Down
31 changes: 21 additions & 10 deletions controllers/platform/services/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ import (
const (
quarkusHibernateORMDatabaseGeneration string = "QUARKUS_HIBERNATE_ORM_DATABASE_GENERATION"
quarkusFlywayMigrateAtStart string = "QUARKUS_FLYWAY_MIGRATE_AT_START"
pathProcesses string = "/processes"
pathDefinitions string = "/definitions"
pathJobs string = "/jobs"
)

type PlatformServiceHandler interface {
Expand Down Expand Up @@ -105,6 +102,9 @@ type PlatformServiceHandler interface {
SetServiceUrlInWorkflowStatus(workflow *operatorapi.SonataFlow)

GetServiceSource() *duckv1.Destination

// Check if K_SINK has injected for Job Service. No Op for Data Index
CheckKSinkInjected() (bool, error)
}

type DataIndexHandler struct {
Expand Down Expand Up @@ -276,6 +276,10 @@ func (d *DataIndexHandler) GenerateServiceProperties() (*properties.Properties,
return props, nil
}

func (d *DataIndexHandler) CheckKSinkInjected() (bool, error) {
return true, nil // No op
}

type JobServiceHandler struct {
platform *operatorapi.SonataFlowPlatform
}
Expand Down Expand Up @@ -599,13 +603,13 @@ func (d *DataIndexHandler) GenerateKnativeResources(platform *operatorapi.Sonata
}
serviceName := d.GetServiceName()
return []client.Object{
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-error", "ProcessInstanceErrorDataEvent", pathProcesses, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-node", "ProcessInstanceNodeDataEvent", pathProcesses, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-sla", "ProcessInstanceSLADataEvent", pathProcesses, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-state", "ProcessInstanceStateDataEvent", pathProcesses, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-variable", "ProcessInstanceVariableDataEvent", pathProcesses, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-definition", "ProcessDefinitionEvent", pathDefinitions, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "jobs", "JobEvent", pathJobs, platform)}, nil
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-error", "ProcessInstanceErrorDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-node", "ProcessInstanceNodeDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-sla", "ProcessInstanceSLADataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-state", "ProcessInstanceStateDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-variable", "ProcessInstanceVariableDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "process-definition", "ProcessDefinitionEvent", constants.KogitoProcessDefinitionsEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName, "jobs", "JobEvent", constants.KogitoJobsPath, platform)}, nil
}

func (d JobServiceHandler) GetSourceBroker() *duckv1.Destination {
Expand Down Expand Up @@ -717,6 +721,13 @@ func (j *JobServiceHandler) GenerateKnativeResources(platform *operatorapi.Sonat
return resultObjs, nil
}

func (j *JobServiceHandler) CheckKSinkInjected() (bool, error) {
if j.GetSink() != nil { //job services has sink configured
return knative.CheckKSinkInjected(j.GetServiceName(), j.platform.Namespace)
}
return true, nil
}

func IsDataIndexEnabled(plf *operatorapi.SonataFlowPlatform) bool {
if plf.Spec.Services != nil {
if plf.Spec.Services.DataIndex != nil {
Expand Down
1 change: 1 addition & 0 deletions controllers/profiles/common/constants/platform_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (
KogitoProcessDefinitionsEventsErrorsEnabled = "kogito.events.processdefinitions.errors.propagate"
KogitoProcessDefinitionsEventsPath = "/definitions"
KogitoUserTasksEventsEnabled = "kogito.events.usertasks.enabled"
KogitoJobsPath = "/jobs"
// KogitoDataIndexHealthCheckEnabled configures if a workflow must check for the data index availability as part
// of its start health check.
KogitoDataIndexHealthCheckEnabled = "kogito.data-index.health-enabled"
Expand Down
12 changes: 10 additions & 2 deletions controllers/sonataflowplatform_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func TestSonataFlowPlatformController(t *testing.T) {

// Create a fake client to mock API calls.
cl := test.NewSonataFlowClientBuilder().WithRuntimeObjects(ksp).WithStatusSubresource(ksp).Build()
utils.SetClient(cl)
// Create a SonataFlowPlatformReconciler object with the scheme and fake client.
r := &SonataFlowPlatformReconciler{cl, cl, cl.Scheme(), &rest.Config{}, &record.FakeRecorder{}}

Expand Down Expand Up @@ -190,6 +191,7 @@ func TestSonataFlowPlatformController(t *testing.T) {

// Create a fake client to mock API calls.
cl := test.NewKogitoClientBuilderWithOpenShift().WithRuntimeObjects(ksp).WithStatusSubresource(ksp).Build()
utils.SetClient(cl)

di := services.NewDataIndexHandler(ksp)

Expand Down Expand Up @@ -289,6 +291,7 @@ func TestSonataFlowPlatformController(t *testing.T) {

// Create a fake client to mock API calls.
cl := test.NewKogitoClientBuilderWithOpenShift().WithRuntimeObjects(ksp).WithStatusSubresource(ksp).Build()
utils.SetClient(cl)
// Create a SonataFlowPlatformReconciler object with the scheme and fake client.
r := &SonataFlowPlatformReconciler{cl, cl, cl.Scheme(), &rest.Config{}, &record.FakeRecorder{}}

Expand Down Expand Up @@ -397,6 +400,7 @@ func TestSonataFlowPlatformController(t *testing.T) {

// Create a fake client to mock API calls.
cl := test.NewKogitoClientBuilderWithOpenShift().WithRuntimeObjects(ksp).WithStatusSubresource(ksp).Build()
utils.SetClient(cl)
// Create a SonataFlowPlatformReconciler object with the scheme and fake client.
r := &SonataFlowPlatformReconciler{cl, cl, cl.Scheme(), &rest.Config{}, &record.FakeRecorder{}}

Expand Down Expand Up @@ -488,6 +492,7 @@ func TestSonataFlowPlatformController(t *testing.T) {

// Create a fake client to mock API calls.
cl := test.NewKogitoClientBuilderWithOpenShift().WithRuntimeObjects(ksp).WithStatusSubresource(ksp).Build()
utils.SetClient(cl)
// Create a SonataFlowPlatformReconciler object with the scheme and fake client.
r := &SonataFlowPlatformReconciler{cl, cl, cl.Scheme(), &rest.Config{}, &record.FakeRecorder{}}

Expand Down Expand Up @@ -574,6 +579,7 @@ func TestSonataFlowPlatformController(t *testing.T) {

// Create a fake client to mock API calls.
cl := test.NewKogitoClientBuilderWithOpenShift().WithRuntimeObjects(ksp).WithStatusSubresource(ksp).Build()
utils.SetClient(cl)
js := services.NewJobServiceHandler(ksp)
// Create a SonataFlowPlatformReconciler object with the scheme and fake client.
r := &SonataFlowPlatformReconciler{cl, cl, cl.Scheme(), &rest.Config{}, &record.FakeRecorder{}}
Expand Down Expand Up @@ -648,6 +654,7 @@ func TestSonataFlowPlatformController(t *testing.T) {

// Create a fake client to mock API calls.
cl := test.NewKogitoClientBuilderWithOpenShift().WithRuntimeObjects(ksp).WithStatusSubresource(ksp).Build()
utils.SetClient(cl)
di := services.NewDataIndexHandler(ksp)
js := services.NewJobServiceHandler(ksp)
// Create a SonataFlowPlatformReconciler object with the scheme and fake client.
Expand Down Expand Up @@ -718,6 +725,7 @@ func TestSonataFlowPlatformController(t *testing.T) {

// Create a fake client to mock API calls.
cl := test.NewSonataFlowClientBuilder().WithRuntimeObjects(kscp, ksp, ksp2).WithStatusSubresource(kscp, ksp, ksp2).Build()
utils.SetClient(cl)

// Create a SonataFlowPlatformReconciler object with the scheme and fake client.
r := &SonataFlowPlatformReconciler{cl, cl, cl.Scheme(), &rest.Config{}, &record.FakeRecorder{}}
Expand Down Expand Up @@ -867,7 +875,7 @@ func TestSonataFlowPlatformController(t *testing.T) {
},
}
_, err := r.Reconcile(context.TODO(), req)
if err != nil {
if err != nil && err.Error() != "waiting for K_SINK injection for sonataflow-platform-jobs-service to complete" {
t.Fatalf("reconcile: (%v)", err)
}

Expand Down Expand Up @@ -967,7 +975,7 @@ func TestSonataFlowPlatformController(t *testing.T) {
},
}
_, err := r.Reconcile(context.TODO(), req)
if err != nil {
if err != nil && err.Error() != "waiting for K_SINK injection for sonataflow-platform-jobs-service to complete" {
t.Fatalf("reconcile: (%v)", err)
}

Expand Down
4 changes: 2 additions & 2 deletions hack/ci/create-kind-cluster-with-registry.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ nodes:
kind: InitConfiguration
nodeRegistration:
kubeletExtraArgs:
system-reserved: memory=1Gi
system-reserved: memory=2Gi
- role: worker
kubeadmConfigPatches:
- |
kind: InitConfiguration
nodeRegistration:
kubeletExtraArgs:
system-reserved: memory=2Gi
system-reserved: memory=4Gi
containerdConfigPatches:
- |-
[plugins."io.containerd.grpc.v1.cri".registry]
Expand Down
114 changes: 113 additions & 1 deletion test/e2e/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ import (
"fmt"
"net/url"
"os/exec"
"regexp"
"strconv"
"strings"
"time"

operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/apache/incubator-kie-kogito-serverless-operator/test/utils"

//nolint:golint
Expand Down Expand Up @@ -139,7 +142,7 @@ func verifyWorkflowIsInRunningState(workflowName string, targetNamespace string)
}

func verifyWorkflowIsAddressable(workflowName string, targetNamespace string) bool {
cmd := exec.Command("kubectl", "get", "workflow", workflowName, "-n", targetNamespace, "-o", "jsonpath={.status.address.url}")
cmd := exec.Command("kubectl", "get", "workflow", workflowName, "-n", targetNamespace, "-ojsonpath={.status.address.url}")
if response, err := utils.Run(cmd); err != nil {
GinkgoWriter.Println(fmt.Errorf("failed to check if greeting workflow is running: %v", err))
return false
Expand All @@ -157,3 +160,112 @@ func verifyWorkflowIsAddressable(workflowName string, targetNamespace string) bo
return false
}
}

func verifySchemaMigration(data, name string) bool {
matched1, err := regexp.MatchString(fmt.Sprintf("Successfully applied \\d migrations to schema \"%s\"", name), data)
if err != nil {
GinkgoWriter.Println(fmt.Errorf("string match error:%v", err))
return false
}
matched2, err := regexp.MatchString("Successfully validated \\d (migration|migrations)", data)
if err != nil {
GinkgoWriter.Println(fmt.Errorf("string match error:%v", err))
return false
}
GinkgoWriter.Println(fmt.Sprintf("verifying schemaMigration, logs=%v", data))
return (matched1 && strings.Contains(data, fmt.Sprintf("Creating schema \"%s\"", name)) &&
strings.Contains(data, fmt.Sprintf("Migrating schema \"%s\" to version", name))) ||
(matched2 && strings.Contains(data, fmt.Sprintf("Current version of schema \"%s\"", name)) &&
strings.Contains(data, fmt.Sprintf("Schema \"%s\" is up to date. No migration necessary", name))) ||
(strings.Contains(data, fmt.Sprintf("Creating schema \"%s\"", name)) &&
strings.Contains(data, fmt.Sprintf("Current version of schema \"%s\"", name)) &&
strings.Contains(data, fmt.Sprintf("Schema \"%s\" is up to date. No migration necessary", name)))
}

func verifyKSinkInjection(label, ns string) bool {
GinkgoWriter.Println(fmt.Sprintf("failed to get pod for label: %v, ns=%s", label, ns))
cmd := exec.Command("kubectl", "get", "pod", "-n", ns, "-l", label, "-o", "jsonpath={.items[*].metadata.name}")
out, err := utils.Run(cmd)
if err != nil {
GinkgoWriter.Println(fmt.Errorf("failed to get pods: %v", err))
return false
}
podNames := strings.Fields(string(out))
if len(podNames) == 0 {
GinkgoWriter.Println("no pods found to check K_SINK")
return false // pods haven't created yet
}
GinkgoWriter.Println(fmt.Sprintf("pods found: %s", podNames))
for _, pod := range podNames {
cmd = exec.Command("kubectl", "get", "pod", pod, "-n", ns, "-o", "json")
out, err := utils.Run(cmd)
if err != nil {
GinkgoWriter.Println(fmt.Errorf("failed to get pod: %v", err))
return false
}
GinkgoWriter.Println(string(out))
if !strings.Contains(string(out), "K_SINK") { // The pod does not have K_SINK injected
GinkgoWriter.Println(fmt.Sprintf("Pod does not have K_SINK injected: %s", string(out)))
return false
}
}
return true
}

func waitForPodRestartCompletion(label, ns string) {
EventuallyWithOffset(1, func() bool {
GinkgoWriter.Println(fmt.Sprintf("failed to get pod for label: %v, ns=%s", label, ns))
cmd := exec.Command("kubectl", "get", "pod", "-n", ns, "-l", label, "-o", "jsonpath={.items[*].metadata.name}")
out, err := utils.Run(cmd)
if err != nil {
GinkgoWriter.Println(fmt.Errorf("failed to get pods: %v", err))
return false
}
podNames := strings.Fields(string(out))
if len(podNames) == 0 {
GinkgoWriter.Println("no pods found")
return false // pods haven't created yet
} else if len(podNames) > 1 {
GinkgoWriter.Println("multiple pods found")
return false // multiple pods found, wait for other pods to terminate
}
return true
}, 1*time.Minute, 5).Should(BeTrue())
}

func verifyTrigger(triggers []operatorapi.SonataFlowPlatformTriggerRef, namePrefix, path, ns, broker string) error {
for _, ref := range triggers {
if strings.HasPrefix(ref.Name, namePrefix) && ref.Namespace == ns {
return verifyTriggerData(ref.Name, ns, path, broker)
}
}
return fmt.Errorf("failed to find trigger to verify with prefix: %v, namespace: %v", namePrefix, ns)
}

func verifyTriggerData(name, ns, path, broker string) error {
cmd := exec.Command("kubectl", "get", "trigger", name, "-n", ns, "-ojsonpath={.spec.broker} {.status.subscriberUri} {.status.conditions[?(@.type=='Ready')].status}")
out, err := utils.Run(cmd)
if err != nil {
GinkgoWriter.Println(fmt.Errorf("failed to get pod: %v", err))
return err
}
data := strings.Fields(string(out))
if len(data) == 3 && broker == data[0] && strings.HasSuffix(data[1], path) && data[2] == "True" {
return nil
}
return fmt.Errorf("failed to verify trigger %v, data=%s", name, string(out))
}

func verifySinkBinding(name, ns, broker string) error {
cmd := exec.Command("kubectl", "get", "sinkbinding", name, "-n", ns, "-ojsonpath={.status.sinkUri} {.status.conditions[?(@.type=='Ready')].status}")
out, err := utils.Run(cmd)
if err != nil {
GinkgoWriter.Println(fmt.Errorf("failed to get pod: %v", err))
return err
}
data := strings.Fields(string(out))
if len(data) == 2 && strings.HasSuffix(data[0], broker) && data[1] == "True" {
return nil
}
return fmt.Errorf("failed to verify sinkbinding %v, data=%s", name, string(out))
}
Loading

0 comments on commit 4480a80

Please sign in to comment.