Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: job config #402

Merged
merged 4 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ lfs:
# Enable Git SSH transfer.
ssh_enabled: true

# Cron job configuration
jobs:
mirror_pull: "@every 10m"

jolheiser marked this conversation as resolved.
Show resolved Hide resolved
# The stats server configuration.
stats:
# The address on which the stats server will listen.
Expand Down
8 changes: 8 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ type LFSConfig struct {
SSHEnabled bool `env:"SSH_ENABLED" yaml:"ssh_enabled"`
}

// JobsConfig is the configuration for cron jobs.
type JobsConfig struct {
MirrorPull string `env:"MIRROR_PULL" yaml:"mirror_pull"`
}

// Config is the configuration for Soft Serve.
type Config struct {
// Name is the name of the server.
Expand All @@ -131,6 +136,9 @@ type Config struct {
// LFS is the configuration for Git LFS.
LFS LFSConfig `envPrefix:"LFS_" yaml:"lfs"`

// Jobs is the configuration for cron jobs
Jobs JobsConfig `envPrefix:"JOBS_" yaml:"jobs"`

// InitialAdminKeys is a list of public keys that will be added to the list of admins.
InitialAdminKeys []string `env:"INITIAL_ADMIN_KEYS" envSeparator:"\n" yaml:"initial_admin_keys"`

Expand Down
4 changes: 4 additions & 0 deletions server/config/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ lfs:
# Enable Git SSH transfer.
ssh_enabled: {{ .LFS.SSHEnabled }}

# Cron job configuration
jobs:
mirror_pull: "{{ .Jobs.MirrorPull }}"

# Additional admin keys.
#initial_admin_keys:
# - "ssh-rsa AAAAB3NzaC1yc2..."
Expand Down
15 changes: 10 additions & 5 deletions server/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@ import (

// Job is a job that can be registered with the scheduler.
type Job struct {
ID int
Spec string
Func func(context.Context) func()
ID int
Runner Runner
}

// Runner is a job runner.
type Runner interface {
Spec(context.Context) string
Func(context.Context) func()
}

var (
Expand All @@ -18,10 +23,10 @@ var (
)

// Register registers a job.
func Register(name, spec string, fn func(context.Context) func()) {
func Register(name string, runner Runner) {
mtx.Lock()
defer mtx.Unlock()
jobs[name] = &Job{Spec: spec, Func: fn}
jobs[name] = &Job{Runner: runner}
}

// List returns a map of registered jobs.
Expand Down
17 changes: 14 additions & 3 deletions server/jobs/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,87 +17,98 @@
)

func init() {
Register("mirror-pull", "@every 10m", mirrorPull)
Register("mirror-pull", mirrorPull{})
}

// mirrorPull runs the (pull) mirror job task.
func mirrorPull(ctx context.Context) func() {
type mirrorPull struct{}

// Spec derives the spec used for pull mirrors and implements Runner.
func (m mirrorPull) Spec(ctx context.Context) string {
cfg := config.FromContext(ctx)
if cfg.Jobs.MirrorPull != "" {
return cfg.Jobs.MirrorPull
}

Check warning on line 30 in server/jobs/mirror.go

View check run for this annotation

Codecov / codecov/patch

server/jobs/mirror.go#L29-L30

Added lines #L29 - L30 were not covered by tests
return "@every 10m"
}

// Func runs the (pull) mirror job task and implements Runner.
func (m mirrorPull) Func(ctx context.Context) func() {
cfg := config.FromContext(ctx)
logger := log.FromContext(ctx).WithPrefix("jobs.mirror")
b := backend.FromContext(ctx)
dbx := db.FromContext(ctx)
datastore := store.FromContext(ctx)
return func() {
repos, err := b.Repositories(ctx)
if err != nil {
logger.Error("error getting repositories", "err", err)
return
}

Check warning on line 46 in server/jobs/mirror.go

View check run for this annotation

Codecov / codecov/patch

server/jobs/mirror.go#L42-L46

Added lines #L42 - L46 were not covered by tests

// Divide the work up among the number of CPUs.
wq := sync.NewWorkPool(ctx, runtime.GOMAXPROCS(0),
sync.WithWorkPoolLogger(logger.Errorf),
)

logger.Debug("updating mirror repos")
for _, repo := range repos {
if repo.IsMirror() {
r, err := repo.Open()
if err != nil {
logger.Error("error opening repository", "repo", repo.Name(), "err", err)
continue

Check warning on line 59 in server/jobs/mirror.go

View check run for this annotation

Codecov / codecov/patch

server/jobs/mirror.go#L49-L59

Added lines #L49 - L59 were not covered by tests
}

name := repo.Name()
wq.Add(name, func() {
repo := repo
cmd := git.NewCommand("remote", "update", "--prune").WithContext(ctx)
cmd.AddEnvs(
fmt.Sprintf(`GIT_SSH_COMMAND=ssh -o UserKnownHostsFile="%s" -o StrictHostKeyChecking=no -i "%s"`,
filepath.Join(cfg.DataPath, "ssh", "known_hosts"),
cfg.SSH.ClientKeyPath,
),
)

if _, err := cmd.RunInDir(r.Path); err != nil {
logger.Error("error running git remote update", "repo", name, "err", err)
}

Check warning on line 75 in server/jobs/mirror.go

View check run for this annotation

Codecov / codecov/patch

server/jobs/mirror.go#L62-L75

Added lines #L62 - L75 were not covered by tests

if cfg.LFS.Enabled {
rcfg, err := r.Config()
if err != nil {
logger.Error("error getting git config", "repo", name, "err", err)
return
}

Check warning on line 82 in server/jobs/mirror.go

View check run for this annotation

Codecov / codecov/patch

server/jobs/mirror.go#L77-L82

Added lines #L77 - L82 were not covered by tests

lfsEndpoint := rcfg.Section("lfs").Option("url")
if lfsEndpoint == "" {
// If there is no LFS url defined, means the repo
// doesn't use LFS and we can skip it.
return
}

Check warning on line 89 in server/jobs/mirror.go

View check run for this annotation

Codecov / codecov/patch

server/jobs/mirror.go#L84-L89

Added lines #L84 - L89 were not covered by tests

ep, err := lfs.NewEndpoint(lfsEndpoint)
if err != nil {
logger.Error("error creating LFS endpoint", "repo", name, "err", err)
return
}

Check warning on line 95 in server/jobs/mirror.go

View check run for this annotation

Codecov / codecov/patch

server/jobs/mirror.go#L91-L95

Added lines #L91 - L95 were not covered by tests

client := lfs.NewClient(ep)
if client == nil {
logger.Errorf("failed to create lfs client: unsupported endpoint %s", lfsEndpoint)
return
}

Check warning on line 101 in server/jobs/mirror.go

View check run for this annotation

Codecov / codecov/patch

server/jobs/mirror.go#L97-L101

Added lines #L97 - L101 were not covered by tests

if err := backend.StoreRepoMissingLFSObjects(ctx, repo, dbx, datastore, client); err != nil {
logger.Error("failed to store missing lfs objects", "err", err, "path", r.Path)
return
}

Check warning on line 106 in server/jobs/mirror.go

View check run for this annotation

Codecov / codecov/patch

server/jobs/mirror.go#L103-L106

Added lines #L103 - L106 were not covered by tests
}
})
}
}

wq.Run()

Check warning on line 112 in server/jobs/mirror.go

View check run for this annotation

Codecov / codecov/patch

server/jobs/mirror.go#L112

Added line #L112 was not covered by tests
}
}
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@
// Add cron jobs.
sched := cron.NewScheduler(ctx)
for n, j := range jobs.List() {
id, err := sched.AddFunc(j.Spec, j.Func(ctx))
id, err := sched.AddFunc(j.Runner.Spec(ctx), j.Runner.Func(ctx))
if err != nil {
logger.Warn("error adding cron job", "job", n, "err", err)
}

Check warning on line 63 in server/server.go

View check run for this annotation

Codecov / codecov/patch

server/server.go#L62-L63

Added lines #L62 - L63 were not covered by tests

j.ID = id
}
Expand Down
Loading