Skip to content

Commit

Permalink
fix maestro server resync after reconnect.
Browse files Browse the repository at this point in the history
Signed-off-by: morvencao <[email protected]>
  • Loading branch information
morvencao committed Jun 20, 2024
1 parent d69cb43 commit e3339e1
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 57 deletions.
16 changes: 16 additions & 0 deletions pkg/client/cloudevents/source_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,29 @@ type SourceClientImpl struct {

func NewSourceClient(sourceOptions *ceoptions.CloudEventsSourceOptions, resourceService services.ResourceService) (SourceClient, error) {
ctx := context.Background()
log := logger.NewOCMLogger(ctx)
codec, bundleCodec := &Codec{sourceID: sourceOptions.SourceID}, &BundleCodec{sourceID: sourceOptions.SourceID}
ceSourceClient, err := cegeneric.NewCloudEventSourceClient[*api.Resource](ctx, sourceOptions,
resourceService, ResourceStatusHashGetter, codec, bundleCodec)
if err != nil {
return nil, err
}

// start a go routine to receive client reconnect signal
go func() {
for {
select {
case <-ctx.Done():
return
case <-ceSourceClient.ReconnectedChan():
// when receiving a client reconnected signal, we resync all clusters for this source
if err := ceSourceClient.Resync(ctx, cetypes.ClusterAll); err != nil {
log.Error(fmt.Sprintf("failed to send resync request, %v", err))
}
}
}
}()

return &SourceClientImpl{
Codec: codec,
BundleCodec: bundleCodec,
Expand Down
57 changes: 1 addition & 56 deletions test/e2e/pkg/spec_resync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
var _ = Describe("Spec resync", Ordered, Label("e2e-tests-spec-resync"), func() {

var resource1, resource2, resource3 *openapi.Resource
var mqttReplicas, maestroServerReplicas, maestroAgentReplicas int
var mqttReplicas, maestroAgentReplicas int

Context("Resource resync resource spec after maestro agent restarts", func() {

Expand Down Expand Up @@ -380,61 +380,6 @@ var _ = Describe("Spec resync", Ordered, Label("e2e-tests-spec-resync"), func()
}, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred())
})

It("Rollout the maestro-server", func() {

deploy, err := kubeClient.AppsV1().Deployments("maestro").Get(context.Background(), "maestro", metav1.GetOptions{})
Expect(err).ShouldNot(HaveOccurred())
maestroServerReplicas = int(*deploy.Spec.Replicas)
deploy, err = kubeClient.AppsV1().Deployments("maestro").Patch(context.Background(), "maestro", types.MergePatchType, []byte(`{"spec":{"replicas":0}}`), metav1.PatchOptions{
FieldManager: "testKubeClient",
})
Expect(err).ShouldNot(HaveOccurred())
Expect(*deploy.Spec.Replicas).To(Equal(int32(0)))

// ensure no running maestro pods
Eventually(func() error {
pods, err := kubeClient.CoreV1().Pods("maestro").List(context.Background(), metav1.ListOptions{
LabelSelector: "app=maestro",
})
if err != nil {
return err
}
if len(pods.Items) > 0 {
return fmt.Errorf("maestro pods still running")
}
return nil
}, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred())

// patch maestro replicas to maestroServerReplicas
deploy, err = kubeClient.AppsV1().Deployments("maestro").Patch(context.Background(), "maestro", types.MergePatchType, []byte(fmt.Sprintf(`{"spec":{"replicas":%d}}`, maestroServerReplicas)), metav1.PatchOptions{
FieldManager: "testKubeClient",
})
Expect(err).ShouldNot(HaveOccurred())
Expect(*deploy.Spec.Replicas).To(Equal(int32(maestroServerReplicas)))

// ensure maestro pod is up and running
Eventually(func() error {
pods, err := kubeClient.CoreV1().Pods("maestro").List(context.Background(), metav1.ListOptions{
LabelSelector: "app=maestro",
})
if err != nil {
return err
}
if len(pods.Items) != maestroServerReplicas {
return fmt.Errorf("unexpected maestro pod count, expected %d, got %d", maestroServerReplicas, len(pods.Items))
}
for _, pod := range pods.Items {
if pod.Status.Phase != "Running" {
return fmt.Errorf("maestro pod not in running state")
}
if pod.Status.ContainerStatuses[0].State.Running == nil {
return fmt.Errorf("maestro server container not in running state")
}
}
return nil
}, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred())
})

It("patch the nginx-1 resource", func() {

newRes := helper.NewAPIResourceWithIndex(consumer_name, 2, 1)
Expand Down
194 changes: 193 additions & 1 deletion test/e2e/pkg/status_resync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
)

var _ = Describe("Status resync", Ordered, Label("e2e-tests-status-resync"), func() {

var resource *openapi.Resource
var maestroServerReplicas int
var mqttReplicas, maestroServerReplicas int

Context("Resource resync resource status after maestro server restarts", func() {

Expand Down Expand Up @@ -184,4 +185,195 @@ var _ = Describe("Status resync", Ordered, Label("e2e-tests-status-resync"), fun
})

})

Context("Resource resync resource status after maestro server reconnects", func() {

It("post the nginx resource with non-default service account to the maestro api", func() {

res := helper.NewAPIResourceWithSA(consumer_name, 1, "nginx")
var resp *http.Response
var err error
resource, resp, err = apiClient.DefaultApi.ApiMaestroV1ResourcesPost(context.Background()).Resource(res).Execute()
Expect(err).ShouldNot(HaveOccurred())
Expect(resp.StatusCode).To(Equal(http.StatusCreated))
Expect(*resource.Id).ShouldNot(BeEmpty())

Eventually(func() error {
deploy, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{})
if err != nil {
return err
}
if *deploy.Spec.Replicas != 1 {
return fmt.Errorf("unexpected replicas, expected 1, got %d", *deploy.Spec.Replicas)
}
return nil
}, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred())

gotResource, resp, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdGet(context.Background(), *resource.Id).Execute()
Expect(err).ShouldNot(HaveOccurred())
Expect(resp.StatusCode).To(Equal(http.StatusOK))
Expect(*gotResource.Id).To(Equal(*resource.Id))
Expect(*gotResource.Version).To(Equal(*resource.Version))

Eventually(func() error {
gotResource, _, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdGet(context.Background(), *resource.Id).Execute()
if err != nil {
return err
}
statusJSON, err := json.Marshal(gotResource.Status)
if err != nil {
return err
}
if !strings.Contains(string(statusJSON), "error looking up service account default/nginx") {
return fmt.Errorf("unexpected status, expected error looking up service account default/nginx, got %s", string(statusJSON))
}
return nil
}, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred())
})

It("delete the mqtt-broker service for server", func() {

err := kubeClient.CoreV1().Services("maestro").Delete(context.Background(), "maestro-mqtt-server", metav1.DeleteOptions{})
Expect(err).ShouldNot(HaveOccurred())
})

It("Rollout the mqtt-broker", func() {

deploy, err := kubeClient.AppsV1().Deployments("maestro").Get(context.Background(), "maestro-mqtt", metav1.GetOptions{})
Expect(err).ShouldNot(HaveOccurred())
mqttReplicas = int(*deploy.Spec.Replicas)
deploy, err = kubeClient.AppsV1().Deployments("maestro").Patch(context.Background(), "maestro-mqtt", types.MergePatchType, []byte(`{"spec":{"replicas":0}}`), metav1.PatchOptions{
FieldManager: "testKubeClient",
})
Expect(err).ShouldNot(HaveOccurred())
Expect(*deploy.Spec.Replicas).To(Equal(int32(0)))

// ensure no running mqtt-broker pods
Eventually(func() error {
pods, err := kubeClient.CoreV1().Pods("maestro").List(context.Background(), metav1.ListOptions{
LabelSelector: "name=maestro-mqtt",
})
if err != nil {
return err
}
if len(pods.Items) > 0 {
return fmt.Errorf("maestro-mqtt pods still running")
}
return nil
}, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred())

// patch mqtt-broker replicas to mqttReplicas
deploy, err = kubeClient.AppsV1().Deployments("maestro").Patch(context.Background(), "maestro-mqtt", types.MergePatchType, []byte(fmt.Sprintf(`{"spec":{"replicas":%d}}`, mqttReplicas)), metav1.PatchOptions{
FieldManager: "testKubeClient",
})
Expect(err).ShouldNot(HaveOccurred())
Expect(*deploy.Spec.Replicas).To(Equal(int32(mqttReplicas)))

// ensure mqtt-broker pod is up and running
Eventually(func() error {
pods, err := kubeClient.CoreV1().Pods("maestro").List(context.Background(), metav1.ListOptions{
LabelSelector: "name=maestro-mqtt",
})
if err != nil {
return err
}
if len(pods.Items) != mqttReplicas {
return fmt.Errorf("unexpected maestro-mqtt pod count, expected %d, got %d", mqttReplicas, len(pods.Items))
}
for _, pod := range pods.Items {
if pod.Status.Phase != "Running" {
return fmt.Errorf("maestro-mqtt pod not in running state")
}
if pod.Status.ContainerStatuses[0].State.Running == nil {
return fmt.Errorf("maestro-mqtt container not in running state")
}
}
return nil
}, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred())
})

It("create default/nginx serviceaccount", func() {

_, err := kubeClient.CoreV1().ServiceAccounts("default").Create(context.Background(), &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Name: "nginx",
},
}, metav1.CreateOptions{})
Expect(err).ShouldNot(HaveOccurred())

// delete the nginx deployment to tigger recreating
err = kubeClient.AppsV1().Deployments("default").Delete(context.Background(), "nginx", metav1.DeleteOptions{})
Expect(err).ShouldNot(HaveOccurred())
})

It("recreate the mqtt-broker service for server", func() {

mqttServerService := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "maestro-mqtt-server",
Namespace: "maestro",
},
Spec: corev1.ServiceSpec{
Selector: map[string]string{
"name": "maestro-mqtt",
},
Ports: []corev1.ServicePort{
{
Name: "mosquitto",
Protocol: corev1.ProtocolTCP,
Port: 1883,
TargetPort: intstr.FromInt(1883),
},
},
Type: corev1.ServiceTypeClusterIP,
},
}

_, err := kubeClient.CoreV1().Services("maestro").Create(context.Background(), mqttServerService, metav1.CreateOptions{})
Expect(err).ShouldNot(HaveOccurred())
})

It("ensure the resource status is resynced", func() {

Eventually(func() error {
gotResource, _, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdGet(context.Background(), *resource.Id).Execute()
if err != nil {
return err
}
if _, ok := gotResource.Status["ContentStatus"]; !ok {
return fmt.Errorf("unexpected status, expected contains ContentStatus, got %v", gotResource.Status)
}
statusJSON, err := json.Marshal(gotResource.Status)
if err != nil {
return err
}
if strings.Contains(string(statusJSON), "error looking up service account default/nginx") {
return fmt.Errorf("unexpected status, should not contain error looking up service account default/nginx, got %s", string(statusJSON))
}
return nil
}, 3*time.Minute, 3*time.Second).ShouldNot(HaveOccurred())
})

It("delete the nginx resource", func() {

resp, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdDelete(context.Background(), *resource.Id).Execute()
Expect(err).ShouldNot(HaveOccurred())
Expect(resp.StatusCode).To(Equal(http.StatusNoContent))

Eventually(func() error {
_, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
return fmt.Errorf("nginx deployment still exists")
}, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred())

err = kubeClient.CoreV1().ServiceAccounts("default").Delete(context.Background(), "nginx", metav1.DeleteOptions{})
Expect(err).ShouldNot(HaveOccurred())
})

})
})

0 comments on commit e3339e1

Please sign in to comment.