Skip to content

Commit

Permalink
Merge pull request #68 from HeathLee/master
Browse files Browse the repository at this point in the history
improve asynctask
  • Loading branch information
HeathLee authored Feb 15, 2020
2 parents ae879c5 + d63922e commit 0d19f9a
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 25 deletions.
15 changes: 12 additions & 3 deletions extensions/asynctaskext/asynctaskext.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package asynctaskext

import (
"errors"
"fmt"
"os"

"github.com/RichardKnop/machinery/v1"
Expand All @@ -10,7 +11,6 @@ import (
"github.com/RichardKnop/machinery/v1/log"
"github.com/RichardKnop/machinery/v1/tasks"
"github.com/mitchellh/mapstructure"

"github.com/shanbay/gobay"
)

Expand Down Expand Up @@ -65,13 +65,22 @@ func (t *AsyncTaskExt) RegisterWorkerHandler(name string, handler interface{}) e
return t.server.RegisterTask(name, handler)
}

//RegisterWorkerHandlers add task handlers to worker to process task messages
func (t *AsyncTaskExt) RegisterWorkerHandlers(handlers map[string]interface{}) error {
return t.server.RegisterTasks(handlers)
}

//StartWorker start a worker that consume task messages for queue
func (t *AsyncTaskExt) StartWorker(queue string) error {
func (t *AsyncTaskExt) StartWorker(queue string, concurrency int) error {
hostName, err := os.Hostname()
if err != nil {
log.ERROR.Printf("get host name failed: %v", err)
}
worker := t.server.NewWorker(hostName, 0)
if queue == "" {
queue = t.config.DefaultQueue
}
tag := fmt.Sprintf("%s@%s", queue, hostName)
worker := t.server.NewWorker(tag, concurrency)
worker.Queue = queue
t.workers = append(t.workers, worker)
return worker.Launch()
Expand Down
71 changes: 49 additions & 22 deletions extensions/asynctaskext/asynctaskext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,40 +31,63 @@ func init() {
}

func TestPushConsume(t *testing.T) {
if err := task.RegisterWorkerHandler("add", TaskAdd); err != nil {
if err := task.RegisterWorkerHandlers(map[string]interface{}{"add": TaskAdd, "sub": TaskSub}); err != nil {
t.Error(err)
}
go func() {
if err := task.StartWorker("gobay.task"); err != nil {
// use default queue
if err := task.StartWorker("", 1); err != nil {
t.Error(err)
}
}()
go func() {
if err := task.StartWorker("gobay.task_sub", 1); err != nil {
t.Error(err)
}
}()

sign := &tasks.Signature{
Name: "add",
Args: []tasks.Arg{
{
Type: "int64",
Value: 1,
signs := []*tasks.Signature{
{
Name: "add",
Args: []tasks.Arg{ // use default queue
{
Type: "int64",
Value: 1,
},
{
Type: "int64",
Value: 2,
},
{
Type: "int64",
Value: 3,
},
},
{
Type: "int64",
Value: 2,
},
{
Type: "int64",
Value: 3,
},
{
Name: "sub",
RoutingKey: "gobay.task_sub",
Args: []tasks.Arg{
{
Type: "int64",
Value: 7,
},
{
Type: "int64",
Value: 1,
},
},
},
}
if asyncResult, err := task.SendTask(sign); err != nil {
t.Error(err)
} else if results, err := asyncResult.Get(time.Millisecond * 5); err != nil {
t.Error(err)
} else if res, ok := results[0].Interface().(int64); !ok || res != 6 {
t.Error("result error")
for _, sign := range signs {
if asyncResult, err := task.SendTask(sign); err != nil {
t.Error(err)
} else if results, err := asyncResult.Get(time.Millisecond * 5); err != nil {
t.Error(err)
} else if res, ok := results[0].Interface().(int64); !ok || res != 6 {
t.Error("result error")
}
}

}
func TaskAdd(args ...int64) (int64, error) {
sum := int64(0)
Expand All @@ -73,3 +96,7 @@ func TaskAdd(args ...int64) (int64, error) {
}
return sum, nil
}

func TaskSub(arg1, arg2 int64) (int64, error) {
return arg1 - arg2, nil
}

0 comments on commit 0d19f9a

Please sign in to comment.