Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement resource pruning in k8s plugin #5455

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
3f1509a
Add labels for application tracking in Kubernetes manifests
Warashi Dec 24, 2024
e6f5bc9
Add IsManagedByPiped method
Warashi Dec 25, 2024
b150959
Remove Namespace method from ResourceKey
Warashi Dec 25, 2024
9c5d959
Remove APIVersion method from ResourceKey
Warashi Dec 25, 2024
9aea549
Refactor manifest handling to use Name and Kind methods instead of Key
Warashi Dec 25, 2024
5134edb
Change apiVersion and kind to groupKind
Warashi Dec 25, 2024
f8f0c7e
Implement pruning resources
Warashi Dec 26, 2024
b9879ce
Implement resource pruning in k8s_sync
Warashi Dec 26, 2024
bb444a7
Fix the pruning in the case namespace changes
Warashi Dec 26, 2024
54309c9
Fix pruning the cluster-scoped resources
Warashi Dec 26, 2024
b849517
Enable parallel execution for deployment service tests
Warashi Dec 26, 2024
3ed4320
Add documentation for Kubernetes resource retrieval functions
Warashi Dec 26, 2024
d1e0236
Add license header to resource_test.go
Warashi Dec 26, 2024
85ba187
Fix the LoadManifests
Warashi Dec 26, 2024
c2a747f
Rename getAPIResources to getClusterScopedAPIResources for clarity
Warashi Dec 27, 2024
48c0fb3
Refactor manifest key normalization
Warashi Dec 27, 2024
6fb0aab
Add comments to ResourceKey
Warashi Dec 27, 2024
69ac113
Refactor to use normalized key only to compare
Warashi Dec 27, 2024
6392bca
Rename variable clusterLiveResources to clusterScopedLiveResources
Warashi Dec 27, 2024
e1dec97
Rename keys to normalizedKeys and add comments
Warashi Dec 27, 2024
bcfd121
Refactor to remove normalizeGroupKinds
Warashi Dec 27, 2024
2e7b2f5
Fix to pass tests
Warashi Dec 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions pkg/app/pipedv1/plugin/kubernetes/config/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ type KubernetesApplicationSpec struct {
// Input for Kubernetes deployment such as kubectl version, helm version, manifests filter...
Input KubernetesDeploymentInput `json:"input"`

// Configuration for quick sync.
QuickSync K8sSyncStageOptions `json:"quickSync"`

// Which resources should be considered as the Workload of application.
// Empty means all Deployments.
// e.g.
Expand Down Expand Up @@ -100,6 +103,14 @@ type KubernetesDeployTargetConfig struct {
KubectlVersion string `json:"kubectlVersion"`
}

// K8sSyncStageOptions contains all configurable values for a K8S_SYNC stage.
type K8sSyncStageOptions struct {
// Whether the PRIMARY variant label should be added to manifests if they were missing.
AddVariantLabelToSelector bool `json:"addVariantLabelToSelector"`
// Whether the resources that are no longer defined in Git should be removed or not.
Prune bool `json:"prune"`
}

// FindDeployTarget finds the deploy target configuration by the given name.
func FindDeployTarget(cfg *config.PipedPlugin, name string) (KubernetesDeployTargetConfig, error) {
if cfg == nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/app/pipedv1/plugin/kubernetes/deployment/annotate.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ func annotateConfigHash(manifests []provider.Manifest) error {
secrets := make(map[string]provider.Manifest)
for _, m := range manifests {
if m.IsConfigMap() {
configMaps[m.Key().Name()] = m
configMaps[m.Name()] = m
continue
}
if m.IsSecret() {
secrets[m.Key().Name()] = m
secrets[m.Name()] = m
}
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/app/pipedv1/plugin/kubernetes/deployment/determine.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ func determineVersions(manifests []provider.Manifest) ([]*model.ArtifactVersion,
func findManifests(kind, name string, manifests []provider.Manifest) []provider.Manifest {
out := make([]provider.Manifest, 0, len(manifests))
for _, m := range manifests {
if m.Key().Kind() != kind {
if m.Kind() != kind {
continue
}
if name != "" && m.Key().Name() != name {
if name != "" && m.Name() != name {
continue
}
out = append(out, m)
Expand Down Expand Up @@ -186,7 +186,7 @@ func determineStrategy(olds, news []provider.Manifest, workloadRefs []config.K8s
if msg, changed := checkImageChange(templateDiffs); changed {
return model.SyncStrategy_PIPELINE, msg
}
return model.SyncStrategy_PIPELINE, fmt.Sprintf("Sync progressively because pod template of workload %s was changed", w.New.Key().Name())
return model.SyncStrategy_PIPELINE, fmt.Sprintf("Sync progressively because pod template of workload %s was changed", w.New.Name())
}
}

Expand All @@ -203,14 +203,14 @@ func determineStrategy(olds, news []provider.Manifest, workloadRefs []config.K8s
for k, oc := range oldConfigs {
nc, ok := newConfigs[k]
if !ok {
return model.SyncStrategy_PIPELINE, fmt.Sprintf("Sync progressively because %s %s was deleted", oc.Key().Kind(), oc.Key().Name())
return model.SyncStrategy_PIPELINE, fmt.Sprintf("Sync progressively because %s %s was deleted", oc.Kind(), oc.Name())
}
result, err := provider.Diff(oc, nc, logger)
if err != nil {
return model.SyncStrategy_PIPELINE, fmt.Sprintf("Sync progressively due to an error while calculating the diff (%v)", err)
}
if result.HasDiff() {
return model.SyncStrategy_PIPELINE, fmt.Sprintf("Sync progressively because %s %s was updated", oc.Key().Kind(), oc.Key().Name())
return model.SyncStrategy_PIPELINE, fmt.Sprintf("Sync progressively because %s %s was updated", oc.Kind(), oc.Name())
}
}

Expand Down
77 changes: 75 additions & 2 deletions pkg/app/pipedv1/plugin/kubernetes/deployment/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import (
"cmp"
"context"
"errors"
"fmt"
"time"

kubeconfig "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes/config"
Expand Down Expand Up @@ -266,6 +268,9 @@

// Add variant annotations to all manifests.
for i := range manifests {
manifests[i].AddLabels(map[string]string{
variantLabel: primaryVariant,
})
manifests[i].AddAnnotations(map[string]string{
variantLabel: primaryVariant,
})
Expand All @@ -290,17 +295,85 @@
return model.StageStatus_STAGE_FAILURE
}

// Create the kubectl wrapper for the target cluster.
kubectl := provider.NewKubectl(kubectlPath)

// Create the applier for the target cluster.
applier := provider.NewApplier(provider.NewKubectl(kubectlPath), cfg.Spec.Input, deployTargetConfig, a.logger)
applier := provider.NewApplier(kubectl, cfg.Spec.Input, deployTargetConfig, a.logger)

// Start applying all manifests to add or update running resources.
if err := applyManifests(ctx, applier, manifests, cfg.Spec.Input.Namespace, lp); err != nil {
lp.Errorf("Failed while applying manifests (%v)", err)
return model.StageStatus_STAGE_FAILURE
}

// TODO: implement prune resources
// TODO: treat the stage options specified under "with"
if !cfg.Spec.QuickSync.Prune {
lp.Info("Resource GC was skipped because sync.prune was not configured")
return model.StageStatus_STAGE_SUCCESS
}

// Wait for all applied manifests to be stable.
// In theory, we don't need to wait for them to be stable before going to the next step
// but waiting for a while reduces the number of Kubernetes changes in a short time.
lp.Info("Waiting for the applied manifests to be stable")
select {
case <-time.After(15 * time.Second):
break
case <-ctx.Done():
break

Check warning on line 324 in pkg/app/pipedv1/plugin/kubernetes/deployment/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/plugin/kubernetes/deployment/server.go#L323-L324

Added lines #L323 - L324 were not covered by tests
}

lp.Info("Start finding all running resources but no longer defined in Git")

namespacedLiveResources, err := kubectl.GetAll(ctx, deployTargetConfig.KubeConfigPath,
"",
fmt.Sprintf("%s=%s", provider.LabelManagedBy, provider.ManagedByPiped),
fmt.Sprintf("%s=%s", provider.LabelApplication, input.GetDeployment().GetApplicationId()),
)
if err != nil {
lp.Errorf("Failed while listing all resources (%v)", err)
return model.StageStatus_STAGE_FAILURE
}

Check warning on line 337 in pkg/app/pipedv1/plugin/kubernetes/deployment/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/plugin/kubernetes/deployment/server.go#L335-L337

Added lines #L335 - L337 were not covered by tests

clusterScopedLiveResources, err := kubectl.GetAllClusterScoped(ctx, deployTargetConfig.KubeConfigPath,
fmt.Sprintf("%s=%s", provider.LabelManagedBy, provider.ManagedByPiped),
fmt.Sprintf("%s=%s", provider.LabelApplication, input.GetDeployment().GetApplicationId()),
)
if err != nil {
lp.Errorf("Failed while listing all cluster-scoped resources (%v)", err)
return model.StageStatus_STAGE_FAILURE
}

Check warning on line 346 in pkg/app/pipedv1/plugin/kubernetes/deployment/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/plugin/kubernetes/deployment/server.go#L344-L346

Added lines #L344 - L346 were not covered by tests

if len(namespacedLiveResources)+len(clusterScopedLiveResources) == 0 {
lp.Info("There is no data about live resource so no resource will be removed")
return model.StageStatus_STAGE_SUCCESS
}

Check warning on line 351 in pkg/app/pipedv1/plugin/kubernetes/deployment/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/plugin/kubernetes/deployment/server.go#L349-L351

Added lines #L349 - L351 were not covered by tests

lp.Successf("Successfully loaded %d live resources", len(namespacedLiveResources)+len(clusterScopedLiveResources))

removeKeys := provider.FindRemoveResources(manifests, namespacedLiveResources, clusterScopedLiveResources)
if len(removeKeys) == 0 {
lp.Info("There are no live resources should be removed")
return model.StageStatus_STAGE_SUCCESS
}

lp.Infof("Start pruning %d resources", len(removeKeys))
var deletedCount int
for _, key := range removeKeys {
if err := kubectl.Delete(ctx, deployTargetConfig.KubeConfigPath, key.Namespace(), key); err != nil {
if errors.Is(err, provider.ErrNotFound) {
lp.Infof("Specified resource does not exist, so skip deleting the resource: %s (%v)", key.ReadableString(), err)
continue

Check warning on line 367 in pkg/app/pipedv1/plugin/kubernetes/deployment/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/plugin/kubernetes/deployment/server.go#L365-L367

Added lines #L365 - L367 were not covered by tests
}
lp.Errorf("Failed while deleting resource %s (%v)", key.ReadableString(), err)
continue // continue to delete other resources

Check warning on line 370 in pkg/app/pipedv1/plugin/kubernetes/deployment/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/plugin/kubernetes/deployment/server.go#L369-L370

Added lines #L369 - L370 were not covered by tests
}
deletedCount++
lp.Successf("- deleted resource: %s", key.ReadableString())
}

lp.Successf("Successfully deleted %d resources", deletedCount)
return model.StageStatus_STAGE_SUCCESS
}

Expand Down
Loading
Loading