Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions dagger/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,107 @@ spec:

fmt.Printf("reportAllImages test passed - found %d total images (vs %d with filtering)\n", len(allImagesSet), len(imagesSet))

// Create a new namespace and a pod in that namespace, then restart the app and ensure the image is reported
newNs := "dynamic-image-ns"
newPodName := "dynamic-image-pod"
newPodImage := "docker.io/library/busybox:1.35"

// Create namespace
ctr = dag.Container().From("bitnami/kubectl:latest").
WithFile(kubeconfigPath, kubeconfigSource.File("/kubeconfig")).
WithEnvVariable("KUBECONFIG", kubeconfigPath).
With(CacheBustingExec(
[]string{
"kubectl", "create", "namespace", newNs,
}))
out, err = ctr.Stdout(ctx)
if err != nil {
return fmt.Errorf("failed to create namespace %s: %w", newNs, err)
}
fmt.Println(out)

// Create a pod in the new namespace
newNsPodYaml := "apiVersion: v1\n" +
"kind: Pod\n" +
"metadata:\n" +
" name: " + newPodName + "\n" +
" namespace: " + newNs + "\n" +
"spec:\n" +
" containers:\n" +
" - name: busybox\n" +
" image: " + newPodImage + "\n" +
" command: [\"sleep\", \"500d\"]\n"
newNsPodSource := source.WithNewFile("/dynamic-ns-pod.yaml", newNsPodYaml)

ctr = dag.Container().From("bitnami/kubectl:latest").
WithFile(kubeconfigPath, kubeconfigSource.File("/kubeconfig")).
WithEnvVariable("KUBECONFIG", kubeconfigPath).
WithFile("/tmp/dynamic-ns-pod.yaml", newNsPodSource.File("/dynamic-ns-pod.yaml")).
WithExec([]string{"kubectl", "apply", "-f", "/tmp/dynamic-ns-pod.yaml"})
out, err = ctr.Stdout(ctx)
if err != nil {
stderr, _ := ctr.Stderr(ctx)
return fmt.Errorf("failed to apply pod in new namespace: %w\n\nStderr: %s\n\nStdout: %s", err, stderr, out)
}
fmt.Println(out)

// Wait for the pod to be ready
ctr = dag.Container().From("bitnami/kubectl:latest").
WithFile(kubeconfigPath, kubeconfigSource.File("/kubeconfig")).
WithEnvVariable("KUBECONFIG", kubeconfigPath).
WithExec([]string{"kubectl", "wait", "--for=condition=ready", "pod/" + newPodName, "-n", newNs, "--timeout=1m"})
out, err = ctr.Stdout(ctx)
if err != nil {
return fmt.Errorf("failed to wait for pod %s in namespace %s to be ready: %w", newPodName, newNs, err)
}
fmt.Println(out)

// Restart test-chart deployment to force a reporting message
ctr = dag.Container().From("bitnami/kubectl:latest").
WithFile(kubeconfigPath, kubeconfigSource.File("/kubeconfig")).
WithEnvVariable("KUBECONFIG", kubeconfigPath).
With(CacheBustingExec(
[]string{
"kubectl", "rollout", "restart", "deploy/test-chart",
}))
out, err = ctr.Stdout(ctx)
if err != nil {
return fmt.Errorf("failed to restart replicated deployment: %w", err)
}
fmt.Println(out)

// Poll for the new image to appear in the running images list
maxAttempts = 6
retryDelay = 5 * time.Second
for attempt := 1; attempt <= maxAttempts; attempt++ {
var getErr error
allImagesSet, getErr = getRunningImages(ctx, appID, customerID, instanceAppID, tokenPlaintext)
if getErr != nil {
if attempt == maxAttempts {
return fmt.Errorf("failed to get running images after new namespace creation (after %d attempts): %w", maxAttempts, getErr)
}
fmt.Printf("attempt %d/%d: failed to get running images: %v\n", attempt, maxAttempts, getErr)
time.Sleep(retryDelay)
continue
}

if _, ok := allImagesSet[newPodImage]; ok {
fmt.Printf("New namespace image %s detected in running images on attempt %d\n", newPodImage, attempt)
break
}

if attempt == maxAttempts {
seen := make([]string, 0, len(allImagesSet))
for k := range allImagesSet {
seen = append(seen, k)
}
return fmt.Errorf("after creating namespace %s and pod %s, expected image %s not found. Seen: %v", newNs, newPodName, newPodImage, seen)
}

fmt.Printf("attempt %d/%d: image %s not yet reported (retrying)\n", attempt, maxAttempts, newPodImage)
time.Sleep(retryDelay)
}

