Skip to content

Commit

Permalink
Switch jinja2 to grpc based expansion.
Browse files Browse the repository at this point in the history
- rename gjinjg2 -> jinja2 (making jinja2 grpc based)
- rename existing pod based jinja2 -> pjinja2
- Add `image` field to ExpanderVersion
  - allow specifying an image that doesnt follow the
    removePrefix(expanderversion.name , "composition-") pattern.
  - provides more flexibility
  - In our case the type is pjinja and the image is expander-jinja
- Add a custom ratelimiter for the facade reconciler
  - Due to a bug, the reconciler is not restarted across tests.
  - Old failures add to rate limiting causing random failures
  - make ratelimiter max backoff 120s
- Add a small sleep b/w tests
- Move all ExistTimeout to CompositionReconcileTimeout
- Check if a plan was updated and if so check if the applier sees the
  latest plan object. If not retry again.
- Add InputGeneration and Generation to plan status
  • Loading branch information
barney-s committed May 25, 2024
1 parent eb1cc36 commit 936ea11
Show file tree
Hide file tree
Showing 14 changed files with 112 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ const (

// ExpanderVersionSpec defines the desired state of ExpanderVersion
type ExpanderVersionSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file

// ImageRegistry is the designated registry for where to pull the named expander image
ImageRegistry string `json:"imageRegistry,omitempty"`

// Image if different from removePrefix(expanderversion.name , "composition-")
Image string `json:"image,omitempty"`

// ValidVersions is a list of valid versions of the named expander
ValidVersions []string `json:"validVersions"`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ type StageStatus struct {

// PlanStatus defines the observed state of Plan
type PlanStatus struct {
// Facade's generation last we successfully reconciled
InputGeneration int64 `json:"inputGeneration,omitempty"`
// Plan generation we last successfully reconciled
Generation int64 `json:"generation,omitempty"`
Conditions []metav1.Condition `json:"conditions,omitempty"`
Stages map[string]StageStatus `json:"stages,omitempty"`
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ spec:
spec:
description: ExpanderVersionSpec defines the desired state of ExpanderVersion
properties:
image:
description: Image if different from removePrefix(expanderversion.name
, "composition-")
type: string
imageRegistry:
description: ImageRegistry is the designated registry for where to
pull the named expander image
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@ spec:
- type
type: object
type: array
generation:
description: Plan generation we last successfully reconciled
format: int64
type: integer
inputGeneration:
description: Facade's generation last we successfully reconciled
format: int64
type: integer
stages:
additionalProperties:
description: StageStatus captures the status of a stage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
apiVersion: composition.google.com/v1alpha1
kind: ExpanderVersion
metadata:
name: jinja2
name: pjinja2
namespace: system
spec:
type: job
imageRegistry: gcr.io/krmapihosting-release
image: expander-jinja2
validVersions:
# current golang semver package only support version comparison
# in the format of `v{num}.{num}.{num}-{alphabet}`, `v{num}.{num}.{num}` and `{num}.{num}.{num}`.
Expand All @@ -30,7 +31,7 @@ spec:
apiVersion: composition.google.com/v1alpha1
kind: ExpanderVersion
metadata:
name: gjinja2
name: jinja2
namespace: system
spec:
type: grpc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ metadata:
app.kubernetes.io/part-of: composition
app.kubernetes.io/instance: jinja2-v0.0.1
app.kubernetes.io/component: expanders
name: gjinja2-v0-0-1
name: jinja2-v0-0-1
namespace: system
spec:
# type: LoadBalancer
Expand Down
4 changes: 2 additions & 2 deletions experiments/compositions/composition/expanders.mk
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ docker-run-expander-jinja2: docker-build-expander-jinja2

.PHONY: unit-test-expander-jinja2
unit-test-expander-jinja2: deploy-kind
kubectl patch service -n composition-system composition-gjinja2-v0-0-1 -p '{"spec":{"type":"LoadBalancer"}}'
kubectl patch service -n composition-system composition-jinja2-v0-0-1 -p '{"spec":{"type":"LoadBalancer"}}'
nodeip=$$(kubectl get nodes -o json | jq '.items[0].status.addresses[0].address' | xargs echo );\
nodeport=$$(kubectl get service -n composition-system composition-gjinja2-v0-0-1 -o json | jq ".spec.ports[0].nodePort");\
nodeport=$$(kubectl get service -n composition-system composition-jinja2-v0-0-1 -o json | jq ".spec.ports[0].nodePort");\
echo $$nodeip:$$nodeport; \
cd expanders/jinja2 && go test -v --addr=$$nodeip:$$nodeport
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"fmt"
"strconv"
"strings"
"time"

"github.com/go-logr/logr"
"golang.org/x/time/rate"
compositionv1alpha1 "google.com/composition/api/v1alpha1"
"google.com/composition/pkg/containerexecutor/jobcontainerexecutor"
pb "google.com/composition/proto"
Expand All @@ -38,8 +40,10 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/log"
)

Expand Down Expand Up @@ -201,30 +205,37 @@ func (r *ExpanderReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
logger.Info("Got valid expander value", "value", value)

success := false
planUpdated := false
if ev.Spec.Type == compositionv1alpha1.ExpanderTypeJob {
reason, err := r.runJob(ctx, logger, &inputcr, expander.Name, planNN.Name, value, ev.Spec.ImageRegistry)
if err != nil {
newStatus.AppendErrorCondition(expander.Name, err.Error(), reason)
return ctrl.Result{}, err
}
} else {
reason, err := r.evaluateAndSavePlan(ctx, logger, &inputcr, expander, planNN, value)
updated, reason, err := r.evaluateAndSavePlan(ctx, logger, &inputcr, expander, planNN, value)
if err != nil {
newStatus.AppendErrorCondition(expander.Name, err.Error(), reason)
return ctrl.Result{}, err
}
planUpdated = updated
}

// ------------------- APPLIER SECTION -----------------------

// Create Applier and wait for the Applier to complete
logger = loggerCR.WithName(expander.Name).WithName("Apply")

oldGeneration := plancr.GetGeneration()
// Re-read the Plan CR to load the expanded manifests
if err := r.Client.Get(ctx, planNN, &plancr); err != nil {
logger.Error(err, "unable to read Plan CR")
return ctrl.Result{}, err
}
if planUpdated && oldGeneration == plancr.GetGeneration() {
logger.Error(err, "Did not get the latest planCR. Will retry.")
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}

applier := NewApplier(ctx, logger, r, &plancr, &inputcr, &compositionCR, expander.Name)

Expand Down Expand Up @@ -291,23 +302,11 @@ func (r *ExpanderReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
newStatus.ClearCondition(compositionv1alpha1.Ready)
message := fmt.Sprintf("Processed stages: %s", strings.Join(expandersProcessed, ", "))
newStatus.AppendCondition(compositionv1alpha1.Ready, metav1.ConditionTrue, message, "ProcessedAllStages")
newStatus.InputGeneration = inputcr.GetGeneration()
newStatus.Generation = plancr.GetGeneration()
return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *ExpanderReconciler) SetupWithManager(mgr ctrl.Manager, cr *unstructured.Unstructured) error {
var err error
// TODO(barni@): Can we setup dynamic controller at main.go for CompositionReconciler instead of 1 per ExpanderReconciler
r.Dynamic, err = dynamic.NewForConfig(r.Config)
if err != nil {
return fmt.Errorf("error building dynamic client: %w", err)
}

return ctrl.NewControllerManagedBy(mgr).
For(cr).
Complete(r)
}

func (r *ExpanderReconciler) getExpanderValue(
ctx context.Context, inputExpanderVersion string, expanderType string,
) (string, *compositionv1alpha1.ExpanderVersion, string, error) {
Expand Down Expand Up @@ -381,12 +380,14 @@ func (r *ExpanderReconciler) runJob(ctx context.Context, logger logr.Logger,

func (r *ExpanderReconciler) evaluateAndSavePlan(ctx context.Context, logger logr.Logger,
cr *unstructured.Unstructured, expander compositionv1alpha1.Expander,
planNN types.NamespacedName, grpcService string) (string, error) {
planNN types.NamespacedName, grpcService string) (bool, string, error) {
// Set up a connection to the server.
updated := false

conn, err := grpc.Dial(grpcService, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
logger.Error(err, "grpc dial failed: "+grpcService)
return "GRPCConnError", err
return updated, "GRPCConnError", err
}

// read context in cr.namespace
Expand All @@ -395,26 +396,26 @@ func (r *ExpanderReconciler) evaluateAndSavePlan(ctx context.Context, logger log
contextNN := types.NamespacedName{Namespace: cr.GetNamespace(), Name: "context"}
if err := r.Get(ctx, contextNN, &contextcr); err != nil {
logger.Error(err, "unable to fetch Context CR", "context", contextNN)
return "GetContextFailed", err
return updated, "GetContextFailed", err
}
contextBytes, err := json.Marshal(contextcr.Object)
if err != nil {
logger.Error(err, "failed to marshal Context Object")
return "MarshallContextFailed", err
return updated, "MarshallContextFailed", err
}

// marshall facade cr
facadeBytes, err := json.Marshal(cr.Object)
if err != nil {
logger.Error(err, "failed to marshall Facade Object")
return "MarshallFacadeFailed", err
return updated, "MarshallFacadeFailed", err
}

// marshall expande config
configBytes, err := json.Marshal(expander.Template)
if err != nil {
logger.Error(err, "failed to marshall Expander Config")
return "MarshallExpanderConfigFailed", err
return updated, "MarshallExpanderConfigFailed", err
}

expanderClient := pb.NewExpanderClient(conn)
Expand All @@ -429,30 +430,38 @@ func (r *ExpanderReconciler) evaluateAndSavePlan(ctx context.Context, logger log
})
if err != nil {
logger.Error(err, "expander.Evaluate() Failed", "expander", expander.Name)
return "EvaluateError", err
return updated, "EvaluateError", err
}
if result.Status != pb.Status_SUCCESS {
logger.Error(nil, "expander.Evaluate() Status is not Success", "expander", expander.Name, "status", result.Status)
err = fmt.Errorf("Evaluate Failed: %s", result.Error.Message)
return "EvaluateStatusFailed", err
return updated, "EvaluateStatusFailed", err
}

// Write to Plan object
// Re-read the Plan CR to load the expanded manifests
plancr := compositionv1alpha1.Plan{}
if err := r.Client.Get(ctx, planNN, &plancr); err != nil {
logger.Error(err, "unable to read Plan CR", "plan", planNN)
return "GetPlanFailed", err
return updated, "GetPlanFailed", err
}

if plancr.Spec.Stages == nil {
plancr.Spec.Stages = map[string]compositionv1alpha1.Stage{}
updated = true
}
oldValue := ""
if _, ok := plancr.Spec.Stages[expander.Name]; !ok {
updated = true
} else {
oldValue = plancr.Spec.Stages[expander.Name].Manifest
}

if result.Type == pb.ResultType_MANIFESTS {
s, err := strconv.Unquote(string(result.Manifests))
if err != nil {
logger.Error(err, "unable to unquote grpc response")
return "UnquoteResponseFailed", err
return updated, "UnquoteResponseFailed", err
}
plancr.Spec.Stages[expander.Name] = compositionv1alpha1.Stage{
Manifest: s,
Expand All @@ -461,18 +470,43 @@ func (r *ExpanderReconciler) evaluateAndSavePlan(ctx context.Context, logger log
s, err := strconv.Unquote(string(result.Values))
if err != nil {
logger.Error(err, "unable to unquote grpc response")
return "UnquoteResponseFailed", err
return updated, "UnquoteResponseFailed", err
}
plancr.Spec.Stages[expander.Name] = compositionv1alpha1.Stage{
Manifest: s,
}
}

if plancr.Spec.Stages[expander.Name].Manifest != oldValue {
updated = true
}

err = r.Client.Update(ctx, &plancr)
if err != nil {
logger.Error(err, "error updating plan", "plan", planNN)
return "UpdatePlanFailed", err
return updated, "UpdatePlanFailed", err
}

return "", nil
return updated, "", nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *ExpanderReconciler) SetupWithManager(mgr ctrl.Manager, cr *unstructured.Unstructured) error {
var err error
// TODO(barni@): Can we setup dynamic controller at main.go for CompositionReconciler instead of 1 per ExpanderReconciler
r.Dynamic, err = dynamic.NewForConfig(r.Config)
if err != nil {
return fmt.Errorf("error building dynamic client: %w", err)
}

ratelimiter := workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 120*time.Second),
// 40 qps, 400 bucket size. This is only for retry speed and its only the overall factor (not per item)
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(40), 400)},
)

return ctrl.NewControllerManagedBy(mgr).
For(cr).
WithOptions(controller.Options{RateLimiter: ratelimiter}).
Complete(r)
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ func (r *ExpanderVersionReconciler) processExpanderVersion(
}

expander := strings.TrimPrefix(ev.Name, "composition-")
image := ev.Spec.Image
if image == "" {
image = fmt.Sprintf("expander-%s", expander)
}
semVerVersions := []*semver.Version{}
for _, r := range ev.Spec.ValidVersions {
v, err := semver.NewVersion(r)
Expand All @@ -129,7 +133,7 @@ func (r *ExpanderVersionReconciler) processExpanderVersion(
value := ""

if ev.Spec.Type == compositionv1alpha1.ExpanderTypeJob {
value = fmt.Sprintf("%s/expander-%s:%s", ev.Spec.ImageRegistry, expander, key)
value = fmt.Sprintf("%s/%s:%s", ev.Spec.ImageRegistry, image, key)
} else {
svcVersion := strings.Replace(key, ".", "-", -1)
value = fmt.Sprintf("composition-%s-%s:8443", expander, svcVersion)
Expand Down
2 changes: 1 addition & 1 deletion experiments/compositions/composition/release/manifest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1234,7 +1234,7 @@ spec:
apiVersion: composition.google.com/v1alpha1
kind: ExpanderVersion
metadata:
name: composition-gjinja2
name: composition-jinja2
namespace: composition-system
spec:
type: grpc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ metadata:
spec:
inputAPIGroup: pconfigs.facade.foocorp.com
expanders:
- type: gjinja2
- type: jinja2
name: project
template: |
{% set hostProject = 'compositions-foobar' %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ metadata:
spec:
inputAPIGroup: pconfigs.facade.foocorp.com
expanders:
- type: jinja2
- type: pjinja2
name: project
template: |
{% set hostProject = 'compositions-foobar' %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ func NewBasic(t *testing.T) *Scenario {
// TODO (barney-s) parameterize or make it global
logRoot := "../../"

t.Logf("------------------------------------------------------------------")
t.Logf("%s", name)
t.Logf("------------------------------------------------------------------")
time.Sleep(2 * time.Second)

ctx := context.Background()
clusterUser := cluster.ReserveCluster(t)
config := clusterUser.Config()
Expand Down
Loading

0 comments on commit 936ea11

Please sign in to comment.