-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgoncurrent.go
91 lines (80 loc) · 2.22 KB
/
goncurrent.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package goncurrent
import (
"errors"
"sync"
)
// ThreadPool is a structure that manages a pool of goroutines to execute tasks
type ThreadPool struct {
workerCount int
taskQueue chan func()
wg sync.WaitGroup
closed chan struct{}
}
// Execute runs a set of tasks using a fixed number of goroutines.
// It takes the number of threads (goroutines) and a slice of tasks (functions) as input.
func Execute(threads int, tasks []func()) error {
if threads < 1 {
return errors.New("invalid number of threads")
}
if len(tasks) == 0 {
return nil // Nothing to execute
}
// Initialize the ThreadPool
pool := NewThreadPool(threads)
pool.Start()
// Submit all tasks to the ThreadPool
for _, task := range tasks {
pool.Submit(task)
}
// Close the ThreadPool and wait for all tasks to complete
pool.Shutdown()
pool.Wait()
return nil
}
// NewThreadPool creates a new ThreadPool with the specified number of workers (goroutines).
func NewThreadPool(workerCount int) *ThreadPool {
return &ThreadPool{
workerCount: workerCount,
taskQueue: make(chan func(), workerCount), // Using a buffered channel
closed: make(chan struct{}),
}
}
// Start begins the execution of the ThreadPool, launching worker goroutines.
func (tp *ThreadPool) Start() {
for i := 0; i < tp.workerCount; i++ {
tp.wg.Add(1)
go func() {
defer tp.wg.Done()
for {
select {
case task, ok := <-tp.taskQueue:
if !ok {
return // Channel closed, exit goroutine
}
task()
case <-tp.closed:
return // Shutdown signal received, exit goroutine
}
}
}()
}
}
// Submit adds a new task to the ThreadPool.
// If the ThreadPool is already closed, the task is not added.
func (tp *ThreadPool) Submit(task func()) {
select {
case tp.taskQueue <- task:
case <-tp.closed:
// ThreadPool is closed, do not accept new tasks
}
}
// Wait blocks until all tasks have been completed.
func (tp *ThreadPool) Wait() {
tp.wg.Wait()
}
// Shutdown gracefully shuts down the ThreadPool.
// It stops accepting new tasks and closes the taskQueue channel after all existing tasks are processed.
func (tp *ThreadPool) Shutdown() {
close(tp.closed) // Signal to goroutines to stop
close(tp.taskQueue) // Close the task queue
}