diff --git a/v2/controller/controller.go b/v2/controller/controller.go index 162b71a..053adbb 100644 --- a/v2/controller/controller.go +++ b/v2/controller/controller.go @@ -1,10 +1,13 @@ package controller -import "time" +import ( + "time" +) type TaskCtl struct { RetryCount int RunTime time.Time + ExpireTime time.Time err error } @@ -45,3 +48,11 @@ func (t *TaskCtl) GetRunTime() time.Time { func (t *TaskCtl) IsZeroRunTime() bool { return t.RunTime.IsZero() } + +func (t *TaskCtl) SetExpireTime(_t time.Time) { + t.ExpireTime= _t +} + +func (t *TaskCtl) IsExpired() bool { + return !t.ExpireTime.IsZero() && time.Now().After(t.ExpireTime) +} diff --git a/v2/message/result.go b/v2/message/result.go index ca9793c..65a4255 100644 --- a/v2/message/result.go +++ b/v2/message/result.go @@ -11,6 +11,7 @@ type resultStatusChoice struct { Running int Success int Failure int + Expired int } var ResultStatus = resultStatusChoice{ @@ -20,6 +21,7 @@ var ResultStatus = resultStatusChoice{ Running: 3, Success: 4, Failure: 5, + Expired: 6, } type Result struct { @@ -106,7 +108,7 @@ func (r Result) IsSuccess() bool { } func (r Result) IsFinish() bool { - if r.Status == ResultStatus.Success || r.Status == ResultStatus.Failure { + if r.Status == ResultStatus.Success || r.Status == ResultStatus.Failure || r.Status == ResultStatus.Expired { return true } return false diff --git a/v2/server/client.go b/v2/server/client.go index 79c99ac..53e76f2 100644 --- a/v2/server/client.go +++ b/v2/server/client.go @@ -12,12 +12,14 @@ type ctlKeyChoices struct { RetryCount int RunAt int RunAfter int + ExpireTime int } var ctlKey = ctlKeyChoices{ RetryCount: 0, RunAt: 1, RunAfter: 2, + ExpireTime: 3, } type Client struct { @@ -72,8 +74,9 @@ func (c *Client) SetTaskCtl(name int, value interface{}) *Client { n := time.Now() cloneC.ctl.SetRunTime(n.Add(value.(time.Duration))) case ctlKey.RunAt: - cloneC.ctl.SetRunTime(value.(time.Time)) + case ctlKey.ExpireTime: + cloneC.ctl.SetExpireTime(value.(time.Time)) } return cloneC diff --git a/v2/server/inlineServerGoroutine.go b/v2/server/inlineServerGoroutine.go index b02452e..9eb8b90 100644 --- a/v2/server/inlineServerGoroutine.go +++ b/v2/server/inlineServerGoroutine.go @@ -76,14 +76,21 @@ func (t *InlineServer) WorkerGoroutine() { func (t *InlineServer) workerGoroutine_RunWorker(w worker.WorkerInterface, msg *message.Message, result *message.Result) { + var err error ctl := msg.TaskCtl RUN: + if ctl.IsExpired(){ + result.Status = message.ResultStatus.Expired + t.workerGoroutine_SaveResult(*result) + goto AFTER + } + result.SetStatusRunning() t.workerGoroutine_SaveResult(*result) - err := w.Run(&ctl, msg.FuncArgs, result) + err = w.Run(&ctl, msg.FuncArgs, result) if err == nil { result.Status = message.ResultStatus.Success @@ -105,7 +112,9 @@ RUN: t.workerGoroutine_SaveResult(*result) } + AFTER: + err = w.After(&ctl, msg.FuncArgs, result) if err != nil { log.YTaskLog.WithField("server", t.groupName).WithField("goroutine", "worker").Errorf("run worker[%s] callback error %s", msg.WorkerName, err) @@ -122,32 +131,3 @@ func (t *InlineServer) workerGoroutine_SaveResult(result message.Result) { log.YTaskLog.WithField("server", t.groupName).WithField("goroutine", "worker").Errorf("save result error ", err) } } - -func (t *InlineServer) GetDelayMessageGoroutine() { - log.YTaskLog.WithField("server", t.groupName).WithField("goroutine", "get_next_message").Debug("start") - var msg message.Message - var err error - for range t.workerReadyChan { - - if t.IsStop() { - break - } - msg, err = t.Next(t.groupName) - - if err != nil { - go t.MakeWorkerReady() - if !yerrors.IsEqual(err, yerrors.ErrTypeEmptyQuery) { - log.YTaskLog.WithField("server", t.groupName). - WithField("goroutine", "get_next_message"). - Error("get msg error, ", err) - } - continue - } - log.YTaskLog.WithField("server", t.groupName).WithField("goroutine", "get_next_message").Infof("new msg %+v", msg) - t.msgChan <- msg - } - - t.getMessageGoroutineStopChan <- struct{}{} - log.YTaskLog.WithField("server", t.groupName).WithField("goroutine", "get_next_message").Debug("stop") - -} diff --git a/v2/test/ytask_2_test.go b/v2/test/ytask_2_test.go index 468d2e3..c4538e0 100644 --- a/v2/test/ytask_2_test.go +++ b/v2/test/ytask_2_test.go @@ -102,3 +102,42 @@ func TestResultExpires(t *testing.T) { } ser.Shutdown(context.TODO()) } + + +func TestWorkerExpires(t *testing.T) { + // 测试任务过期 + b := brokers.NewRedisBroker("127.0.0.1", "6379", "", 0, 0) + b2 := backends.NewRedisBackend("127.0.0.1", "6379", "", 0, 0) + + ser := server.NewServer( + config.NewConfig( + config.Broker(&b), + config.Backend(&b2), + config.Debug(false), + ), + ) + log.YTaskLog.Out = ioutil.Discard + + ser.Add("test_we", "w1", workerTestResultExpires) + ser.Run("test_we",1) + + client := ser.GetClient() + client.Send("test_we", "w1") + + // 这个任务能执行 + id, _ := client.SetTaskCtl(client.ExpireTime,time.Now().Add(4*time.Second)).Send("test_we", "w1") + // 这个任务应该过期 + id2, _ := client.SetTaskCtl(client.ExpireTime,time.Now().Add(2*time.Second)).Send("test_we", "w1") + + result, _ := client.GetResult(id, 6*time.Second, 300*time.Millisecond) + if !result.IsSuccess(){ + t.Fatal("!result.IsSuccess()") + + } + result, _ = client.GetResult(id2, 2*time.Second, 300*time.Millisecond) + if result.IsSuccess() || result.Status!=message.ResultStatus.Expired{ + t.Fatal("任务状态错误") + + } + ser.Shutdown(context.TODO()) +}