Skip to content

Commit

Permalink
Added tags to jobs (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
gapidobri authored Feb 4, 2024
1 parent 23421b1 commit e7fb438
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 90 deletions.
4 changes: 4 additions & 0 deletions foundation/database/dbmigrate/sql/migrate.sql
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,7 @@ CREATE INDEX job_id_index ON job_executions (job_id);

CREATE INDEX job_executions_start_time_index ON job_executions (start_time);

-- Version: 1.02
-- Description: Add tags column to jobs table

ALTER TABLE jobs ADD tags TEXT[];
65 changes: 30 additions & 35 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,78 +3,73 @@ module github.com/GLCharge/distributed-scheduler
go 1.20

require (
github.com/GLCharge/otelzap v0.0.0-20230904131944-57dc7c9994a9
github.com/ardanlabs/darwin/v3 v3.3.1
github.com/ardanlabs/darwin/v3 v3.3.0
github.com/cenkalti/backoff/v4 v4.2.1
github.com/gin-contrib/zap v0.2.0
github.com/google/go-cmp v0.5.9
github.com/samber/lo v1.39.0
github.com/spf13/cobra v1.8.0
github.com/lib/pq v1.2.0
github.com/samber/lo v1.38.1
github.com/spf13/cobra v1.7.0
github.com/swaggo/files v1.0.1
github.com/swaggo/gin-swagger v1.6.0
github.com/swaggo/swag v1.16.2
github.com/vearne/gin-timeout v0.1.7
github.com/swaggo/swag v1.8.12
)

require (
github.com/KyleBanks/depth v1.2.1 // indirect
github.com/chenzhuoyu/iasm v0.9.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/spec v0.20.9 // indirect
github.com/go-openapi/swag v0.22.4 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.20.0 // indirect
github.com/go-openapi/spec v0.20.8 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
github.com/rogpeppe/go-internal v1.10.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/uptrace/opentelemetry-go-extra/otelutil v0.2.0 // indirect
go.opentelemetry.io/otel v1.15.1 // indirect
go.opentelemetry.io/otel/trace v1.15.1 // indirect
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect
golang.org/x/tools v0.17.0 // indirect
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect
golang.org/x/tools v0.7.0 // indirect
)

require (
github.com/ardanlabs/conf/v3 v3.1.6
github.com/bytedance/sonic v1.10.0 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect
github.com/ardanlabs/conf/v3 v3.1.5
github.com/bytedance/sonic v1.9.1 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/gin-gonic/gin v1.9.1
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.15.3 // indirect
github.com/go-playground/validator/v10 v10.14.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/google/uuid v1.3.0
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgx/v5 v5.4.1
github.com/jackc/pgx/v5 v5.3.1
github.com/jmoiron/sqlx v1.3.5
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/leodido/go-urn v1.2.4 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pkg/errors v0.9.1
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rabbitmq/amqp091-go v1.9.0
github.com/rabbitmq/amqp091-go v1.8.1
github.com/robfig/cron/v3 v3.0.1
github.com/stretchr/testify v1.8.4
github.com/stretchr/testify v1.8.3
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.11 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.25.0
golang.org/x/arch v0.5.0 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/net v0.20.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.24.0
golang.org/x/arch v0.3.0 // indirect
golang.org/x/crypto v0.9.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/guregu/null.v4 v4.0.0
gopkg.in/yaml.v3 v3.0.1 // indirect
)
11 changes: 7 additions & 4 deletions handlers/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"strconv"

"github.com/GLCharge/distributed-scheduler/model"
"github.com/GLCharge/distributed-scheduler/service/job"
jobService "github.com/GLCharge/distributed-scheduler/service/job"
"github.com/gin-gonic/gin"
)

Expand All @@ -22,14 +22,14 @@ func JobsRoutesV1(router *gin.Engine, jobsHandler *Jobs) {
}
}

