Skip to content

Commit

Permalink
respect ttl (#12)
Browse files Browse the repository at this point in the history
* respect ttl

* tidy go mod

* don't use ptime

* lint

* gofmt

* minutes to nanoseconds

* pr feedback
  • Loading branch information
jbonzo authored Oct 19, 2021
1 parent dc9aff2 commit c43b7ba
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 24 deletions.
68 changes: 45 additions & 23 deletions errand-routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,27 +84,7 @@ func (s *ErrandsServer) failedErrand(c *gin.Context) {
}

updatedErrand, err := s.UpdateErrandByID(c.Param("id"), func(errand *schemas.Errand) error {
// Update this errand attributes:
if err := errand.AddToLogs("ERROR", failedReq.Reason); err != nil {
return err
}
errand.Failed = utils.GetTimestamp()
errand.Status = schemas.StatusFailed
errand.Progress = 0
if errand.Options.Retries > 0 {
// If we should retry this errand:
if errand.Attempts <= errand.Options.Retries {
errand.Status = inactive
} else {
// If this errand is out of retries
metrics.ErrandFailed(errand.Type)
}
} else {
// If this errand was not configured with retries
metrics.ErrandFailed(errand.Type)
}

return nil
return failErrand(errand, failedReq)
})
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{
Expand All @@ -122,6 +102,30 @@ func (s *ErrandsServer) failedErrand(c *gin.Context) {
})
}

func failErrand(errand *schemas.Errand, failureRequest FailedRequest) error {
// Update this errand attributes:
if err := errand.AddToLogs("ERROR", failureRequest.Reason); err != nil {
return err
}
errand.Failed = utils.GetTimestamp()
errand.Status = schemas.StatusFailed
errand.Progress = 0
if errand.Options.Retries > 0 {
// If we should retry this errand:
if errand.Attempts <= errand.Options.Retries {
errand.Status = inactive
} else {
// If this errand is out of retries
metrics.ErrandFailed(errand.Type)
}
} else {
// If this errand was not configured with retries
metrics.ErrandFailed(errand.Type)
}

return nil
}

type CompletedRequest struct {
Results *gin.H `json:"results"`
}
Expand Down Expand Up @@ -256,14 +260,14 @@ func (s *ErrandsServer) deleteErrandByID(id string) {
}

// UpdateErrandByID Lets you pass in a function which will be called allowing you to update the errand. If no error is returned, the errand will be saved in the DB with the new attributes.
func (s *ErrandsServer) UpdateErrandByID(id string, fn func(*schemas.Errand) error) (*schemas.Errand, error) {
func (s *ErrandsServer) UpdateErrandByID(id string, update func(*schemas.Errand) error) (*schemas.Errand, error) {
errandObj, found := s.ErrandStore.Get(id)
if !found {
return nil, errors.New("errand with this ID not found")
}

errand := errandObj.(schemas.Errand)
if err := fn(&errand); err != nil {
if err := update(&errand); err != nil {
return nil, fmt.Errorf("error in given update function (fn): %w", err)
}

Expand All @@ -273,3 +277,21 @@ func (s *ErrandsServer) UpdateErrandByID(id string, fn func(*schemas.Errand) err

return &errand, nil
}

func (s *ErrandsServer) UpdateErrandsByFilter(filter func(*schemas.Errand) bool, update func(*schemas.Errand) error) error {
for _, itemObj := range s.ErrandStore.Items() {
errand := itemObj.Object.(schemas.Errand)

if filter(&errand) {
if err := update(&errand); err != nil {
return fmt.Errorf("error in given update function (fn): %w", err)
}

s.updateErrandInPipeline(&errand)

s.ErrandStore.SetDefault(errand.ID, errand)
}
}

return nil
}
34 changes: 34 additions & 0 deletions errands-server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"fmt"
"net/http"
"path"
"reflect"
Expand Down Expand Up @@ -77,6 +78,8 @@ func NewErrandsServer(cfg *Config) *ErrandsServer {

go obj.periodicallySaveDB()

go obj.periodicallyCheckTTLs()

return obj
}

Expand All @@ -93,6 +96,37 @@ func (s *ErrandsServer) periodicallySaveDB() {
}
}

func (s *ErrandsServer) periodicallyCheckTTLs() {
t := time.NewTicker(time.Minute)
defer t.Stop()

filter := func(errand *schemas.Errand) bool {
if errand.Options.TTL <= 0 {
return false
}

started := time.Unix(0, errand.Started*1_000_000) // ms to ns
ttlDuration := time.Duration(errand.Options.TTL) * time.Minute // m to ns

return time.Since(started) > ttlDuration
}

update := func(errand *schemas.Errand) error {
if err := failErrand(errand, FailedRequest{Reason: "TTL Expired"}); err != nil {
return fmt.Errorf("unable to fail errand: %s; %w", errand.ID, err)
}

s.AddNotification("failed", errand)
return nil
}

for range t.C {
if err := s.UpdateErrandsByFilter(filter, update); err != nil {
log.WithError(err).Error("Unable to fail errand(s)")
}
}
}

func (s *ErrandsServer) saveDBs() {
if err := s.ErrandStore.SaveFile(path.Join(s.StorageDir, errandsDBPathSuffix)); err != nil {
log.Error("----- Error checkpoint saving the errand DB to file -----")
Expand Down
2 changes: 1 addition & 1 deletion schemas/errand.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package schemas

import (
"fmt"

"github.com/google/uuid"

"github.com/polygon-io/errands-server/utils"
Expand Down Expand Up @@ -51,6 +50,7 @@ type Errand struct {
// ErrandOptions holds various options tied to an errand.
//easyjson:json
type ErrandOptions struct {
// TTL is measured in minutes.
TTL int `json:"ttl,omitempty"`
Retries int `json:"retries,omitempty"`
Priority int `json:"priority,omitempty"`
Expand Down

0 comments on commit c43b7ba

Please sign in to comment.