Skip to content

Commit

Permalink
feat/boredom
Browse files Browse the repository at this point in the history
  • Loading branch information
snadrus authored and magik6k committed Apr 16, 2024
1 parent 649a1b5 commit da92001
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 14 deletions.
37 changes: 37 additions & 0 deletions itests/harmonytask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,3 +264,40 @@ func TestTaskRetry(t *testing.T) {
{2, false, "error: intentional 'error'"}}, res)
})
}

func TestBoredom(t *testing.T) {
//t.Parallel()
withDbSetup(t, func(m *kit.TestMiner) {
cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB
harmonytask.POLL_DURATION = time.Millisecond * 100
var taskID harmonytask.TaskID
var ran bool
boredParty := &passthru{
dtl: harmonytask.TaskTypeDetails{
Name: "boredTest",
Max: -1,
Cost: resources.Resources{},
IAmBored: func(add harmonytask.AddTaskFunc) error {
add(func(tID harmonytask.TaskID, tx *harmonydb.Tx) (bool, error) {
taskID = tID
return true, nil
})
return nil
},
},
canAccept: func(list []harmonytask.TaskID, e *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
require.Equal(t, harmonytask.WorkSourceIAmBored, e.WorkOrigin)
return &list[0], nil
},
do: func(tID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
require.Equal(t, taskID, tID)
ran = true
return true, nil
},
}
ht, err := harmonytask.New(cdb, []harmonytask.TaskInterface{boredParty}, "test:1")
require.NoError(t, err)
require.Eventually(t, func() bool { return ran }, time.Second, time.Millisecond*100)
ht.GracefullyTerminate(time.Hour)
})
}
57 changes: 46 additions & 11 deletions lib/harmony/harmonytask/harmonytask.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ type TaskTypeDetails struct {
// NOTE: if refatoring tasks, see if your task is
// necessary. Ex: Is the sector state correct for your stage to run?
Follows map[string]func(TaskID, AddTaskFunc) (bool, error)

// IAmBored is called (when populated) when there's capacity but no work.
// Tasks added will be proposed to CanAccept() on this machine.
// CanAccept() can read taskEngine's WorkOrigin string to learn about a task.
// Ex: make new CC sectors, clean-up, or retrying pipelines that failed in later states.
IAmBored func(AddTaskFunc) error
}

// TaskInterface must be implemented in order to have a task used by harmonytask.
Expand Down Expand Up @@ -97,17 +103,21 @@ type TaskInterface interface {
type AddTaskFunc func(extraInfo func(TaskID, *harmonydb.Tx) (shouldCommit bool, seriousError error))

type TaskEngine struct {
ctx context.Context
handlers []*taskTypeHandler
db *harmonydb.DB
reg *resources.Reg
grace context.CancelFunc
taskMap map[string]*taskTypeHandler
ownerID int
follows map[string][]followStruct
// Static After New()
ctx context.Context
handlers []*taskTypeHandler
db *harmonydb.DB
reg *resources.Reg
grace context.CancelFunc
taskMap map[string]*taskTypeHandler
ownerID int
follows map[string][]followStruct
hostAndPort string

// synchronous to the single-threaded poller
lastFollowTime time.Time
lastCleanup atomic.Value
hostAndPort string
WorkOrigin string
}
type followStruct struct {
f func(TaskID, AddTaskFunc) (bool, error)
Expand Down Expand Up @@ -177,7 +187,7 @@ func New(
continue // not really fatal, but not great
}
}
if !h.considerWork(workSourceRecover, []TaskID{TaskID(w.ID)}) {
if !h.considerWork(WorkSourceRecover, []TaskID{TaskID(w.ID)}) {
log.Errorw("Strange: Unable to accept previously owned task", "id", w.ID, "type", w.Name)
}
}
Expand Down Expand Up @@ -327,13 +337,38 @@ func (e *TaskEngine) pollerTryAllWork() bool {
continue
}
if len(unownedTasks) > 0 {
accepted := v.considerWork(workSourcePoller, unownedTasks)
accepted := v.considerWork(WorkSourcePoller, unownedTasks)
if accepted {
return true // accept new work slowly and in priority order
}
log.Warn("Work not accepted for " + strconv.Itoa(len(unownedTasks)) + " " + v.Name + " task(s)")
}
}
// if no work was accepted, are we bored? Then find work in priority order.
for _, v := range e.handlers {
if v.AssertMachineHasCapacity() != nil {
continue
}
if v.TaskTypeDetails.IAmBored != nil {
var added []TaskID
err := v.TaskTypeDetails.IAmBored(func(extraInfo func(TaskID, *harmonydb.Tx) (shouldCommit bool, seriousError error)) {
v.AddTask(func(tID TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
b, err := extraInfo(tID, tx)
if err == nil {
added = append(added, tID)
}
return b, err
})
})
if err != nil {
log.Error("IAmBored failed: ", err)
continue
}
if added != nil { // tiny chance a fail could make these bogus, but considerWork should then fail.
v.considerWork(WorkSourceIAmBored, added)
}
}
}

return false
}
Expand Down
12 changes: 9 additions & 3 deletions lib/harmony/harmonytask/task_type_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ retryAddTask:
}

const (
workSourcePoller = "poller"
workSourceRecover = "recovered"
WorkSourcePoller = "poller"
WorkSourceRecover = "recovered"
WorkSourceIAmBored = "bored"
)

// considerWork is called to attempt to start work on a task-id of this task type.
Expand Down Expand Up @@ -84,9 +85,14 @@ top:
return false
}

h.TaskEngine.WorkOrigin = from

// 3. What does the impl say?
canAcceptAgain:
tID, err := h.CanAccept(ids, h.TaskEngine)

h.TaskEngine.WorkOrigin = ""

if err != nil {
log.Error(err)
return false
Expand Down Expand Up @@ -123,7 +129,7 @@ canAcceptAgain:
}

// if recovering we don't need to try to claim anything because those tasks are already claimed by us
if from != workSourceRecover {
if from != WorkSourceRecover {
// 4. Can we claim the work for our hostname?
ct, err := h.TaskEngine.db.Exec(h.TaskEngine.ctx, "UPDATE harmony_task SET owner_id=$1 WHERE id=$2 AND owner_id IS NULL", h.TaskEngine.ownerID, *tID)
if err != nil {
Expand Down

0 comments on commit da92001

Please sign in to comment.