func NewJobsHandler(service *job.Service) *Jobs {
func NewJobsHandler(service *jobService.Service) *Jobs {
return &Jobs{
service: service,
}
}

type Jobs struct {
service *job.Service
service *jobService.Service
}

type ErrorResponse struct {
Expand Down Expand Up @@ -181,6 +181,7 @@ func (j *Jobs) DeleteJob() gin.HandlerFunc {
// @Produce json
// @Param limit query int false "Limit"
// @Param offset query int false "Offset"
// @Param tags query array false "Tags"
// @Success 200 {object} []model.Job
// @Failure 400 {object} ErrorResponse
// @Failure 500 {object} ErrorResponse
Expand All @@ -190,7 +191,9 @@ func (j *Jobs) ListJobs() gin.HandlerFunc {

limit, offset := LimitAndOffset(ctx)

jobs, err := j.service.ListJobs(ctx.Request.Context(), limit, offset)
tags := ctx.QueryArray("tags")

jobs, err := j.service.ListJobs(ctx.Request.Context(), limit, offset, tags)
if err != nil {
ctx.JSON(http.StatusInternalServerError, ErrorResponse{Error: err.Error()})
return
Expand Down
11 changes: 11 additions & 0 deletions model/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ type Job struct {

// when the job is scheduled to run next (can be null if the job is not scheduled to run again)
NextRun null.Time `json:"next_run"`

Tags []string `json:"tags"`
}

// swagger:model JobUpdate
Expand All @@ -106,6 +108,8 @@ type JobUpdate struct {

CronSchedule *string `json:"cron_schedule,omitempty"`
ExecuteAt *time.Time `json:"execute_at,omitempty"`

Tags *[]string `json:"tags,omitempty"`
}

func (j *Job) ApplyUpdate(update JobUpdate) {
Expand All @@ -132,6 +136,10 @@ func (j *Job) ApplyUpdate(update JobUpdate) {
j.ExecuteAt = null.TimeFromPtr(update.ExecuteAt)
}

if update.Tags != nil {
j.Tags = *update.Tags
}

j.UpdatedAt = time.Now()

j.SetInitialRunTime()
Expand Down Expand Up @@ -332,6 +340,8 @@ type JobCreate struct {
// HTTPJob and AMQPJob are mutually exclusive.
HTTPJob *HTTPJob `json:"http_job,omitempty"`
AMQPJob *AMQPJob `json:"amqp_job,omitempty"`

Tags []string `json:"tags"`
}

func (j *JobCreate) ToJob() *Job {
Expand All @@ -345,6 +355,7 @@ func (j *JobCreate) ToJob() *Job {
AMQPJob: j.AMQPJob,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
Tags: j.Tags,
}

job.SetInitialRunTime()
Expand Down
7 changes: 3 additions & 4 deletions service/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,11 @@ func (s *Service) CreateJob(ctx context.Context, jobCreate *model.JobCreate) (*m
return nil, err
}

return job, err
return job, nil
}

// GetJob returns the job with the given ID.
func (s *Service) GetJob(ctx context.Context, id uuid.UUID) (*model.Job, error) {

return s.store.GetJob(ctx, id)
}

Expand Down Expand Up @@ -85,10 +84,10 @@ func (s *Service) DeleteJob(ctx context.Context, id uuid.UUID) error {
}

// ListJobs returns a list of jobs with the given limit and offset.
func (s *Service) ListJobs(ctx context.Context, limit, offset uint64) ([]*model.Job, error) {
func (s *Service) ListJobs(ctx context.Context, limit, offset uint64, tags []string) ([]model.Job, error) {
// Implement listing jobs using the store

return s.store.ListJobs(ctx, limit, offset)
return s.store.ListJobs(ctx, limit, offset, tags)
}

// GetJobsToRun returns a list of jobs that should be run at the given time.
Expand Down
30 changes: 16 additions & 14 deletions store/postgres/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,28 @@ import (

"github.com/GLCharge/distributed-scheduler/model"
"github.com/google/uuid"
"github.com/lib/pq"
"github.com/pkg/errors"
"gopkg.in/guregu/null.v4"
)

type jobDB struct {
ID uuid.UUID `db:"id"`
Type string `db:"type"`
Status string `db:"status"`
ExecuteAt null.Time `db:"execute_at"`
CronSchedule null.String `db:"cron_schedule"`
HTTPJob []byte `db:"http_job"`
AMQPJob []byte `db:"amqp_job"`
CreatedAt time.Time `db:"created_at"`
UpdatedAt time.Time `db:"updated_at"`
NextRun null.Time `db:"next_run"`
LockedUntil null.Time `db:"locked_until"`
LockedBy null.String `db:"locked_by"`
ID uuid.UUID `db:"id"`
Type string `db:"type"`
Status string `db:"status"`
ExecuteAt null.Time `db:"execute_at"`
CronSchedule null.String `db:"cron_schedule"`
HTTPJob []byte `db:"http_job"`
AMQPJob []byte `db:"amqp_job"`
CreatedAt time.Time `db:"created_at"`
UpdatedAt time.Time `db:"updated_at"`
NextRun null.Time `db:"next_run"`
LockedUntil null.Time `db:"locked_until"`
LockedBy null.String `db:"locked_by"`
Tags pq.StringArray `db:"tags"`
}

func toJobDB(j *model.Job) (*jobDB, error) {

dbJ := &jobDB{
ID: j.ID,
Type: string(j.Type),
Expand All @@ -36,6 +37,7 @@ func toJobDB(j *model.Job) (*jobDB, error) {
CreatedAt: j.CreatedAt,
UpdatedAt: j.UpdatedAt,
NextRun: j.NextRun,
Tags: j.Tags,
}

if j.HTTPJob != nil {
Expand Down Expand Up @@ -68,6 +70,7 @@ func (j *jobDB) ToJob() (*model.Job, error) {
CreatedAt: j.CreatedAt,
UpdatedAt: j.UpdatedAt,
NextRun: j.NextRun,
Tags: j.Tags,
}

if err := unmarshalNullableJSON(j.HTTPJob, &job.HTTPJob); err != nil {
Expand Down Expand Up @@ -99,7 +102,6 @@ type executionDB struct {
}

func (e *executionDB) ToModel() *model.JobExecution {

return &model.JobExecution{
ID: e.ID,
JobID: e.JobID,
Expand Down
Loading

0 comments on commit e7fb438

Please sign in to comment.