Skip to content

Commit

Permalink
HW5 is completed
Browse files Browse the repository at this point in the history
Signed-off-by: Pavel Pogodaev <[email protected]>
  • Loading branch information
Pavel Pogodaev committed Jul 6, 2024
1 parent 411b524 commit 408b363
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 4 deletions.
Empty file removed hw05_parallel_execution/.sync
Empty file.
2 changes: 1 addition & 1 deletion hw05_parallel_execution/go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/fixme_my_friend/hw05_parallel_execution
module github.com/pogpp/hw05_parallel_execution

go 1.22

Expand Down
13 changes: 10 additions & 3 deletions hw05_parallel_execution/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,19 @@ import (
"errors"
)

var ErrErrorsLimitExceeded = errors.New("errors limit exceeded")
var (
ErrErrorsLimitExceeded = errors.New("errors limit exceeded")
ErrErrorsNoWorkers = errors.New("no worker")
)

type Task func() error

// Run starts tasks in n goroutines and stops its work when receiving m errors from tasks.
func Run(tasks []Task, n, m int) error {
// Place your code here.
return nil
if n <= 0 {
return ErrErrorsNoWorkers
}
threadPoolExecutor := newPool(tasks, n, int64(m))

return threadPoolExecutor.run()
}
56 changes: 56 additions & 0 deletions hw05_parallel_execution/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,60 @@ func TestRun(t *testing.T) {
require.Equal(t, runTasksCount, int32(tasksCount), "not all tasks were completed")
require.LessOrEqual(t, int64(elapsedTime), int64(sumTime/2), "tasks were run sequentially?")
})

t.Run("tasks less than workers no errors", func(t *testing.T) {
tasksCount := 9
tasks := make([]Task, 0, tasksCount)

var runTasksCount int32
var sumTime time.Duration

for i := 0; i < tasksCount; i++ {
taskSleep := time.Millisecond * time.Duration(rand.Intn(100))
sumTime += taskSleep

tasks = append(tasks, func() error {
time.Sleep(taskSleep)
atomic.AddInt32(&runTasksCount, 1)
return nil
})
}

workersCount := 10
maxErrorsCount := 1

start := time.Now()
err := Run(tasks, workersCount, maxErrorsCount)
require.NoError(t, err)

require.Equal(t, runTasksCount, int32(tasksCount))
require.LessOrEqual(t, int64(time.Since(start)), int64(sumTime/2))
})

t.Run("no tasks", func(t *testing.T) {
tasks := make([]Task, 0)

workersCount := 5
maxErrorsCount := 1

err := Run(tasks, workersCount, maxErrorsCount)
require.NoError(t, err)
})

t.Run("no workers", func(t *testing.T) {
tasksCount := 5
tasks := make([]Task, 0, tasksCount)

for i := 0; i < tasksCount; i++ {
taskSleep := time.Millisecond * time.Duration(rand.Intn(100))

tasks = append(tasks, func() error {
time.Sleep(taskSleep)
return nil
})
}

err := Run(tasks, 0, 1)
require.Equal(t, ErrErrorsNoWorkers, err)
})
}
31 changes: 31 additions & 0 deletions hw05_parallel_execution/tasks_solver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package hw05parallelexecution

import (
"sync"
)

type worker struct {
taskCh chan Task
}

func newTasksSolver(chTask chan Task) *worker {
return &worker{
taskCh: chTask,
}
}

func (w *worker) Start(wg *sync.WaitGroup, limiter *limiter) {
wg.Add(1)

go func() {
defer wg.Done()
for task := range w.taskCh {
if task() != nil {
limiter.increment()
if limiter.isLimitExceeded() {
return
}
}
}
}()
}
64 changes: 64 additions & 0 deletions hw05_parallel_execution/thread_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package hw05parallelexecution

import (
"sync"
"sync/atomic"
)

const chanSize = 1000

type limiter struct {
count int64
limit int64
}

type pool struct {
tasks []Task
routinesCount int
collector chan Task
wg sync.WaitGroup
limiter *limiter
}

func newLimiter(limit int64) *limiter {
return &limiter{
limit: limit,
}
}

func newPool(tasks []Task, rCount int, maxErrorCount int64) *pool {
return &pool{
tasks: tasks,
routinesCount: rCount,
collector: make(chan Task, chanSize),
limiter: newLimiter(maxErrorCount),
}
}

func (l *limiter) increment() {
atomic.AddInt64(&l.count, 1)
}

func (l *limiter) isLimitExceeded() bool {
return atomic.LoadInt64(&l.count) >= atomic.LoadInt64(&l.limit)
}

func (p *pool) run() error {
for i := 0; i < p.routinesCount; i++ {
w := newTasksSolver(p.collector)
w.Start(&p.wg, p.limiter)
}

for _, task := range p.tasks {
p.collector <- task
}
close(p.collector)

p.wg.Wait()

if p.limiter.isLimitExceeded() {
return ErrErrorsLimitExceeded
}

return nil
}

0 comments on commit 408b363

Please sign in to comment.