Skip to content

Commit

Permalink
feat: resolver tested
Browse files Browse the repository at this point in the history
  • Loading branch information
ramantehlan committed Jul 3, 2024
1 parent 23c8926 commit 5694c91
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 103 deletions.
1 change: 1 addition & 0 deletions install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ rules:
resources:
- services
- pods
- endpoints
verbs:
- get
- list
Expand Down
2 changes: 1 addition & 1 deletion operator/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ docker-push: ## Push docker image with the manager.
# - have enabled BuildKit. More info: https://docs.docker.com/develop/develop-images/build_enhancements/
# - be able to push the image to your registry (i.e. if you do not set a valid value via IMG=<myregistry/image:<tag>> then the export will fail)
# To adequately provide solutions that are compatible with multiple platforms, you should consider using this option.
PLATFORMS ?= linux/arm64,linux/amd64 #,linux/s390x,linux/ppc64le
PLATFORMS ?= linux/amd64 #,linux/s390x,linux/ppc64le,linux/arm64,
.PHONY: docker-buildx
docker-buildx: ## Build and push docker image for the manager for cross-platform support
# copy existing Dockerfile and insert --platform=${BUILDPLATFORM} into Dockerfile.cross, and preserve the original Dockerfile
Expand Down
8 changes: 4 additions & 4 deletions operator/internal/elastiServer/elastiServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package elastiServer
import (
"context"
"encoding/json"
"errors"
"io"
"net/http"

"k8s.io/client-go/rest"

"github.com/go-errors/errors"
"github.com/truefoundry/elasti/pkg/k8sHelper"
"github.com/truefoundry/elasti/pkg/messages"
"go.uber.org/zap"
Expand Down Expand Up @@ -75,7 +75,7 @@ func (s *Server) resolverReqHandler(w http.ResponseWriter, req *http.Request) {
s.logger.Error("Failed to close Body", zap.Error(err))
}
}(req.Body)
s.logger.Info("Received request from Resolver", zap.Any("body", body))
s.logger.Info("-- Received request from Resolver", zap.Any("body", body))
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
response := Response{
Expand All @@ -96,14 +96,14 @@ func (s *Server) resolverReqHandler(w http.ResponseWriter, req *http.Request) {
s.logger.Error("Failed to compare and scale deployment", zap.Error(err))
return
}
s.logger.Info("Received fulfilled from Resolver", zap.Any("body", body))
s.logger.Info("-- Received fulfilled from Resolver", zap.Any("body", body))
}