return nil
}

Expand Down
99 changes: 88 additions & 11 deletions pkg/appstate/appstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ import (
reporttypes "github.com/replicatedhq/replicated-sdk/pkg/report/types"
"github.com/replicatedhq/replicated-sdk/pkg/store"
authv1 "k8s.io/api/authorization/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)
Expand Down Expand Up @@ -223,6 +226,16 @@ func (m *AppMonitor) runInformers(ctx context.Context, informers []types.StatusI
ServiceResourceKind: runServiceController,
StatefulSetResourceKind: runStatefulSetController,
}
for namespace, kinds := range namespaceKinds {
for kind, informers := range kinds {
if impl, ok := kindImpls[kind]; ok {
goRun(impl, namespace, informers)
} else {
log.Printf("Informer requested for unsupported resource kind %v", kind)
}
}
}

// Start a Pod image controller per namespace
// When reportAllImages is true or in embedded cluster, watch all accessible namespaces
sdkStore := store.GetStore()
Expand Down Expand Up @@ -257,20 +270,59 @@ func (m *AppMonitor) runInformers(ctx context.Context, informers []types.StatusI
namespacesToWatch = informerNamespaces
}

// Filter out namespaces we don't have permission to access
for ns := range namespacesToWatch {
if canAccessPodsInNamespace(ctx, m.clientset, ns) {
goRun(runPodImageController, ns, nil)
// Track started pod watchers to avoid duplicates from dynamic namespace events
var startedMu sync.Mutex
startedNamespaces := make(map[string]struct{})

maybeStartPodWatcher := func(ns string) {
if !canAccessPodsInNamespace(ctx, m.clientset, ns) {
return
}
startedMu.Lock()
if _, ok := startedNamespaces[ns]; ok {
startedMu.Unlock()
return
}
startedNamespaces[ns] = struct{}{}
startedMu.Unlock()
goRun(runPodImageController, ns, nil)
}
for namespace, kinds := range namespaceKinds {
for kind, informers := range kinds {
if impl, ok := kindImpls[kind]; ok {
goRun(impl, namespace, informers)
} else {
log.Printf("Informer requested for unsupported resource kind %v", kind)
}

// Start initial pod watchers for namespaces we can access
for ns := range namespacesToWatch {
maybeStartPodWatcher(ns)
}

// If configured to watch all namespaces and we have list+watch perms on namespaces,
// start a namespace informer to dynamically watch new namespaces and spawn pod watchers
if shouldWatchAllNamespaces && canListNamespaces(ctx, m.clientset) && canWatchNamespaces(ctx, m.clientset) {
listwatch := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return m.clientset.CoreV1().Namespaces().List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return m.clientset.CoreV1().Namespaces().Watch(ctx, options)
},
}
informer := cache.NewSharedInformer(
listwatch,
&corev1.Namespace{},
time.Minute,
)
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ns, _ := obj.(*corev1.Namespace)
if ns == nil {
return
}
maybeStartPodWatcher(ns.Name)
},
})
shutdown.Add(1)
go func() {
defer shutdown.Done()
informer.Run(ctx.Done())
}()
}

for {
Expand Down Expand Up @@ -370,3 +422,28 @@ func canListNamespaces(ctx context.Context, clientset kubernetes.Interface) bool

return true
}

