Skip to content

Commit

Permalink
enable JointInferenceService controller to update related pods when m…
Browse files Browse the repository at this point in the history
…odifying CRD, and add test files to ensure functionality stability and correctness

Signed-off-by: SherlockShemol <[email protected]>
  • Loading branch information
SherlockShemol committed Sep 29, 2024
1 parent c15e8cc commit 3c5644d
Show file tree
Hide file tree
Showing 2 changed files with 369 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ import (
"context"
"encoding/json"
"fmt"
"reflect"
"strconv"
"time"

appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
Expand Down Expand Up @@ -94,6 +96,10 @@ type Controller struct {
cfg *config.ControllerConfig

sendToEdgeFunc runtime.DownstreamSendFunc

bigModelHost string

selector labels.Selector
}

// Run starts the main goroutine responsible for watching and syncing services.
Expand Down Expand Up @@ -278,9 +284,12 @@ func (c *Controller) sync(key string) (bool, error) {
// more details at https://github.com/kubernetes/kubernetes/issues/3030
service.SetGroupVersionKind(Kind)

selector, _ := runtime.GenerateSelector(&service)
pods, err := c.podStore.Pods(service.Namespace).List(selector)
deployments, err := c.deploymentsLister.Deployments(service.Namespace).List(selector)
c.selector, _ = runtime.GenerateSelector(&service)
pods, err := c.podStore.Pods(service.Namespace).List(c.selector)
if err != nil {
return false, err
}
deployments, err := c.deploymentsLister.Deployments(service.Namespace).List(c.selector)

if err != nil {
return false, err
Expand Down Expand Up @@ -422,31 +431,35 @@ func isServiceFinished(j *sednav1.JointInferenceService) bool {
func (c *Controller) createWorkers(service *sednav1.JointInferenceService, activeCloudPod *bool, activeCloudDeployment *bool, activeEdgePod *bool, activeEdgeDeployment *bool) (activePods, activeDeployments int32, err error) {
var bigModelPort int32 = BigModelPort
// create cloud worker
err = c.createCloudWorker(service, bigModelPort, activeCloudPod, activeCloudDeployment)
err = c.createCloudWorker(service, bigModelPort)
if err != nil {
return activePods, activeDeployments, fmt.Errorf("failed to create cloudWorker: %w", err)
}
*activeCloudPod = true
*activeCloudDeployment = true
activePods++
activeDeployments++

// create k8s service for cloudPod
bigModelHost, err := runtime.CreateEdgeMeshService(c.kubeClient, service, jointInferenceForCloud, bigModelPort)
c.bigModelHost, err = runtime.CreateEdgeMeshService(c.kubeClient, service, jointInferenceForCloud, bigModelPort)
if err != nil {
return activePods, activeDeployments, fmt.Errorf("failed to create edgemesh service: %w", err)
}

// create edge worker
err = c.createEdgeWorker(service, bigModelHost, bigModelPort, activeEdgePod, activeEdgeDeployment)
err = c.createEdgeWorker(service, c.bigModelHost, bigModelPort)
if err != nil {
return activePods, activeDeployments, fmt.Errorf("failed to create edgeWorker: %w", err)
}
*activeEdgePod = true
*activeEdgeDeployment = true
activePods++
activeDeployments++

return activePods, activeDeployments, err
}

func (c *Controller) createCloudWorker(service *sednav1.JointInferenceService, bigModelPort int32, activeCloudPod *bool, activeCloudDeployment *bool) error {
func (c *Controller) createCloudWorker(service *sednav1.JointInferenceService, bigModelPort int32) error {
// deliver deployment for cloudworker
cloudModelName := service.Spec.CloudWorker.Model.Name
cloudModel, err := c.client.Models(service.Namespace).Get(context.Background(), cloudModelName, metav1.GetOptions{})
Expand Down Expand Up @@ -494,14 +507,10 @@ func (c *Controller) createCloudWorker(service *sednav1.JointInferenceService, b
if err != nil {
return fmt.Errorf("failed to create cloudWorker deployment: %w", err)
}

*activeCloudDeployment = true
*activeCloudPod = true

return nil
}

func (c *Controller) createEdgeWorker(service *sednav1.JointInferenceService, bigModelHost string, bigModelPort int32, activeEdgePod *bool, activeEdgeDeployment *bool) error {
func (c *Controller) createEdgeWorker(service *sednav1.JointInferenceService, bigModelHost string, bigModelPort int32) error {
// deliver edge deployment for edgeworker
ctx := context.Background()
edgeModelName := service.Spec.EdgeWorker.Model.Name
Expand Down Expand Up @@ -565,8 +574,6 @@ func (c *Controller) createEdgeWorker(service *sednav1.JointInferenceService, bi
return fmt.Errorf("failed to create edgeWorker deployment: %w", err)
}

*activeEdgeDeployment = true
*activeEdgePod = true
return nil
}

Expand Down Expand Up @@ -599,10 +606,7 @@ func New(cc *runtime.ControllerContext) (runtime.FeatureControllerI, error) {
jc.syncToEdge(watch.Added, obj)
},

UpdateFunc: func(old, cur interface{}) {
jc.enqueueController(cur, true)
jc.syncToEdge(watch.Added, cur)
},
UpdateFunc: jc.updateService,

DeleteFunc: func(obj interface{}) {
jc.enqueueController(obj, true)
Expand Down Expand Up @@ -692,3 +696,45 @@ func (c *Controller) enqueueByDeployment(deployment *appsv1.Deployment) {

c.enqueueController(service, true)
}

func (c *Controller) updateService(old, cur interface{}) {
oldService, ok := old.(*sednav1.JointInferenceService)
if !ok {
return
}
curService, ok := cur.(*sednav1.JointInferenceService)
if !ok {
return
}

if oldService == curService {
return
}

if reflect.DeepEqual(oldService.Spec, curService.Spec) {
return
}
// if CRD is changed,and the service.Generation is changed, update deployment settings
klog.Infof("Service is updated, delete previous deployments")
curService.SetGroupVersionKind(Kind)
// if the service.Generation is changed, update deployment settings
if oldService.Generation != curService.Generation {
// delete previous deployments
deployments, err := c.deploymentsLister.Deployments(curService.Namespace).List(c.selector)
if err != nil {
klog.Errorf("Failed to list deployments: %v", err)
return
}
for _, deployment := range deployments {
c.kubeClient.AppsV1().Deployments(curService.Namespace).Delete(context.TODO(), deployment.Name, metav1.DeleteOptions{})
}

c.createEdgeWorker(curService, c.bigModelHost, BigModelPort)
c.createCloudWorker(curService, BigModelPort)

// update the service status
c.client.JointInferenceServices(curService.Namespace).UpdateStatus(context.TODO(), curService, metav1.UpdateOptions{})
}
c.enqueueController(curService, true)
c.syncToEdge(watch.Added, curService)
}
Loading

0 comments on commit 3c5644d

Please sign in to comment.