Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix lint error #238

Merged
merged 1 commit into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/reviewdog.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ jobs:
- uses: actions/checkout@v4
- name: golangci-lint
uses: reviewdog/action-golangci-lint@v2
with:
golangci_lint_flags: --timeout=10m
Comment on lines +13 to +14
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added because the following error occurred

level=error msg="Timeout exceeded: try increasing it by passing --timeout option"

4 changes: 2 additions & 2 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ func init() {
cobra.OnInitialize()
RootCmd.PersistentFlags().StringP("config", "", "", "Kubernetes config file path (If you don't set it, use environment variables `KUBECONFIG`)")
RootCmd.PersistentFlags().BoolP("verbose", "v", false, "Enable verbose mode")
viper.BindPFlag("config", RootCmd.PersistentFlags().Lookup("config"))
viper.BindPFlag("verbose", RootCmd.PersistentFlags().Lookup("verbose"))
_ = viper.BindPFlag("config", RootCmd.PersistentFlags().Lookup("config"))
_ = viper.BindPFlag("verbose", RootCmd.PersistentFlags().Lookup("verbose"))

RootCmd.AddCommand(
runJobCmd(),
Expand Down
12 changes: 6 additions & 6 deletions e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ var _ = Describe("E2E", func() {

func waitUntilReady(ctx context.Context, client *kubernetes.Clientset) error {
klog.Info("Waiting until kubernetes cluster is ready")
err := wait.Poll(10*time.Second, 10*time.Minute, func() (bool, error) {
err := wait.PollUntilContextTimeout(ctx, 10*time.Second, 10*time.Minute, true, wait.ConditionWithContextFunc(func(ctx context.Context) (bool, error) {
nodeList, err := client.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return false, fmt.Errorf("failed to list nodes: %v", err)
Expand All @@ -185,7 +185,7 @@ func waitUntilReady(ctx context.Context, client *kubernetes.Clientset) error {
}
klog.Info("all nodes are ready")
return true, nil
})
}))
return err
}

Expand All @@ -208,7 +208,7 @@ func cleanup(ctx context.Context, client *kubernetes.Clientset) error {
}

func cleanupJobs(ctx context.Context, client *kubernetes.Clientset) error {
return wait.PollImmediate(3*time.Second, 1*time.Minute, func() (bool, error) {
return wait.PollUntilContextTimeout(ctx, 3*time.Second, 1*time.Minute, true, wait.ConditionWithContextFunc(func(ctx context.Context) (bool, error) {
jobList, err := client.BatchV1().Jobs(corev1.NamespaceAll).List(ctx, metav1.ListOptions{
LabelSelector: "app=example-job",
})
Expand All @@ -234,11 +234,11 @@ func cleanupJobs(ctx context.Context, client *kubernetes.Clientset) error {
}
}
return false, nil
})
}))
}

func cleanupPods(ctx context.Context, client *kubernetes.Clientset) error {
return wait.PollImmediate(3*time.Second, 1*time.Minute, func() (bool, error) {
return wait.PollUntilContextTimeout(ctx, 3*time.Second, 1*time.Minute, true, wait.ConditionWithContextFunc(func(ctx context.Context) (bool, error) {
podList, err := client.CoreV1().Pods(corev1.NamespaceAll).List(ctx, metav1.ListOptions{
LabelSelector: "app=example",
})
Expand All @@ -263,5 +263,5 @@ func cleanupPods(ctx context.Context, client *kubernetes.Clientset) error {
}
}
return false, nil
})
}))
}
5 changes: 2 additions & 3 deletions pkg/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"math"
"net/http"
"os"
Expand Down Expand Up @@ -36,7 +35,7 @@ type Job struct {
Args []string
// Target docker image.
Image string
// Target resources.
// Target resources.
Resources corev1.ResourceRequirements
// Target namespace
Namespace string
Expand Down Expand Up @@ -72,7 +71,7 @@ func NewJob(configFile, currentFile, command, image, resources, namespace, conta
if err != nil {
return nil, err
}
bytes, err := ioutil.ReadFile(downloaded)
bytes, err := os.ReadFile(downloaded)
if err != nil {
return nil, err
}
Expand Down
44 changes: 22 additions & 22 deletions pkg/job/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package job

import (
"context"
"io/ioutil"
"os"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -117,16 +117,16 @@ func TestRunJob(t *testing.T) {
Resources: v1core.ResourceRequirements{
Requests: v1core.ResourceList{
"memory": resource.MustParse("64Mi"),
"cpu": resource.MustParse("250m"),
"cpu": resource.MustParse("250m"),
},
Limits: v1core.ResourceList{
"memory": resource.MustParse("128Mi"),
"cpu": resource.MustParse("500m"),
"cpu": resource.MustParse("500m"),
},
},
Namespace: "default",
Container: "alpine",
Timeout: 10 * time.Minute,
Namespace: "default",
Container: "alpine",
Timeout: 10 * time.Minute,
client: mockedKubernetes{
mockedBatch: batchV1Mock,
},
Expand Down Expand Up @@ -195,16 +195,16 @@ func TestWaitJobCompleteWithWaitAll(t *testing.T) {
Resources: v1core.ResourceRequirements{
Requests: v1core.ResourceList{
"memory": resource.MustParse("64Mi"),
"cpu": resource.MustParse("250m"),
"cpu": resource.MustParse("250m"),
},
Limits: v1core.ResourceList{
"memory": resource.MustParse("128Mi"),
"cpu": resource.MustParse("500m"),
"cpu": resource.MustParse("500m"),
},
},
Namespace: "default",
Container: "alpine",
Timeout: 10 * time.Minute,
Namespace: "default",
Container: "alpine",
Timeout: 10 * time.Minute,
client: mockedKubernetes{
mockedBatch: batchV1Mock,
},
Expand Down Expand Up @@ -298,16 +298,16 @@ func TestWaitJobCompleteForContainer(t *testing.T) {
Resources: v1core.ResourceRequirements{
Requests: v1core.ResourceList{
"memory": resource.MustParse("64Mi"),
"cpu": resource.MustParse("250m"),
"cpu": resource.MustParse("250m"),
},
Limits: v1core.ResourceList{
"memory": resource.MustParse("128Mi"),
"cpu": resource.MustParse("500m"),
"cpu": resource.MustParse("500m"),
},
},
Namespace: "default",
Container: "alpine",
Timeout: 10 * time.Minute,
Namespace: "default",
Container: "alpine",
Timeout: 10 * time.Minute,
client: mockedKubernetes{
mockedBatch: batchV1Mock,
mockedCore: coreV1Mock,
Expand Down Expand Up @@ -526,7 +526,7 @@ func TestCompleteTargetContainer(t *testing.T) {
}

func readJobFromFile(file string) (*v1.Job, error) {
bytes, err := ioutil.ReadFile(file)
bytes, err := os.ReadFile(file)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -557,16 +557,16 @@ func TestRemovePods(t *testing.T) {
Resources: v1core.ResourceRequirements{
Requests: v1core.ResourceList{
"memory": resource.MustParse("64Mi"),
"cpu": resource.MustParse("250m"),
"cpu": resource.MustParse("250m"),
},
Limits: v1core.ResourceList{
"memory": resource.MustParse("128Mi"),
"cpu": resource.MustParse("500m"),
"cpu": resource.MustParse("500m"),
},
},
Namespace: "default",
Container: "alpine",
Timeout: 10 * time.Minute,
Namespace: "default",
Container: "alpine",
Timeout: 10 * time.Minute,
client: mockedKubernetes{
mockedCore: coreV1Mock,
},
Expand Down
46 changes: 23 additions & 23 deletions pkg/job/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,47 @@
Package job provides simple functions to run a job on kubernetes.
Usage:
import "github.com/h3poteto/kube-job/pkg/job"
Run a job overriding the commands
import "github.com/h3poteto/kube-job/pkg/job"
# Run a job overriding the commands
When you want to run a job on kubernetes, please use this package as follows.
At first, you have to prepare yaml for job, and provide a command to override the yaml.
For example:
j, err := job.NewJob("$HOME/.kube/config", "job-template.yaml", "echo hoge", "target-container-name", 0 * time.Second)
if err != nil {
return err
}
j, err := job.NewJob("$HOME/.kube/config", "job-template.yaml", "echo hoge", "target-container-name", 0 * time.Second)
if err != nil {
return err
}
// Run the job
running, err := j.RunJob()
if err != nil {
return err
}
// Run the job
running, err := j.RunJob()
if err != nil {
return err
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err = j.WaitJob(ctx, running)
err = j.WaitJob(ctx, running)
Polling the logs
# Polling the logs
You can polling the logs with stream.
For example:
// j is a Job struct
watcher := NewWatcher(j.client, j.Container)
// running is a batchv1.Job struct
err := watcher.Watch(running, ctx)
if err != nil {
return err
}
// j is a Job struct
watcher := NewWatcher(j.client, j.Container)
// running is a batchv1.Job struct
err := watcher.Watch(running, ctx)
if err != nil {
return err
}
*/
package job

Expand Down
32 changes: 18 additions & 14 deletions pkg/job/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func NewWatcher(client kubernetes.Interface, container string) *Watcher {
// And it isn't necessary to stop the loop because the Job is watched in WaitJobComplete.
func (w *Watcher) Watch(job *v1.Job, ctx context.Context) error {
currentPodList := []corev1.Pod{}
errCh := make(chan error, 1)
retry:
for {
newPodList, err := w.FindPods(ctx, job)
Expand All @@ -48,11 +49,20 @@ retry:
}

incrementalPodList := diffPods(currentPodList, newPodList)
go w.WatchPods(ctx, incrementalPodList)

time.Sleep(1 * time.Second)
currentPodList = newPodList
continue retry
go func() {
if err := w.WatchPods(ctx, incrementalPodList); err != nil {
errCh <- err
}
}()

select {
case err := <-errCh:
return err
case <-time.After(1 * time.Second):
currentPodList = newPodList
continue retry
}
}
}

Expand Down Expand Up @@ -85,12 +95,9 @@ func (w *Watcher) WatchPods(ctx context.Context, pods []corev1.Pod) error {
}(pod)
}

select {
case err := <-errCh:
if err != nil {
log.Error(err)
return err
}
if err := <-errCh; err != nil {
log.Error(err)
return err
}
wg.Wait()
return nil
Expand Down Expand Up @@ -130,10 +137,7 @@ retry:

// isPendingPod check the pods whether it have pending container.
func isPendingPod(pod corev1.Pod) bool {
if pod.Status.Phase == corev1.PodPending {
return true
}
return false
return pod.Status.Phase == corev1.PodPending
}

// parseLabels parses label sets, and build query string.
Expand Down
Loading