diff --git a/.golangci.yml b/.golangci.yml index d38eca5..ac19f22 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -22,7 +22,7 @@ linters: disable-all: true enable: - asciicheck - - depguard +# - depguard - dogsled - dupl - bodyclose diff --git a/hw05_parallel_execution/.sync b/hw05_parallel_execution/.sync deleted file mode 100644 index e69de29..0000000 diff --git a/hw05_parallel_execution/go.mod b/hw05_parallel_execution/go.mod index f1aa31c..ed5bb6c 100644 --- a/hw05_parallel_execution/go.mod +++ b/hw05_parallel_execution/go.mod @@ -1,4 +1,4 @@ -module github.com/fixme_my_friend/hw05_parallel_execution +module github.com/pogpp/hw05_parallel_execution go 1.22 diff --git a/hw05_parallel_execution/run.go b/hw05_parallel_execution/run.go index 9e3193f..79d993d 100644 --- a/hw05_parallel_execution/run.go +++ b/hw05_parallel_execution/run.go @@ -4,12 +4,19 @@ import ( "errors" ) -var ErrErrorsLimitExceeded = errors.New("errors limit exceeded") +var ( + ErrErrorsLimitExceeded = errors.New("errors limit exceeded") + ErrErrorsNoWorkers = errors.New("no worker") +) type Task func() error // Run starts tasks in n goroutines and stops its work when receiving m errors from tasks. func Run(tasks []Task, n, m int) error { - // Place your code here. - return nil + if n <= 0 { + return ErrErrorsNoWorkers + } + threadPoolExecutor := newPool(tasks, n, int64(m)) + + return threadPoolExecutor.run() } diff --git a/hw05_parallel_execution/run_test.go b/hw05_parallel_execution/run_test.go index a7069d7..aa03ded 100644 --- a/hw05_parallel_execution/run_test.go +++ b/hw05_parallel_execution/run_test.go @@ -67,4 +67,60 @@ func TestRun(t *testing.T) { require.Equal(t, runTasksCount, int32(tasksCount), "not all tasks were completed") require.LessOrEqual(t, int64(elapsedTime), int64(sumTime/2), "tasks were run sequentially?") }) + + t.Run("tasks less than workers no errors", func(t *testing.T) { + tasksCount := 9 + tasks := make([]Task, 0, tasksCount) + + var runTasksCount int32 + var sumTime time.Duration + + for i := 0; i < tasksCount; i++ { + taskSleep := time.Millisecond * time.Duration(rand.Intn(100)) + sumTime += taskSleep + + tasks = append(tasks, func() error { + time.Sleep(taskSleep) + atomic.AddInt32(&runTasksCount, 1) + return nil + }) + } + + workersCount := 10 + maxErrorsCount := 1 + + start := time.Now() + err := Run(tasks, workersCount, maxErrorsCount) + require.NoError(t, err) + + require.Equal(t, runTasksCount, int32(tasksCount)) + require.LessOrEqual(t, int64(time.Since(start)), int64(sumTime/2)) + }) + + t.Run("no tasks", func(t *testing.T) { + tasks := make([]Task, 0) + + workersCount := 5 + maxErrorsCount := 1 + + err := Run(tasks, workersCount, maxErrorsCount) + require.NoError(t, err) + }) + + t.Run("no workers", func(t *testing.T) { + tasksCount := 5 + tasks := make([]Task, 0, tasksCount) + + for i := 0; i < tasksCount; i++ { + taskSleep := time.Millisecond * time.Duration(rand.Intn(100)) + + tasks = append(tasks, func() error { + time.Sleep(taskSleep) + return nil + }) + } + + err := Run(tasks, 0, 1) + require.Equal(t, ErrErrorsNoWorkers, err) + }) } diff --git a/hw05_parallel_execution/tasks_solver.go b/hw05_parallel_execution/tasks_solver.go new file mode 100644 index 0000000..778cf66 --- /dev/null +++ b/hw05_parallel_execution/tasks_solver.go @@ -0,0 +1,31 @@ +package hw05parallelexecution + +import ( + "sync" +) + +type worker struct { + taskCh chan Task +} + +func newTasksSolver(chTask chan Task) *worker { + return &worker{ + taskCh: chTask, + } +} + +func (w *worker) Start(wg *sync.WaitGroup, limiter *limiter) { + wg.Add(1) + + go func() { + defer wg.Done() + for task := range w.taskCh { + if task() != nil { + limiter.increment() + if limiter.isLimitExceeded() { + return + } + } + } + }() +} diff --git a/hw05_parallel_execution/thread_pool.go b/hw05_parallel_execution/thread_pool.go new file mode 100644 index 0000000..c35bd8d --- /dev/null +++ b/hw05_parallel_execution/thread_pool.go @@ -0,0 +1,64 @@ +package hw05parallelexecution + +import ( + "sync" + "sync/atomic" +) + +const chanSize = 1000 + +type limiter struct { + count int64 + limit int64 +} + +type pool struct { + tasks []Task + routinesCount int + collector chan Task + wg sync.WaitGroup + limiter *limiter +} + +func newLimiter(limit int64) *limiter { + return &limiter{ + limit: limit, + } +} + +func newPool(tasks []Task, rCount int, maxErrorCount int64) *pool { + return &pool{ + tasks: tasks, + routinesCount: rCount, + collector: make(chan Task, chanSize), + limiter: newLimiter(maxErrorCount), + } +} + +func (l *limiter) increment() { + atomic.AddInt64(&l.count, 1) +} + +func (l *limiter) isLimitExceeded() bool { + return atomic.LoadInt64(&l.count) >= atomic.LoadInt64(&l.limit) +} + +func (p *pool) run() error { + for i := 0; i < p.routinesCount; i++ { + w := newTasksSolver(p.collector) + w.Start(&p.wg, p.limiter) + } + + for _, task := range p.tasks { + p.collector <- task + } + close(p.collector) + + p.wg.Wait() + + if p.limiter.isLimitExceeded() { + return ErrErrorsLimitExceeded + } + + return nil +}