From da92001c71a7a1a61a987d916271b4da2063c50d Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Thu, 1 Feb 2024 15:03:09 -0600 Subject: [PATCH] feat/boredom --- itests/harmonytask_test.go | 37 +++++++++++++ lib/harmony/harmonytask/harmonytask.go | 57 ++++++++++++++++---- lib/harmony/harmonytask/task_type_handler.go | 12 +++-- 3 files changed, 92 insertions(+), 14 deletions(-) diff --git a/itests/harmonytask_test.go b/itests/harmonytask_test.go index beef04c8d88..d4aa7550cbd 100644 --- a/itests/harmonytask_test.go +++ b/itests/harmonytask_test.go @@ -264,3 +264,40 @@ func TestTaskRetry(t *testing.T) { {2, false, "error: intentional 'error'"}}, res) }) } + +func TestBoredom(t *testing.T) { + //t.Parallel() + withDbSetup(t, func(m *kit.TestMiner) { + cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB + harmonytask.POLL_DURATION = time.Millisecond * 100 + var taskID harmonytask.TaskID + var ran bool + boredParty := &passthru{ + dtl: harmonytask.TaskTypeDetails{ + Name: "boredTest", + Max: -1, + Cost: resources.Resources{}, + IAmBored: func(add harmonytask.AddTaskFunc) error { + add(func(tID harmonytask.TaskID, tx *harmonydb.Tx) (bool, error) { + taskID = tID + return true, nil + }) + return nil + }, + }, + canAccept: func(list []harmonytask.TaskID, e *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { + require.Equal(t, harmonytask.WorkSourceIAmBored, e.WorkOrigin) + return &list[0], nil + }, + do: func(tID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { + require.Equal(t, taskID, tID) + ran = true + return true, nil + }, + } + ht, err := harmonytask.New(cdb, []harmonytask.TaskInterface{boredParty}, "test:1") + require.NoError(t, err) + require.Eventually(t, func() bool { return ran }, time.Second, time.Millisecond*100) + ht.GracefullyTerminate(time.Hour) + }) +} diff --git a/lib/harmony/harmonytask/harmonytask.go b/lib/harmony/harmonytask/harmonytask.go index 2d537868da9..4f5daa9636a 100644 --- a/lib/harmony/harmonytask/harmonytask.go +++ b/lib/harmony/harmonytask/harmonytask.go @@ -39,6 +39,12 @@ type TaskTypeDetails struct { // NOTE: if refatoring tasks, see if your task is // necessary. Ex: Is the sector state correct for your stage to run? Follows map[string]func(TaskID, AddTaskFunc) (bool, error) + + // IAmBored is called (when populated) when there's capacity but no work. + // Tasks added will be proposed to CanAccept() on this machine. + // CanAccept() can read taskEngine's WorkOrigin string to learn about a task. + // Ex: make new CC sectors, clean-up, or retrying pipelines that failed in later states. + IAmBored func(AddTaskFunc) error } // TaskInterface must be implemented in order to have a task used by harmonytask. @@ -97,17 +103,21 @@ type TaskInterface interface { type AddTaskFunc func(extraInfo func(TaskID, *harmonydb.Tx) (shouldCommit bool, seriousError error)) type TaskEngine struct { - ctx context.Context - handlers []*taskTypeHandler - db *harmonydb.DB - reg *resources.Reg - grace context.CancelFunc - taskMap map[string]*taskTypeHandler - ownerID int - follows map[string][]followStruct + // Static After New() + ctx context.Context + handlers []*taskTypeHandler + db *harmonydb.DB + reg *resources.Reg + grace context.CancelFunc + taskMap map[string]*taskTypeHandler + ownerID int + follows map[string][]followStruct + hostAndPort string + + // synchronous to the single-threaded poller lastFollowTime time.Time lastCleanup atomic.Value - hostAndPort string + WorkOrigin string } type followStruct struct { f func(TaskID, AddTaskFunc) (bool, error) @@ -177,7 +187,7 @@ func New( continue // not really fatal, but not great } } - if !h.considerWork(workSourceRecover, []TaskID{TaskID(w.ID)}) { + if !h.considerWork(WorkSourceRecover, []TaskID{TaskID(w.ID)}) { log.Errorw("Strange: Unable to accept previously owned task", "id", w.ID, "type", w.Name) } } @@ -327,13 +337,38 @@ func (e *TaskEngine) pollerTryAllWork() bool { continue } if len(unownedTasks) > 0 { - accepted := v.considerWork(workSourcePoller, unownedTasks) + accepted := v.considerWork(WorkSourcePoller, unownedTasks) if accepted { return true // accept new work slowly and in priority order } log.Warn("Work not accepted for " + strconv.Itoa(len(unownedTasks)) + " " + v.Name + " task(s)") } } + // if no work was accepted, are we bored? Then find work in priority order. + for _, v := range e.handlers { + if v.AssertMachineHasCapacity() != nil { + continue + } + if v.TaskTypeDetails.IAmBored != nil { + var added []TaskID + err := v.TaskTypeDetails.IAmBored(func(extraInfo func(TaskID, *harmonydb.Tx) (shouldCommit bool, seriousError error)) { + v.AddTask(func(tID TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { + b, err := extraInfo(tID, tx) + if err == nil { + added = append(added, tID) + } + return b, err + }) + }) + if err != nil { + log.Error("IAmBored failed: ", err) + continue + } + if added != nil { // tiny chance a fail could make these bogus, but considerWork should then fail. + v.considerWork(WorkSourceIAmBored, added) + } + } + } return false } diff --git a/lib/harmony/harmonytask/task_type_handler.go b/lib/harmony/harmonytask/task_type_handler.go index 7aecd380f2c..a8c6e58b8fc 100644 --- a/lib/harmony/harmonytask/task_type_handler.go +++ b/lib/harmony/harmonytask/task_type_handler.go @@ -53,8 +53,9 @@ retryAddTask: } const ( - workSourcePoller = "poller" - workSourceRecover = "recovered" + WorkSourcePoller = "poller" + WorkSourceRecover = "recovered" + WorkSourceIAmBored = "bored" ) // considerWork is called to attempt to start work on a task-id of this task type. @@ -84,9 +85,14 @@ top: return false } + h.TaskEngine.WorkOrigin = from + // 3. What does the impl say? canAcceptAgain: tID, err := h.CanAccept(ids, h.TaskEngine) + + h.TaskEngine.WorkOrigin = "" + if err != nil { log.Error(err) return false @@ -123,7 +129,7 @@ canAcceptAgain: } // if recovering we don't need to try to claim anything because those tasks are already claimed by us - if from != workSourceRecover { + if from != WorkSourceRecover { // 4. Can we claim the work for our hostname? ct, err := h.TaskEngine.db.Exec(h.TaskEngine.ctx, "UPDATE harmony_task SET owner_id=$1 WHERE id=$2 AND owner_id IS NULL", h.TaskEngine.ownerID, *tID) if err != nil {