diff --git a/client.go b/client.go index 66c14fd..b750c52 100644 --- a/client.go +++ b/client.go @@ -1,7 +1,5 @@ - package main - import ( // "io" "errors" @@ -10,42 +8,33 @@ import ( gin "github.com/gin-gonic/gin" ) - - - - - - - type Client struct { - ErrandsServer *ErrandsServer - Notifications chan *Notification - Gin *gin.Context - EventSubs []string + ErrandsServer *ErrandsServer + Notifications chan *Notification + Gin *gin.Context + EventSubs []string } -func ( s *ErrandsServer ) RemoveClient( c *Client ){ - close( c.Notifications ) +func (s *ErrandsServer) RemoveClient(c *Client) { + close(c.Notifications) s.UnregisterClient <- c } -func ( s *ErrandsServer ) NewClient( c *gin.Context ) ( *Client, error ) { +func (s *ErrandsServer) NewClient(c *gin.Context) (*Client, error) { obj := &Client{ Notifications: make(chan *Notification, 10), ErrandsServer: s, - Gin: c, + Gin: c, } events := c.DefaultQuery("events", "*") obj.EventSubs = strings.Split(events, ",") - if len( obj.EventSubs ) < 1 { + if len(obj.EventSubs) < 1 { return obj, errors.New("Must have at least 1 event subscription") } s.RegisterClient <- obj return obj, nil } -func ( c *Client ) Gone(){ - c.ErrandsServer.RemoveClient( c ) +func (c *Client) Gone() { + c.ErrandsServer.RemoveClient(c) } - - diff --git a/errand-routes.go b/errand-routes.go index b8a7cae..a1aa854 100644 --- a/errand-routes.go +++ b/errand-routes.go @@ -1,35 +1,31 @@ - package main - import ( // "fmt" "errors" "net/http" + gin "github.com/gin-gonic/gin" - badger "github.com/dgraph-io/badger" - utils "github.com/polygon-io/errands-server/utils" schemas "github.com/polygon-io/errands-server/schemas" + utils "github.com/polygon-io/errands-server/utils" ) - - - type UpdateRequest struct { - Progress float64 `json:"progress"` - Logs []string `json:"logs"` + Progress float64 `json:"progress"` + Logs []string `json:"logs"` } -func ( s *ErrandsServer ) updateErrand( c *gin.Context ){ + +func (s *ErrandsServer) updateErrand(c *gin.Context) { var updatedErrand *schemas.Errand var updateReq UpdateRequest if err := c.ShouldBind(&updateReq); err != nil { c.JSON(http.StatusBadRequest, gin.H{ "message": "Invalid Parameters", - "error": err.Error(), + "error": err.Error(), }) return } - updatedErrand, err := s.UpdateErrandByID(c.Param("id"), func( errand *schemas.Errand ) error { + updatedErrand, err := s.UpdateErrandByID(c.Param("id"), func(errand *schemas.Errand) error { if errand.Status != "active" { return errors.New("Errand must be in active state to update progress") } @@ -38,43 +34,39 @@ func ( s *ErrandsServer ) updateErrand( c *gin.Context ){ if updateReq.Progress < 0 || updateReq.Progress >= 101 { return errors.New("Progress must be between 0 - 100") } - errand.Progress = float64( updateReq.Progress ) + errand.Progress = float64(updateReq.Progress) } return nil }) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{ "message": "Internal Server Error!", - "error": err.Error(), + "error": err.Error(), }) return } - s.AddNotification( "updated", updatedErrand ) + s.AddNotification("updated", updatedErrand) c.JSON(http.StatusOK, gin.H{ - "status": "OK", + "status": "OK", "results": updatedErrand, }) } - - - - - type FailedRequest struct { - Reason string `json:"reason" binding:"required"` + Reason string `json:"reason" binding:"required"` } -func ( s *ErrandsServer ) failedErrand( c *gin.Context ){ + +func (s *ErrandsServer) failedErrand(c *gin.Context) { var updatedErrand *schemas.Errand var failedReq FailedRequest if err := c.ShouldBind(&failedReq); err != nil { c.JSON(http.StatusBadRequest, gin.H{ "message": "Invalid Parameters", - "error": err.Error(), + "error": err.Error(), }) return } - updatedErrand, err := s.UpdateErrandByID(c.Param("id"), func( errand *schemas.Errand ) error { + updatedErrand, err := s.UpdateErrandByID(c.Param("id"), func(errand *schemas.Errand) error { // if errand.Status != "active" { // return errors.New("Errand must be in active state to fail") // } @@ -96,38 +88,33 @@ func ( s *ErrandsServer ) failedErrand( c *gin.Context ){ if err != nil { c.JSON(http.StatusInternalServerError, gin.H{ "message": "Internal Server Error!", - "error": err.Error(), + "error": err.Error(), }) return } - s.AddNotification( "failed", updatedErrand ) + s.AddNotification("failed", updatedErrand) c.JSON(http.StatusOK, gin.H{ - "status": "OK", + "status": "OK", "results": updatedErrand, }) } - - - type CompletedRequest struct { - Results *gin.H `json:"results"` + Results *gin.H `json:"results"` } -func ( s *ErrandsServer ) completeErrand( c *gin.Context ){ + +func (s *ErrandsServer) completeErrand(c *gin.Context) { var updatedErrand *schemas.Errand var compReq CompletedRequest if err := c.ShouldBind(&compReq); err != nil { c.JSON(http.StatusBadRequest, gin.H{ "message": "Invalid Parameters", - "error": err.Error(), + "error": err.Error(), }) return } shouldDelete := false - updatedErrand, err := s.UpdateErrandByID(c.Param("id"), func( errand *schemas.Errand ) error { - // if errand.Status != "active" { - // return errors.New("Errand must be in active state to complete") - // } + updatedErrand, err := s.UpdateErrandByID(c.Param("id"), func(errand *schemas.Errand) error { // Update this errand attributes: if err := errand.AddToLogs("INFO", "Completed!"); err != nil { return err @@ -135,7 +122,7 @@ func ( s *ErrandsServer ) completeErrand( c *gin.Context ){ errand.Completed = utils.GetTimestamp() errand.Status = "completed" errand.Progress = 100 - errand.Results = compReq.Results + // errand.Results = compReq.Results // If we should delete this errand upon completion: if errand.Options.DeleteOnCompleted == true { shouldDelete = true @@ -143,30 +130,28 @@ func ( s *ErrandsServer ) completeErrand( c *gin.Context ){ return nil }) if err == nil && shouldDelete == true && updatedErrand.ID != "" { - err = s.deleteErrandByID( updatedErrand.ID ) + err = s.deleteErrandByID(updatedErrand.ID) } - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{ - "message": "Internal Server Error!", - "error": err.Error(), - }) - return + + if shouldDelete == true && updatedErrand.ID != "" { + if err := s.deleteErrandByID(updatedErrand.ID); err != nil { + c.JSON(http.StatusInternalServerError, gin.H{ + "message": "Internal Server Error!", + "error": err.Error(), + }) + return + } } - s.AddNotification( "completed", updatedErrand ) + s.AddNotification("completed", updatedErrand) c.JSON(http.StatusOK, gin.H{ - "status": "OK", + "status": "OK", "results": updatedErrand, }) } - - - - - -func ( s *ErrandsServer ) retryErrand( c *gin.Context ){ +func (s *ErrandsServer) retryErrand(c *gin.Context) { var updatedErrand *schemas.Errand - updatedErrand, err := s.UpdateErrandByID(c.Param("id"), func( errand *schemas.Errand ) error { + updatedErrand, err := s.UpdateErrandByID(c.Param("id"), func(errand *schemas.Errand) error { if errand.Status == "inactive" { return errors.New("Cannot retry errand which is in inactive state") } @@ -181,30 +166,27 @@ func ( s *ErrandsServer ) retryErrand( c *gin.Context ){ if err != nil { c.JSON(http.StatusInternalServerError, gin.H{ "message": "Internal Server Error!", - "error": err.Error(), + "error": err.Error(), }) return } - s.AddNotification( "retry", updatedErrand ) + s.AddNotification("retry", updatedErrand) c.JSON(http.StatusOK, gin.H{ - "status": "OK", + "status": "OK", "results": updatedErrand, }) } - - - -func ( s *ErrandsServer ) logToErrand( c *gin.Context ){ +func (s *ErrandsServer) logToErrand(c *gin.Context) { var logReq schemas.Log if err := c.ShouldBind(&logReq); err != nil { c.JSON(http.StatusBadRequest, gin.H{ "message": "Invalid Parameters", - "error": err.Error(), + "error": err.Error(), }) return } - updatedErrand, err := s.UpdateErrandByID(c.Param("id"), func( errand *schemas.Errand ) error { + updatedErrand, err := s.UpdateErrandByID(c.Param("id"), func(errand *schemas.Errand) error { if errand.Status != "active" { return errors.New("Errand must be in active state to log to") } @@ -214,26 +196,23 @@ func ( s *ErrandsServer ) logToErrand( c *gin.Context ){ if err != nil { c.JSON(http.StatusInternalServerError, gin.H{ "message": "Internal Server Error!", - "error": err.Error(), + "error": err.Error(), }) return } c.JSON(http.StatusOK, gin.H{ - "status": "OK", + "status": "OK", "results": updatedErrand, }) } - - - - -func ( s *ErrandsServer ) deleteErrand( c *gin.Context ){ - err := s.deleteErrandByID( c.Param("id") ) +func (s *ErrandsServer) deleteErrand(c *gin.Context) { + s.Store.Delete(c.Param("id")) + err := s.deleteErrandByID(c.Param("id")) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{ "message": "Internal Server Error!", - "error": err.Error(), + "error": err.Error(), }) return } @@ -242,48 +221,25 @@ func ( s *ErrandsServer ) deleteErrand( c *gin.Context ){ }) } - - - - - - - -func ( s *ErrandsServer ) deleteErrandByID( id string ) error { - return s.DB.Update(func(txn *badger.Txn) error { - return txn.Delete([]byte( id )) - }) +func (s *ErrandsServer) deleteErrandByID(id string) error { + s.Store.Delete(id) + return nil } - - /* - Pass in a function which will be called allowing you to update the errand. - If no error is returned, the errand will be saved in the DB with the new + UpdateErrandByID Lets you pass in a function which will be called allowing you to update the errand. + If no error is returned, the errand will be saved in the DB with the new attributes. - */ -func ( s *ErrandsServer ) UpdateErrandByID( id string, fn func( *schemas.Errand ) error ) ( *schemas.Errand, error ) { - var updatedErrand *schemas.Errand - err := s.DB.Update(func(txn *badger.Txn) error { - item, err := txn.Get([]byte( id )); if err != nil { - return err - } - err = item.Value(func(v []byte) error { - errand := &schemas.Errand{} - err := errand.UnmarshalJSON( v ); if err != nil { - return err - } - err = fn( errand ); if err != nil { - return err - } - updatedErrand = errand - err = s.saveErrand( txn, errand ); if err != nil { - return err - } - return nil - }) - return err - }) - return updatedErrand, err +*/ +func (s *ErrandsServer) UpdateErrandByID(id string, fn func(*schemas.Errand) error) (*schemas.Errand, error) { + errandObj, found := s.Store.Get(id) + if !found { + return nil, errors.New("Errand with this ID not found") + } + errand := errandObj.(schemas.Errand) + if err := fn(&errand); err != nil { + return nil, errors.New("Error in given update function (fn)") + } + s.Store.SetDefault(id, errand) + return &errand, nil } - diff --git a/errands-routes.go b/errands-routes.go index 1fbb54e..d9cc111 100644 --- a/errands-routes.go +++ b/errands-routes.go @@ -1,33 +1,29 @@ - package main - import ( "io" // "fmt" - "log" "sort" + + log "github.com/sirupsen/logrus" // "time" + "encoding/json" "errors" "net/http" - "encoding/json" + gin "github.com/gin-gonic/gin" - badger "github.com/dgraph-io/badger" - utils "github.com/polygon-io/errands-server/utils" schemas "github.com/polygon-io/errands-server/schemas" + utils "github.com/polygon-io/errands-server/utils" ) - - - - -func ( s *ErrandsServer ) errandNotifications( c *gin.Context ){ - client, err := s.NewClient( c ); if err != nil { +func (s *ErrandsServer) errandNotifications(c *gin.Context) { + client, err := s.NewClient(c) + if err != nil { c.JSON(http.StatusInternalServerError, gin.H{ "message": "Error Creating Subscription", "error": err.Error(), }) - return + return } w := client.Gin.Writer w.Header().Set("Cache-Control", "no-cache") @@ -40,15 +36,15 @@ func ( s *ErrandsServer ) errandNotifications( c *gin.Context ){ client.Gin.Stream(func(wr io.Writer) bool { for { select { - case <- clientGone: + case <-clientGone: client.Gone() return false case t, ok := <-client.Notifications: if ok { // If we are subscribed to this event type: if utils.Contains(client.EventSubs, t.Event) || client.EventSubs[0] == "*" { - jsonData, _ := json.Marshal( t ) - client.Gin.SSEvent("message", string( jsonData )) + jsonData, _ := json.Marshal(t) + client.Gin.SSEvent("message", string(jsonData)) w.Flush() } return true @@ -60,16 +56,7 @@ func ( s *ErrandsServer ) errandNotifications( c *gin.Context ){ }) } - - - - - - - - - -func ( s *ErrandsServer ) createErrand( c *gin.Context ){ +func (s *ErrandsServer) createErrand(c *gin.Context) { log.Println("creating errand") var item schemas.Errand if err := c.ShouldBindJSON(&item); err != nil { @@ -77,69 +64,51 @@ func ( s *ErrandsServer ) createErrand( c *gin.Context ){ "message": "Errand validation failed!", "error": err.Error(), }) - return - } - item.SetDefaults() - err := s.DB.Update(func( txn *badger.Txn ) error { - return s.saveErrand( txn, &item ) - }) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{ - "message": "Internal Server Error!", - "error": err.Error(), - }) return } - s.AddNotification( "created", &item ) + item.SetDefaults() + s.Store.SetDefault(item.ID, item) + s.AddNotification("created", &item) c.JSON(http.StatusOK, gin.H{ - "status": "OK", + "status": "OK", "results": item, }) } - - -func ( s *ErrandsServer ) saveErrand( txn *badger.Txn, errand *schemas.Errand ) error { +func (s *ErrandsServer) saveErrand(errand *schemas.Errand) error { if !utils.Contains(schemas.ErrandStatuses, errand.Status) { return errors.New("Invalid errand status state") } - bytes, err := errand.MarshalJSON(); if err != nil { - return err - } - return txn.Set([]byte(errand.ID), bytes) + s.Store.SetDefault(errand.ID, *errand) + return nil } - - - -func ( s *ErrandsServer ) getAllErrands( c *gin.Context ){ - errands, err := s.GetErrandsBy(func( errand *schemas.Errand ) bool { +func (s *ErrandsServer) getAllErrands(c *gin.Context) { + errands, err := s.GetErrandsBy(func(errand *schemas.Errand) bool { return true }) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{ "message": "Internal Server Error!", - "error": err.Error(), + "error": err.Error(), }) return } c.JSON(http.StatusOK, gin.H{ - "status": "OK", + "status": "OK", "results": errands, }) } - - -func ( s *ErrandsServer ) getFilteredErrands( c *gin.Context ){ +func (s *ErrandsServer) getFilteredErrands(c *gin.Context) { key := c.Param("key") value := c.Param("val") - errands, err := s.GetErrandsBy(func( errand *schemas.Errand ) bool { + errands, err := s.GetErrandsBy(func(errand *schemas.Errand) bool { switch key { case "status": - return ( errand.Status == value ) + return (errand.Status == value) case "type": - return ( errand.Type == value ) + return (errand.Type == value) default: return false } @@ -147,40 +116,38 @@ func ( s *ErrandsServer ) getFilteredErrands( c *gin.Context ){ if err != nil { c.JSON(http.StatusInternalServerError, gin.H{ "message": "Internal Server Error!", - "error": err.Error(), + "error": err.Error(), }) return } c.JSON(http.StatusOK, gin.H{ - "status": "OK", + "status": "OK", "results": errands, }) } - - - type filteredUpdateReq struct { - Status string `json:"status"` - Delete bool `json:"delete"` + Status string `json:"status"` + Delete bool `json:"delete"` } -func ( s *ErrandsServer ) updateFilteredErrands( c *gin.Context ){ + +func (s *ErrandsServer) updateFilteredErrands(c *gin.Context) { key := c.Param("key") value := c.Param("val") var updateReq filteredUpdateReq if err := c.ShouldBind(&updateReq); err != nil { c.JSON(http.StatusBadRequest, gin.H{ "message": "Invalid Parameters", - "error": err.Error(), + "error": err.Error(), }) return } - errands, err := s.GetErrandsBy(func( errand *schemas.Errand ) bool { + errands, err := s.GetErrandsBy(func(errand *schemas.Errand) bool { switch key { case "status": - return ( errand.Status == value ) + return (errand.Status == value) case "type": - return ( errand.Type == value ) + return (errand.Type == value) default: return false } @@ -188,12 +155,13 @@ func ( s *ErrandsServer ) updateFilteredErrands( c *gin.Context ){ if err == nil { for _, errand := range errands { if updateReq.Delete == true { - err = s.deleteErrandByID( errand.ID ); if err != nil { + err = s.deleteErrandByID(errand.ID) + if err != nil { break } - }else { + } else { if updateReq.Status != "" { - _, err = s.UpdateErrandByID( errand.ID, func( e *schemas.Errand ) error { + _, err = s.UpdateErrandByID(errand.ID, func(e *schemas.Errand) error { e.Status = updateReq.Status return nil }) @@ -207,138 +175,70 @@ func ( s *ErrandsServer ) updateFilteredErrands( c *gin.Context ){ if err != nil { c.JSON(http.StatusInternalServerError, gin.H{ "message": "Internal Server Error!", - "error": err.Error(), + "error": err.Error(), }) return } c.JSON(http.StatusOK, gin.H{ "status": "OK", - "count": len( errands ), + "count": len(errands), }) } - - - - - -func ( s *ErrandsServer ) processErrand( c *gin.Context ){ - var procErrand *schemas.Errand - errands := make([]*schemas.Errand, 0) - hasFound := false +func (s *ErrandsServer) processErrand(c *gin.Context) { + var procErrand schemas.Errand + errands := make([]schemas.Errand, 0) typeFilter := c.Param("type") - err := s.DB.Update(func(txn *badger.Txn) error { - opts := badger.DefaultIteratorOptions - opts.PrefetchSize = 50 - it := txn.NewIterator( opts ) - defer it.Close() - for it.Rewind(); it.Valid(); it.Next() { - item := it.Item() - err := item.Value(func( v []byte ) error { - - errand := &schemas.Errand{} - err := errand.UnmarshalJSON( v ); if err != nil { - return err - } - if errand.Status != "inactive" { - return nil - } - if errand.Type != typeFilter { - return nil - } - // Add to list of errands we could possibly process: - errands = append( errands, errand ) - return nil - }) - if err != nil { - return err - } + for _, itemObj := range s.Store.Items() { + item := itemObj.Object.(schemas.Errand) + if item.Status != "inactive" { + continue } - - // Of the possible errands to process, sort them by date & priority: - if len( errands ) > 0 { - sort.SliceStable(errands, func(i, j int) bool { - return errands[i].Created < errands[j].Created - }) - sort.SliceStable(errands, func(i, j int) bool { - return errands[i].Options.Priority > errands[j].Options.Priority - }) - procErrand = errands[0] - // We are processing this errand: - procErrand.Started = utils.GetTimestamp() - procErrand.Attempts += 1 - procErrand.Status = "active" - procErrand.Progress = 0.0 - if err := procErrand.AddToLogs("INFO", "Started!"); err != nil { - return err - } - err := s.saveErrand( txn, procErrand ); if err != nil { - return err - } - hasFound = true + if item.Type != typeFilter { + continue } - - return nil - }) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{ - "message": "Internal Server Error!", - "error": err.Error(), - }) - return + // Add to list of errands we could possibly process: + errands = append(errands, item) } - if !hasFound { + + if len(errands) == 0 { c.JSON(http.StatusNotFound, gin.H{ "message": "No jobs", }) return } - s.AddNotification( "processing", procErrand ) + + // Of the possible errands to process, sort them by date & priority: + sort.SliceStable(errands, func(i, j int) bool { + return errands[i].Created < errands[j].Created + }) + sort.SliceStable(errands, func(i, j int) bool { + return errands[i].Options.Priority > errands[j].Options.Priority + }) + procErrand = errands[0] + // We are processing this errand: + procErrand.Started = utils.GetTimestamp() + procErrand.Attempts += 1 + procErrand.Status = "active" + procErrand.Progress = 0.0 + _ = procErrand.AddToLogs("INFO", "Started!") + s.saveErrand(&procErrand) + + s.AddNotification("processing", &procErrand) c.JSON(http.StatusOK, gin.H{ - "status": "OK", + "status": "OK", "results": procErrand, }) } - - - - - - - -func ( s *ErrandsServer ) GetErrandsBy( fn func ( *schemas.Errand ) bool ) ( []*schemas.Errand, error ) { - errands := make([]*schemas.Errand, 0) - err := s.DB.View(func( txn *badger.Txn ) error { - opts := badger.DefaultIteratorOptions - opts.PrefetchSize = 50 - it := txn.NewIterator( opts ) - defer it.Close() - for it.Rewind(); it.Valid(); it.Next() { - item := it.Item() - err := item.Value(func( v []byte ) error { - errand := &schemas.Errand{} - err := errand.UnmarshalJSON( v ); if err != nil { - return err - } - if( fn( errand ) ){ - errands = append( errands, errand ) - } - return nil - }) - if err != nil { - return err - } +func (s *ErrandsServer) GetErrandsBy(fn func(*schemas.Errand) bool) ([]schemas.Errand, error) { + errands := make([]schemas.Errand, 0) + for _, itemObj := range s.Store.Items() { + errand := itemObj.Object.(schemas.Errand) + if fn(&errand) { + errands = append(errands, errand) } - return nil - }) - return errands, err + } + return errands, nil } - - - - - - - diff --git a/errands-server.go b/errands-server.go index 6bcea80..1747073 100644 --- a/errands-server.go +++ b/errands-server.go @@ -1,96 +1,100 @@ - package main - import ( - "log" + "time" + + log "github.com/sirupsen/logrus" // "time" - "reflect" "net/http" - gin "github.com/gin-gonic/gin" + "reflect" + cors "github.com/gin-contrib/cors" + gin "github.com/gin-gonic/gin" // gzip "github.com/gin-contrib/gzip" - badger "github.com/dgraph-io/badger" + binding "github.com/gin-gonic/gin/binding" - validator "gopkg.in/go-playground/validator.v8" + store "github.com/polygon-io/errands-server/memorydb" schemas "github.com/polygon-io/errands-server/schemas" + validator "gopkg.in/go-playground/validator.v8" ) - - - - - - //easyjson:json type Notification struct { - Event string `json:"event"` - Errand schemas.Errand `json:"errand,omitempty"` + Event string `json:"event"` + Errand schemas.Errand `json:"errand,omitempty"` } - type ErrandsServer struct { - StorageDir string - Port string - DB *badger.DB - Server *http.Server - API *gin.Engine - ErrandsRoutes *gin.RouterGroup - ErrandRoutes *gin.RouterGroup - Notifications chan *Notification - StreamClients []*Client - RegisterClient chan *Client - UnregisterClient chan *Client + StorageDir string + Port string + Store *store.MemoryStore + Server *http.Server + API *gin.Engine + ErrandsRoutes *gin.RouterGroup + ErrandRoutes *gin.RouterGroup + Notifications chan *Notification + StreamClients []*Client + RegisterClient chan *Client + UnregisterClient chan *Client + periodicSave bool } - - - - - -func NewErrandsServer( cfg *Config ) *ErrandsServer { +func NewErrandsServer(cfg *Config) *ErrandsServer { obj := &ErrandsServer{ - StorageDir: cfg.Storage, - Port: cfg.Port, - StreamClients: make([]*Client, 0 ), - RegisterClient: make( chan *Client, 10 ), - UnregisterClient: make( chan *Client, 10 ), - Notifications: make(chan *Notification, 100), + StorageDir: cfg.Storage, + Port: cfg.Port, + StreamClients: make([]*Client, 0), + RegisterClient: make(chan *Client, 10), + UnregisterClient: make(chan *Client, 10), + Notifications: make(chan *Notification, 100), + Store: store.New(), + periodicSave: true, } go obj.createAPI() go obj.broadcastLoop() - obj.createDB() + if err := obj.Store.LoadFile(cfg.Storage); err != nil { + log.Warning("Could not load data from previous DB file.") + log.Warning("If this is your first time running, this is normal.") + log.Warning("If not please check the contents of your file: ", cfg.Storage) + } + go obj.periodicallySaveDB() return obj } -func ( s *ErrandsServer ) AddNotification( event string, errand *schemas.Errand ){ +func (s *ErrandsServer) periodicallySaveDB() { + for { + time.Sleep(60 * time.Second) + if !s.periodicSave { + return + } + log.Info("Checkpoint saving DB to file...") + if err := s.Store.SaveFile(cfg.Storage); err != nil { + log.Error("----- Error checkpoint saving the DB to file -----") + log.Error(err) + } + } +} + +func (s *ErrandsServer) AddNotification(event string, errand *schemas.Errand) { obj := &Notification{ - Event: event, + Event: event, Errand: *errand, } s.Notifications <- obj } -func ( s *ErrandsServer ) broadcastLoop(){ - // go func(){ - // for { - // s.Notifications <- &Notification{ - // Event: "heartbeat", - // } - // time.Sleep(2 * time.Second) - // } - // }() +func (s *ErrandsServer) broadcastLoop() { for { select { - case client := <- s.RegisterClient: - s.StreamClients = append( s.StreamClients, client ) - case client := <- s.UnregisterClient: + case client := <-s.RegisterClient: + s.StreamClients = append(s.StreamClients, client) + case client := <-s.UnregisterClient: for i, c := range s.StreamClients { if c == client { s.StreamClients = append(s.StreamClients[:i], s.StreamClients[i+1:]...) } } - case not := <- s.Notifications: + case not := <-s.Notifications: for _, client := range s.StreamClients { notificationCopy := &Notification{} *notificationCopy = *not @@ -100,25 +104,26 @@ func ( s *ErrandsServer ) broadcastLoop(){ } } - -func ( s *ErrandsServer ) kill(){ +func (s *ErrandsServer) kill() { s.killAPI() for _, client := range s.StreamClients { client.Gone() } s.killDB() } -func ( s *ErrandsServer) killAPI(){ + +func (s *ErrandsServer) killAPI() { log.Println("Closing the HTTP Server") s.Server.Close() } -func ( s *ErrandsServer ) killDB(){ + +func (s *ErrandsServer) killDB() { log.Println("Closing the DB") - s.DB.Close() + if err := s.Store.SaveFile(cfg.Storage); err != nil { + log.Fatal(err) + } } - - func UserStructLevelValidation(v *validator.Validate, structLevel *validator.StructLevel) { errand := structLevel.CurrentStruct.Interface().(schemas.Errand) if errand.Options.TTL < 5 && errand.Options.TTL != 0 { @@ -128,25 +133,12 @@ func UserStructLevelValidation(v *validator.Validate, structLevel *validator.Str } } - - -func ( s *ErrandsServer ) createDB(){ - opts := badger.DefaultOptions - opts.Dir = s.StorageDir - opts.ValueDir = s.StorageDir - var err error - s.DB, err = badger.Open( opts ); if err != nil { - log.Fatal( err ) - } -} - - -func ( s *ErrandsServer) createAPI(){ +func (s *ErrandsServer) createAPI() { s.API = gin.Default() CORSconfig := cors.DefaultConfig() - CORSconfig.AllowOriginFunc = func( origin string ) bool { + CORSconfig.AllowOriginFunc = func(origin string) bool { // fmt.Println("Connection from", origin) return true } @@ -172,7 +164,6 @@ func ( s *ErrandsServer) createAPI(){ s.ErrandRoutes.POST("/:id/retry", s.retryErrand) } - // Errands Routes s.ErrandsRoutes = s.API.Group("/v1/errands") { @@ -191,8 +182,8 @@ func ( s *ErrandsServer) createAPI(){ } s.Server = &http.Server{ - Addr: s.Port, - Handler: s.API, + Addr: s.Port, + Handler: s.API, } log.Println("Starting server on port:", s.Port) @@ -201,7 +192,3 @@ func ( s *ErrandsServer) createAPI(){ } } - - - - diff --git a/errands.db b/errands.db new file mode 100644 index 0000000..8781788 Binary files /dev/null and b/errands.db differ diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..47be941 --- /dev/null +++ b/go.mod @@ -0,0 +1,25 @@ +module github.com/polygon-io/errands-server + +go 1.12 + +require ( + github.com/gin-contrib/cors v0.0.0-20170318125340-cf4846e6a636 + github.com/gin-contrib/sse v0.0.0-20190301062529-5545eab6dad3 // indirect + github.com/gin-gonic/gin v1.3.0 + github.com/golang/protobuf v1.2.0 // indirect + github.com/google/uuid v1.1.1 + github.com/json-iterator/go v1.1.6 // indirect + github.com/kelseyhightower/envconfig v1.3.0 + github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe + github.com/mattn/go-isatty v0.0.7 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.1 // indirect + github.com/patrickmn/go-cache v2.1.0+incompatible + github.com/sirupsen/logrus v1.4.2 + github.com/ugorji/go/codec v0.0.0-20181022190402-e5e69e061d4f // indirect + golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53 // indirect + golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 // indirect + gopkg.in/go-playground/assert.v1 v1.2.1 // indirect + gopkg.in/go-playground/validator.v8 v8.18.2 + gopkg.in/yaml.v2 v2.2.2 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..dc66b19 --- /dev/null +++ b/go.sum @@ -0,0 +1,55 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gin-contrib/cors v0.0.0-20170318125340-cf4846e6a636 h1:oGgJA7DJphAc81EMHZ+2G7Ai2xyg5eoq7bbqzCsiWFc= +github.com/gin-contrib/cors v0.0.0-20170318125340-cf4846e6a636/go.mod h1:cw+u9IsAkC16e42NtYYVCLsHYXE98nB3M7Dr9mLSeH4= +github.com/gin-contrib/sse v0.0.0-20190301062529-5545eab6dad3 h1:t8FVkw33L+wilf2QiWkw0UV77qRpcH/JHPKGpKa2E8g= +github.com/gin-contrib/sse v0.0.0-20190301062529-5545eab6dad3/go.mod h1:VJ0WA2NBN22VlZ2dKZQPAPnyWw5XTlK1KymzLKsr59s= +github.com/gin-gonic/gin v1.3.0 h1:kCmZyPklC0gVdL728E6Aj20uYBJV93nj/TkwBTKhFbs= +github.com/gin-gonic/gin v1.3.0/go.mod h1:7cKuhb5qV2ggCFctp2fJQ+ErvciLZrIeoOSOm6mUr7Y= +github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/json-iterator/go v1.1.6 h1:MrUvLMLTMxbqFJ9kzlvat/rYZqZnW3u4wkLzWTaFwKs= +github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= +github.com/kelseyhightower/envconfig v1.3.0 h1:IvRS4f2VcIQy6j4ORGIf9145T/AsUB+oY8LyvN8BXNM= +github.com/kelseyhightower/envconfig v1.3.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= +github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe h1:W/GaMY0y69G4cFlmsC6B9sbuo2fP8OFP1ABjt4kPz+w= +github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= +github.com/mattn/go-isatty v0.0.7 h1:UvyT9uN+3r7yLEYSlJsbQGdsaB/a0DlgWP3pql6iwOc= +github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= +github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= +github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/ugorji/go/codec v0.0.0-20181022190402-e5e69e061d4f h1:y3Vj7GoDdcBkxFa2RUUFKM25TrBbWVDnjRDI0u975zQ= +github.com/ugorji/go/codec v0.0.0-20181022190402-e5e69e061d4f/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53 h1:kcXqo9vE6fsZY5X5Rd7R1l7fTgnWaDCVmln65REefiE= +golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/go-playground/assert.v1 v1.2.1 h1:xoYuJVE7KT85PYWrN730RguIQO0ePzVRfFMXadIrXTM= +gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8bDuhia5mkpMnE= +gopkg.in/go-playground/validator.v8 v8.18.2 h1:lFB4DoMU6B626w8ny76MV7VX6W2VHct2GVOI3xgiMrQ= +gopkg.in/go-playground/validator.v8 v8.18.2/go.mod h1:RX2a/7Ha8BgOhfk7j780h4/u/RRjR0eouCJSH80/M2Y= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/main.go b/main.go index 2a42464..7d6627e 100644 --- a/main.go +++ b/main.go @@ -1,16 +1,13 @@ - package main import ( "os" - "log" "os/signal" + envconfig "github.com/kelseyhightower/envconfig" + log "github.com/sirupsen/logrus" ) - - - /* ENVIRONMENT VARIABLES: @@ -22,42 +19,38 @@ import ( ERRANDS_STORAGE="/errands/" - Will change the DB location to /errands/ - */ +*/ var cfg Config + type Config struct { - Storage string `split_words:"true" default:"./errands"` - Port string `split_words:"true" default:":5555"` + Storage string `split_words:"true" default:"./errands.db"` + Port string `split_words:"true" default:":5555"` } - var server *ErrandsServer -func main(){ + +func main() { // Parse Env Vars: - err := envconfig.Process( "ERRANDS", &cfg ); if err != nil { - log.Fatal( err ) + err := envconfig.Process("ERRANDS", &cfg) + if err != nil { + log.Fatal(err) } // trap SIGINT to trigger a shutdown. signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) - server = NewErrandsServer( &cfg ) - log.Println("listening for signals") + server = NewErrandsServer(&cfg) + log.Info("listening for signals") for { select { - case <-signals: - // Logger.Info("main: done. exiting") - log.Println("Exiting") - server.kill() - return + case <-signals: + // Logger.Info("main: done. exiting") + log.Info("Exiting") + server.kill() + return } } } - - - - - - diff --git a/memorydb/memorydb.go b/memorydb/memorydb.go new file mode 100644 index 0000000..9baac31 --- /dev/null +++ b/memorydb/memorydb.go @@ -0,0 +1,37 @@ +package memorydb + +import ( + cache "github.com/patrickmn/go-cache" + "github.com/sirupsen/logrus" +) + +/* + +In memory Key/Value store +- Load from flat file +- Store to flat file when exiting +- Store to flat file periodically as backup + +Methods: +- Create +- Read/Get +- Update +- Delete + +*/ + +type MemoryStore struct { + *cache.Cache +} + +func New() *MemoryStore { + obj := &MemoryStore{ + cache.New(cache.NoExpiration, 0), + } + return obj +} + +func LoadDBFrom(dbLocation string) error { + logrus.Println("Loaded memory DB from file: ", dbLocation) + return nil +} diff --git a/schemas/schemas.go b/schemas/schemas.go index 15e627a..e2d5d05 100644 --- a/schemas/schemas.go +++ b/schemas/schemas.go @@ -1,4 +1,3 @@ - package schemas import ( @@ -8,81 +7,71 @@ import ( utils "github.com/polygon-io/errands-server/utils" ) - - - - var ErrandStatuses []string = []string{"inactive", "active", "failed", "completed"} + //easyjson:json type Errand struct { // General Attributes: - ID string `json:"id"` - Name string `json:"name" binding:"required"` - Type string `json:"type" binding:"required"` - Options struct { - TTL int `json:"ttl,omitempty"` - Retries int `json:"retries,omitempty"` - Priority int `json:"priority,omitempty"` - DeleteOnCompleted bool `json:"deleteOnCompleted,omitempty"` - } `json:"options"` - Data map[string]interface{} `json:"data,omitempty"` - Created int64 `json:"created"` - Status string `json:"status,omitempty"` - Results map[string]interface{} `json:"results,omitempty"` + ID string `json:"id"` + Name string `json:"name" binding:"required"` + Type string `json:"type" binding:"required"` + Options struct { + TTL int `json:"ttl,omitempty"` + Retries int `json:"retries,omitempty"` + Priority int `json:"priority,omitempty"` + DeleteOnCompleted bool `json:"deleteOnCompleted,omitempty"` + } `json:"options"` + Data map[string]interface{} `json:"data,omitempty"` + Created int64 `json:"created"` + Status string `json:"status,omitempty"` + Results map[string]interface{} `json:"results,omitempty"` // Internal attributes: - Progress float64 `json:"progress"` - Attempts int `json:"attempts"` - Started int64 `json:"started,omitempty"` // Timestamp of last Start - Failed int64 `json:"failed,omitempty"` // Timestamp of last Fail - Completed int64 `json:"compelted,omitempty"` // Timestamp of last Fail - Logs []Log `json:"logs,omitempty"` + Progress float64 `json:"progress"` + Attempts int `json:"attempts"` + Started int64 `json:"started,omitempty"` // Timestamp of last Start + Failed int64 `json:"failed,omitempty"` // Timestamp of last Fail + Completed int64 `json:"compelted,omitempty"` // Timestamp of last Fail + Logs []Log `json:"logs,omitempty"` } +var LogSeverities []string = []string{"INFO", "WARNING", "ERROR"} -var LogSeverities []string = []string{ "INFO", "WARNING", "ERROR" } //easyjson:json type Log struct { - Severity string `json:"severity" binding:"required"` - Message string `json:"message" binding:"required"` - Timestamp int64 `json:"timestamp"` + Severity string `json:"severity" binding:"required"` + Message string `json:"message" binding:"required"` + Timestamp int64 `json:"timestamp"` } - - - func NewErrand() *Errand { obj := &Errand{} obj.SetDefaults() return obj } - - -func ( e *Errand ) SetDefaults(){ +func (e *Errand) SetDefaults() { uid := uuid.New() - uidText, err := uid.MarshalText(); if err != nil { - panic( err ) + uidText, err := uid.MarshalText() + if err != nil { + panic(err) } - e.ID = string( uidText ) + e.ID = string(uidText) e.Status = "inactive" e.Created = utils.GetTimestamp() - e.Logs = make( []Log, 0 ) + e.Logs = make([]Log, 0) } - -func ( e *Errand ) AddToLogs( severity, message string ) error { - if !utils.Contains( LogSeverities, severity ) { +func (e *Errand) AddToLogs(severity, message string) error { + if !utils.Contains(LogSeverities, severity) { return errors.New("Invalid log severity") } obj := Log{ - Severity: severity, - Message: message, + Severity: severity, + Message: message, Timestamp: utils.GetTimestamp(), } - e.Logs = append( e.Logs, obj ) + e.Logs = append(e.Logs, obj) return nil } - - diff --git a/utils/utils.go b/utils/utils.go index f802154..a7d538d 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -1,22 +1,18 @@ - package utils - import ( "time" ) - func GetTimestamp() int64 { - return ( time.Now().UnixNano() / 1000000 ) + return (time.Now().UnixNano() / 1000000) } - func Contains(s []string, e string) bool { - for _, a := range s { - if a == e { - return true - } - } - return false + for _, a := range s { + if a == e { + return true + } + } + return false }