func (s *Server) scaleTargetForService(_ context.Context, serviceName, namespace string) error {
crd, found := crdDirectory.CRDDirectory.GetCRD(serviceName)
if !found {
s.logger.Error("Failed to get CRD details from directory")
return errors.New("Failed to get CRD details from directory")
return errors.New("failed to get CRD details from directory")
}
if err := s.k8sHelper.ScaleTargetWhenAtZero(namespace, crd.Spec.ScaleTargetRef.Name, crd.Spec.ScaleTargetRef.Kind, crd.Spec.MinTargetReplicas); err != nil {
s.logger.Error("Failed to scale TargetRef", zap.Any("TargetRef", crd.Spec.ScaleTargetRef), zap.Error(err))
Expand Down
63 changes: 20 additions & 43 deletions pkg/k8sHelper/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ package k8sHelper

import (
"context"
"fmt"
"strings"

"github.com/truefoundry/elasti/pkg/values"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -38,50 +36,22 @@ func NewOps(logger *zap.Logger, config *rest.Config) *Ops {
}
}

// CheckIfServicePodActive returns true if even a single pod for a service is active
func (k *Ops) CheckIfServicePodActive(ns, svc string) (bool, error) {
selectors, err := k.getServiceSelectorStr(ns, svc)
// CheckIfServiceEnpointActive returns true if endpoint for a service is active
func (k *Ops) CheckIfServiceEnpointActive(ns, svc string) (bool, error) {
endpoint, err := k.kClient.CoreV1().Endpoints(ns).Get(context.TODO(), svc, metav1.GetOptions{})
if err != nil {
return false, err
}
pods, err := k.kClient.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{
LabelSelector: selectors,
})
if err != nil {
return false, err
}
if len(pods.Items) == 0 {
return false, ErrNoPodFound
}
podActive := false
for _, pod := range pods.Items {
if pod.Status.Phase == corev1.PodRunning {
podActive = true
break
}
}
if !podActive {
return podActive, ErrNoActivePodFound
}

return podActive, nil
}

// GetServiceSelectorStr is to generate a k8s acceptable selector string for the provided service
func (k *Ops) getServiceSelectorStr(ns, svc string) (string, error) {
service, err := k.kClient.CoreV1().Services(ns).Get(context.TODO(), svc, metav1.GetOptions{})
if err != nil {
return "", err
}
selectorString := ""
for key, value := range service.Spec.Selector {
if selectorString != "" {
selectorString += ","
if endpoint.Subsets != nil && len(endpoint.Subsets) > 0 {
if endpoint.Subsets[0].Addresses != nil && len(endpoint.Subsets[0].Addresses) != 0 {
k.logger.Debug("Checking if service endpoint is active", zap.String("service", svc), zap.String("namespace", ns), zap.Any("endpoint", endpoint.Subsets[0].Addresses))
k.logger.Debug("Service endpoint is active", zap.String("service", svc), zap.String("namespace", ns))
return true, nil
}
selectorString += fmt.Sprintf("%s=%s", key, value)
}

return selectorString, nil
k.logger.Debug("Service endpoint is not active", zap.String("service", svc), zap.String("namespace", ns), zap.Any("endpoint", endpoint.Subsets))
return false, nil
}

// ScaleTargetWhenAtZero scales the TargetRef to the provided replicas when it's at 0
Expand Down Expand Up @@ -123,18 +93,25 @@ func (k *Ops) ScaleDeployment(ns, targetName string, replicas int32) error {
}

func (k *Ops) ScaleArgoRollout(ns, targetName string, replicas int32) error {
k.logger.Debug("Scaling Rollout yet to be implimented", zap.String("rollout", targetName), zap.Int32("replicas", replicas))
k.logger.Debug("Scaling Rollout", zap.String("rollout", targetName), zap.Int32("replicas", replicas))

rollout, err := k.kDynamicClient.Resource(values.RolloutGVR).Namespace(ns).Get(context.TODO(), targetName, metav1.GetOptions{})
if err != nil {
k.logger.Error("Error getting rollout", zap.Error(err), zap.String("rollout", targetName))
return err
}

currentReplicas := rollout.Object["spec"].(map[string]interface{})["replicas"]
currentReplicas := rollout.Object["spec"].(map[string]interface{})["replicas"].(int64)
k.logger.Debug("Rollout found", zap.String("rollout", targetName), zap.Int64("current replicas", currentReplicas), zap.Int32("desired replicas", replicas))
if currentReplicas == 0 {
rollout.Object["spec"].(map[string]interface{})["replicas"] = replicas
_, err = k.kDynamicClient.Resource(values.RolloutGVR).Namespace(ns).Update(context.TODO(), rollout, metav1.UpdateOptions{})
return err
if err != nil {
k.logger.Error("Error updating rollout", zap.Error(err), zap.String("rollout", targetName))
return err
}
k.logger.Info("Rollout scaled", zap.String("rollout", targetName), zap.Int32("replicas", replicas))
}

return nil
}
35 changes: 0 additions & 35 deletions resolver/Dockerfile.cross

This file was deleted.

8 changes: 4 additions & 4 deletions resolver/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ docker-build: ## Build docker image for the resolver
docker-push: ## Publish docker image for the resolver
docker push ${IMG}

PLATFORMS ?= linux/arm64,linux/amd64 #,linux/s390x,linux/ppc64le
PLATFORMS ?= linux/amd64 #,linux/s390x,linux/ppc64le,linux/arm64,
.PHONY: docker-buildx
docker-buildx: ## Build and push docker image for cross-platform support
# copy existing Dockerfile and insert --platform=${BUILDPLATFORM} into Dockerfile.cross, and preserve the original Dockerfile
sed -e '1 s/\(^FROM\)/FROM --platform=\$$\{BUILDPLATFORM\}/; t' -e ' 1,// s//FROM --platform=\$$\{BUILDPLATFORM\}/' Dockerfile > Dockerfile.cross
# sed -e '1 s/\(^FROM\)/FROM --platform=\$$\{BUILDPLATFORM\}/; t' -e ' 1,// s//FROM --platform=\$$\{BUILDPLATFORM\}/' Dockerfile > Dockerfile.cross
- $(CONTAINER_TOOL) buildx create --name project-resolver-v3-builder
$(CONTAINER_TOOL) buildx use project-resolver-v3-builder
- $(CONTAINER_TOOL) buildx build --push --platform=$(PLATFORMS) --tag ${IMG} -f Dockerfile.cross ../
- $(CONTAINER_TOOL) buildx build --push --platform=$(PLATFORMS) --tag ${IMG} -f Dockerfile ../
- $(CONTAINER_TOOL) buildx rm project-resolver-v3-builder
rm Dockerfile.cross
# rm Dockerfile.cross


.PHONY: deploy
Expand Down
2 changes: 1 addition & 1 deletion resolver/config/clusterRole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ rules:
resources: ["deployments"]
verbs: ["get", "list", "watch", "update", "patch"]
- apiGroups: [""]
resources: ["services", "pods"]
resources: ["services", "pods", "endpoints"]
verbs: ["get", "list", "watch", "update", "patch", "delete", "create"]
23 changes: 12 additions & 11 deletions resolver/internal/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ package handler
import (
"context"
"errors"
"github.com/truefoundry/elasti/pkg/k8sHelper"
"github.com/truefoundry/elasti/pkg/messages"
"go.uber.org/zap"
"net/http"
"net/http/httputil"
"net/url"
Expand All @@ -14,6 +11,10 @@ import (
"sync"
"time"
"truefoundry/resolver/internal/throttler"

"github.com/truefoundry/elasti/pkg/k8sHelper"
"github.com/truefoundry/elasti/pkg/messages"
"go.uber.org/zap"
)

type (
Expand Down Expand Up @@ -92,19 +93,13 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return
}
go h.operatorRPC.SendIncomingRequestInfo(host.Namespace, host.SourceService)
targetURL, err := url.Parse(host.TargetHost + req.RequestURI)
if err != nil {
h.logger.Error("Error parsing target URL", zap.Error(err))
http.Error(w, "Error parsing target URL", http.StatusInternalServerError)
return
}
select {
case <-ctx.Done():
h.logger.Error("Request timeout", zap.Error(ctx.Err()))
w.WriteHeader(http.StatusInternalServerError)
default:
if tryErr := h.throttler.Try(ctx, host, func(count int) error {
err := h.ProxyRequest(w, req, targetURL, count)
err := h.ProxyRequest(w, req, host.TargetHost, count)
if err != nil {
return err
}
Expand All @@ -121,7 +116,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
}

func (h *Handler) ProxyRequest(w http.ResponseWriter, req *http.Request, targetURL *url.URL, count int) (rErr error) {
func (h *Handler) ProxyRequest(w http.ResponseWriter, req *http.Request, host string, count int) (rErr error) {
defer func() {
if r := recover(); r != nil {
h.logger.Error("Recovered from panic", zap.Any("panic", r))
Expand All @@ -131,6 +126,12 @@ func (h *Handler) ProxyRequest(w http.ResponseWriter, req *http.Request, targetU
rErr = r.(error)
}
}()
targetURL, err := url.Parse(host)
if err != nil {
h.logger.Error("Error parsing target URL", zap.Error(err))
http.Error(w, "Error parsing target URL", http.StatusInternalServerError)
return
}
proxy := httputil.NewSingleHostReverseProxy(targetURL)
proxy.BufferPool = h.bufferPool
proxy.Transport = h.transport
Expand Down
9 changes: 5 additions & 4 deletions resolver/internal/throttler/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package throttler

import (
"context"
"time"

"github.com/truefoundry/elasti/pkg/k8sHelper"
"github.com/truefoundry/elasti/pkg/messages"
"go.uber.org/zap"
"time"
)

type Throttler struct {
Expand Down Expand Up @@ -37,11 +38,11 @@ func (t *Throttler) Try(ctx context.Context, host *messages.Host, resolve func(i
reenqueue = false
var tryErr error
if tryErr = t.breaker.Maybe(ctx, func() {
if isPodActive, err := t.k8sUtil.CheckIfServicePodActive(host.Namespace, host.TargetService); err != nil {
t.logger.Info("Unable to get target active pod", zap.Error(err), zap.Int("retryCount", retryCount))
if isPodActive, err := t.k8sUtil.CheckIfServiceEnpointActive(host.Namespace, host.TargetService); err != nil {
t.logger.Info("Unable to get target active endpoints", zap.Error(err), zap.Int("retryCount", retryCount))
reenqueue = true
} else if !isPodActive {
t.logger.Info("No active pods", zap.Any("host", host), zap.Int("retryCount", retryCount))
t.logger.Info("No active endpoints", zap.Any("host", host), zap.Int("retryCount", retryCount))
reenqueue = true
} else {
if res := resolve(retryCount); res != nil {
Expand Down

0 comments on commit 5694c91

Please sign in to comment.