diff --git a/pkg/watcher/reconciler/dynamic/dynamic.go b/pkg/watcher/reconciler/dynamic/dynamic.go index 8ebabbac8..8029b85cd 100644 --- a/pkg/watcher/reconciler/dynamic/dynamic.go +++ b/pkg/watcher/reconciler/dynamic/dynamic.go @@ -26,7 +26,6 @@ import ( "github.com/fatih/color" "github.com/jonboulle/clockwork" - "github.com/tektoncd/cli/pkg/cli" "github.com/tektoncd/pipeline/pkg/apis/pipeline" pipelinev1beta1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" "github.com/tektoncd/results/pkg/api/server/v1alpha2/log" @@ -55,6 +54,7 @@ var ( // Reconciler implements common reconciler behavior across different Tekton Run // Object types. type Reconciler struct { + kubernetesClientset kubernetes.Interface resultsClient *results.Client objectClient ObjectClient cfg *reconciler.Config @@ -80,11 +80,12 @@ type IsReadyForDeletion func(ctx context.Context, object results.Object) (bool, type AfterDeletion func(ctx context.Context, object results.Object) error // NewDynamicReconciler creates a new dynamic Reconciler. -func NewDynamicReconciler(rc pb.ResultsClient, lc pb.LogsClient, oc ObjectClient, cfg *reconciler.Config) *Reconciler { +func NewDynamicReconciler(rc pb.ResultsClient, lc pb.LogsClient, oc ObjectClient, kc kubernetes.Interface, cfg *reconciler.Config) *Reconciler { return &Reconciler{ - resultsClient: results.NewClient(rc, lc), - objectClient: oc, - cfg: cfg, + kubernetesClientset: kc, + resultsClient: results.NewClient(rc, lc), + objectClient: oc, + cfg: cfg, // Always true predicate. IsReadyForDeletionFunc: func(ctx context.Context, object results.Object) (bool, error) { return true, nil @@ -357,11 +358,11 @@ func (r *Reconciler) sendLog(ctx context.Context, o results.Object) error { return nil } -func getPodLogs(ctx context.Context, client kubernetes.Interface, ns, pod, container string) ([]byte, error) { +func (r *Reconciler) getPodLogs(ctx context.Context, ns, pod, container string) ([]byte, error) { podLogOpts := corev1.PodLogOptions{ Container: container, } - req := client.CoreV1().Pods(ns).GetLogs(pod, &podLogOpts) + req := r.kubernetesClientset.CoreV1().Pods(ns).GetLogs(pod, &podLogOpts) podLogs, err := req.Stream(ctx) if err != nil { return nil, err @@ -392,18 +393,11 @@ func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, labelKey, inMemWriteBufferStdout := bytes.NewBuffer(make([]byte, 0)) - tknParams := &cli.TektonParams{} - tknParams.SetNamespace(o.GetNamespace()) - k8sClient, err := tknParams.KubeClient() - if err != nil { - return err - } - lo := metav1.ListOptions{ LabelSelector: fmt.Sprintf("%s=%s", labelKey, o.GetName()), } var pods *corev1.PodList - pods, err = k8sClient.CoreV1().Pods(o.GetNamespace()).List(ctx, lo) + pods, err = r.kubernetesClientset.CoreV1().Pods(o.GetNamespace()).List(ctx, lo) if err != nil { return err } @@ -414,7 +408,7 @@ func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, labelKey, copy(containers, pod.Spec.InitContainers) containers = append(containers, pod.Spec.Containers...) for _, container := range containers { - ba, podLogsErr := getPodLogs(ctx, k8sClient, o.GetNamespace(), pod.Name, container.Name) + ba, podLogsErr := r.getPodLogs(ctx, o.GetNamespace(), pod.Name, container.Name) if podLogsErr != nil { return podLogsErr } diff --git a/pkg/watcher/reconciler/dynamic/dynamic_test.go b/pkg/watcher/reconciler/dynamic/dynamic_test.go index 6c3f4f185..14b739881 100644 --- a/pkg/watcher/reconciler/dynamic/dynamic_test.go +++ b/pkg/watcher/reconciler/dynamic/dynamic_test.go @@ -41,6 +41,7 @@ import ( corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + fakekubeclientset "k8s.io/client-go/kubernetes/fake" "knative.dev/pkg/apis" duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1" "knative.dev/pkg/controller" @@ -140,7 +141,8 @@ func TestReconcile_TaskRun(t *testing.T) { RequeueInterval: 1 * time.Second, } - r := NewDynamicReconciler(resultsClient, logsClient, trclient, cfg) + clientset := fakekubeclientset.NewSimpleClientset() + r := NewDynamicReconciler(resultsClient, logsClient, trclient, clientset, cfg) if err := r.Reconcile(ctx, taskrun); err != nil { t.Fatal(err) } @@ -429,7 +431,8 @@ func TestReconcile_PipelineRun(t *testing.T) { t.Fatal(err) } - r := NewDynamicReconciler(resultsClient, logsClient, prclient, nil) + clientset := fakekubeclientset.NewSimpleClientset() + r := NewDynamicReconciler(resultsClient, logsClient, prclient, clientset, nil) if err := r.Reconcile(ctx, pipelinerun); err != nil { t.Fatal(err) } diff --git a/pkg/watcher/reconciler/pipelinerun/controller.go b/pkg/watcher/reconciler/pipelinerun/controller.go index 0b4774250..ab092c8d2 100644 --- a/pkg/watcher/reconciler/pipelinerun/controller.go +++ b/pkg/watcher/reconciler/pipelinerun/controller.go @@ -29,6 +29,7 @@ import ( "github.com/tektoncd/results/pkg/watcher/reconciler/leaderelection" pb "github.com/tektoncd/results/proto/v1alpha2/results_go_proto" "k8s.io/client-go/tools/cache" + kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/controller" "knative.dev/pkg/logging" ) @@ -42,12 +43,14 @@ func NewController(ctx context.Context, resultsClient pb.ResultsClient, cmw conf func NewControllerWithConfig(ctx context.Context, resultsClient pb.ResultsClient, cfg *reconciler.Config, cmw configmap.Watcher) *controller.Impl { pipelineRunInformer := pipelineruninformer.Get(ctx) pipelineRunLister := pipelineRunInformer.Lister() + kubeclientset := kubeclient.Get(ctx) logger := logging.FromContext(ctx) configStore := config.NewStore(logger.Named("config-store"), pipelinerunmetrics.MetricsOnStore(logger)) configStore.WatchConfigs(cmw) c := &Reconciler{ LeaderAwareFuncs: leaderelection.NewLeaderAwareFuncs(pipelineRunLister.List), + kubeClientSet: kubeclientset, resultsClient: resultsClient, logsClient: logs.Get(ctx), pipelineRunLister: pipelineRunLister, diff --git a/pkg/watcher/reconciler/pipelinerun/reconciler.go b/pkg/watcher/reconciler/pipelinerun/reconciler.go index 1d9b905e4..f13a9b488 100644 --- a/pkg/watcher/reconciler/pipelinerun/reconciler.go +++ b/pkg/watcher/reconciler/pipelinerun/reconciler.go @@ -32,6 +32,7 @@ import ( "go.uber.org/zap" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "knative.dev/pkg/controller" "knative.dev/pkg/logging" @@ -43,6 +44,7 @@ type Reconciler struct { // Inline LeaderAwareFuncs to support leader election. knativereconciler.LeaderAwareFuncs + kubeClientSet kubernetes.Interface resultsClient pb.ResultsClient logsClient pb.LogsClient pipelineRunLister pipelinev1beta1listers.PipelineRunLister @@ -86,7 +88,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, key string) error { PipelineRunInterface: r.pipelineClient.TektonV1beta1().PipelineRuns(namespace), } - dyn := dynamic.NewDynamicReconciler(r.resultsClient, r.logsClient, pipelineRunClient, r.cfg) + dyn := dynamic.NewDynamicReconciler(r.resultsClient, r.logsClient, pipelineRunClient, r.kubeClientSet, r.cfg) // Tell the dynamic reconciler to wait until all underlying TaskRuns are // ready for deletion before deleting the PipelineRun. This guarantees // that the TaskRuns will not be deleted before their final state being diff --git a/pkg/watcher/reconciler/taskrun/controller.go b/pkg/watcher/reconciler/taskrun/controller.go index 398987354..34fd179bc 100644 --- a/pkg/watcher/reconciler/taskrun/controller.go +++ b/pkg/watcher/reconciler/taskrun/controller.go @@ -28,6 +28,7 @@ import ( "github.com/tektoncd/results/pkg/watcher/reconciler/leaderelection" pb "github.com/tektoncd/results/proto/v1alpha2/results_go_proto" "k8s.io/client-go/tools/cache" + kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/controller" "knative.dev/pkg/logging" ) @@ -41,12 +42,14 @@ func NewController(ctx context.Context, resultsClient pb.ResultsClient, cmw conf func NewControllerWithConfig(ctx context.Context, resultsClient pb.ResultsClient, cfg *reconciler.Config, cmw configmap.Watcher) *controller.Impl { informer := taskruninformer.Get(ctx) lister := informer.Lister() + kubeclientset := kubeclient.Get(ctx) logger := logging.FromContext(ctx) configStore := config.NewStore(logger.Named("config-store"), taskrunmetrics.MetricsOnStore(logger)) configStore.WatchConfigs(cmw) c := &Reconciler{ LeaderAwareFuncs: leaderelection.NewLeaderAwareFuncs(lister.List), + kubeClientset: kubeclientset, resultsClient: resultsClient, logsClient: logs.Get(ctx), lister: lister, diff --git a/pkg/watcher/reconciler/taskrun/reconciler.go b/pkg/watcher/reconciler/taskrun/reconciler.go index c132c8f2f..079bd5b4d 100644 --- a/pkg/watcher/reconciler/taskrun/reconciler.go +++ b/pkg/watcher/reconciler/taskrun/reconciler.go @@ -20,6 +20,7 @@ import ( "go.uber.org/zap" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "knative.dev/pkg/logging" ) @@ -29,6 +30,7 @@ type Reconciler struct { // Inline LeaderAwareFuncs to support leader election. knativereconciler.LeaderAwareFuncs + kubeClientset kubernetes.Interface resultsClient pb.ResultsClient logsClient pb.LogsClient lister v1beta1.TaskRunLister @@ -71,7 +73,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, key string) error { TaskRunInterface: r.pipelineClient.TektonV1beta1().TaskRuns(namespace), } - dyn := dynamic.NewDynamicReconciler(r.resultsClient, r.logsClient, taskRunClient, r.cfg) + dyn := dynamic.NewDynamicReconciler(r.resultsClient, r.logsClient, taskRunClient, r.kubeClientset, r.cfg) dyn.AfterDeletion = func(ctx context.Context, o results.Object) error { tr := o.(*pipelinev1beta1.TaskRun) return r.metrics.DurationAndCountDeleted(ctx, r.configStore.Load().Metrics, tr)