Skip to content

Commit

Permalink
任务过期
Browse files Browse the repository at this point in the history
  • Loading branch information
gojuukaze committed Jul 12, 2021
1 parent 5de2b2c commit f53f0ec
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 33 deletions.
13 changes: 12 additions & 1 deletion v2/controller/controller.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package controller

import "time"
import (
"time"
)

type TaskCtl struct {
RetryCount int
RunTime time.Time
ExpireTime time.Time
err error
}

Expand Down Expand Up @@ -45,3 +48,11 @@ func (t *TaskCtl) GetRunTime() time.Time {
func (t *TaskCtl) IsZeroRunTime() bool {
return t.RunTime.IsZero()
}

func (t *TaskCtl) SetExpireTime(_t time.Time) {
t.ExpireTime= _t
}

func (t *TaskCtl) IsExpired() bool {
return !t.ExpireTime.IsZero() && time.Now().After(t.ExpireTime)
}
4 changes: 3 additions & 1 deletion v2/message/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type resultStatusChoice struct {
Running int
Success int
Failure int
Expired int
}

var ResultStatus = resultStatusChoice{
Expand All @@ -20,6 +21,7 @@ var ResultStatus = resultStatusChoice{
Running: 3,
Success: 4,
Failure: 5,
Expired: 6,
}

type Result struct {
Expand Down Expand Up @@ -106,7 +108,7 @@ func (r Result) IsSuccess() bool {
}

func (r Result) IsFinish() bool {
if r.Status == ResultStatus.Success || r.Status == ResultStatus.Failure {
if r.Status == ResultStatus.Success || r.Status == ResultStatus.Failure || r.Status == ResultStatus.Expired {
return true
}
return false
Expand Down
5 changes: 4 additions & 1 deletion v2/server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ type ctlKeyChoices struct {
RetryCount int
RunAt int
RunAfter int
ExpireTime int
}

var ctlKey = ctlKeyChoices{
RetryCount: 0,
RunAt: 1,
RunAfter: 2,
ExpireTime: 3,
}

type Client struct {
Expand Down Expand Up @@ -72,8 +74,9 @@ func (c *Client) SetTaskCtl(name int, value interface{}) *Client {
n := time.Now()
cloneC.ctl.SetRunTime(n.Add(value.(time.Duration)))
case ctlKey.RunAt:

cloneC.ctl.SetRunTime(value.(time.Time))
case ctlKey.ExpireTime:
cloneC.ctl.SetExpireTime(value.(time.Time))

}
return cloneC
Expand Down
40 changes: 10 additions & 30 deletions v2/server/inlineServerGoroutine.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,21 @@ func (t *InlineServer) WorkerGoroutine() {

func (t *InlineServer) workerGoroutine_RunWorker(w worker.WorkerInterface, msg *message.Message, result *message.Result) {

var err error
ctl := msg.TaskCtl

RUN:

if ctl.IsExpired(){
result.Status = message.ResultStatus.Expired
t.workerGoroutine_SaveResult(*result)
goto AFTER
}

result.SetStatusRunning()
t.workerGoroutine_SaveResult(*result)

err := w.Run(&ctl, msg.FuncArgs, result)
err = w.Run(&ctl, msg.FuncArgs, result)

if err == nil {
result.Status = message.ResultStatus.Success
Expand All @@ -105,7 +112,9 @@ RUN:
t.workerGoroutine_SaveResult(*result)

}

AFTER:

err = w.After(&ctl, msg.FuncArgs, result)
if err != nil {
log.YTaskLog.WithField("server", t.groupName).WithField("goroutine", "worker").Errorf("run worker[%s] callback error %s", msg.WorkerName, err)
Expand All @@ -122,32 +131,3 @@ func (t *InlineServer) workerGoroutine_SaveResult(result message.Result) {
log.YTaskLog.WithField("server", t.groupName).WithField("goroutine", "worker").Errorf("save result error ", err)
}
}

func (t *InlineServer) GetDelayMessageGoroutine() {
log.YTaskLog.WithField("server", t.groupName).WithField("goroutine", "get_next_message").Debug("start")
var msg message.Message
var err error
for range t.workerReadyChan {

if t.IsStop() {
break
}
msg, err = t.Next(t.groupName)

if err != nil {
go t.MakeWorkerReady()
if !yerrors.IsEqual(err, yerrors.ErrTypeEmptyQuery) {
log.YTaskLog.WithField("server", t.groupName).
WithField("goroutine", "get_next_message").
Error("get msg error, ", err)
}
continue
}
log.YTaskLog.WithField("server", t.groupName).WithField("goroutine", "get_next_message").Infof("new msg %+v", msg)
t.msgChan <- msg
}

t.getMessageGoroutineStopChan <- struct{}{}
log.YTaskLog.WithField("server", t.groupName).WithField("goroutine", "get_next_message").Debug("stop")

}
39 changes: 39 additions & 0 deletions v2/test/ytask_2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,42 @@ func TestResultExpires(t *testing.T) {
}
ser.Shutdown(context.TODO())
}


func TestWorkerExpires(t *testing.T) {
// 测试任务过期
b := brokers.NewRedisBroker("127.0.0.1", "6379", "", 0, 0)
b2 := backends.NewRedisBackend("127.0.0.1", "6379", "", 0, 0)

ser := server.NewServer(
config.NewConfig(
config.Broker(&b),
config.Backend(&b2),
config.Debug(false),
),
)
log.YTaskLog.Out = ioutil.Discard

ser.Add("test_we", "w1", workerTestResultExpires)
ser.Run("test_we",1)

client := ser.GetClient()
client.Send("test_we", "w1")

// 这个任务能执行
id, _ := client.SetTaskCtl(client.ExpireTime,time.Now().Add(4*time.Second)).Send("test_we", "w1")
// 这个任务应该过期
id2, _ := client.SetTaskCtl(client.ExpireTime,time.Now().Add(2*time.Second)).Send("test_we", "w1")

result, _ := client.GetResult(id, 6*time.Second, 300*time.Millisecond)
if !result.IsSuccess(){
t.Fatal("!result.IsSuccess()")

}
result, _ = client.GetResult(id2, 2*time.Second, 300*time.Millisecond)
if result.IsSuccess() || result.Status!=message.ResultStatus.Expired{
t.Fatal("任务状态错误")

}
ser.Shutdown(context.TODO())
}

0 comments on commit f53f0ec

Please sign in to comment.