Skip to content

Commit

Permalink
refactor: queue pr 1
Browse files Browse the repository at this point in the history
  • Loading branch information
devhaozi committed Jan 5, 2025
1 parent 273efc8 commit 3817c33
Show file tree
Hide file tree
Showing 23 changed files with 950 additions and 752 deletions.
18 changes: 18 additions & 0 deletions contracts/queue/driver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package queue

import "time"

type Driver interface {
// Connection returns the connection name for the driver.
Connection() string
// Driver returns the driver name for the driver.
Driver() string
// Push pushes the job onto the queue.
Push(job Job, args []any, queue string) error
// Bulk pushes a slice of jobs onto the queue.
Bulk(jobs []Jobs, queue string) error
// Later pushes the job onto the queue after a delay.
Later(delay time.Duration, job Job, args []any, queue string) error
// Pop pops the next job off of the queue.
Pop(queue string) (Job, []any, error)
}
7 changes: 5 additions & 2 deletions contracts/queue/job.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package queue

import "time"

type Job interface {
// Signature set the unique signature of the job.
Signature() string
Expand All @@ -8,6 +10,7 @@ type Job interface {
}

type Jobs struct {
Job Job
Args []Arg
Job Job
Args []any
Delay time.Duration
}
14 changes: 6 additions & 8 deletions contracts/queue/queue.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
package queue

type Queue interface {
Worker(args ...Args) Worker
Worker(payloads ...*Args) Worker
// Register register jobs
Register(jobs []Job)
Register(jobs []Job) error
// GetJobs get all jobs
GetJobs() []Job
// GetJob get job by signature
GetJob(signature string) (Job, error)
// Job add a job to queue
Job(job Job, args []Arg) Task
Job(job Job, args []any) Task
// Chain creates a chain of jobs to be processed one by one, passing
Chain(jobs []Jobs) Task
}

type Worker interface {
Run() error
Shutdown() error
}

type Args struct {
Expand All @@ -24,8 +27,3 @@ type Args struct {
// Concurrent num
Concurrent int
}

type Arg struct {
Type string
Value any
}
2 changes: 1 addition & 1 deletion contracts/queue/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type Task interface {
// DispatchSync dispatches the task synchronously.
DispatchSync() error
// Delay dispatches the task after the given delay.
Delay(time time.Time) Task
Delay(time time.Duration) Task
// OnConnection sets the connection of the task.
OnConnection(connection string) Task
// OnQueue sets the queue of the task.
Expand Down
44 changes: 26 additions & 18 deletions queue/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,49 +2,57 @@ package queue

import (
configcontract "github.com/goravel/framework/contracts/config"
"github.com/goravel/framework/contracts/log"
"github.com/goravel/framework/contracts/queue"
)

type Application struct {
config *Config
jobs []queue.Job
log log.Log
job *JobImpl
}

func NewApplication(config configcontract.Config, log log.Log) *Application {
func NewApplication(config configcontract.Config) *Application {
return &Application{
config: NewConfig(config),
log: log,
job: NewJobImpl(),
}
}

func (app *Application) Worker(args ...queue.Args) queue.Worker {
func (app *Application) Worker(payloads ...*queue.Args) queue.Worker {
defaultConnection := app.config.DefaultConnection()

if len(args) == 0 {
return NewWorker(app.config, app.log, 1, defaultConnection, app.jobs, app.config.Queue(defaultConnection, ""))
if len(payloads) == 0 || payloads[0] == nil {
return NewWorker(app.config, 1, defaultConnection, app.config.Queue(defaultConnection, ""), app.job)
}

if args[0].Connection == "" {
args[0].Connection = defaultConnection
if payloads[0].Connection == "" {
payloads[0].Connection = defaultConnection
}
if payloads[0].Concurrent == 0 {
payloads[0].Concurrent = 1
}

return NewWorker(app.config, app.log, args[0].Concurrent, args[0].Connection, app.jobs, app.config.Queue(args[0].Connection, args[0].Queue))
return NewWorker(app.config, payloads[0].Concurrent, payloads[0].Connection, app.config.Queue(payloads[0].Connection, payloads[0].Queue), app.job)
}

func (app *Application) Register(jobs []queue.Job) {
app.jobs = append(app.jobs, jobs...)
func (app *Application) Register(jobs []queue.Job) error {
if err := app.job.Register(jobs); err != nil {
return err
}

return nil
}

func (app *Application) GetJobs() []queue.Job {
return app.jobs
return app.job.GetJobs()
}

func (app *Application) GetJob(signature string) (queue.Job, error) {
return app.job.Get(signature)
}

func (app *Application) Job(job queue.Job, args []queue.Arg) queue.Task {
return NewTask(app.config, app.log, job, args)
func (app *Application) Job(job queue.Job, args []any) queue.Task {
return NewTask(app.config, job, args)
}

func (app *Application) Chain(jobs []queue.Jobs) queue.Task {
return NewChainTask(app.config, app.log, jobs)
return NewChainTask(app.config, jobs)
}
Loading

0 comments on commit 3817c33

Please sign in to comment.