Skip to content

Commit

Permalink
Refactored Workers and Execution flows, improved Task
Browse files Browse the repository at this point in the history
  • Loading branch information
Francesco Cosentino committed May 19, 2023
1 parent 2810a49 commit d2e12d8
Show file tree
Hide file tree
Showing 11 changed files with 393 additions and 352 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -118,5 +118,5 @@ $RECYCLE.BIN/
# Local example
*.bak
*.now
examples/test/res/*.txt
examples/multi/res/*.txt
.dccache
2 changes: 1 addition & 1 deletion examples/manual/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func main() {
task := worker.Task{
ID: uuid.New(),
Priority: 1,
Fn: func() (val interface{}, err error) { return "Hello, World from Task!", err },
Execute: func() (val interface{}, err error) { return "Hello, World from Task!", err },
}

res, err := srv.ExecuteTask(task.ID, time.Second*5)
Expand Down
39 changes: 27 additions & 12 deletions examples/middleware/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
)

func main() {
tm := worker.NewTaskManager(context.TODO(), 4, 10, 5, time.Second*3, time.Second*30, 3)
tm := worker.NewTaskManager(context.TODO(), 4, 10, 5, time.Second*3, time.Second*3, 3)

// defer tm.Close()
// defer tm.Stop()

var srv worker.Service = tm
// apply middleware in the same order as you want to execute them
Expand All @@ -24,12 +24,12 @@ func main() {
},
)

// defer srv.Close()
// defer srv.Stop()

task := worker.Task{
ID: uuid.New(),
Priority: 1,
Fn: func() (val interface{}, err error) {
Execute: func() (val interface{}, err error) {
return func(a int, b int) (val interface{}, err error) {
return a + b, err
}(2, 5)
Expand All @@ -40,22 +40,23 @@ func main() {
task1 := worker.Task{
ID: uuid.New(),
Priority: 10,
// Fn: func() (val interface{}, err error) { return "Hello, World from Task 1!", err },
// Execute: func() (val interface{}, err error) { return "Hello, World from Task 1!", err },
}

task2 := worker.Task{
ID: uuid.New(),
Priority: 5,
Fn: func() (val interface{}, err error) {
Execute: func() (val interface{}, err error) {
time.Sleep(time.Second * 2)
return "Hello, World from Task 2!", err
},
Ctx: context.TODO(),
}

task3 := worker.Task{
ID: uuid.New(),
Priority: 90,
Fn: func() (val interface{}, err error) {
Execute: func() (val interface{}, err error) {
// Simulate a long running task
// time.Sleep(3 * time.Second)
return "Hello, World from Task 3!", err
Expand All @@ -65,7 +66,7 @@ func main() {
task4 := worker.Task{
ID: uuid.New(),
Priority: 150,
Fn: func() (val interface{}, err error) {
Execute: func() (val interface{}, err error) {
// Simulate a long running task
time.Sleep(1 * time.Second)
return "Hello, World from Task 4!", err
Expand All @@ -79,12 +80,26 @@ func main() {
srv.RegisterTask(context.TODO(), task4)

// Print results
for result := range srv.GetResults() {
fmt.Println(result)
}
// for result := range srv.GetResults() {
// fmt.Println(result)
// }
// tm.Wait(tm.Timeout)

tasks := srv.GetTasks()
for _, task := range tasks {
fmt.Println(task)
fmt.Print(task.ID, " ", task.Priority, " ", task.Status, " ", task.Error, " ", "\n")
}

fmt.Println("printing cancelled tasks")

// get the cancelled tasks
cancelledTasks := tm.GetCancelledTasks()

select {
case task := <-cancelledTasks:
fmt.Printf("Task %s was cancelled\n", task.ID.String())
default:
fmt.Println("No tasks have been cancelled yet")
}

}
129 changes: 129 additions & 0 deletions examples/multi/multi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package main

import (
"context"
"fmt"
"log"
"os"
"path"
"time"

"github.com/google/uuid"
worker "github.com/hyp3rd/go-worker"
)

func main() {
// create a new task manager
tm := worker.NewTaskManager(context.TODO(), 1, 50, 10, time.Second*60, time.Second*2, 5)

// register and execute 10 tasks in a separate goroutine
go func() {
for i := 0; i < 10; i++ {
j := i
// create a new task
id := uuid.New()
task := worker.Task{
ID: id,
Name: "Some task",
Description: "Here goes the description of the task",
Priority: 10,
Execute: func() (val interface{}, err error) {
emptyFile, error := os.Create(path.Join("examples", "multi", "res", fmt.Sprintf("1st__EmptyFile___%v.txt", j)))
if error != nil {
log.Fatal(error)
}
emptyFile.Close()
return fmt.Sprintf("** task number %v with id %s executed", j, id), err
},
Ctx: context.TODO(),
Retries: 5,
RetryDelay: 1,
}

// register the task
tm.RegisterTask(context.TODO(), task)
}
}()

// register and execute 10 tasks in a separate goroutine
go func() {
for i := 0; i < 10; i++ {
j := i
// create a new task
id := uuid.New()
task := worker.Task{
ID: id,
Execute: func() (val interface{}, err error) {
emptyFile, error := os.Create(path.Join("examples", "multi", "res", fmt.Sprintf("2nd__EmptyFile___%v.txt", j)))

if error != nil {
log.Fatal(error)
}
emptyFile.Close()
// time.Sleep(time.Millisecond * 100)
return fmt.Sprintf("**** task number %v with id %s executed", j, id), err
},
// Ctx: context.TODO(),
Retries: 5,
RetryDelay: 1,
}

// register the task
tm.RegisterTask(context.TODO(), task)
}
}()

for i := 0; i < 10; i++ {
j := i
// create a new task
id := uuid.New()
task := worker.Task{
ID: id,
Execute: func() (val interface{}, err error) {
emptyFile, error := os.Create(path.Join("examples", "multi", "res", fmt.Sprintf("3nd__EmptyFile___%v.txt", j)))

if error != nil {
log.Fatal(error)
}
emptyFile.Close()
// time.Sleep(time.Millisecond * 100)
return fmt.Sprintf("**** task number %v with id %s executed", j, id), err
},
Ctx: context.TODO(),
Retries: 5,
RetryDelay: 1,
}

// register the task
tm.RegisterTask(context.TODO(), task)
}

for i := 0; i < 10; i++ {
j := i
// create a new task
id := uuid.New()
task := worker.Task{
ID: id,
Execute: func() (val interface{}, err error) {
emptyFile, err := os.Create(path.Join("examples", "wrong-path", "res", fmt.Sprintf("4nd__EmptyFile___%v.txt", j)))

if err != nil {
log.Println(err)
}
emptyFile.Close()
// time.Sleep(time.Millisecond * 100)
return fmt.Sprintf("**** wrong task number %v with id %s executed", j, id), err
},
Ctx: context.TODO(),
Retries: 3,
}

// register the task
tm.RegisterTask(context.TODO(), task)
}

// wait for the tasks to finish and print the results
for id, result := range tm.GetResults() {
fmt.Println(id, result)
}
}
File renamed without changes.
87 changes: 0 additions & 87 deletions examples/test/test.go

This file was deleted.

6 changes: 3 additions & 3 deletions middleware/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@ func (mw *loggerMiddleware) GetResults() []worker.Result {
return mw.next.GetResults()
}

// GetCancelled streams the cancelled tasks channel
func (mw *loggerMiddleware) GetCancelled() <-chan worker.Task {
return mw.next.GetCancelled()
// GetCancelledTasks streams the cancelled tasks channel
func (mw *loggerMiddleware) GetCancelledTasks() <-chan worker.Task {
return mw.next.GetCancelledTasks()
}

// GetTask gets a task by its ID
Expand Down
4 changes: 2 additions & 2 deletions service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ type Service interface {
StreamResults() <-chan Result
// GetResults retruns the `Result` channel
GetResults() []Result
// GetCancelled gets the cancelled tasks channel
GetCancelled() <-chan Task
// GetCancelledTasks gets the cancelled tasks channel
GetCancelledTasks() <-chan Task
// GetTask gets a task by its ID
GetTask(id uuid.UUID) (task *Task, err error)
// GetTasks gets all tasks
Expand Down
Loading

0 comments on commit d2e12d8

Please sign in to comment.