Skip to content

Commit

Permalink
Migrage task worker types to new package
Browse files Browse the repository at this point in the history
  • Loading branch information
jasonjoo2010 committed May 17, 2021
1 parent eecc06a commit 2d1cfe3
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 48 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ GoSchedule is an in-process scheduler. It's modularized, efficient, high availab

A web based console [goschedule-console](https://github.com/jasonjoo2010/goschedule-console) is provided as an easy to use operating panel to review runtimes/statistics, manage tasks/strategies/schedulers and manage data of storage.

The first version of GoSchedule is implemented based on `tbschedule` which comes from Taobao®. Tbschedule became opensouce in 2011~2013 and stopped updating then. For an alternated please refer to [tbschedule](https://github.com/jasonjoo2010/tbschedule).
The first version of GoSchedule is implemented based on `tbschedule` which comes from Taobao®. Tbschedule became opensource in 2011~2013 and stopped updating then. For an alternated please refer to [tbschedule](https://github.com/jasonjoo2010/tbschedule).

The overview of design:

Expand Down Expand Up @@ -111,7 +111,7 @@ FuncWorker works perfectly in scenarios implementing simple and repeated logic w

TaskWorker is a more complicated and powerful framework for select()->execute() like jobs. Partitioning can be easily configured.

For more detail on design or explaination please refer to [Workers](WORKERS.md).
For more detail on design or explanation please refer to [Workers](WORKERS.md).

#### TaskItem of TaskWorker

Expand All @@ -129,7 +129,7 @@ For more details please refer to [MODELS](MODELS.md).

### Load balancing

Your workers are distributed between nodes that can be scheduled on. The `balancing` has a meaning in two dimentions: In same strategy and over strategies.
Your workers are distributed between nodes that can be scheduled on. The `balancing` has a meaning in two dimensions: In same strategy and over strategies.

In the same strategy, requested count of worker are well distributed based on nodes. But if you have more single-worker strategy there may be still unbalanced. So a shuffling is introduced when rescheduling to optimize balancing over strategies.

Expand Down
6 changes: 3 additions & 3 deletions WORKERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ We can use a figure to get an overview of them:

Simple worker acts like a thread with start-stop lifecycle. You can use it in start-stop scenarios like consumers of queue like RocketMQ/Kafka, goroutine loop, etc. .

Besides start/stop hooks it also supports parameter, cron expressions of begin/end. They are clustered togethor and defined as `strategy`.
Besides start/stop hooks it also supports parameter, cron expressions of begin/end. They are clustered together and defined as `strategy`.

## Func Worker

Compared to `Simple` worker `Func` worker doesn't care about the lifecyle and it focuses on business in single loop. The single loop logic can be scheduled in fixed rate, or fixed time driven by cron expression of begin, or invoked repeatedly in specified time segments driven by cron expressions. It acts more like a legacy `scheduled task`.
Compared to `Simple` worker `Func` worker doesn't care about the lifecycle and it focuses on business in single loop. The single loop logic can be scheduled in fixed rate, or fixed time driven by cron expression of begin, or invoked repeatedly in specified time segments driven by cron expressions. It acts more like a legacy `scheduled task`.

## Task Worker

`Task` worker is more complicated. A task worker can act quite differently in different scenarios. It supports partitioning, parellelism, batch processing, distributing and evironment definition. For simple worker which runs in single instance globally an arbitary partition is given and enough. But for heavier jobs in which partitions are necessary you can carefully define the partitions and they can be distributed among all worker instances well:
`Task` worker is more complicated. A task worker can act quite differently in different scenarios. It supports partitioning, parallelism, batch processing, distributing and environment definition. For simple worker which runs in single instance globally an arbitrary partition is given and enough. But for heavier jobs in which partitions are necessary you can carefully define the partitions and they can be distributed among all worker instances well:

![Partitioning in task](doc/partition.png)

Expand Down
3 changes: 2 additions & 1 deletion core/worker/task_worker/executor_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ import (
"sync"
"time"

"github.com/jasonjoo2010/goschedule/types"
"github.com/sirupsen/logrus"
)

type BatchExecutor struct {
worker *TaskWorker
task TaskBatch
task types.TaskBatch
pool sync.Pool
}

Expand Down
3 changes: 2 additions & 1 deletion core/worker/task_worker/executor_single.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ package task_worker
import (
"time"

"github.com/jasonjoo2010/goschedule/types"
"github.com/sirupsen/logrus"
)

type SingleExecutor struct {
worker *TaskWorker
task TaskSingle
task types.TaskSingle
}

func (m *SingleExecutor) execute(item interface{}) {
Expand Down
54 changes: 14 additions & 40 deletions core/worker/task_worker/task_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,37 +25,11 @@ var (
taskRegistryMap sync.Map
)

// TaskBase defines the task used in scheduling.
type TaskBase interface {
// Select returns tasks to be dealed later.
// It will be guaranteed in serial model.
// parameter, items, eachFetchNum are from definition of task
// ownSign is from name of strategy binded in the form of 'name$ownsign'
// It's a kind of relation to strategy but generally task doesn't care about strategy in user's view.
Select(parameter, ownSign string, items []definition.TaskItem, eachFetchNum int) []interface{}
}

// TaskSingle represents one task one time(routine) model
type TaskSingle interface {
TaskBase
// return true if succ false otherwise, but things will still go on
Execute(task interface{}, ownSign string) bool
}

// TaskBatch represents multiple tasks one time(routine) model
type TaskBatch interface {
TaskBase
// return true if succ false otherwise, but things will still go on
Execute(tasks []interface{}, ownSign string) bool
}

type TaskComparable interface {
Less(a, b interface{}) bool
}

// TaskWorker implements a task-driven worker.
// Strategy.Bind should be the identifier of task(on console panel).
type TaskWorker struct {
types.Worker

mu sync.Mutex
selectLock sync.Mutex
parameter string
Expand All @@ -72,7 +46,7 @@ type TaskWorker struct {
queuedData []interface{}
model TaskModel
executor TaskExecutor
task TaskBase
task types.TaskBase
executors int32
schedStart cron.Schedule
schedEnd cron.Schedule
Expand All @@ -89,15 +63,15 @@ type TaskWorker struct {
Statistics definition.Statistics
}

func getTaskFromType(t reflect.Type) TaskBase {
if v, ok := reflect.New(t).Interface().(TaskBase); ok {
func getTaskFromType(t reflect.Type) types.TaskBase {
if v, ok := reflect.New(t).Interface().(types.TaskBase); ok {
return v
}
logrus.Warn("Entry registered is not a convertable type: ", t)
return nil
}

func getTask(name string) TaskBase {
func getTask(name string) types.TaskBase {
var (
ok bool
v interface{}
Expand All @@ -110,7 +84,7 @@ func getTask(name string) TaskBase {
if ok {
return getTaskFromType(t)
}
val, ok := v.(TaskBase)
val, ok := v.(types.TaskBase)
if ok {
return val
}
Expand All @@ -119,15 +93,15 @@ func getTask(name string) TaskBase {
}

// RegisterTaskType registers a task type with key inferred by its type
func RegisterTaskType(task TaskBase) {
func RegisterTaskType(task types.TaskBase) {
if task == nil {
panic("Could not register a task using nil as value")
}
RegisterTaskTypeName(utils.TypeName(utils.Dereference(task)), task)
}

// RegisterTaskTypeName registers a task type with key
func RegisterTaskTypeName(name string, task TaskBase) {
func RegisterTaskTypeName(name string, task types.TaskBase) {
if name == "" {
panic("Could not register a task using empty name")
}
Expand All @@ -140,20 +114,20 @@ func RegisterTaskTypeName(name string, task TaskBase) {
}

// RegisterTaskInst registers a task in single instance model with key inferred by its type
func RegisterTaskInst(task TaskBase) {
func RegisterTaskInst(task types.TaskBase) {
RegisterTaskInstName(utils.TypeName(task), task)
}

// RegisterTaskInstName registers a task in single instance model with given key
func RegisterTaskInstName(name string, task TaskBase) {
func RegisterTaskInstName(name string, task types.TaskBase) {
taskRegistryMap.Store(name, task)
logrus.Info("Register a task instance: ", name)
}

// NewTask creates a new task and initials necessary fields
// Please don't initial TaskWorker manually
func NewTask(strategy definition.Strategy, task definition.Task, store store.Store, schedulerId string) (types.Worker, error) {
var inst TaskBase
var inst types.TaskBase
sequence, err := store.Sequence()
if err != nil {
logrus.Error("Generate sequence from storage failed: ", err.Error())
Expand Down Expand Up @@ -202,7 +176,7 @@ func NewTask(strategy definition.Strategy, task definition.Task, store store.Sto
w.model = NewNormalModel(w)
}
if w.taskDefine.BatchCount > 1 {
t, ok := inst.(TaskBatch)
t, ok := inst.(types.TaskBatch)
if !ok {
return nil, errors.New("Specific bind is not a TaskBatch: " + task.Bind)
}
Expand All @@ -216,7 +190,7 @@ func NewTask(strategy definition.Strategy, task definition.Task, store store.Sto
},
}
} else {
t, ok := inst.(TaskSingle)
t, ok := inst.(types.TaskSingle)
if !ok {
return nil, errors.New("Specific bind is not a TaskSingle: " + task.Bind)
}
Expand Down
31 changes: 31 additions & 0 deletions types/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package types

import "github.com/jasonjoo2010/goschedule/definition"

// TaskBase defines the task used in scheduling.
type TaskBase interface {
// Select returns tasks to be dealed later.
// It will be guaranteed in serial model.
// parameter, items, eachFetchNum are from definition of task
// ownSign is from name of strategy bond in the form of 'name$ownsign'
// It's a kind of relation to strategy but generally task doesn't care about strategy in user's view.
Select(parameter, ownSign string, items []definition.TaskItem, eachFetchNum int) []interface{}
}

// TaskSingle represents one task one time(routine) model
type TaskSingle interface {
TaskBase
// return true if succ false otherwise, but things will still go on
Execute(task interface{}, ownSign string) bool
}

// TaskBatch represents multiple tasks one time(routine) model
type TaskBatch interface {
TaskBase
// return true if succ false otherwise, but things will still go on
Execute(tasks []interface{}, ownSign string) bool
}

type TaskComparable interface {
Less(a, b interface{}) bool
}

0 comments on commit 2d1cfe3

Please sign in to comment.