Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement resource pruning in k8s plugin #5455

Merged
merged 27 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
3f1509a
Add labels for application tracking in Kubernetes manifests
Warashi Dec 24, 2024
e6f5bc9
Add IsManagedByPiped method
Warashi Dec 25, 2024
b150959
Remove Namespace method from ResourceKey
Warashi Dec 25, 2024
9c5d959
Remove APIVersion method from ResourceKey
Warashi Dec 25, 2024
9aea549
Refactor manifest handling to use Name and Kind methods instead of Key
Warashi Dec 25, 2024
5134edb
Change apiVersion and kind to groupKind
Warashi Dec 25, 2024
f8f0c7e
Implement pruning resources
Warashi Dec 26, 2024
b9879ce
Implement resource pruning in k8s_sync
Warashi Dec 26, 2024
bb444a7
Fix the pruning in the case namespace changes
Warashi Dec 26, 2024
54309c9
Fix pruning the cluster-scoped resources
Warashi Dec 26, 2024
b849517
Enable parallel execution for deployment service tests
Warashi Dec 26, 2024
3ed4320
Add documentation for Kubernetes resource retrieval functions
Warashi Dec 26, 2024
d1e0236
Add license header to resource_test.go
Warashi Dec 26, 2024
85ba187
Fix the LoadManifests
Warashi Dec 26, 2024
c2a747f
Rename getAPIResources to getClusterScopedAPIResources for clarity
Warashi Dec 27, 2024
48c0fb3
Refactor manifest key normalization
Warashi Dec 27, 2024
6fb0aab
Add comments to ResourceKey
Warashi Dec 27, 2024
69ac113
Refactor to use normalized key only to compare
Warashi Dec 27, 2024
6392bca
Rename variable clusterLiveResources to clusterScopedLiveResources
Warashi Dec 27, 2024
e1dec97
Rename keys to normalizedKeys and add comments
Warashi Dec 27, 2024
bcfd121
Refactor to remove normalizeGroupKinds
Warashi Dec 27, 2024
2e7b2f5
Fix to pass tests
Warashi Dec 27, 2024
f47da4b
Update comments to clarify source of application config in tests
Warashi Jan 6, 2025
b12e317
Add assertions to check for NotFound errors in deployment service tests
Warashi Jan 6, 2025
d75fd94
Add resource-key assertions to deployment service tests
Warashi Jan 6, 2025
34c7807
Use subtest to prepare e2e tests
Warashi Jan 6, 2025
40af37c
Add v1beta1 version for CronTab and update tests for pruning behavior
Warashi Jan 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions pkg/app/pipedv1/plugin/kubernetes/config/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ type KubernetesApplicationSpec struct {
// Input for Kubernetes deployment such as kubectl version, helm version, manifests filter...
Input KubernetesDeploymentInput `json:"input"`

// Configuration for quick sync.
QuickSync K8sSyncStageOptions `json:"quickSync"`

// Which resources should be considered as the Workload of application.
// Empty means all Deployments.
// e.g.
Expand Down Expand Up @@ -100,6 +103,14 @@ type KubernetesDeployTargetConfig struct {
KubectlVersion string `json:"kubectlVersion"`
}

// K8sSyncStageOptions contains all configurable values for a K8S_SYNC stage.
type K8sSyncStageOptions struct {
// Whether the PRIMARY variant label should be added to manifests if they were missing.
AddVariantLabelToSelector bool `json:"addVariantLabelToSelector"`
// Whether the resources that are no longer defined in Git should be removed or not.
Prune bool `json:"prune"`
}

// FindDeployTarget finds the deploy target configuration by the given name.
func FindDeployTarget(cfg *config.PipedPlugin, name string) (KubernetesDeployTargetConfig, error) {
if cfg == nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/app/pipedv1/plugin/kubernetes/deployment/annotate.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ func annotateConfigHash(manifests []provider.Manifest) error {
secrets := make(map[string]provider.Manifest)
for _, m := range manifests {
if m.IsConfigMap() {
configMaps[m.Key().Name()] = m
configMaps[m.Name()] = m
continue
}
if m.IsSecret() {
secrets[m.Key().Name()] = m
secrets[m.Name()] = m
}
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/app/pipedv1/plugin/kubernetes/deployment/determine.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ func determineVersions(manifests []provider.Manifest) ([]*model.ArtifactVersion,
func findManifests(kind, name string, manifests []provider.Manifest) []provider.Manifest {
out := make([]provider.Manifest, 0, len(manifests))
for _, m := range manifests {
if m.Key().Kind() != kind {
if m.Kind() != kind {
continue
}
if name != "" && m.Key().Name() != name {
if name != "" && m.Name() != name {
continue
}
out = append(out, m)
Expand Down Expand Up @@ -186,7 +186,7 @@ func determineStrategy(olds, news []provider.Manifest, workloadRefs []config.K8s
if msg, changed := checkImageChange(templateDiffs); changed {
return model.SyncStrategy_PIPELINE, msg
}
return model.SyncStrategy_PIPELINE, fmt.Sprintf("Sync progressively because pod template of workload %s was changed", w.New.Key().Name())
return model.SyncStrategy_PIPELINE, fmt.Sprintf("Sync progressively because pod template of workload %s was changed", w.New.Name())
}
}

Expand All @@ -203,14 +203,14 @@ func determineStrategy(olds, news []provider.Manifest, workloadRefs []config.K8s
for k, oc := range oldConfigs {
nc, ok := newConfigs[k]
if !ok {
return model.SyncStrategy_PIPELINE, fmt.Sprintf("Sync progressively because %s %s was deleted", oc.Key().Kind(), oc.Key().Name())
return model.SyncStrategy_PIPELINE, fmt.Sprintf("Sync progressively because %s %s was deleted", oc.Kind(), oc.Name())
}
result, err := provider.Diff(oc, nc, logger)
if err != nil {
return model.SyncStrategy_PIPELINE, fmt.Sprintf("Sync progressively due to an error while calculating the diff (%v)", err)
}
if result.HasDiff() {
return model.SyncStrategy_PIPELINE, fmt.Sprintf("Sync progressively because %s %s was updated", oc.Key().Kind(), oc.Key().Name())
return model.SyncStrategy_PIPELINE, fmt.Sprintf("Sync progressively because %s %s was updated", oc.Kind(), oc.Name())
}
}

Expand Down
77 changes: 75 additions & 2 deletions pkg/app/pipedv1/plugin/kubernetes/deployment/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package deployment
import (
"cmp"
"context"
"errors"
"fmt"
"time"

kubeconfig "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes/config"
Expand Down Expand Up @@ -266,6 +268,9 @@ func (a *DeploymentService) executeK8sSyncStage(ctx context.Context, lp logpersi

// Add variant annotations to all manifests.
for i := range manifests {
manifests[i].AddLabels(map[string]string{
variantLabel: primaryVariant,
})
manifests[i].AddAnnotations(map[string]string{
variantLabel: primaryVariant,
})
Expand All @@ -290,17 +295,85 @@ func (a *DeploymentService) executeK8sSyncStage(ctx context.Context, lp logpersi
return model.StageStatus_STAGE_FAILURE
}

// Create the kubectl wrapper for the target cluster.
kubectl := provider.NewKubectl(kubectlPath)

// Create the applier for the target cluster.
applier := provider.NewApplier(provider.NewKubectl(kubectlPath), cfg.Spec.Input, deployTargetConfig, a.logger)
applier := provider.NewApplier(kubectl, cfg.Spec.Input, deployTargetConfig, a.logger)

// Start applying all manifests to add or update running resources.
if err := applyManifests(ctx, applier, manifests, cfg.Spec.Input.Namespace, lp); err != nil {
lp.Errorf("Failed while applying manifests (%v)", err)
return model.StageStatus_STAGE_FAILURE
}

// TODO: implement prune resources
// TODO: treat the stage options specified under "with"
if !cfg.Spec.QuickSync.Prune {
lp.Info("Resource GC was skipped because sync.prune was not configured")
return model.StageStatus_STAGE_SUCCESS
}

// Wait for all applied manifests to be stable.
// In theory, we don't need to wait for them to be stable before going to the next step
// but waiting for a while reduces the number of Kubernetes changes in a short time.
lp.Info("Waiting for the applied manifests to be stable")
select {
case <-time.After(15 * time.Second):
break
case <-ctx.Done():
break
}

lp.Info("Start finding all running resources but no longer defined in Git")

namespacedLiveResources, err := kubectl.GetAll(ctx, deployTargetConfig.KubeConfigPath,
"",
fmt.Sprintf("%s=%s", provider.LabelManagedBy, provider.ManagedByPiped),
fmt.Sprintf("%s=%s", provider.LabelApplication, input.GetDeployment().GetApplicationId()),
)
if err != nil {
lp.Errorf("Failed while listing all resources (%v)", err)
return model.StageStatus_STAGE_FAILURE
}

clusterLiveResources, err := kubectl.GetAllClusterScoped(ctx, deployTargetConfig.KubeConfigPath,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] clusterLiveResources -> clusterScopedLiveResources

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

applied on this commit
6392bca

fmt.Sprintf("%s=%s", provider.LabelManagedBy, provider.ManagedByPiped),
fmt.Sprintf("%s=%s", provider.LabelApplication, input.GetDeployment().GetApplicationId()),
)
if err != nil {
lp.Errorf("Failed while listing all cluster-scoped resources (%v)", err)
return model.StageStatus_STAGE_FAILURE
}

if len(namespacedLiveResources)+len(clusterLiveResources) == 0 {
lp.Info("There is no data about live resource so no resource will be removed")
return model.StageStatus_STAGE_SUCCESS
}

lp.Successf("Successfully loaded %d live resources", len(namespacedLiveResources)+len(clusterLiveResources))

removeKeys := provider.FindRemoveResources(manifests, namespacedLiveResources, clusterLiveResources)
if len(removeKeys) == 0 {
lp.Info("There are no live resources should be removed")
return model.StageStatus_STAGE_SUCCESS
}

lp.Infof("Start pruning %d resources", len(removeKeys))
var deletedCount int
for _, key := range removeKeys {
if err := kubectl.Delete(ctx, deployTargetConfig.KubeConfigPath, key.Namespace(), key); err != nil {
if errors.Is(err, provider.ErrNotFound) {
lp.Infof("Specified resource does not exist, so skip deleting the resource: %s (%v)", key.ReadableString(), err)
continue
}
lp.Errorf("Failed while deleting resource %s (%v)", key.ReadableString(), err)
continue // continue to delete other resources
}
deletedCount++
lp.Successf("- deleted resource: %s", key.ReadableString())
}

lp.Successf("Successfully deleted %d resources", deletedCount)
return model.StageStatus_STAGE_SUCCESS
}

Expand Down
Loading
Loading