From 326aa146e8ac3038e06c6300498d452976df0560 Mon Sep 17 00:00:00 2001 From: youcai Date: Sat, 15 Feb 2020 16:23:01 +0800 Subject: [PATCH 1/4] regist multi task handler; test wokers listen on diff queue --- extensions/asynctaskext/asynctaskext.go | 6 +- extensions/asynctaskext/asynctaskext_test.go | 71 ++++++++++++++------ 2 files changed, 52 insertions(+), 25 deletions(-) diff --git a/extensions/asynctaskext/asynctaskext.go b/extensions/asynctaskext/asynctaskext.go index 89d30ac6..11d70edb 100644 --- a/extensions/asynctaskext/asynctaskext.go +++ b/extensions/asynctaskext/asynctaskext.go @@ -60,9 +60,9 @@ func (t *AsyncTaskExt) Close() error { return nil } -//RegisterWorkerHandler add task handler to worker to process task messages -func (t *AsyncTaskExt) RegisterWorkerHandler(name string, handler interface{}) error { - return t.server.RegisterTask(name, handler) +//RegisterWorkerHandlers add task handlers to worker to process task messages +func (t *AsyncTaskExt) RegisterWorkerHandlers(handlers map[string]interface{}) error { + return t.server.RegisterTasks(handlers) } //StartWorker start a worker that consume task messages for queue diff --git a/extensions/asynctaskext/asynctaskext_test.go b/extensions/asynctaskext/asynctaskext_test.go index 0f59c945..422f9d79 100644 --- a/extensions/asynctaskext/asynctaskext_test.go +++ b/extensions/asynctaskext/asynctaskext_test.go @@ -31,40 +31,63 @@ func init() { } func TestPushConsume(t *testing.T) { - if err := task.RegisterWorkerHandler("add", TaskAdd); err != nil { + if err := task.RegisterWorkerHandlers(map[string]interface{}{"add": TaskAdd, "sub": TaskSub}); err != nil { t.Error(err) } go func() { - if err := task.StartWorker("gobay.task"); err != nil { + if err := task.StartWorker("gobay.task_add"); err != nil { + t.Error(err) + } + }() + go func() { + if err := task.StartWorker("gobay.task_sub"); err != nil { t.Error(err) } }() - sign := &tasks.Signature{ - Name: "add", - Args: []tasks.Arg{ - { - Type: "int64", - Value: 1, + signs := []*tasks.Signature{ + { + Name: "add", + RoutingKey: "gobay.task_add", + Args: []tasks.Arg{ + { + Type: "int64", + Value: 1, + }, + { + Type: "int64", + Value: 2, + }, + { + Type: "int64", + Value: 3, + }, }, - { - Type: "int64", - Value: 2, - }, - { - Type: "int64", - Value: 3, + }, + { + Name: "sub", + RoutingKey: "gobay.task_sub", + Args: []tasks.Arg{ + { + Type: "int64", + Value: 7, + }, + { + Type: "int64", + Value: 1, + }, }, }, } - if asyncResult, err := task.SendTask(sign); err != nil { - t.Error(err) - } else if results, err := asyncResult.Get(time.Millisecond * 5); err != nil { - t.Error(err) - } else if res, ok := results[0].Interface().(int64); !ok || res != 6 { - t.Error("result error") + for _, sign := range(signs) { + if asyncResult, err := task.SendTask(sign); err != nil { + t.Error(err) + } else if results, err := asyncResult.Get(time.Millisecond * 5); err != nil { + t.Error(err) + } else if res, ok := results[0].Interface().(int64); !ok || res != 6 { + t.Error("result error") + } } - } func TaskAdd(args ...int64) (int64, error) { sum := int64(0) @@ -73,3 +96,7 @@ func TaskAdd(args ...int64) (int64, error) { } return sum, nil } + +func TaskSub(arg1, arg2 int64) (int64, error) { + return arg1 - arg2, nil +} From e81472d5937f6206d79a9053b4dd637071a03134 Mon Sep 17 00:00:00 2001 From: youcai Date: Sat, 15 Feb 2020 16:41:58 +0800 Subject: [PATCH 2/4] set worker tag --- extensions/asynctaskext/asynctaskext.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/extensions/asynctaskext/asynctaskext.go b/extensions/asynctaskext/asynctaskext.go index 11d70edb..27a732ae 100644 --- a/extensions/asynctaskext/asynctaskext.go +++ b/extensions/asynctaskext/asynctaskext.go @@ -3,6 +3,7 @@ package asynctaskext import ( "errors" "os" + "fmt" "github.com/RichardKnop/machinery/v1" "github.com/RichardKnop/machinery/v1/backends/result" @@ -10,7 +11,7 @@ import ( "github.com/RichardKnop/machinery/v1/log" "github.com/RichardKnop/machinery/v1/tasks" "github.com/mitchellh/mapstructure" - + uuid "github.com/satori/go.uuid" "github.com/shanbay/gobay" ) @@ -71,7 +72,8 @@ func (t *AsyncTaskExt) StartWorker(queue string) error { if err != nil { log.ERROR.Printf("get host name failed: %v", err) } - worker := t.server.NewWorker(hostName, 0) + tag := fmt.Sprintf("%s-%s@%s", queue, uuid.NewV4().String()[:6],hostName) + worker := t.server.NewWorker(tag, 0) worker.Queue = queue t.workers = append(t.workers, worker) return worker.Launch() From 6ed87971e3a108b983b2d818e688d0fac4ca3f72 Mon Sep 17 00:00:00 2001 From: youcai Date: Sat, 15 Feb 2020 16:44:20 +0800 Subject: [PATCH 3/4] support concurrency --- extensions/asynctaskext/asynctaskext.go | 4 ++-- extensions/asynctaskext/asynctaskext_test.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/extensions/asynctaskext/asynctaskext.go b/extensions/asynctaskext/asynctaskext.go index 27a732ae..4f9d31a3 100644 --- a/extensions/asynctaskext/asynctaskext.go +++ b/extensions/asynctaskext/asynctaskext.go @@ -67,13 +67,13 @@ func (t *AsyncTaskExt) RegisterWorkerHandlers(handlers map[string]interface{}) e } //StartWorker start a worker that consume task messages for queue -func (t *AsyncTaskExt) StartWorker(queue string) error { +func (t *AsyncTaskExt) StartWorker(queue string, concurrency int) error { hostName, err := os.Hostname() if err != nil { log.ERROR.Printf("get host name failed: %v", err) } tag := fmt.Sprintf("%s-%s@%s", queue, uuid.NewV4().String()[:6],hostName) - worker := t.server.NewWorker(tag, 0) + worker := t.server.NewWorker(tag, concurrency) worker.Queue = queue t.workers = append(t.workers, worker) return worker.Launch() diff --git a/extensions/asynctaskext/asynctaskext_test.go b/extensions/asynctaskext/asynctaskext_test.go index 422f9d79..fb0fab1f 100644 --- a/extensions/asynctaskext/asynctaskext_test.go +++ b/extensions/asynctaskext/asynctaskext_test.go @@ -35,12 +35,12 @@ func TestPushConsume(t *testing.T) { t.Error(err) } go func() { - if err := task.StartWorker("gobay.task_add"); err != nil { + if err := task.StartWorker("gobay.task_add", 1); err != nil { t.Error(err) } }() go func() { - if err := task.StartWorker("gobay.task_sub"); err != nil { + if err := task.StartWorker("gobay.task_sub", 1); err != nil { t.Error(err) } }() From d63922e450ecbbaf5ca86726216d305fd55873bc Mon Sep 17 00:00:00 2001 From: youcai Date: Sat, 15 Feb 2020 17:08:30 +0800 Subject: [PATCH 4/4] go fmt; resolve comment --- extensions/asynctaskext/asynctaskext.go | 13 ++++++++++--- extensions/asynctaskext/asynctaskext_test.go | 10 +++++----- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/extensions/asynctaskext/asynctaskext.go b/extensions/asynctaskext/asynctaskext.go index 4f9d31a3..6483d9a6 100644 --- a/extensions/asynctaskext/asynctaskext.go +++ b/extensions/asynctaskext/asynctaskext.go @@ -2,8 +2,8 @@ package asynctaskext import ( "errors" - "os" "fmt" + "os" "github.com/RichardKnop/machinery/v1" "github.com/RichardKnop/machinery/v1/backends/result" @@ -11,7 +11,6 @@ import ( "github.com/RichardKnop/machinery/v1/log" "github.com/RichardKnop/machinery/v1/tasks" "github.com/mitchellh/mapstructure" - uuid "github.com/satori/go.uuid" "github.com/shanbay/gobay" ) @@ -61,6 +60,11 @@ func (t *AsyncTaskExt) Close() error { return nil } +//RegisterWorkerHandler add task handler to worker to process task messages +func (t *AsyncTaskExt) RegisterWorkerHandler(name string, handler interface{}) error { + return t.server.RegisterTask(name, handler) +} + //RegisterWorkerHandlers add task handlers to worker to process task messages func (t *AsyncTaskExt) RegisterWorkerHandlers(handlers map[string]interface{}) error { return t.server.RegisterTasks(handlers) @@ -72,7 +76,10 @@ func (t *AsyncTaskExt) StartWorker(queue string, concurrency int) error { if err != nil { log.ERROR.Printf("get host name failed: %v", err) } - tag := fmt.Sprintf("%s-%s@%s", queue, uuid.NewV4().String()[:6],hostName) + if queue == "" { + queue = t.config.DefaultQueue + } + tag := fmt.Sprintf("%s@%s", queue, hostName) worker := t.server.NewWorker(tag, concurrency) worker.Queue = queue t.workers = append(t.workers, worker) diff --git a/extensions/asynctaskext/asynctaskext_test.go b/extensions/asynctaskext/asynctaskext_test.go index fb0fab1f..5df67f3d 100644 --- a/extensions/asynctaskext/asynctaskext_test.go +++ b/extensions/asynctaskext/asynctaskext_test.go @@ -35,7 +35,8 @@ func TestPushConsume(t *testing.T) { t.Error(err) } go func() { - if err := task.StartWorker("gobay.task_add", 1); err != nil { + // use default queue + if err := task.StartWorker("", 1); err != nil { t.Error(err) } }() @@ -48,8 +49,7 @@ func TestPushConsume(t *testing.T) { signs := []*tasks.Signature{ { Name: "add", - RoutingKey: "gobay.task_add", - Args: []tasks.Arg{ + Args: []tasks.Arg{ // use default queue { Type: "int64", Value: 1, @@ -65,7 +65,7 @@ func TestPushConsume(t *testing.T) { }, }, { - Name: "sub", + Name: "sub", RoutingKey: "gobay.task_sub", Args: []tasks.Arg{ { @@ -79,7 +79,7 @@ func TestPushConsume(t *testing.T) { }, }, } - for _, sign := range(signs) { + for _, sign := range signs { if asyncResult, err := task.SendTask(sign); err != nil { t.Error(err) } else if results, err := asyncResult.Get(time.Millisecond * 5); err != nil {