diff --git a/worker/job.go b/worker/job.go new file mode 100644 index 0000000..b67954b --- /dev/null +++ b/worker/job.go @@ -0,0 +1,43 @@ +package worker + +import ( + "reflect" +) + +type Job struct { + name string + fn interface{} + args []interface{} +} + +func (this *Job) Call() (err error) { + switch f := this.fn.(type) { + + case func() error: + return f() + + case func(): + f() + return nil + + default: // call by reflect + + vf := reflect.ValueOf(this.fn) + if vf.Kind() == reflect.Func { + out := callFunc(this.fn, this.args) + + if len(out) < 1 { + return + } + + if err, ok := out[0].(error); ok { + return err + } + + // ignore other outputs + return + } + } + + return +} diff --git a/worker/processer.go b/worker/processer.go new file mode 100644 index 0000000..a7a9bf0 --- /dev/null +++ b/worker/processer.go @@ -0,0 +1,128 @@ +package worker + +import ( + "fmt" + "reflect" + "sync" + + "github.com/pkg/errors" + "go.uber.org/atomic" +) + +type jobs []*Job + +//go:generate goption -p . -c processer +type Processer struct { + jobs jobs + + workers []*Worker + + wait sync.WaitGroup + + // controls + QueueMutex sync.RWMutex + started atomic.Bool + + Result Result +} + +func Default() *Processer { + return &Processer{ + jobs: make(jobs, 0, 4), + workers: make([]*Worker, 0, 4), + } +} + +type Result struct { + errs chan error +} + +func (this *Result) NextError() (err error) { + return <-this.errs +} + +// go func(){} +func (this *Processer) Go(jobName string, fn func()) *Processer { + return this.Call(jobName, fn) +} + +// go func(args){} +func (this *Processer) Call(jobName string, fn interface{}, args ...interface{}) *Processer { + + this.QueueMutex.Lock() + this.jobs = append(this.jobs, &Job{name: jobName, fn: fn}) + this.QueueMutex.Unlock() + + return this +} + +func (this *Processer) Run() *Processer { + this.Start() + return this.Wait() +} + +func (this *Processer) Wait() *Processer { + this.wait.Wait() + + return this +} + +func (this *Processer) Start() *Processer { + + if startOk := this.started.CAS(false, true); !startOk { + panic(`concurrent start`) + return this + } + + this.Result.errs = make(chan error, len(this.jobs)) + + for _, job := range this.jobs { + + this.wait.Add(1) + + // todo goroutine pool + go func(job *Job) { + var err error + + defer func() { + this.Result.errs <- err + + this.wait.Done() + }() + + defer func() { + if ex := recover(); ex != nil { + err = fmt.Errorf("%+v", ex) + err = errors.Wrapf(err, "JobPanic[%+v]", job.name) + } + }() + + err = job.Call() + if err != nil { + err = errors.Wrapf(err, "JobFailed[%+v]", job.name) + } + + }(job) + } + + return this +} + +func callFunc(f interface{}, args ...interface{}) (out []interface{}) { + vf := reflect.ValueOf(f) + if vf.Kind() != reflect.Func { + panic(123) + } + + in := make([]reflect.Value, len(args)) + for idx, arg := range args { + in[idx] = reflect.ValueOf(arg) + } + + outv := vf.Call(in) + for _, v := range outv { + out = append(out, v.Interface()) + } + + return +} diff --git a/worker/processer_test.go b/worker/processer_test.go new file mode 100644 index 0000000..63ccd32 --- /dev/null +++ b/worker/processer_test.go @@ -0,0 +1,35 @@ +package worker + +import ( + "fmt" + "testing" +) + +func TestProcesser_DemoStart(t *testing.T) { + var err error + wg := new(Processer) + + result := wg. + Go(`a func`, func() {}). + Call(`a func with error`, func() error { return nil }). + Call(`a func with args`, func(int) error { return nil }, 2). + Start(). + Wait() + + fmt.Println(result, err) +} + +func TestProcesser_Run(t *testing.T) { + var err error + wg := Default() + + done := make([]bool, 3) + + result := wg. + Go(`a func`, func() { done[1] = true }). + Call(`a func with error`, func() error { done[2] = true; return nil }). + Call(`a func with args`, func(int) error { done[0] = true; return nil }, 2). + Run() + + fmt.Println(result, done, err) +} diff --git a/worker/worker.go b/worker/worker.go new file mode 100644 index 0000000..0abc3a4 --- /dev/null +++ b/worker/worker.go @@ -0,0 +1,3 @@ +package worker + +type Worker struct{}