-
Notifications
You must be signed in to change notification settings - Fork 720
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
pkg: remove old duplicated task #8234
Merged
Merged
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
2d43787
remove old duplicated task
rleungx 02b6f89
add unit test
rleungx bfcceb3
remove context
rleungx 3772233
change back to slice
rleungx 2695c79
use general task id
rleungx d73aa2f
address comments
rleungx 746e7f1
address the comment
rleungx c10679d
Merge branch 'master' into dup
ti-chi-bot[bot] File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,16 +42,16 @@ const ( | |
|
||
// Runner is the interface for running tasks. | ||
type Runner interface { | ||
RunTask(ctx context.Context, name string, f func(context.Context), opts ...TaskOption) error | ||
RunTask(id uint64, name string, f func(), opts ...TaskOption) error | ||
Start() | ||
Stop() | ||
} | ||
|
||
// Task is a task to be run. | ||
type Task struct { | ||
ctx context.Context | ||
id uint64 | ||
submittedAt time.Time | ||
f func(context.Context) | ||
f func() | ||
name string | ||
// retained indicates whether the task should be dropped if the task queue exceeds maxPendingDuration. | ||
retained bool | ||
|
@@ -60,17 +60,22 @@ type Task struct { | |
// ErrMaxWaitingTasksExceeded is returned when the number of waiting tasks exceeds the maximum. | ||
var ErrMaxWaitingTasksExceeded = errors.New("max waiting tasks exceeded") | ||
|
||
// ConcurrentRunner is a simple task runner that limits the number of concurrent tasks. | ||
type taskID struct { | ||
id uint64 | ||
name string | ||
} | ||
|
||
type ConcurrentRunner struct { | ||
name string | ||
limiter *ConcurrencyLimiter | ||
maxPendingDuration time.Duration | ||
taskChan chan *Task | ||
pendingTasks []*Task | ||
pendingMu sync.Mutex | ||
stopChan chan struct{} | ||
wg sync.WaitGroup | ||
pendingTaskCount map[string]int64 | ||
pendingTaskCount map[string]int | ||
pendingTasks []*Task | ||
existTasks map[taskID]*Task | ||
maxWaitingDuration prometheus.Gauge | ||
} | ||
|
||
|
@@ -82,7 +87,8 @@ func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDur | |
maxPendingDuration: maxPendingDuration, | ||
taskChan: make(chan *Task), | ||
pendingTasks: make([]*Task, 0, initialCapacity), | ||
pendingTaskCount: make(map[string]int64), | ||
pendingTaskCount: make(map[string]int), | ||
existTasks: make(map[taskID]*Task), | ||
maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name), | ||
} | ||
return s | ||
|
@@ -101,6 +107,7 @@ func (cr *ConcurrentRunner) Start() { | |
cr.stopChan = make(chan struct{}) | ||
cr.wg.Add(1) | ||
ticker := time.NewTicker(5 * time.Second) | ||
defer ticker.Stop() | ||
go func() { | ||
defer cr.wg.Done() | ||
for { | ||
|
@@ -139,7 +146,7 @@ func (cr *ConcurrentRunner) Start() { | |
|
||
func (cr *ConcurrentRunner) run(task *Task, token *TaskToken) { | ||
start := time.Now() | ||
task.f(task.ctx) | ||
task.f() | ||
if token != nil { | ||
cr.limiter.ReleaseToken(token) | ||
cr.processPendingTasks() | ||
|
@@ -157,6 +164,7 @@ func (cr *ConcurrentRunner) processPendingTasks() { | |
case cr.taskChan <- task: | ||
cr.pendingTasks = cr.pendingTasks[1:] | ||
cr.pendingTaskCount[task.name]-- | ||
delete(cr.existTasks, taskID{id: task.id, name: task.name}) | ||
default: | ||
} | ||
return | ||
|
@@ -170,11 +178,12 @@ func (cr *ConcurrentRunner) Stop() { | |
} | ||
|
||
// RunTask runs the task asynchronously. | ||
func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(context.Context), opts ...TaskOption) error { | ||
func (cr *ConcurrentRunner) RunTask(id uint64, name string, f func(), opts ...TaskOption) error { | ||
task := &Task{ | ||
ctx: ctx, | ||
name: name, | ||
f: f, | ||
id: id, | ||
name: name, | ||
f: f, | ||
submittedAt: time.Now(), | ||
} | ||
for _, opt := range opts { | ||
opt(task) | ||
|
@@ -187,23 +196,29 @@ func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(con | |
}() | ||
|
||
pendingTaskNum := len(cr.pendingTasks) | ||
tid := taskID{task.id, task.name} | ||
if pendingTaskNum > 0 { | ||
// Here we use a map to find the task with the same ID. | ||
// Then replace the old task with the new one. | ||
if t, ok := cr.existTasks[tid]; ok { | ||
t.f = f | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add a comment to explain the function will be overwrite and it doesn't matter rest lgtm |
||
t.submittedAt = time.Now() | ||
return nil | ||
} | ||
if !task.retained { | ||
maxWait := time.Since(cr.pendingTasks[0].submittedAt) | ||
if maxWait > cr.maxPendingDuration { | ||
RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc() | ||
return ErrMaxWaitingTasksExceeded | ||
} | ||
} | ||
// We use the max task number to limit the memory usage. | ||
// It occupies around 1.5GB memory when there is 20000000 pending task. | ||
if len(cr.pendingTasks) > maxPendingTaskNum { | ||
if pendingTaskNum > maxPendingTaskNum { | ||
RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc() | ||
return ErrMaxWaitingTasksExceeded | ||
} | ||
} | ||
task.submittedAt = time.Now() | ||
cr.pendingTasks = append(cr.pendingTasks, task) | ||
cr.existTasks[tid] = task | ||
cr.pendingTaskCount[task.name]++ | ||
return nil | ||
} | ||
|
@@ -217,8 +232,8 @@ func NewSyncRunner() *SyncRunner { | |
} | ||
|
||
// RunTask runs the task synchronously. | ||
func (*SyncRunner) RunTask(ctx context.Context, _ string, f func(context.Context), _ ...TaskOption) error { | ||
f(ctx) | ||
func (*SyncRunner) RunTask(_ uint64, _ string, f func(), _ ...TaskOption) error { | ||
f() | ||
return nil | ||
} | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can use taskID directly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's ok and use less memory.