Skip to content

Commit

Permalink
#7 Start using a queue to process index update. TODO: Propagate usage…
Browse files Browse the repository at this point in the history
… of a queue instead of direct updating in the indexation code.
  • Loading branch information
Estelle Maudet committed Oct 29, 2018
1 parent e4acc0b commit bb80aa5
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 10 deletions.
38 changes: 38 additions & 0 deletions pkg/fulltext/indexation/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package indexation

import (
"bytes"
"errors"
"fmt"
"io/ioutil"
"net/http"
Expand Down Expand Up @@ -66,6 +67,8 @@ var prefixPath string

var ft_language *FastText

var updateQueue chan string

func StartIndex(instance *instance.Instance) error {

inst = instance
Expand Down Expand Up @@ -178,6 +181,18 @@ func AllIndexesUpdate() error {
return nil
}

func IndexUpdateDoctype(docType string) error {

for _, docIndexes := range indexes {
if docIndexes.docType == docType {
return IndexUpdate(docIndexes)
}
}
err := errors.New("doctype " + docType + " index not found.")
fmt.Printf("%s\n", err)
return err
}

func IndexUpdate(docIndexes documentIndexes) error {

docIndexes.updateMu.Lock()
Expand Down Expand Up @@ -409,3 +424,26 @@ func GetStoreSeq(index *bleve.Index) (string, error) {
res, err := (*index).GetInternal([]byte("seq"))
return string(res), err
}

func StartWorker() {

updateQueue = make(chan string, 10)

go func(updateQueue <-chan string) {
for docType := range updateQueue {
IndexUpdateDoctype(docType) // TODO: deal with errors
}
}(updateQueue)
}

func AddUpdateIndexJobs(doctypeUpdateList []string) error {
for _, docType := range doctypeUpdateList {
select {
case updateQueue <- docType:
continue
default:
return errors.New("Update Queue is full, can't add new doctype to the update queue for now (docTypes before " + docType + " were correctly added to update queue).")
}
}
return nil
}
10 changes: 1 addition & 9 deletions pkg/instance/triggers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,14 @@ import (

// Triggers returns the list of the triggers to add when an instance is created
func Triggers(db prefixer.Prefixer) []jobs.TriggerInfos {
// Create/update/remove thumbnails when an image is created/updated/removed
return []jobs.TriggerInfos{
// Create/update/remove thumbnails when an image is created/updated/removed
{
Domain: db.DomainName(),
Prefix: db.DBPrefix(),
Type: "@event",
WorkerType: "thumbnail",
Arguments: "io.cozy.files:CREATED,UPDATED,DELETED:image:class",
},
// Index all changes since last couchdb sequence every 2 min
{
Domain: db.DomainName(),
Prefix: db.DBPrefix(),
Type: "@every",
WorkerType: "indexupdate",
Arguments: "2m",
},
}
}
2 changes: 2 additions & 0 deletions pkg/stack/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,5 +158,7 @@ security features. Please do not use this binary as your production server.
results, _, _ := search.QueryIndex(request)
fmt.Println(results)

indexation.StartWorker()

return
}
30 changes: 29 additions & 1 deletion web/fulltext/fulltext.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func Routes(router *echo.Group) {
router.POST("/_search", SearchQuery)
router.POST("/_search_prefix", SearchQueryPrefix)
router.POST("/_reindex", Reindex)
router.POST("/_all_indexes_update", AllIndexesUpdate)
router.POST("/_index_update", IndexUpdate)
router.POST("/_update_index_alias/:doctype/:lang", ReplicateIndex)
}
Expand Down Expand Up @@ -93,7 +94,7 @@ func Reindex(c echo.Context) error {
return c.JSON(http.StatusOK, nil)
}

func IndexUpdate(c echo.Context) error {
func AllIndexesUpdate(c echo.Context) error {

err := indexation.AllIndexesUpdate()
if err != nil {
Expand All @@ -106,6 +107,33 @@ func IndexUpdate(c echo.Context) error {

}

func IndexUpdate(c echo.Context) error {

var body map[string]interface{}

if err := json.NewDecoder(c.Request().Body).Decode(&body); err != nil {
fmt.Printf("Error on decoding request: %s\n", err)
return c.JSON(http.StatusInternalServerError, echo.Map{
"error": errors.New("Could not decode the request"),
})
}

if doctypeUpdate, ok := body["docTypes"].([]interface{}); ok {
doctypeUpdateList := make([]string, len(doctypeUpdate))
for i, d := range doctypeUpdate {
doctypeUpdateList[i] = d.(string)
}
err := indexation.AddUpdateIndexJobs(doctypeUpdateList)
if err != nil {
return c.JSON(http.StatusInternalServerError, echo.Map{
"error": err.Error(),
})
}
} // Else...

return c.JSON(http.StatusOK, nil)
}

func ReplicateIndex(c echo.Context) error {

// TODO : see how to deal with permissions
Expand Down

0 comments on commit bb80aa5

Please sign in to comment.