diff --git a/contracts/queue/driver.go b/contracts/queue/driver.go new file mode 100644 index 000000000..29ef603bc --- /dev/null +++ b/contracts/queue/driver.go @@ -0,0 +1,18 @@ +package queue + +import "time" + +type Driver interface { + // Connection returns the connection name for the driver. + Connection() string + // Driver returns the driver name for the driver. + Driver() string + // Push pushes the job onto the queue. + Push(job Job, args []any, queue string) error + // Bulk pushes a slice of jobs onto the queue. + Bulk(jobs []Jobs, queue string) error + // Later pushes the job onto the queue after a delay. + Later(delay time.Duration, job Job, args []any, queue string) error + // Pop pops the next job off of the queue. + Pop(queue string) (Job, []any, error) +} diff --git a/contracts/queue/job.go b/contracts/queue/job.go index 9cfcf9f3c..8eff008cc 100644 --- a/contracts/queue/job.go +++ b/contracts/queue/job.go @@ -1,5 +1,7 @@ package queue +import "time" + type Job interface { // Signature set the unique signature of the job. Signature() string @@ -8,6 +10,7 @@ type Job interface { } type Jobs struct { - Job Job - Args []Arg + Job Job + Args []any + Delay time.Duration } diff --git a/contracts/queue/queue.go b/contracts/queue/queue.go index 1662509b4..1a7111429 100644 --- a/contracts/queue/queue.go +++ b/contracts/queue/queue.go @@ -1,19 +1,22 @@ package queue type Queue interface { - Worker(args ...Args) Worker + Worker(payloads ...*Args) Worker // Register register jobs - Register(jobs []Job) + Register(jobs []Job) error // GetJobs get all jobs GetJobs() []Job + // GetJob get job by signature + GetJob(signature string) (Job, error) // Job add a job to queue - Job(job Job, args []Arg) Task + Job(job Job, args []any) Task // Chain creates a chain of jobs to be processed one by one, passing Chain(jobs []Jobs) Task } type Worker interface { Run() error + Shutdown() error } type Args struct { @@ -24,8 +27,3 @@ type Args struct { // Concurrent num Concurrent int } - -type Arg struct { - Type string - Value any -} diff --git a/contracts/queue/task.go b/contracts/queue/task.go index ff7a1df5b..46ff8f2fc 100644 --- a/contracts/queue/task.go +++ b/contracts/queue/task.go @@ -10,7 +10,7 @@ type Task interface { // DispatchSync dispatches the task synchronously. DispatchSync() error // Delay dispatches the task after the given delay. - Delay(time time.Time) Task + Delay(time time.Duration) Task // OnConnection sets the connection of the task. OnConnection(connection string) Task // OnQueue sets the queue of the task. diff --git a/queue/application.go b/queue/application.go index 52764f06e..7c13c4792 100644 --- a/queue/application.go +++ b/queue/application.go @@ -2,49 +2,57 @@ package queue import ( configcontract "github.com/goravel/framework/contracts/config" - "github.com/goravel/framework/contracts/log" "github.com/goravel/framework/contracts/queue" ) type Application struct { config *Config - jobs []queue.Job - log log.Log + job *JobImpl } -func NewApplication(config configcontract.Config, log log.Log) *Application { +func NewApplication(config configcontract.Config) *Application { return &Application{ config: NewConfig(config), - log: log, + job: NewJobImpl(), } } -func (app *Application) Worker(args ...queue.Args) queue.Worker { +func (app *Application) Worker(payloads ...*queue.Args) queue.Worker { defaultConnection := app.config.DefaultConnection() - if len(args) == 0 { - return NewWorker(app.config, app.log, 1, defaultConnection, app.jobs, app.config.Queue(defaultConnection, "")) + if len(payloads) == 0 || payloads[0] == nil { + return NewWorker(app.config, 1, defaultConnection, app.config.Queue(defaultConnection, ""), app.job) } - - if args[0].Connection == "" { - args[0].Connection = defaultConnection + if payloads[0].Connection == "" { + payloads[0].Connection = defaultConnection + } + if payloads[0].Concurrent == 0 { + payloads[0].Concurrent = 1 } - return NewWorker(app.config, app.log, args[0].Concurrent, args[0].Connection, app.jobs, app.config.Queue(args[0].Connection, args[0].Queue)) + return NewWorker(app.config, payloads[0].Concurrent, payloads[0].Connection, app.config.Queue(payloads[0].Connection, payloads[0].Queue), app.job) } -func (app *Application) Register(jobs []queue.Job) { - app.jobs = append(app.jobs, jobs...) +func (app *Application) Register(jobs []queue.Job) error { + if err := app.job.Register(jobs); err != nil { + return err + } + + return nil } func (app *Application) GetJobs() []queue.Job { - return app.jobs + return app.job.GetJobs() +} + +func (app *Application) GetJob(signature string) (queue.Job, error) { + return app.job.Get(signature) } -func (app *Application) Job(job queue.Job, args []queue.Arg) queue.Task { - return NewTask(app.config, app.log, job, args) +func (app *Application) Job(job queue.Job, args []any) queue.Task { + return NewTask(app.config, job, args) } func (app *Application) Chain(jobs []queue.Jobs) queue.Task { - return NewChainTask(app.config, app.log, jobs) + return NewChainTask(app.config, jobs) } diff --git a/queue/application_test.go b/queue/application_test.go deleted file mode 100644 index 3384dd6aa..000000000 --- a/queue/application_test.go +++ /dev/null @@ -1,481 +0,0 @@ -package queue - -import ( - "context" - "errors" - "testing" - "time" - - "github.com/spf13/cast" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/suite" - - "github.com/goravel/framework/contracts/queue" - configmock "github.com/goravel/framework/mocks/config" - logmock "github.com/goravel/framework/mocks/log" - "github.com/goravel/framework/support/carbon" - testingdocker "github.com/goravel/framework/support/docker" - "github.com/goravel/framework/support/env" -) - -var ( - testSyncJob = 0 - testAsyncJob = 0 - testAsyncJobOfDisableDebug = 0 - testDelayAsyncJob = 0 - testCustomAsyncJob = 0 - testErrorAsyncJob = 0 - testChainAsyncJob = 0 - testChainSyncJob = 0 - testChainAsyncJobError = 0 - testChainSyncJobError = 0 -) - -type QueueTestSuite struct { - suite.Suite - app *Application - mockConfig *configmock.Config - mockLog *logmock.Log - port int -} - -func TestQueueTestSuite(t *testing.T) { - if env.IsWindows() { - t.Skip("Skip test that using Docker") - } - - redisDocker := testingdocker.NewRedis() - assert.Nil(t, redisDocker.Build()) - - suite.Run(t, &QueueTestSuite{ - port: redisDocker.Config().Port, - }) - - assert.Nil(t, redisDocker.Shutdown()) -} - -func (s *QueueTestSuite) SetupTest() { - s.mockConfig = &configmock.Config{} - s.mockLog = &logmock.Log{} - s.app = NewApplication(s.mockConfig, s.mockLog) -} - -func (s *QueueTestSuite) TestSyncQueue() { - s.mockConfig.On("GetString", "queue.default").Return("redis").Once() - s.Nil(s.app.Job(&TestSyncJob{}, []queue.Arg{ - {Type: "string", Value: "TestSyncQueue"}, - {Type: "int", Value: 1}, - }).DispatchSync()) - s.Equal(1, testSyncJob) -} - -func (s *QueueTestSuite) TestDefaultAsyncQueue_EnableDebug() { - s.mockConfig.On("GetString", "queue.default").Return("redis").Twice() - s.mockConfig.On("GetString", "app.name").Return("goravel").Times(4) - s.mockConfig.On("GetBool", "app.debug").Return(true).Times(2) - s.mockConfig.On("GetString", "queue.connections.redis.queue", "default").Return("default").Times(2) - s.mockConfig.On("GetString", "queue.connections.redis.driver").Return("redis").Times(3) - s.mockConfig.On("GetString", "queue.connections.redis.connection").Return("default").Twice() - s.mockConfig.On("GetString", "database.redis.default.host").Return("localhost").Twice() - s.mockConfig.On("GetString", "database.redis.default.password").Return("").Twice() - s.mockConfig.On("GetInt", "database.redis.default.port").Return(s.port).Twice() - s.mockConfig.On("GetInt", "database.redis.default.database").Return(0).Twice() - s.mockLog.On("Infof", "Launching a worker with the following settings:").Once() - s.mockLog.On("Infof", "- Broker: %s", "://").Once() - s.mockLog.On("Infof", "- DefaultQueue: %s", "goravel_queues:debug").Once() - s.mockLog.On("Infof", "- ResultBackend: %s", "://").Once() - s.mockLog.On("Info", "[*] Waiting for messages. To exit press CTRL+C").Once() - s.mockLog.On("Debugf", "Received new message: %s", mock.Anything).Once() - s.mockLog.On("Debugf", "Processed task %s. Results = %s", mock.Anything, mock.Anything).Once() - s.app.jobs = []queue.Job{&TestAsyncJob{}} - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - go func(ctx context.Context) { - s.Nil(s.app.Worker(queue.Args{ - Queue: "debug", - }).Run()) - - for range ctx.Done() { - return - } - }(ctx) - time.Sleep(2 * time.Second) - s.Nil(s.app.Job(&TestAsyncJob{}, []queue.Arg{ - {Type: "string", Value: "TestDefaultAsyncQueue_EnableDebug"}, - {Type: "int", Value: 1}, - }).OnQueue("debug").Dispatch()) - time.Sleep(2 * time.Second) - s.Equal(1, testAsyncJob) - - s.mockConfig.AssertExpectations(s.T()) - s.mockLog.AssertExpectations(s.T()) -} - -func (s *QueueTestSuite) TestDefaultAsyncQueue_DisableDebug() { - s.mockConfig.On("GetString", "queue.default").Return("redis").Twice() - s.mockConfig.On("GetString", "app.name").Return("goravel").Times(3) - s.mockConfig.On("GetBool", "app.debug").Return(false).Times(2) - s.mockConfig.On("GetString", "queue.connections.redis.queue", "default").Return("default").Times(3) - s.mockConfig.On("GetString", "queue.connections.redis.driver").Return("redis").Times(3) - s.mockConfig.On("GetString", "queue.connections.redis.connection").Return("default").Twice() - s.mockConfig.On("GetString", "database.redis.default.host").Return("localhost").Twice() - s.mockConfig.On("GetString", "database.redis.default.password").Return("").Twice() - s.mockConfig.On("GetInt", "database.redis.default.port").Return(s.port).Twice() - s.mockConfig.On("GetInt", "database.redis.default.database").Return(0).Twice() - s.app.jobs = []queue.Job{&TestAsyncJobOfDisableDebug{}} - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - go func(ctx context.Context) { - s.Nil(s.app.Worker().Run()) - - for range ctx.Done() { - return - } - }(ctx) - time.Sleep(2 * time.Second) - s.Nil(s.app.Job(&TestAsyncJobOfDisableDebug{}, []queue.Arg{ - {Type: "string", Value: "TestDefaultAsyncQueue_DisableDebug"}, - {Type: "int", Value: 1}, - }).Dispatch()) - time.Sleep(2 * time.Second) - s.Equal(1, testAsyncJobOfDisableDebug) - - s.mockConfig.AssertExpectations(s.T()) - s.mockLog.AssertExpectations(s.T()) -} - -func (s *QueueTestSuite) TestDelayAsyncQueue() { - s.mockConfig.On("GetString", "queue.default").Return("redis").Times(2) - s.mockConfig.On("GetString", "app.name").Return("goravel").Times(4) - s.mockConfig.On("GetBool", "app.debug").Return(false).Times(2) - s.mockConfig.On("GetString", "queue.connections.redis.queue", "default").Return("default").Twice() - s.mockConfig.On("GetString", "queue.connections.redis.driver").Return("redis").Times(3) - s.mockConfig.On("GetString", "queue.connections.redis.connection").Return("default").Twice() - s.mockConfig.On("GetString", "database.redis.default.host").Return("localhost").Twice() - s.mockConfig.On("GetString", "database.redis.default.password").Return("").Twice() - s.mockConfig.On("GetInt", "database.redis.default.port").Return(s.port).Twice() - s.mockConfig.On("GetInt", "database.redis.default.database").Return(0).Twice() - s.app.jobs = []queue.Job{&TestDelayAsyncJob{}} - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - go func(ctx context.Context) { - s.Nil(s.app.Worker(queue.Args{ - Queue: "delay", - }).Run()) - - for range ctx.Done() { - return - } - }(ctx) - time.Sleep(2 * time.Second) - s.Nil(s.app.Job(&TestDelayAsyncJob{}, []queue.Arg{ - {Type: "string", Value: "TestDelayAsyncQueue"}, - {Type: "int", Value: 1}, - }).OnQueue("delay").Delay(carbon.Now().AddSeconds(3).StdTime()).Dispatch()) - time.Sleep(2 * time.Second) - s.Equal(0, testDelayAsyncJob) - time.Sleep(3 * time.Second) - s.Equal(1, testDelayAsyncJob) - - s.mockConfig.AssertExpectations(s.T()) -} - -func (s *QueueTestSuite) TestCustomAsyncQueue() { - s.mockConfig.On("GetString", "queue.default").Return("redis").Twice() - s.mockConfig.On("GetString", "app.name").Return("goravel").Times(4) - s.mockConfig.On("GetBool", "app.debug").Return(false).Times(2) - s.mockConfig.On("GetString", "queue.connections.custom.queue", "default").Return("default").Twice() - s.mockConfig.On("GetString", "queue.connections.custom.driver").Return("redis").Times(3) - s.mockConfig.On("GetString", "queue.connections.custom.connection").Return("default").Twice() - s.mockConfig.On("GetString", "database.redis.default.host").Return("localhost").Twice() - s.mockConfig.On("GetString", "database.redis.default.password").Return("").Twice() - s.mockConfig.On("GetInt", "database.redis.default.port").Return(s.port).Twice() - s.mockConfig.On("GetInt", "database.redis.default.database").Return(0).Twice() - s.app.jobs = []queue.Job{&TestCustomAsyncJob{}} - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - go func(ctx context.Context) { - s.Nil(s.app.Worker(queue.Args{ - Connection: "custom", - Queue: "custom1", - Concurrent: 2, - }).Run()) - - for range ctx.Done() { - return - } - }(ctx) - time.Sleep(2 * time.Second) - s.Nil(s.app.Job(&TestCustomAsyncJob{}, []queue.Arg{ - {Type: "string", Value: "TestCustomAsyncQueue"}, - {Type: "int", Value: 1}, - }).OnConnection("custom").OnQueue("custom1").Dispatch()) - time.Sleep(2 * time.Second) - s.Equal(1, testCustomAsyncJob) - - s.mockConfig.AssertExpectations(s.T()) -} - -func (s *QueueTestSuite) TestErrorAsyncQueue() { - s.mockConfig.On("GetString", "queue.default").Return("redis").Twice() - s.mockConfig.On("GetString", "app.name").Return("goravel").Times(4) - s.mockConfig.On("GetBool", "app.debug").Return(false).Times(2) - s.mockConfig.On("GetString", "queue.connections.redis.queue", "default").Return("default").Twice() - s.mockConfig.On("GetString", "queue.connections.redis.driver").Return("redis").Times(3) - s.mockConfig.On("GetString", "queue.connections.redis.connection").Return("default").Twice() - s.mockConfig.On("GetString", "database.redis.default.host").Return("localhost").Twice() - s.mockConfig.On("GetString", "database.redis.default.password").Return("").Twice() - s.mockConfig.On("GetInt", "database.redis.default.port").Return(s.port).Twice() - s.mockConfig.On("GetInt", "database.redis.default.database").Return(0).Twice() - s.app.jobs = []queue.Job{&TestErrorAsyncJob{}} - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - go func(ctx context.Context) { - s.Nil(s.app.Worker(queue.Args{ - Queue: "error", - }).Run()) - - for range ctx.Done() { - return - } - }(ctx) - time.Sleep(2 * time.Second) - s.Nil(s.app.Job(&TestErrorAsyncJob{}, []queue.Arg{ - {Type: "string", Value: "TestErrorAsyncQueue"}, - {Type: "int", Value: 1}, - }).OnConnection("redis").OnQueue("error1").Dispatch()) - time.Sleep(2 * time.Second) - s.Equal(0, testErrorAsyncJob) - - s.mockConfig.AssertExpectations(s.T()) -} - -func (s *QueueTestSuite) TestChainAsyncQueue() { - s.mockConfig.On("GetString", "queue.default").Return("redis").Times(2) - s.mockConfig.On("GetString", "app.name").Return("goravel").Times(4) - s.mockConfig.On("GetBool", "app.debug").Return(false).Times(2) - s.mockConfig.On("GetString", "queue.connections.redis.queue", "default").Return("default").Twice() - s.mockConfig.On("GetString", "queue.connections.redis.driver").Return("redis").Times(3) - s.mockConfig.On("GetString", "queue.connections.redis.connection").Return("default").Twice() - s.mockConfig.On("GetString", "database.redis.default.host").Return("localhost").Twice() - s.mockConfig.On("GetString", "database.redis.default.password").Return("").Twice() - s.mockConfig.On("GetInt", "database.redis.default.port").Return(s.port).Twice() - s.mockConfig.On("GetInt", "database.redis.default.database").Return(0).Twice() - s.app.jobs = []queue.Job{&TestChainAsyncJob{}, &TestChainSyncJob{}} - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - go func(ctx context.Context) { - s.Nil(s.app.Worker(queue.Args{ - Queue: "chain", - }).Run()) - - for range ctx.Done() { - return - } - }(ctx) - - time.Sleep(2 * time.Second) - s.Nil(s.app.Chain([]queue.Jobs{ - { - Job: &TestChainAsyncJob{}, - Args: []queue.Arg{ - {Type: "string", Value: "TestChainAsyncQueue"}, - {Type: "int", Value: 1}, - }, - }, - { - Job: &TestChainSyncJob{}, - Args: []queue.Arg{ - {Type: "string", Value: "TestChainSyncQueue"}, - {Type: "int", Value: 1}, - }, - }, - }).OnQueue("chain").Dispatch()) - - time.Sleep(2 * time.Second) - s.Equal(1, testChainAsyncJob) - s.Equal(1, testChainSyncJob) - - s.mockConfig.AssertExpectations(s.T()) -} - -func (s *QueueTestSuite) TestChainAsyncQueue_Error() { - s.mockConfig.On("GetString", "queue.default").Return("redis").Times(2) - s.mockConfig.On("GetString", "app.name").Return("goravel").Times(4) - s.mockConfig.On("GetBool", "app.debug").Return(false).Times(2) - s.mockConfig.On("GetString", "queue.connections.redis.queue", "default").Return("default").Twice() - s.mockConfig.On("GetString", "queue.connections.redis.driver").Return("redis").Times(3) - s.mockConfig.On("GetString", "queue.connections.redis.connection").Return("default").Twice() - s.mockConfig.On("GetString", "database.redis.default.host").Return("localhost").Twice() - s.mockConfig.On("GetString", "database.redis.default.password").Return("").Twice() - s.mockConfig.On("GetInt", "database.redis.default.port").Return(s.port).Twice() - s.mockConfig.On("GetInt", "database.redis.default.database").Return(0).Twice() - s.mockLog.On("Errorf", "Failed processing task %s. Error = %v", mock.Anything, errors.New("error")).Once() - s.app.jobs = []queue.Job{&TestChainAsyncJob{}, &TestChainSyncJob{}} - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - go func(ctx context.Context) { - s.Nil(s.app.Worker(queue.Args{ - Queue: "chain", - }).Run()) - - for range ctx.Done() { - return - } - }(ctx) - - time.Sleep(2 * time.Second) - s.Nil(s.app.Chain([]queue.Jobs{ - { - Job: &TestChainAsyncJob{}, - Args: []queue.Arg{ - {Type: "bool", Value: true}, - }, - }, - { - Job: &TestChainSyncJob{}, - Args: []queue.Arg{}, - }, - }).OnQueue("chain").Dispatch()) - - time.Sleep(2 * time.Second) - s.Equal(1, testChainAsyncJobError) - s.Equal(0, testChainSyncJobError) - - s.mockConfig.AssertExpectations(s.T()) - s.mockLog.AssertExpectations(s.T()) -} - -type TestAsyncJob struct { -} - -// Signature The name and signature of the job. -func (receiver *TestAsyncJob) Signature() string { - return "test_async_job" -} - -// Handle Execute the job. -func (receiver *TestAsyncJob) Handle(args ...any) error { - testAsyncJob++ - - return nil -} - -type TestAsyncJobOfDisableDebug struct { -} - -// Signature The name and signature of the job. -func (receiver *TestAsyncJobOfDisableDebug) Signature() string { - return "test_async_job_of_disable_debug" -} - -// Handle Execute the job. -func (receiver *TestAsyncJobOfDisableDebug) Handle(args ...any) error { - testAsyncJobOfDisableDebug++ - - return nil -} - -type TestDelayAsyncJob struct { -} - -// Signature The name and signature of the job. -func (receiver *TestDelayAsyncJob) Signature() string { - return "test_delay_async_job" -} - -// Handle Execute the job. -func (receiver *TestDelayAsyncJob) Handle(args ...any) error { - testDelayAsyncJob++ - - return nil -} - -type TestSyncJob struct { -} - -// Signature The name and signature of the job. -func (receiver *TestSyncJob) Signature() string { - return "test_sync_job" -} - -// Handle Execute the job. -func (receiver *TestSyncJob) Handle(args ...any) error { - testSyncJob++ - - return nil -} - -type TestCustomAsyncJob struct { -} - -// Signature The name and signature of the job. -func (receiver *TestCustomAsyncJob) Signature() string { - return "test_async_job" -} - -// Handle Execute the job. -func (receiver *TestCustomAsyncJob) Handle(args ...any) error { - testCustomAsyncJob++ - - return nil -} - -type TestErrorAsyncJob struct { -} - -// Signature The name and signature of the job. -func (receiver *TestErrorAsyncJob) Signature() string { - return "test_async_job" -} - -// Handle Execute the job. -func (receiver *TestErrorAsyncJob) Handle(args ...any) error { - testErrorAsyncJob++ - - return nil -} - -type TestChainAsyncJob struct { -} - -// Signature The name and signature of the job. -func (receiver *TestChainAsyncJob) Signature() string { - return "test_async_job" -} - -// Handle Execute the job. -func (receiver *TestChainAsyncJob) Handle(args ...any) error { - if len(args) > 0 && cast.ToBool(args[0]) { - testChainAsyncJobError++ - - return errors.New("error") - } - - testChainAsyncJob++ - - return nil -} - -type TestChainSyncJob struct { -} - -// Signature The name and signature of the job. -func (receiver *TestChainSyncJob) Signature() string { - return "test_sync_job" -} - -// Handle Execute the job. -func (receiver *TestChainSyncJob) Handle(args ...any) error { - testChainSyncJob++ - - return nil -} diff --git a/queue/config.go b/queue/config.go index f26c13b6c..b749086e5 100644 --- a/queue/config.go +++ b/queue/config.go @@ -4,6 +4,7 @@ import ( "fmt" configcontract "github.com/goravel/framework/contracts/config" + "github.com/goravel/framework/contracts/database/orm" ) type Config struct { @@ -32,17 +33,19 @@ func (r *Config) Queue(connection, queue string) string { queue = r.config.GetString(fmt.Sprintf("queue.connections.%s.queue", connection), "default") } - return fmt.Sprintf("%s_%s:%s", appName, "queues", queue) + return fmt.Sprintf("%s_queues:%s", appName, queue) } func (r *Config) Driver(connection string) string { if connection == "" { - connection = r.config.GetString("queue.default") + connection = r.DefaultConnection() } return r.config.GetString(fmt.Sprintf("queue.connections.%s.driver", connection)) } +// Redis returns the Redis configuration for a given connection. +// TODO: Will be removed in v1.17 func (r *Config) Redis(queueConnection string) (dsn string, database int, queue string) { connection := r.config.GetString(fmt.Sprintf("queue.connections.%s.connection", queueConnection)) queue = r.Queue(queueConnection, "") @@ -59,3 +62,25 @@ func (r *Config) Redis(queueConnection string) (dsn string, database int, queue return } + +func (r *Config) Size(connection string) int { + if connection == "" { + connection = r.DefaultConnection() + } + + return r.config.GetInt(fmt.Sprintf("queue.connections.%s.size", connection), 100) +} + +func (r *Config) Via(connection string) any { + if connection == "" { + connection = r.DefaultConnection() + } + + return r.config.Get(fmt.Sprintf("queue.connections.%s.via", connection)) +} + +func (r *Config) FailedJobsQuery() orm.Query { + connection := r.config.GetString("queue.failed.database") + table := r.config.GetString("queue.failed.table") + return OrmFacade.Connection(connection).Query().Table(table) +} diff --git a/queue/driver.go b/queue/driver.go new file mode 100644 index 000000000..4d1640623 --- /dev/null +++ b/queue/driver.go @@ -0,0 +1,55 @@ +package queue + +import ( + "fmt" + + "github.com/goravel/framework/contracts/queue" +) + +const DriverSync string = "sync" +const DriverASync string = "async" +const DriverMachinery string = "machinery" // TODO: Will be removed in v1.17 +const DriverCustom string = "custom" + +type Driver interface { + New(store string) (queue.Driver, error) +} + +type DriverImpl struct { + connection string + config *Config +} + +func NewDriverImpl(connection string, config *Config) *DriverImpl { + return &DriverImpl{ + connection: connection, + config: config, + } +} + +func (d *DriverImpl) New() (queue.Driver, error) { + switch d.config.Driver(d.connection) { + case DriverSync: + return NewSync(d.connection), nil + case DriverASync: + return NewASync(d.connection, d.config.Size(d.connection)), nil + case DriverMachinery: + return NewMachinery(d.connection, d.config, LogFacade), nil // TODO: Will be removed in v1.17 + case DriverCustom: + return d.custom(d.connection) + default: + return nil, fmt.Errorf("invalid driver: %s, only support sync, async, custom\n", d.connection) + } +} + +func (d *DriverImpl) custom(connection string) (queue.Driver, error) { + custom := d.config.Via(connection) + if driver, ok := custom.(queue.Driver); ok { + return driver, nil + } + if driver, ok := custom.(func() (queue.Driver, error)); ok { + return driver() + } + + return nil, fmt.Errorf("%s doesn't implement contracts/queue/driver\n", connection) +} diff --git a/queue/driver_async.go b/queue/driver_async.go new file mode 100644 index 000000000..b1ac44959 --- /dev/null +++ b/queue/driver_async.go @@ -0,0 +1,86 @@ +package queue + +import ( + "fmt" + "sync" + "time" + + contractsqueue "github.com/goravel/framework/contracts/queue" +) + +var asyncQueues sync.Map + +type ASync struct { + connection string + size int +} + +func NewASync(connection string, size int) *ASync { + return &ASync{ + connection: connection, + size: size, + } +} + +func (r *ASync) Connection() string { + return r.connection +} + +func (r *ASync) Driver() string { + return DriverASync +} + +func (r *ASync) Push(job contractsqueue.Job, args []any, queue string) error { + r.getQueue(queue) <- contractsqueue.Jobs{Job: job, Args: args} + return nil +} + +func (r *ASync) Bulk(jobs []contractsqueue.Jobs, queue string) error { + for _, job := range jobs { + if job.Delay > 0 { + go func(j contractsqueue.Jobs) { + time.Sleep(j.Delay) + r.getQueue(queue) <- j + }(job) + continue + } + + r.getQueue(queue) <- job + } + + return nil +} + +func (r *ASync) Later(delay time.Duration, job contractsqueue.Job, args []any, queue string) error { + go func() { + time.Sleep(delay) + r.getQueue(queue) <- contractsqueue.Jobs{Job: job, Args: args} + }() + + return nil +} + +func (r *ASync) Pop(queue string) (contractsqueue.Job, []any, error) { + ch, ok := asyncQueues.Load(queue) + if !ok { + return nil, nil, fmt.Errorf("no queue found: %s", queue) + } + + queueChan := ch.(chan contractsqueue.Jobs) + select { + case job := <-queueChan: + return job.Job, job.Args, nil + default: + return nil, nil, fmt.Errorf("no job found in %s queue", queue) + } +} + +func (r *ASync) getQueue(queue string) chan contractsqueue.Jobs { + ch, ok := asyncQueues.Load(queue) + if !ok { + ch = make(chan contractsqueue.Jobs, r.size) + actual, _ := asyncQueues.LoadOrStore(queue, ch) + return actual.(chan contractsqueue.Jobs) + } + return ch.(chan contractsqueue.Jobs) +} diff --git a/queue/driver_async_test.go b/queue/driver_async_test.go new file mode 100644 index 000000000..715cca969 --- /dev/null +++ b/queue/driver_async_test.go @@ -0,0 +1,284 @@ +package queue + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" + + "github.com/goravel/framework/contracts/queue" + configmock "github.com/goravel/framework/mocks/config" + ormmock "github.com/goravel/framework/mocks/database/orm" + queuemock "github.com/goravel/framework/mocks/queue" +) + +var ( + testAsyncJob = 0 + testDelayAsyncJob = 0 + testCustomAsyncJob = 0 + testErrorAsyncJob = 0 + testChainAsyncJob = 0 +) + +type DriverAsyncTestSuite struct { + suite.Suite + app *Application + mockConfig *configmock.Config + mockQueue *queuemock.Queue +} + +func TestDriverAsyncTestSuite(t *testing.T) { + mockConfig := &configmock.Config{} + mockQueue := &queuemock.Queue{} + app := NewApplication(mockConfig) + + mockOrm := &ormmock.Orm{} + mockQuery := &ormmock.Query{} + mockOrm.On("Connection", "database").Return(mockOrm) + mockOrm.On("Query").Return(mockQuery) + mockQuery.On("Table", "failed_jobs").Return(mockQuery) + + OrmFacade = mockOrm + + assert.Nil(t, app.Register([]queue.Job{&TestAsyncJob{}, &TestDelayAsyncJob{}, &TestCustomAsyncJob{}, &TestErrorAsyncJob{}, &TestChainAsyncJob{}})) + suite.Run(t, &DriverAsyncTestSuite{ + app: app, + mockConfig: mockConfig, + mockQueue: mockQueue, + }) +} + +func (s *DriverAsyncTestSuite) SetupTest() { + testAsyncJob = 0 +} + +func (s *DriverAsyncTestSuite) TestDefaultAsyncQueue() { + s.mockConfig.On("GetString", "queue.default").Return("async").Times(4) + s.mockConfig.On("GetString", "app.name").Return("goravel").Times(2) + s.mockConfig.On("GetString", "queue.connections.async.queue", "default").Return("default").Twice() + s.mockConfig.On("GetString", "queue.connections.async.driver").Return("async").Times(2) + s.mockConfig.On("GetString", "queue.failed.database").Return("database").Once() + s.mockConfig.On("GetString", "queue.failed.table").Return("failed_jobs").Once() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + go func(ctx context.Context) { + worker := s.app.Worker(nil) + s.Nil(worker.Run()) + + <-ctx.Done() + s.Nil(worker.Shutdown()) + }(ctx) + time.Sleep(1 * time.Second) + s.Nil(s.app.Job(&TestAsyncJob{}, []any{"TestDefaultAsyncQueue", 1}).Dispatch()) + time.Sleep(2 * time.Second) + s.Equal(1, testAsyncJob) + + s.mockConfig.AssertExpectations(s.T()) + s.mockQueue.AssertExpectations(s.T()) +} + +func (s *DriverAsyncTestSuite) TestDelayAsyncQueue() { + s.mockConfig.On("GetString", "queue.default").Return("async").Times(4) + s.mockConfig.On("GetString", "app.name").Return("goravel").Times(3) + s.mockConfig.On("GetString", "queue.connections.async.queue", "default").Return("default").Once() + s.mockConfig.On("GetString", "queue.connections.async.driver").Return("async").Times(2) + s.mockConfig.On("GetString", "queue.failed.database").Return("database").Once() + s.mockConfig.On("GetString", "queue.failed.table").Return("failed_jobs").Once() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + go func(ctx context.Context) { + worker := s.app.Worker(&queue.Args{ + Queue: "delay", + }) + s.Nil(worker.Run()) + + <-ctx.Done() + s.Nil(worker.Shutdown()) + }(ctx) + time.Sleep(1 * time.Second) + s.Nil(s.app.Job(&TestDelayAsyncJob{}, []any{"TestDelayAsyncQueue", 1}).OnQueue("delay").Delay(3).Dispatch()) + time.Sleep(2 * time.Second) + s.Equal(0, testDelayAsyncJob) + time.Sleep(3 * time.Second) + s.Equal(1, testDelayAsyncJob) + + s.mockConfig.AssertExpectations(s.T()) + s.mockQueue.AssertExpectations(s.T()) +} + +func (s *DriverAsyncTestSuite) TestCustomAsyncQueue() { + s.mockConfig.On("GetString", "queue.default").Return("custom").Times(4) + s.mockConfig.On("GetString", "app.name").Return("goravel").Times(3) + s.mockConfig.On("GetString", "queue.connections.custom.queue", "default").Return("default").Once() + s.mockConfig.On("GetString", "queue.connections.custom.driver").Return("async").Times(2) + s.mockConfig.On("GetString", "queue.failed.database").Return("database").Once() + s.mockConfig.On("GetString", "queue.failed.table").Return("failed_jobs").Once() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + go func(ctx context.Context) { + worker := s.app.Worker(&queue.Args{ + Connection: "custom", + Queue: "custom1", + Concurrent: 2, + }) + s.Nil(worker.Run()) + + <-ctx.Done() + s.Nil(worker.Shutdown()) + }(ctx) + time.Sleep(1 * time.Second) + s.Nil(s.app.Job(&TestCustomAsyncJob{}, []any{"TestCustomAsyncQueue", 1}).OnConnection("custom").OnQueue("custom1").Dispatch()) + time.Sleep(2 * time.Second) + s.Equal(1, testCustomAsyncJob) + + s.mockConfig.AssertExpectations(s.T()) + s.mockQueue.AssertExpectations(s.T()) +} + +func (s *DriverAsyncTestSuite) TestErrorAsyncQueue() { + s.mockConfig.On("GetString", "queue.default").Return("async").Times(4) + s.mockConfig.On("GetString", "app.name").Return("goravel").Times(3) + s.mockConfig.On("GetString", "queue.connections.async.queue", "default").Return("default").Once() + s.mockConfig.On("GetString", "queue.connections.async.driver").Return("async").Once() + s.mockConfig.On("GetString", "queue.connections.redis.driver").Return("").Once() + s.mockConfig.On("GetString", "queue.failed.database").Return("database").Once() + s.mockConfig.On("GetString", "queue.failed.table").Return("failed_jobs").Once() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + go func(ctx context.Context) { + worker := s.app.Worker(&queue.Args{ + Queue: "error", + }) + s.Nil(worker.Run()) + + <-ctx.Done() + s.Nil(worker.Shutdown()) + }(ctx) + time.Sleep(1 * time.Second) + s.Error(s.app.Job(&TestErrorAsyncJob{}, []any{"TestErrorAsyncQueue", 1}).OnConnection("redis").OnQueue("error1").Dispatch()) + time.Sleep(2 * time.Second) + s.Equal(0, testErrorAsyncJob) + + s.mockConfig.AssertExpectations(s.T()) + s.mockQueue.AssertExpectations(s.T()) +} + +func (s *DriverAsyncTestSuite) TestChainAsyncQueue() { + s.mockConfig.On("GetString", "queue.default").Return("async").Times(4) + s.mockConfig.On("GetString", "app.name").Return("goravel").Times(3) + s.mockConfig.On("GetString", "queue.connections.async.queue", "default").Return("default").Once() + s.mockConfig.On("GetString", "queue.connections.async.driver").Return("async").Times(2) + s.mockConfig.On("GetString", "queue.failed.database").Return("database").Once() + s.mockConfig.On("GetString", "queue.failed.table").Return("failed_jobs").Once() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + go func(ctx context.Context) { + worker := s.app.Worker(&queue.Args{ + Queue: "chain", + }) + s.Nil(worker.Run()) + + <-ctx.Done() + s.Nil(worker.Shutdown()) + }(ctx) + + time.Sleep(1 * time.Second) + s.Nil(s.app.Chain([]queue.Jobs{ + { + Job: &TestChainAsyncJob{}, + Args: []any{"TestChainAsyncJob", 1}, + }, + { + Job: &TestAsyncJob{}, + Args: []any{"TestAsyncJob", 1}, + }, + }).OnQueue("chain").Dispatch()) + + time.Sleep(2 * time.Second) + s.Equal(1, testChainAsyncJob) + s.Equal(1, testAsyncJob) + + s.mockConfig.AssertExpectations(s.T()) +} + +type TestAsyncJob struct { +} + +// Signature The name and signature of the job. +func (receiver *TestAsyncJob) Signature() string { + return "test_async_job" +} + +// Handle Execute the job. +func (receiver *TestAsyncJob) Handle(args ...any) error { + testAsyncJob++ + + return nil +} + +type TestDelayAsyncJob struct { +} + +// Signature The name and signature of the job. +func (receiver *TestDelayAsyncJob) Signature() string { + return "test_delay_async_job" +} + +// Handle Execute the job. +func (receiver *TestDelayAsyncJob) Handle(args ...any) error { + testDelayAsyncJob++ + + return nil +} + +type TestCustomAsyncJob struct { +} + +// Signature The name and signature of the job. +func (receiver *TestCustomAsyncJob) Signature() string { + return "test_custom_async_job" +} + +// Handle Execute the job. +func (receiver *TestCustomAsyncJob) Handle(args ...any) error { + testCustomAsyncJob++ + + return nil +} + +type TestErrorAsyncJob struct { +} + +// Signature The name and signature of the job. +func (receiver *TestErrorAsyncJob) Signature() string { + return "test_error_async_job" +} + +// Handle Execute the job. +func (receiver *TestErrorAsyncJob) Handle(args ...any) error { + testErrorAsyncJob++ + + return nil +} + +type TestChainAsyncJob struct { +} + +// Signature The name and signature of the job. +func (receiver *TestChainAsyncJob) Signature() string { + return "test_chain_async_job" +} + +// Handle Execute the job. +func (receiver *TestChainAsyncJob) Handle(args ...any) error { + testChainAsyncJob++ + + return nil +} diff --git a/queue/driver_machinery.go b/queue/driver_machinery.go new file mode 100644 index 000000000..ce78bbf52 --- /dev/null +++ b/queue/driver_machinery.go @@ -0,0 +1,85 @@ +// TODO: Will be removed in v1.17 + +package queue + +import ( + "time" + + "github.com/RichardKnop/machinery/v2" + redisbackend "github.com/RichardKnop/machinery/v2/backends/redis" + redisbroker "github.com/RichardKnop/machinery/v2/brokers/redis" + "github.com/RichardKnop/machinery/v2/config" + "github.com/RichardKnop/machinery/v2/locks/eager" + "github.com/RichardKnop/machinery/v2/log" + + logcontract "github.com/goravel/framework/contracts/log" + "github.com/goravel/framework/contracts/queue" +) + +type Machinery struct { + connection string + config *Config + log logcontract.Log +} + +func NewMachinery(connection string, config *Config, log logcontract.Log) *Machinery { + return &Machinery{ + connection: connection, + config: config, + log: log, + } +} + +func (m *Machinery) Connection() string { + return m.connection +} + +func (m *Machinery) Driver() string { + //TODO implement me + panic("implement me") +} + +func (m *Machinery) Push(job queue.Job, args []any, queue string) error { + //TODO implement me + panic("implement me") +} + +func (m *Machinery) Bulk(jobs []queue.Jobs, queue string) error { + //TODO implement me + panic("implement me") +} + +func (m *Machinery) Later(delay time.Duration, job queue.Job, args []any, queue string) error { + //TODO implement me + panic("implement me") +} + +func (m *Machinery) Pop(queue string) (queue.Job, []any, error) { + //TODO implement me + panic("implement me") +} + +func (m *Machinery) server(queue string) *machinery.Server { + redisConfig, database, defaultQueue := m.config.Redis(m.connection) + if queue == "" { + queue = defaultQueue + } + + cnf := &config.Config{ + DefaultQueue: queue, + Redis: &config.RedisConfig{}, + } + + broker := redisbroker.NewGR(cnf, []string{redisConfig}, database) + backend := redisbackend.NewGR(cnf, []string{redisConfig}, database) + lock := eager.New() + + debug := m.config.config.GetBool("app.debug") + log.DEBUG = NewDebug(debug, m.log) + log.INFO = NewInfo(debug, m.log) + log.WARNING = NewWarning(debug, m.log) + log.ERROR = NewError(debug, m.log) + log.FATAL = NewFatal(debug, m.log) + + return machinery.NewServer(cnf, broker, backend, lock) +} diff --git a/queue/log.go b/queue/driver_machinery_log.go similarity index 99% rename from queue/log.go rename to queue/driver_machinery_log.go index 898b5bb54..5d26a2d27 100644 --- a/queue/log.go +++ b/queue/driver_machinery_log.go @@ -1,3 +1,5 @@ +// TODO: Will be removed in v1.17 + package queue import ( diff --git a/queue/machinery_test.go b/queue/driver_machinery_test.go similarity index 76% rename from queue/machinery_test.go rename to queue/driver_machinery_test.go index ae59b556f..105198d50 100644 --- a/queue/machinery_test.go +++ b/queue/driver_machinery_test.go @@ -1,3 +1,5 @@ +// TODO: Will be removed in v1.17 + package queue import ( @@ -23,7 +25,6 @@ func TestMachineryTestSuite(t *testing.T) { func (s *MachineryTestSuite) SetupTest() { s.mockConfig = &configmock.Config{} s.mockLog = &logmock.Log{} - s.machinery = NewMachinery(NewConfig(s.mockConfig), s.mockLog) } func (s *MachineryTestSuite) TestServer() { @@ -35,13 +36,6 @@ func (s *MachineryTestSuite) TestServer() { expectServer bool expectErr bool }{ - { - name: "sync", - connection: "sync", - setup: func() { - s.mockConfig.On("GetString", "queue.connections.sync.driver").Return("sync").Once() - }, - }, { name: "redis", connection: "redis", @@ -58,23 +52,14 @@ func (s *MachineryTestSuite) TestServer() { }, expectServer: true, }, - { - name: "error", - connection: "custom", - setup: func() { - s.mockConfig.On("GetString", "queue.connections.custom.driver").Return("custom").Once() - - }, - expectErr: true, - }, } for _, test := range tests { s.Run(test.name, func() { + s.machinery = NewMachinery(test.connection, NewConfig(s.mockConfig), s.mockLog) test.setup() - server, err := s.machinery.Server(test.connection, test.queue) + server := s.machinery.server(test.queue) s.Equal(test.expectServer, server != nil) - s.Equal(test.expectErr, err != nil) s.mockConfig.AssertExpectations(s.T()) }) } diff --git a/queue/driver_sync.go b/queue/driver_sync.go new file mode 100644 index 000000000..f4a43b9c6 --- /dev/null +++ b/queue/driver_sync.go @@ -0,0 +1,52 @@ +package queue + +import ( + "time" + + "github.com/goravel/framework/contracts/queue" +) + +type Sync struct { + connection string +} + +func NewSync(connection string) *Sync { + return &Sync{ + connection: connection, + } +} + +func (r *Sync) Connection() string { + return r.connection +} + +func (r *Sync) Driver() string { + return DriverSync +} + +func (r *Sync) Push(job queue.Job, args []any, _ string) error { + return job.Handle(args...) +} + +func (r *Sync) Bulk(jobs []queue.Jobs, _ string) error { + for _, job := range jobs { + if job.Delay > 0 { + time.Sleep(job.Delay) + } + if err := job.Job.Handle(job.Args...); err != nil { + return err + } + } + + return nil +} + +func (r *Sync) Later(delay time.Duration, job queue.Job, args []any, _ string) error { + time.Sleep(delay) + return job.Handle(args...) +} + +func (r *Sync) Pop(_ string) (queue.Job, []any, error) { + // sync driver does not support pop + return nil, nil, nil +} diff --git a/queue/driver_sync_test.go b/queue/driver_sync_test.go new file mode 100644 index 000000000..5c407157f --- /dev/null +++ b/queue/driver_sync_test.go @@ -0,0 +1,107 @@ +package queue + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" + + "github.com/goravel/framework/contracts/queue" + configmock "github.com/goravel/framework/mocks/config" + queuemock "github.com/goravel/framework/mocks/queue" +) + +var ( + testSyncJob = 0 + testChainSyncJob = 0 +) + +type DriverSyncTestSuite struct { + suite.Suite + app *Application + mockConfig *configmock.Config + mockQueue *queuemock.Queue +} + +func TestDriverSyncTestSuite(t *testing.T) { + mockConfig := &configmock.Config{} + mockQueue := &queuemock.Queue{} + app := NewApplication(mockConfig) + + assert.Nil(t, app.Register([]queue.Job{&TestSyncJob{}, &TestChainSyncJob{}})) + suite.Run(t, &DriverSyncTestSuite{ + app: app, + mockConfig: mockConfig, + mockQueue: mockQueue, + }) +} + +func (s *DriverSyncTestSuite) SetupTest() { + testSyncJob = 0 + testChainSyncJob = 0 +} + +func (s *DriverSyncTestSuite) TestSyncQueue() { + s.mockConfig.On("GetString", "queue.default").Return("sync").Times(3) + s.mockConfig.On("GetString", "app.name").Return("goravel").Once() + s.mockConfig.On("GetString", "queue.connections.sync.queue", "default").Return("default").Once() + + s.Nil(s.app.Job(&TestSyncJob{}, []any{"TestSyncQueue", 1}).DispatchSync()) + s.Equal(1, testSyncJob) + + s.mockConfig.AssertExpectations(s.T()) +} + +func (s *DriverSyncTestSuite) TestChainSyncQueue() { + s.mockConfig.On("GetString", "queue.default").Return("sync").Times(3) + s.mockConfig.On("GetString", "app.name").Return("goravel").Twice() + s.mockConfig.On("GetString", "queue.connections.sync.queue", "default").Return("default").Once() + s.mockConfig.On("GetString", "queue.connections.sync.driver").Return("sync").Once() + + s.Nil(s.app.Chain([]queue.Jobs{ + { + Job: &TestChainSyncJob{}, + Args: []any{"TestChainSyncJob", 1}, + }, + { + Job: &TestSyncJob{}, + Args: []any{"TestSyncJob", 1}, + }, + }).OnQueue("chain").Dispatch()) + + time.Sleep(2 * time.Second) + s.Equal(1, testChainSyncJob) + + s.mockConfig.AssertExpectations(s.T()) +} + +type TestSyncJob struct { +} + +// Signature The name and signature of the job. +func (receiver *TestSyncJob) Signature() string { + return "test_sync_job" +} + +// Handle Execute the job. +func (receiver *TestSyncJob) Handle(args ...any) error { + testSyncJob++ + + return nil +} + +type TestChainSyncJob struct { +} + +// Signature The name and signature of the job. +func (receiver *TestChainSyncJob) Signature() string { + return "test_chain_sync_job" +} + +// Handle Execute the job. +func (receiver *TestChainSyncJob) Handle(args ...any) error { + testChainSyncJob++ + + return nil +} diff --git a/queue/job.go b/queue/job.go new file mode 100644 index 000000000..6607683cb --- /dev/null +++ b/queue/job.go @@ -0,0 +1,71 @@ +package queue + +import ( + "sync" + + contractsqueue "github.com/goravel/framework/contracts/queue" + "github.com/goravel/framework/support/carbon" +) + +type FailedJob struct { + ID uint `gorm:"primaryKey"` // The unique ID of the job. + Queue string `gorm:"not null"` // The name of the queue the job belongs to. + Signature string `gorm:"not null"` // The signature of the handler for this job. + Payloads []any `gorm:"not null;serializer:json"` // The arguments passed to the job. + Exception string `gorm:"not null"` // The exception that caused the job to fail. + FailedAt carbon.DateTime `gorm:"not null"` // The timestamp when the job failed. +} + +type Job interface { + Register(jobs []contractsqueue.Job) error + Call(signature string, args []any) error + Get(signature string) (contractsqueue.Job, error) + GetJobs() []contractsqueue.Job +} + +type JobImpl struct { + jobs sync.Map +} + +func NewJobImpl() *JobImpl { + return &JobImpl{} +} + +// Register registers jobs to the job manager +func (r *JobImpl) Register(jobs []contractsqueue.Job) error { + for _, job := range jobs { + r.jobs.Store(job.Signature(), job) + } + + return nil +} + +// Call calls a registered job using its signature +func (r *JobImpl) Call(signature string, args []any) error { + job, err := r.Get(signature) + if err != nil { + return err + } + + return job.Handle(args...) +} + +// Get gets a registered job using its signature +func (r *JobImpl) Get(signature string) (contractsqueue.Job, error) { + if job, ok := r.jobs.Load(signature); ok { + return job.(contractsqueue.Job), nil + } + + return nil, nil +} + +// GetJobs gets all registered jobs +func (r *JobImpl) GetJobs() []contractsqueue.Job { + var jobs []contractsqueue.Job + r.jobs.Range(func(_, value any) bool { + jobs = append(jobs, value.(contractsqueue.Job)) + return true + }) + + return jobs +} diff --git a/queue/machinery.go b/queue/machinery.go deleted file mode 100644 index df2e6bf06..000000000 --- a/queue/machinery.go +++ /dev/null @@ -1,63 +0,0 @@ -package queue - -import ( - "github.com/RichardKnop/machinery/v2" - redisbackend "github.com/RichardKnop/machinery/v2/backends/redis" - redisbroker "github.com/RichardKnop/machinery/v2/brokers/redis" - "github.com/RichardKnop/machinery/v2/config" - "github.com/RichardKnop/machinery/v2/locks/eager" - "github.com/RichardKnop/machinery/v2/log" - - logcontract "github.com/goravel/framework/contracts/log" - "github.com/goravel/framework/errors" - "github.com/goravel/framework/support/color" -) - -type Machinery struct { - config *Config - log logcontract.Log -} - -func NewMachinery(config *Config, log logcontract.Log) *Machinery { - return &Machinery{config: config, log: log} -} - -func (m *Machinery) Server(connection string, queue string) (*machinery.Server, error) { - driver := m.config.Driver(connection) - - switch driver { - case DriverSync: - color.Warningln("Queue sync driver doesn't need to be run") - - return nil, nil - case DriverRedis: - return m.redisServer(connection, queue), nil - } - - return nil, errors.QueueDriverNotSupported.Args(driver) -} - -func (m *Machinery) redisServer(connection string, queue string) *machinery.Server { - redisConfig, database, defaultQueue := m.config.Redis(connection) - if queue == "" { - queue = defaultQueue - } - - cnf := &config.Config{ - DefaultQueue: queue, - Redis: &config.RedisConfig{}, - } - - broker := redisbroker.NewGR(cnf, []string{redisConfig}, database) - backend := redisbackend.NewGR(cnf, []string{redisConfig}, database) - lock := eager.New() - - debug := m.config.config.GetBool("app.debug") - log.DEBUG = NewDebug(debug, m.log) - log.INFO = NewInfo(debug, m.log) - log.WARNING = NewWarning(debug, m.log) - log.ERROR = NewError(debug, m.log) - log.FATAL = NewFatal(debug, m.log) - - return machinery.NewServer(cnf, broker, backend, lock) -} diff --git a/queue/service_provider.go b/queue/service_provider.go index fb6e41421..5e9117e7c 100644 --- a/queue/service_provider.go +++ b/queue/service_provider.go @@ -2,13 +2,20 @@ package queue import ( "github.com/goravel/framework/contracts/console" + "github.com/goravel/framework/contracts/database/orm" "github.com/goravel/framework/contracts/foundation" + "github.com/goravel/framework/contracts/log" "github.com/goravel/framework/errors" - queueConsole "github.com/goravel/framework/queue/console" + queueconsole "github.com/goravel/framework/queue/console" ) const Binding = "goravel.queue" +var ( + LogFacade log.Log // TODO: Will be removed in v1.17 + OrmFacade orm.Orm +) + type ServiceProvider struct { } @@ -19,17 +26,19 @@ func (receiver *ServiceProvider) Register(app foundation.Application) { return nil, errors.ConfigFacadeNotSet.SetModule(errors.ModuleQueue) } - log := app.MakeLog() - if log == nil { - return nil, errors.LogFacadeNotSet.SetModule(errors.ModuleQueue) - } - - return NewApplication(config, log), nil + return NewApplication(app.MakeConfig()), nil }) } func (receiver *ServiceProvider) Boot(app foundation.Application) { - app.Commands([]console.Command{ - &queueConsole.JobMakeCommand{}, + LogFacade = app.MakeLog() // TODO: Will be removed in v1.17 + OrmFacade = app.MakeOrm() + + receiver.registerCommands(app) +} + +func (receiver *ServiceProvider) registerCommands(app foundation.Application) { + app.MakeArtisan().Register([]console.Command{ + &queueconsole.JobMakeCommand{}, }) } diff --git a/queue/task.go b/queue/task.go index 8c8714227..3c0999d5a 100644 --- a/queue/task.go +++ b/queue/task.go @@ -3,84 +3,75 @@ package queue import ( "time" - "github.com/RichardKnop/machinery/v2" - "github.com/RichardKnop/machinery/v2/tasks" - - "github.com/goravel/framework/contracts/log" "github.com/goravel/framework/contracts/queue" - "github.com/goravel/framework/errors" ) type Task struct { config *Config connection string chain bool - delay *time.Time - machinery *Machinery + delay time.Duration + driver *DriverImpl jobs []queue.Jobs queue string - server *machinery.Server } -func NewTask(config *Config, log log.Log, job queue.Job, args []queue.Arg) *Task { +func NewTask(config *Config, job queue.Job, args []any) *Task { return &Task{ config: config, connection: config.DefaultConnection(), - machinery: NewMachinery(config, log), + driver: NewDriverImpl(config.DefaultConnection(), config), jobs: []queue.Jobs{ { Job: job, Args: args, }, }, + queue: config.Queue(config.DefaultConnection(), ""), } } -func NewChainTask(config *Config, log log.Log, jobs []queue.Jobs) *Task { +func NewChainTask(config *Config, jobs []queue.Jobs) *Task { return &Task{ config: config, connection: config.DefaultConnection(), chain: true, - machinery: NewMachinery(config, log), + driver: NewDriverImpl(config.DefaultConnection(), config), jobs: jobs, + queue: config.Queue(config.DefaultConnection(), ""), } } -func (receiver *Task) Delay(delay time.Time) queue.Task { - receiver.delay = &delay +// Delay sets a delay time for the task +func (receiver *Task) Delay(delay time.Duration) queue.Task { + receiver.delay = delay return receiver } +// Dispatch dispatches the task func (receiver *Task) Dispatch() error { - driver := receiver.config.Driver(receiver.connection) - if driver == "" { - return errors.QueueDriverNotSupported.Args(driver) - } - if driver == DriverSync { - return receiver.DispatchSync() - } - - server, err := receiver.machinery.Server(receiver.connection, receiver.queue) + driver, err := receiver.driver.New() if err != nil { return err } - receiver.server = server - if receiver.chain { - return receiver.handleChain(receiver.jobs) + return driver.Bulk(receiver.jobs, receiver.queue) } else { job := receiver.jobs[0] - - return receiver.handleAsync(job.Job, job.Args) + if receiver.delay > 0 { + return driver.Later(receiver.delay, job.Job, job.Args, receiver.queue) + } + return driver.Push(job.Job, job.Args, receiver.queue) } } +// DispatchSync dispatches the task synchronously func (receiver *Task) DispatchSync() error { if receiver.chain { for _, job := range receiver.jobs { - if err := receiver.handleSync(job.Job, job.Args); err != nil { + if err := job.Job.Handle(job.Args...); err != nil { return err } } @@ -89,76 +80,21 @@ func (receiver *Task) DispatchSync() error { } else { job := receiver.jobs[0] - return receiver.handleSync(job.Job, job.Args) + return job.Job.Handle(job.Args...) } } +// OnConnection sets the connection name func (receiver *Task) OnConnection(connection string) queue.Task { receiver.connection = connection + receiver.driver = NewDriverImpl(connection, receiver.config) return receiver } +// OnQueue sets the queue name func (receiver *Task) OnQueue(queue string) queue.Task { receiver.queue = receiver.config.Queue(receiver.connection, queue) return receiver } - -func (receiver *Task) handleChain(jobs []queue.Jobs) error { - var signatures []*tasks.Signature - for _, job := range jobs { - var realArgs []tasks.Arg - for _, arg := range job.Args { - realArgs = append(realArgs, tasks.Arg{ - Type: arg.Type, - Value: arg.Value, - }) - } - - signatures = append(signatures, &tasks.Signature{ - Name: job.Job.Signature(), - Args: realArgs, - ETA: receiver.delay, - }) - } - - chain, err := tasks.NewChain(signatures...) - if err != nil { - return err - } - - _, err = receiver.server.SendChain(chain) - - return err -} - -func (receiver *Task) handleAsync(job queue.Job, args []queue.Arg) error { - var realArgs []tasks.Arg - for _, arg := range args { - realArgs = append(realArgs, tasks.Arg{ - Type: arg.Type, - Value: arg.Value, - }) - } - - _, err := receiver.server.SendTask(&tasks.Signature{ - Name: job.Signature(), - Args: realArgs, - ETA: receiver.delay, - }) - if err != nil { - return err - } - - return nil -} - -func (receiver *Task) handleSync(job queue.Job, args []queue.Arg) error { - var realArgs []any - for _, arg := range args { - realArgs = append(realArgs, arg.Value) - } - - return job.Handle(realArgs...) -} diff --git a/queue/task_test.go b/queue/task_test.go index e82967195..d84e26678 100644 --- a/queue/task_test.go +++ b/queue/task_test.go @@ -1,7 +1,6 @@ package queue import ( - "fmt" "testing" "github.com/stretchr/testify/assert" @@ -21,31 +20,26 @@ func (receiver *Test) Signature() string { // Handle Execute the job. func (receiver *Test) Handle(args ...any) error { - if len(args) == 0 { - return fmt.Errorf("no arguments provided") - } - - arg, ok := args[0].(string) - if !ok { - return fmt.Errorf("expected a string argument") - } - - return file.Create("test.txt", arg) + return file.Create("test.txt", args[0].(string)) } func TestDispatchSync(t *testing.T) { task := &Task{ jobs: []queue.Jobs{ { - Job: &Test{}, - Args: []queue.Arg{ - {Type: "uint64", Value: "test"}, - }, + Job: &Test{}, + Args: []any{"test"}, }, }, } - err := task.DispatchSync() + jobs := NewJobImpl() + err := jobs.Register([]queue.Job{ + &Test{}, + }) + assert.Nil(t, err) + + err = task.DispatchSync() assert.Nil(t, err) assert.True(t, file.Exists("test.txt")) assert.True(t, testingfile.GetLineNum("test.txt") == 1) diff --git a/queue/utils.go b/queue/utils.go index 4a82c9433..cb7233806 100644 --- a/queue/utils.go +++ b/queue/utils.go @@ -1,9 +1,11 @@ package queue import ( + "errors" + "fmt" + "github.com/goravel/framework/contracts/event" "github.com/goravel/framework/contracts/queue" - "github.com/goravel/framework/errors" ) func jobs2Tasks(jobs []queue.Job) (map[string]any, error) { @@ -11,11 +13,11 @@ func jobs2Tasks(jobs []queue.Job) (map[string]any, error) { for _, job := range jobs { if job.Signature() == "" { - return nil, errors.QueueEmptyJobSignature + return nil, errors.New("the Signature of job can't be empty") } if tasks[job.Signature()] != nil { - return nil, errors.QueueDuplicateJobSignature.Args(job.Signature()) + return nil, fmt.Errorf("job signature duplicate: %s, the names of Job and Listener cannot be duplicated", job.Signature()) } tasks[job.Signature()] = job.Handle @@ -30,7 +32,7 @@ func eventsToTasks(events map[event.Event][]event.Listener) (map[string]any, err for _, listeners := range events { for _, listener := range listeners { if listener.Signature() == "" { - return nil, errors.QueueEmptyListenerSignature + return nil, errors.New("the Signature of listener can't be empty") } if tasks[listener.Signature()] != nil { diff --git a/queue/utils_test.go b/queue/utils_test.go index 64aa0499b..fdd448c2c 100644 --- a/queue/utils_test.go +++ b/queue/utils_test.go @@ -6,7 +6,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/goravel/framework/contracts/event" - queuecontract "github.com/goravel/framework/contracts/queue" + contractsqueue "github.com/goravel/framework/contracts/queue" ) type TestJob struct { @@ -43,20 +43,20 @@ func (receiver *TestJobEmpty) Handle(args ...any) error { } func TestJobs2Tasks(t *testing.T) { - _, err := jobs2Tasks([]queuecontract.Job{ + _, err := jobs2Tasks([]contractsqueue.Job{ &TestJob{}, }) assert.Nil(t, err, "success") - _, err = jobs2Tasks([]queuecontract.Job{ + _, err = jobs2Tasks([]contractsqueue.Job{ &TestJob{}, &TestJobDuplicate{}, }) assert.NotNil(t, err, "Signature duplicate") - _, err = jobs2Tasks([]queuecontract.Job{ + _, err = jobs2Tasks([]contractsqueue.Job{ &TestJobEmpty{}, }) diff --git a/queue/worker.go b/queue/worker.go index 19a494d88..fb06faedd 100644 --- a/queue/worker.go +++ b/queue/worker.go @@ -1,61 +1,83 @@ package queue import ( - "github.com/goravel/framework/contracts/log" - "github.com/goravel/framework/contracts/queue" -) + "fmt" + "time" -const ( - DriverSync string = "sync" - DriverRedis string = "redis" + "github.com/goravel/framework/contracts/database/orm" + "github.com/goravel/framework/support/carbon" ) type Worker struct { - concurrent int - connection string - machinery *Machinery - jobs []queue.Job - queue string + concurrent int + driver *DriverImpl + job *JobImpl + failedJobs orm.Query + queue string + failedJobChan chan FailedJob + isShutdown bool } -func NewWorker(config *Config, log log.Log, concurrent int, connection string, jobs []queue.Job, queue string) *Worker { +func NewWorker(config *Config, concurrent int, connection string, queue string, job *JobImpl) *Worker { return &Worker{ - concurrent: concurrent, - connection: connection, - machinery: NewMachinery(config, log), - jobs: jobs, - queue: queue, + concurrent: concurrent, + driver: NewDriverImpl(connection, config), + job: job, + failedJobs: config.FailedJobsQuery(), + queue: queue, + failedJobChan: make(chan FailedJob), } } -func (receiver *Worker) Run() error { - server, err := receiver.machinery.Server(receiver.connection, receiver.queue) +func (r *Worker) Run() error { + r.isShutdown = false + + driver, err := r.driver.New() if err != nil { return err } - if server == nil { - return nil + if driver.Driver() == DriverSync { + return fmt.Errorf("queue %s driver not need run", r.queue) } - jobTasks, err := jobs2Tasks(receiver.jobs) - if err != nil { - return err - } + for i := 0; i < r.concurrent; i++ { + go func() { + for { + if r.isShutdown { + return + } - if err := server.RegisterTasks(jobTasks); err != nil { - return err - } + job, args, err := driver.Pop(r.queue) + if err != nil { + // This error not need to be reported. + // It is usually caused by the queue being empty. + time.Sleep(1 * time.Second) + continue + } - if receiver.queue == "" { - receiver.queue = server.GetConfig().DefaultQueue - } - if receiver.concurrent == 0 { - receiver.concurrent = 1 - } - worker := server.NewWorker(receiver.queue, receiver.concurrent) - if err := worker.Launch(); err != nil { - return err + if err = r.job.Call(job.Signature(), args); err != nil { + r.failedJobChan <- FailedJob{ + Queue: r.queue, + Signature: job.Signature(), + Payloads: args, + Exception: err.Error(), + FailedAt: carbon.DateTime{Carbon: carbon.Now()}, + } + } + } + }() } + go func() { + for job := range r.failedJobChan { + _ = r.failedJobs.Create(&job) + } + }() + + return nil +} + +func (r *Worker) Shutdown() error { + r.isShutdown = true return nil }