diff --git a/errand-routes.go b/errand-routes.go index 67a52c0..cc4cf8e 100644 --- a/errand-routes.go +++ b/errand-routes.go @@ -84,27 +84,7 @@ func (s *ErrandsServer) failedErrand(c *gin.Context) { } updatedErrand, err := s.UpdateErrandByID(c.Param("id"), func(errand *schemas.Errand) error { - // Update this errand attributes: - if err := errand.AddToLogs("ERROR", failedReq.Reason); err != nil { - return err - } - errand.Failed = utils.GetTimestamp() - errand.Status = schemas.StatusFailed - errand.Progress = 0 - if errand.Options.Retries > 0 { - // If we should retry this errand: - if errand.Attempts <= errand.Options.Retries { - errand.Status = inactive - } else { - // If this errand is out of retries - metrics.ErrandFailed(errand.Type) - } - } else { - // If this errand was not configured with retries - metrics.ErrandFailed(errand.Type) - } - - return nil + return failErrand(errand, failedReq) }) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{ @@ -122,6 +102,30 @@ func (s *ErrandsServer) failedErrand(c *gin.Context) { }) } +func failErrand(errand *schemas.Errand, failureRequest FailedRequest) error { + // Update this errand attributes: + if err := errand.AddToLogs("ERROR", failureRequest.Reason); err != nil { + return err + } + errand.Failed = utils.GetTimestamp() + errand.Status = schemas.StatusFailed + errand.Progress = 0 + if errand.Options.Retries > 0 { + // If we should retry this errand: + if errand.Attempts <= errand.Options.Retries { + errand.Status = inactive + } else { + // If this errand is out of retries + metrics.ErrandFailed(errand.Type) + } + } else { + // If this errand was not configured with retries + metrics.ErrandFailed(errand.Type) + } + + return nil +} + type CompletedRequest struct { Results *gin.H `json:"results"` } @@ -256,14 +260,14 @@ func (s *ErrandsServer) deleteErrandByID(id string) { } // UpdateErrandByID Lets you pass in a function which will be called allowing you to update the errand. If no error is returned, the errand will be saved in the DB with the new attributes. -func (s *ErrandsServer) UpdateErrandByID(id string, fn func(*schemas.Errand) error) (*schemas.Errand, error) { +func (s *ErrandsServer) UpdateErrandByID(id string, update func(*schemas.Errand) error) (*schemas.Errand, error) { errandObj, found := s.ErrandStore.Get(id) if !found { return nil, errors.New("errand with this ID not found") } errand := errandObj.(schemas.Errand) - if err := fn(&errand); err != nil { + if err := update(&errand); err != nil { return nil, fmt.Errorf("error in given update function (fn): %w", err) } @@ -273,3 +277,21 @@ func (s *ErrandsServer) UpdateErrandByID(id string, fn func(*schemas.Errand) err return &errand, nil } + +func (s *ErrandsServer) UpdateErrandsByFilter(filter func(*schemas.Errand) bool, update func(*schemas.Errand) error) error { + for _, itemObj := range s.ErrandStore.Items() { + errand := itemObj.Object.(schemas.Errand) + + if filter(&errand) { + if err := update(&errand); err != nil { + return fmt.Errorf("error in given update function (fn): %w", err) + } + + s.updateErrandInPipeline(&errand) + + s.ErrandStore.SetDefault(errand.ID, errand) + } + } + + return nil +} diff --git a/errands-server.go b/errands-server.go index 9c48f17..c6c0180 100644 --- a/errands-server.go +++ b/errands-server.go @@ -1,6 +1,7 @@ package main import ( + "fmt" "net/http" "path" "reflect" @@ -77,6 +78,8 @@ func NewErrandsServer(cfg *Config) *ErrandsServer { go obj.periodicallySaveDB() + go obj.periodicallyCheckTTLs() + return obj } @@ -93,6 +96,37 @@ func (s *ErrandsServer) periodicallySaveDB() { } } +func (s *ErrandsServer) periodicallyCheckTTLs() { + t := time.NewTicker(time.Minute) + defer t.Stop() + + filter := func(errand *schemas.Errand) bool { + if errand.Options.TTL <= 0 { + return false + } + + started := time.Unix(0, errand.Started*1_000_000) // ms to ns + ttlDuration := time.Duration(errand.Options.TTL) * time.Minute // m to ns + + return time.Since(started) > ttlDuration + } + + update := func(errand *schemas.Errand) error { + if err := failErrand(errand, FailedRequest{Reason: "TTL Expired"}); err != nil { + return fmt.Errorf("unable to fail errand: %s; %w", errand.ID, err) + } + + s.AddNotification("failed", errand) + return nil + } + + for range t.C { + if err := s.UpdateErrandsByFilter(filter, update); err != nil { + log.WithError(err).Error("Unable to fail errand(s)") + } + } +} + func (s *ErrandsServer) saveDBs() { if err := s.ErrandStore.SaveFile(path.Join(s.StorageDir, errandsDBPathSuffix)); err != nil { log.Error("----- Error checkpoint saving the errand DB to file -----") diff --git a/schemas/errand.go b/schemas/errand.go index 2c47ea9..0086ff3 100644 --- a/schemas/errand.go +++ b/schemas/errand.go @@ -3,7 +3,6 @@ package schemas import ( "fmt" - "github.com/google/uuid" "github.com/polygon-io/errands-server/utils" @@ -51,6 +50,7 @@ type Errand struct { // ErrandOptions holds various options tied to an errand. //easyjson:json type ErrandOptions struct { + // TTL is measured in minutes. TTL int `json:"ttl,omitempty"` Retries int `json:"retries,omitempty"` Priority int `json:"priority,omitempty"`