// canWatchNamespaces checks if the current service account has permission to watch namespaces
func canWatchNamespaces(ctx context.Context, clientset kubernetes.Interface) bool {
sar := &authv1.SelfSubjectAccessReview{
Spec: authv1.SelfSubjectAccessReviewSpec{
ResourceAttributes: &authv1.ResourceAttributes{
Verb: "watch",
Group: "",
Resource: "namespaces",
},
},
}

result, err := clientset.AuthorizationV1().SelfSubjectAccessReviews().Create(ctx, sar, metav1.CreateOptions{})
if err != nil {
log.Printf("Failed to check namespace watch permission: %v", err)
return false
}

if !result.Status.Allowed {
return false
}

return true
}
4 changes: 2 additions & 2 deletions pkg/appstate/daemonsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ func runDaemonSetController(ctx context.Context, clientset kubernetes.Interface,
) {
listwatch := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return clientset.AppsV1().DaemonSets(targetNamespace).List(context.TODO(), options)
return clientset.AppsV1().DaemonSets(targetNamespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return clientset.AppsV1().DaemonSets(targetNamespace).Watch(context.TODO(), options)
return clientset.AppsV1().DaemonSets(targetNamespace).Watch(ctx, options)
},
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/appstate/deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ func runDeploymentController(
) {
listwatch := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return clientset.AppsV1().Deployments(targetNamespace).List(context.TODO(), options)
return clientset.AppsV1().Deployments(targetNamespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return clientset.AppsV1().Deployments(targetNamespace).Watch(context.TODO(), options)
return clientset.AppsV1().Deployments(targetNamespace).Watch(ctx, options)
},
}
informer := cache.NewSharedInformer(
Expand Down
4 changes: 2 additions & 2 deletions pkg/appstate/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ func runIngressController(
) {
listwatch := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return clientset.NetworkingV1().Ingresses(targetNamespace).List(context.TODO(), options)
return clientset.NetworkingV1().Ingresses(targetNamespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return clientset.NetworkingV1().Ingresses(targetNamespace).Watch(context.TODO(), options)
return clientset.NetworkingV1().Ingresses(targetNamespace).Watch(ctx, options)
},
}
informer := cache.NewSharedInformer(
Expand Down
4 changes: 2 additions & 2 deletions pkg/appstate/persistentvolumeclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ func runPersistentVolumeClaimController(
) {
listwatch := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return clientset.CoreV1().PersistentVolumeClaims(targetNamespace).List(context.TODO(), options)
return clientset.CoreV1().PersistentVolumeClaims(targetNamespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return clientset.CoreV1().PersistentVolumeClaims(targetNamespace).Watch(context.TODO(), options)
return clientset.CoreV1().PersistentVolumeClaims(targetNamespace).Watch(ctx, options)
},
}
informer := cache.NewSharedInformer(
Expand Down
4 changes: 2 additions & 2 deletions pkg/appstate/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
func runPodImageController(ctx context.Context, clientset kubernetes.Interface, targetNamespace string, _ []appstatetypes.StatusInformer, _ chan<- appstatetypes.ResourceState) {
listwatch := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return clientset.CoreV1().Pods(targetNamespace).List(context.TODO(), options)
return clientset.CoreV1().Pods(targetNamespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return clientset.CoreV1().Pods(targetNamespace).Watch(context.TODO(), options)
return clientset.CoreV1().Pods(targetNamespace).Watch(ctx, options)
},
}
informer := cache.NewSharedInformer(
Expand Down
4 changes: 2 additions & 2 deletions pkg/appstate/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ func runServiceController(
) {
listwatch := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return clientset.CoreV1().Services(targetNamespace).List(context.TODO(), options)
return clientset.CoreV1().Services(targetNamespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return clientset.CoreV1().Services(targetNamespace).Watch(context.TODO(), options)
return clientset.CoreV1().Services(targetNamespace).Watch(ctx, options)
},
}
informer := cache.NewSharedInformer(
Expand Down
4 changes: 2 additions & 2 deletions pkg/appstate/statefulsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ func runStatefulSetController(
) {
listwatch := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return clientset.AppsV1().StatefulSets(targetNamespace).List(context.TODO(), options)
return clientset.AppsV1().StatefulSets(targetNamespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return clientset.AppsV1().StatefulSets(targetNamespace).Watch(context.TODO(), options)
return clientset.AppsV1().StatefulSets(targetNamespace).Watch(ctx, options)
},
}
informer := cache.NewSharedInformer(
Expand Down
Loading