diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 03f2fa8..fef1991 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -20,6 +20,6 @@ jobs: - name: Upload release binaries uses: alexellis/upload-assets@0.4.0 env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + GITHUB_TOKEN: ${{ secrets.ORG_GITHUB_SCOUTEA_PUBLIC_PAT }} with: asset_paths: '["./bin/*"]' diff --git a/.gitignore b/.gitignore index 8aa2faf..ea5ce42 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,32 @@ vendor bin .config.yml kube-job +coverage/ + +# Created by https://www.toptal.com/developers/gitignore/api/go +# Edit at https://www.toptal.com/developers/gitignore?templates=go + +### Go ### +# If you prefer the allow list template instead of the deny list, see community template: +# https://github.com/github/gitignore/blob/main/community/Golang/Go.AllowList.gitignore +# +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +# vendor/ + +# Go workspace file +go.work + +# End of https://www.toptal.com/developers/gitignore/api/go diff --git a/Makefile b/Makefile index 9b074b8..ab9444c 100644 --- a/Makefile +++ b/Makefile @@ -34,7 +34,9 @@ windows: mod zip $(OUTDIR)/$(OUTPUT)_${VERSION}_windows_amd64.zip $(OUTPUT).exe test: - go test -cover -v ./pkg/... + mkdir -p coverage + go test -coverprofile=coverage/coverage.out -cover -v ./pkg/... + go tool cover -html=coverage.out -o coverage/reports.html e2e: build which kind ginkgo > /dev/null diff --git a/cmd/run.go b/cmd/run.go index ceb0c8e..2fab4af 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -45,8 +45,11 @@ func runJobCmd() *cobra.Command { func (r *runJob) run(cmd *cobra.Command, args []string) { config, verbose := generalConfig() - log.SetLevel(log.DebugLevel) - if !verbose { + if verbose { + log.SetLevel(log.TraceLevel) + log.SetFormatter(&log.JSONFormatter{}) + log.SetReportCaller(true) + } else { log.SetLevel(log.WarnLevel) } if r.cleanup != job.All.String() && r.cleanup != job.Succeeded.String() && r.cleanup != job.Failed.String() { diff --git a/pkg/job/job.go b/pkg/job/job.go index 939d1f4..1687617 100644 --- a/pkg/job/job.go +++ b/pkg/job/job.go @@ -36,7 +36,7 @@ type Job struct { Args []string // Target docker image. Image string - // Target resources. + // Target resources. Resources corev1.ResourceRequirements // Target namespace Namespace string @@ -237,18 +237,49 @@ func (j *Job) WaitJob(ctx context.Context, job *v1.Job, ignoreSidecar bool) erro // If the job is failed, this function returns error. // If the job is succeeded, this function returns nil. func (j *Job) WaitJobComplete(ctx context.Context, job *v1.Job, ignoreSidecar bool) error { + log.WithContext(ctx).WithFields(log.Fields{ + "job": job.Name, + "ignoreSidecar": ignoreSidecar, + }).Debug("WaitJobComplete start") retry: for { time.Sleep(3 * time.Second) running, err := j.client.BatchV1().Jobs(job.Namespace).Get(ctx, job.Name, metav1.GetOptions{}) + runningName := "" + if err == nil { + runningName = running.Name + } + log.WithContext(ctx).WithFields(log.Fields{ + "runningName": runningName, + "err": err, + }).Debug("WaitJobComplete Get") if err != nil { return err } if running.Status.Active == 0 && (running.Status.Succeeded == 1 || running.Status.Failed == 1) { + log.WithContext(ctx).WithFields(log.Fields{ + "running.Name": running.Name, + "jobConditions": checkJobConditions(running.Status.Conditions), + }).Debug("WaitJobComplete finished") return checkJobConditions(running.Status.Conditions) } if ignoreSidecar { + log.WithContext(ctx).WithFields(log.Fields{ + "running.Name": running.Name, + "ignoreSidecar": ignoreSidecar, + }).Debug("WaitJobComplete ignoreSidecar") pods, err := j.FindPods(ctx, running) + podNames := []string{} + if err == nil { + for _, pod := range pods { + podNames = append(podNames, pod.Name) + } + } + log.WithContext(ctx).WithFields(log.Fields{ + "running.Name": running.Name, + "podNames": podNames, + "err": err, + }).Debug("WaitJobComplete FindPods") if err != nil { return err } @@ -291,29 +322,49 @@ func checkJobConditions(conditions []v1.JobCondition) error { // checkPodConditions check all pods related a job. // Returns true, if all containers in the pods which are matched container name is completed. func checkPodConditions(pods []corev1.Pod, containerName string) (bool, error) { + podNames := []string{} results := []bool{} errs := []error{} for _, pod := range pods { if podIncludeContainer(pod, containerName) { finished, err := containerIsCompleted(pod, containerName) + podNames = append(podNames, pod.Name) results = append(results, finished) errs = append(errs, err) } } + log.WithFields(log.Fields{ + "podNames": podNames, + "results": results, + "errs": errs, + }).Debug("checkPodConditions check result") if len(results) == 0 { return false, nil } - for _, r := range results { + for i, r := range results { if !r { + log.WithFields(log.Fields{ + "podName": podNames[i], + "result": r, + "returnValue": false, + }).Debug("checkPodConditions result false") return false, nil } } var err error - for _, e := range errs { + for i, e := range errs { if e != nil { + log.WithFields(log.Fields{ + "podName": podNames[i], + "error": e, + }).Debug("checkPodConditions error") err = e } } + log.WithFields(log.Fields{ + "err": err, + "returnValue": true, + }).Debug("checkPodConditions finished") return true, err } @@ -353,6 +404,10 @@ func (j *Job) Cleanup() error { log.Infof("Removing the job: %s", j.CurrentJob.Name) options := metav1.DeleteOptions{} err := j.client.BatchV1().Jobs(j.CurrentJob.Namespace).Delete(ctx, j.CurrentJob.Name, options) + log.WithContext(ctx).WithFields(log.Fields{ + "jobName": j.CurrentJob.Name, + "err": err, + }).Debug("Delete job") if err != nil { return err } diff --git a/pkg/job/job_test.go b/pkg/job/job_test.go index 32084f4..038ba19 100644 --- a/pkg/job/job_test.go +++ b/pkg/job/job_test.go @@ -117,16 +117,16 @@ func TestRunJob(t *testing.T) { Resources: v1core.ResourceRequirements{ Requests: v1core.ResourceList{ "memory": resource.MustParse("64Mi"), - "cpu": resource.MustParse("250m"), + "cpu": resource.MustParse("250m"), }, Limits: v1core.ResourceList{ "memory": resource.MustParse("128Mi"), - "cpu": resource.MustParse("500m"), + "cpu": resource.MustParse("500m"), }, }, - Namespace: "default", - Container: "alpine", - Timeout: 10 * time.Minute, + Namespace: "default", + Container: "alpine", + Timeout: 10 * time.Minute, client: mockedKubernetes{ mockedBatch: batchV1Mock, }, @@ -195,16 +195,16 @@ func TestWaitJobCompleteWithWaitAll(t *testing.T) { Resources: v1core.ResourceRequirements{ Requests: v1core.ResourceList{ "memory": resource.MustParse("64Mi"), - "cpu": resource.MustParse("250m"), + "cpu": resource.MustParse("250m"), }, Limits: v1core.ResourceList{ "memory": resource.MustParse("128Mi"), - "cpu": resource.MustParse("500m"), + "cpu": resource.MustParse("500m"), }, }, - Namespace: "default", - Container: "alpine", - Timeout: 10 * time.Minute, + Namespace: "default", + Container: "alpine", + Timeout: 10 * time.Minute, client: mockedKubernetes{ mockedBatch: batchV1Mock, }, @@ -298,16 +298,16 @@ func TestWaitJobCompleteForContainer(t *testing.T) { Resources: v1core.ResourceRequirements{ Requests: v1core.ResourceList{ "memory": resource.MustParse("64Mi"), - "cpu": resource.MustParse("250m"), + "cpu": resource.MustParse("250m"), }, Limits: v1core.ResourceList{ "memory": resource.MustParse("128Mi"), - "cpu": resource.MustParse("500m"), + "cpu": resource.MustParse("500m"), }, }, - Namespace: "default", - Container: "alpine", - Timeout: 10 * time.Minute, + Namespace: "default", + Container: "alpine", + Timeout: 10 * time.Minute, client: mockedKubernetes{ mockedBatch: batchV1Mock, mockedCore: coreV1Mock, @@ -557,16 +557,16 @@ func TestRemovePods(t *testing.T) { Resources: v1core.ResourceRequirements{ Requests: v1core.ResourceList{ "memory": resource.MustParse("64Mi"), - "cpu": resource.MustParse("250m"), + "cpu": resource.MustParse("250m"), }, Limits: v1core.ResourceList{ "memory": resource.MustParse("128Mi"), - "cpu": resource.MustParse("500m"), + "cpu": resource.MustParse("500m"), }, }, - Namespace: "default", - Container: "alpine", - Timeout: 10 * time.Minute, + Namespace: "default", + Container: "alpine", + Timeout: 10 * time.Minute, client: mockedKubernetes{ mockedCore: coreV1Mock, }, diff --git a/pkg/job/runner.go b/pkg/job/runner.go index 1a115fd..96c99b8 100644 --- a/pkg/job/runner.go +++ b/pkg/job/runner.go @@ -2,9 +2,10 @@ Package job provides simple functions to run a job on kubernetes. Usage: - import "github.com/h3poteto/kube-job/pkg/job" -Run a job overriding the commands + import "github.com/h3poteto/kube-job/pkg/job" + +# Run a job overriding the commands When you want to run a job on kubernetes, please use this package as follows. @@ -12,37 +13,36 @@ At first, you have to prepare yaml for job, and provide a command to override th For example: - j, err := job.NewJob("$HOME/.kube/config", "job-template.yaml", "echo hoge", "target-container-name", 0 * time.Second) - if err != nil { - return err - } + j, err := job.NewJob("$HOME/.kube/config", "job-template.yaml", "echo hoge", "target-container-name", 0 * time.Second) + if err != nil { + return err + } - // Run the job - running, err := j.RunJob() - if err != nil { - return err - } + // Run the job + running, err := j.RunJob() + if err != nil { + return err + } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - err = j.WaitJob(ctx, running) + err = j.WaitJob(ctx, running) -Polling the logs +# Polling the logs You can polling the logs with stream. For example: - // j is a Job struct - watcher := NewWatcher(j.client, j.Container) - - // running is a batchv1.Job struct - err := watcher.Watch(running, ctx) - if err != nil { - return err - } + // j is a Job struct + watcher := NewWatcher(j.client, j.Container) + // running is a batchv1.Job struct + err := watcher.Watch(running, ctx) + if err != nil { + return err + } */ package job @@ -71,10 +71,17 @@ func (c CleanupType) String() string { // Run a command on kubernetes cluster, and watch the log. func (j *Job) Run(ignoreSidecar bool) error { + log.WithFields(log.Fields{ + "ignoreSidecar": ignoreSidecar, + }).Debug("Run start") if ignoreSidecar { log.Info("Ignore sidecar containers") } running, err := j.RunJob() + log.WithFields(log.Fields{ + "running": running, + "err": err, + }).Debug("RunJob") if err != nil { log.Error(err) return err @@ -82,20 +89,39 @@ func (j *Job) Run(ignoreSidecar bool) error { log.Infof("Starting job: %s", running.Name) ctx, cancel := context.WithCancel(context.Background()) if j.Timeout != 0 { + log.WithContext(ctx).WithFields(log.Fields{ + "timeout": j.Timeout, + }).Debug("Set timeout") ctx, cancel = context.WithTimeout(context.Background(), j.Timeout) } defer cancel() watcher := NewWatcher(j.client, j.Container) + log.WithFields(log.Fields{ + "watcher": watcher, + }).Debug("NewWatcher") + go func() { + log.WithContext(ctx).WithFields(log.Fields{ + "running": running, + }).Debug("Watch start") err := watcher.Watch(running, ctx) + log.WithContext(ctx).WithFields(log.Fields{ + "running": running, + "err": err, + }).Debug("Watch finished") if err != nil { log.Error(err) } }() + log.WithContext(ctx).Debug("WaitJob start") err = j.WaitJob(ctx, running, ignoreSidecar) + log.WithContext(ctx).Debug("WaitJob finished") time.Sleep(10 * time.Second) + log.WithContext(ctx).WithFields(log.Fields{ + "err": err, + }).Debug("Run finished") return err } diff --git a/pkg/job/watcher.go b/pkg/job/watcher.go index 13a66b0..2f37d78 100644 --- a/pkg/job/watcher.go +++ b/pkg/job/watcher.go @@ -2,6 +2,7 @@ package job import ( "context" + "github.com/spf13/viper" "io" "os" "strconv" @@ -42,12 +43,42 @@ func (w *Watcher) Watch(job *v1.Job, ctx context.Context) error { currentPodList := []corev1.Pod{} retry: for { + var currentPodNames []string + for _, pod := range currentPodList { + currentPodNames = append(currentPodNames, pod.Name) + } + log.WithContext(ctx).WithFields(log.Fields{ + "job": job.Name, + "currentPodNames": currentPodNames, + }).Info("Watch continue retry:") + newPodList, err := w.FindPods(ctx, job) + var newPodNames []string + if err == nil { + for _, pod := range newPodList { + newPodNames = append(newPodNames, pod.Name) + } + } + log.WithContext(ctx).WithFields(log.Fields{ + "job": job.Name, + "newPodNames": newPodNames, + "err": err, + }).Debug("FindPods") if err != nil { return err } incrementalPodList := diffPods(currentPodList, newPodList) + var incrementalPodNames []string + for _, pod := range incrementalPodList { + incrementalPodNames = append(incrementalPodNames, pod.Name) + } + log.WithContext(ctx).WithFields(log.Fields{ + "job": job.Name, + "currentPodNames": currentPodNames, + "newPodNames": newPodNames, + "incrementalPodNames": incrementalPodNames, + }).Debug("diffPods") go w.WatchPods(ctx, incrementalPodList) time.Sleep(1 * time.Second) @@ -58,14 +89,28 @@ retry: // WatchPods gets wait to start pod and tail the logs. func (w *Watcher) WatchPods(ctx context.Context, pods []corev1.Pod) error { + log.WithContext(ctx).WithFields(log.Fields{ + "pods": pods, + }).Debug("WatchPods start") + var wg sync.WaitGroup errCh := make(chan error, len(pods)) for _, pod := range pods { + log.WithContext(ctx).WithFields(log.Fields{ + "pod.Name": pod.Name, + }).Debug("range pod") wg.Add(1) go func(p corev1.Pod) { + log.WithContext(ctx).WithFields(log.Fields{ + "pod.Name": p.Name, + }).Debug("go func") defer wg.Done() startedPod, err := w.WaitToStartPod(ctx, p) + log.WithContext(ctx).WithFields(log.Fields{ + "startedPod.Name": startedPod.Name, + "err": err, + }).Debug("WaitToStartPod finished") if err != nil { errCh <- err return @@ -80,7 +125,15 @@ func (w *Watcher) WatchPods(ctx context.Context, pods []corev1.Pod) error { Param("follow", strconv.FormatBool(true)). Param("container", w.Container). Param("timestamps", strconv.FormatBool(false)) + log.WithContext(ctx).WithFields(log.Fields{ + "startedPod.Name": startedPod.Name, + "request": request, + }).Debug("readStreamLog start") err = readStreamLog(ctx, request, startedPod) + log.WithContext(ctx).WithFields(log.Fields{ + "startedPod.Name": startedPod.Name, + "err": err, + }).Debug("readStreamLog finished") errCh <- err }(pod) } @@ -89,10 +142,15 @@ func (w *Watcher) WatchPods(ctx context.Context, pods []corev1.Pod) error { case err := <-errCh: if err != nil { log.Error(err) + log.WithContext(ctx).WithFields(log.Fields{ + "err": err, + }).Debug("WatchPods error") return err } } + log.WithContext(ctx).Debug("WatchPods wait") wg.Wait() + log.WithContext(ctx).Debug("WatchPods finished") return nil } @@ -113,13 +171,26 @@ func (w *Watcher) FindPods(ctx context.Context, job *v1.Job) ([]corev1.Pod, erro // Because the job does not start immediately after call kubernetes API. // So we have to wait to start the pod, before watch logs. func (w *Watcher) WaitToStartPod(ctx context.Context, pod corev1.Pod) (corev1.Pod, error) { + log.WithContext(ctx).WithFields(log.Fields{ + "pod": pod, + }).Debug("WaitToStartPod start") + retry: for { targetPod, err := w.client.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) + log.WithContext(ctx).WithFields(log.Fields{ + "pod.Name": pod.Name, + "err": err, + }).Debug("get pod") if err != nil { return pod, err } + log.WithContext(ctx).WithFields(log.Fields{ + "targetPod.Name": targetPod.Name, + "targetPod.Status.Phase": targetPod.Status.Phase, + }).Debug("pod status") + if !isPendingPod(*targetPod) { return *targetPod, nil } @@ -152,6 +223,18 @@ func readStreamLog(ctx context.Context, request *restclient.Request, pod corev1. return err } defer readCloser.Close() + if viper.GetBool("verbose") { + buf := new(strings.Builder) + _, err := io.Copy(buf, readCloser) + if err != nil { + return err + } + log.WithFields(log.Fields{ + "pod.Name": pod.Name, + "log": buf.String(), + }).Debug("readStreamLog") + return nil + } _, err = io.Copy(os.Stdout, readCloser) return err }