Skip to content

Commit

Permalink
updated README and comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Francesco Cosentino committed Mar 6, 2023
1 parent ef39960 commit 0e7cc19
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 51 deletions.
98 changes: 61 additions & 37 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,46 +12,59 @@ Each `Task` represents a function scheduled by priority.
- Middleware: You can apply middleware to the `TaskManager` to add additional functionality.
- Results: You can access the results of the tasks via the `Results` channel.
- Rate limiting: You can rate limit the tasks schedule by setting a maximum number of jobs per second.
- Cancellation: Tasks can be canceled before or while they are running.
- Cancellation: You can cancel Tasks before or while they are running.

## API

### Initialization

Create a new `TaskManager` by calling the `NewTaskManager()` function and passing in the maximum number of tasks and the rate limit as parameters.
Create a new `TaskManager` by calling the `NewTaskManager()` function with the following parameters:

- `maxWorkers` is the number of workers to start. If 0 is specified, it will default to the number of available CPUs
- `maxTasks` is the maximum number of tasks that can be executed at once, defaults to 10
- `tasksPerSecond` is the rate limit of tasks that can be executed per second, defaults to 1
- `timeout` is the default timeout for tasks, defaults to 5 minute
- `retryDelay` is the default delay between retries, defaults to 1 second
- `maxRetries` is the default maximum number of retries, defaults to 3

```go
tm := worker.NewTaskManager(10, 5)
tm := worker.NewTaskManager(4, 10, 5, time.Second*30, time.Second*30, 3)
```

### Registering Tasks

Register new tasks by calling the `RegisterTask()` method of the `TaskManager` struct and passing in the tasks.
Register new tasks by calling the `RegisterTasks()` method of the `TaskManager` struct and passing in a variadic number of tasks.

```go
id := uuid.New()
task := worker.Task{
ID: uuid.New(),
Priority: 1,
Fn: func() interface{} { return "Hello, World!" },
ID: id,
Name: "Some task",
Description: "Here goes the description of the task",
Priority: 10,
Fn: func() (interface{}, error) {
emptyFile, err := os.Create(path.Join("examples", "test", "res", fmt.Sprintf("1st__EmptyFile___%v.txt", j)))
if err != nil {
log.Fatal(err)
}
emptyFile.Close()
time.Sleep(time.Second)
return fmt.Sprintf("** task number %v with id %s executed", j, id), err
},
Retries: 10,
RetryDelay: 3,
}


task2 := worker.Task{
ID: uuid.New(),
Priority: 10,
Fn: func() interface{} { return "Hello, World!" },
Fn: func() (val interface{}, err error){ return "Hello, World!", err },
}

tm.RegisterTask(task, task2)
tm.RegisterTasks(context.Background(), task, task2)
```

### Starting and Stopping

You can start the task manager and its goroutines by calling the `Start()` method of the `TaskManager` struct and passing in the number of workers.

```go
tm.Start(5)
```
### Stopping the Task Manager

You can stop the task manager and its goroutines by calling the Stop() method of the TaskManager struct.

Expand Down Expand Up @@ -103,6 +116,7 @@ tm = worker.RegisterMiddleware(tm,
package main

import (
"context"
"fmt"
"time"

Expand All @@ -111,70 +125,80 @@ import (
"github.com/hyp3rd/go-worker/middleware"
)

func main() {
tm := worker.NewTaskManager(5, 10)
func main() {
tm := worker.NewTaskManager(4, 10, 5, time.Second*3, time.Second*30, 3)

defer tm.Close()

var srv worker.Service = tm
// apply middleware in the same order as you want to execute them
tm = worker.RegisterMiddleware(tm,
srv = worker.RegisterMiddleware(tm,
// middleware.YourMiddleware,
func(next worker.Service) worker.Service {
return middleware.NewLoggerMiddleware(next, middleware.DefaultLogger())
},
)

defer srv.Close()

task := worker.Task{
ID: uuid.New(),
Priority: 1,
Fn: func() interface{} {
return func(a int, b int) interface{} {
return a + b
Fn: func() (val interface{}, err error) {
return func(a int, b int) (val interface{}, err error) {
return a + b, err
}(2, 5)
},
}

// Invalid task, it doesn't have a function
task1 := worker.Task{
ID: uuid.New(),
Priority: 1,
Priority: 10,
// Fn: func() (val interface{}, err error) { return "Hello, World from Task 1!", err },
}

task2 := worker.Task{
ID: uuid.New(),
Priority: 5,
Fn: func() interface{} { return "Hello, World from Task 2!" },
Fn: func() (val interface{}, err error) {
time.Sleep(time.Second * 2)
return "Hello, World from Task 2!", err
},
}

task3 := worker.Task{
ID: uuid.New(),
Priority: 90,
Fn: func() interface{} {
Fn: func() (val interface{}, err error) {
// Simulate a long running task
time.Sleep(3 * time.Second)
return "Hello, World from Task 3!"
// time.Sleep(3 * time.Second)
return "Hello, World from Task 3!", err
},
}

task4 := worker.Task{
ID: uuid.New(),
Priority: 15,
Fn: func() interface{} {
Priority: 150,
Fn: func() (val interface{}, err error) {
// Simulate a long running task
time.Sleep(5 * time.Second)
return "Hello, World from Task 4!"
time.Sleep(1 * time.Second)
return "Hello, World from Task 4!", err
},
}

tm.RegisterTask(task, task1, task2, task3, task4)
tm.Start(5)
srv.RegisterTasks(context.Background(), task, task1, task2, task3)

srv.CancelTask(task3.ID)

tm.CancelTask(task3.ID)
srv.RegisterTask(context.Background(), task4)

// Print results
for result := range tm.GetResults() {
for result := range srv.GetResults() {
fmt.Println(result)
}

tasks := tm.GetTasks()
tasks := srv.GetTasks()
for _, task := range tasks {
fmt.Println(task)
}
Expand Down
11 changes: 0 additions & 11 deletions examples/middleware/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,23 +73,12 @@ func main() {
}

srv.RegisterTasks(context.Background(), task, task1, task2, task3)
// srv.RegisterTask(context.Background(), task)
// srv.RegisterTask(context.Background(), task1)
// srv.RegisterTask(context.Background(), task2)
// srv.RegisterTask(context.Background(), task3)
// srv.RegisterTask(context.Background(), task4)

srv.CancelTask(task3.ID)

srv.RegisterTask(context.Background(), task4)

// srv.Start(1)

// Print results
// for result := range srv.GetCancelled() {
// fmt.Println(result)
// }
// // Print results
for result := range srv.GetResults() {
fmt.Println(result)
}
Expand Down
10 changes: 7 additions & 3 deletions examples/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

func main() {
// create a new task manager with a rate limit of 1 task per second
// create a new task manager
tm := worker.NewTaskManager(4, 10, 5, time.Second*30, time.Second*30, 3)
// close the task manager

Expand All @@ -24,17 +24,21 @@ func main() {
// create a new task
id := uuid.New()
task := worker.Task{
ID: id,
ID: id,
Name: "Some task",
Description: "Here goes the description of the task",
Priority: 10,
Fn: func() (val interface{}, err error) {
emptyFile, error := os.Create(path.Join("examples", "test", "res", fmt.Sprintf("1st__EmptyFile___%v.txt", j)))

if error != nil {
log.Fatal(error)
}
emptyFile.Close()
time.Sleep(time.Second)
return fmt.Sprintf("** task number %v with id %s executed", j, id), err
},
Retries: 10,
RetryDelay: 3,
}

// register the task
Expand Down

0 comments on commit 0e7cc19

Please sign in to comment.