Skip to content

Commit

Permalink
Remove LibreChat and use a standalone RAG server
Browse files Browse the repository at this point in the history
  • Loading branch information
nono committed Sep 18, 2024
1 parent 95b677d commit 8660cc6
Show file tree
Hide file tree
Showing 11 changed files with 402 additions and 412 deletions.
14 changes: 7 additions & 7 deletions cozy.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,14 @@ jobs:
# List of available workers:
#
# - "clean-clients": delete unused OAuth clients
# - "index": send data to a RAG for being indexed
# - "export": exporting data from a cozy instance
# - "import": importing data into a cozy instance
# - "konnector": launching konnectors
# - "service": launching services
# - "migrations": transforming a VFS with Swift to layout v3
# - "notes-save": saving notes to the VFS
# - "rag-index": send data to the RAG server for being indexed
# - "rag-query": send a query to the RAG server
# - "push": sending push notifications
# - "sms": sending SMS notifications
# - "sendmail": sending mails
Expand Down Expand Up @@ -201,15 +202,14 @@ konnectors:
# cmd: ./scripts/konnector-rkt-run.sh # run connectors with rkt
# cmd: ./scripts/konnector-nsjail-node8-run.sh # run connectors with nsjail

# external indexers are used for indexing files and doctypes in vector
# databases.
external_indexers:
# A cozy will use the indexers for its context, or if the context is not
# rag are the URL of the RAG server(s) for AI.
rag:
# A cozy will use the rag server for its context, or if the context is not
# declared, for default.
default:
- http://localhost:8000
url: http://localhost:8000
beta:
- http://localhost:8001
url: http://localhost:8001

# mail service parameters for sending email via SMTP
mail:
Expand Down
23 changes: 9 additions & 14 deletions docs/ai.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,21 @@ AI can be used for interacting with the personal data of a Cozy. This is
currently an experimental feature. Retrieval-Augmented Generation (RAG) is
a classical pattern in the AI world. Here, it is specific to each Cozy.

[LibreChat](https://www.librechat.ai/) has been integrated this way:

![Architecture with LibreChat](diagrams/librechat.svg)
![Architecture with a RAG server](diagrams/ai.svg)

## Indexation

First of all, LibreChat and the RAG must be installed with their dependencies.
It is not mandatory to install them on the same servers as the cozy-stack. And
the URL of RAG must be filled in cozy-stack configuration file (in
`external_indexers`).
First of all, the RAG server must be installed with its dependencies. It is
not mandatory to install them on the same servers as the cozy-stack. And the
URL of RAG must be filled in cozy-stack configuration file (in `rag`).

For the moment, the feature is experimental, and a trigger must be created
manually on the Cozy:

```sh
$ COZY=cozy.localhost:8080
$ TOKEN=$(cozy-stack instances token-cli $COZY io.cozy.triggers)
$ curl "http://${COZY}/jobs/triggers" -H "Authorization: Bearer $TOKEN" -d '{ "data": { "attributes": { "type": "@event", "arguments": "io.cozy.files", "debounce": "1m", "worker": "index", "message": {"doctype": "io.cozy.files"} } } }'
$ curl "http://${COZY}/jobs/triggers" -H "Authorization: Bearer $TOKEN" -d '{ "data": { "attributes": { "type": "@event", "arguments": "io.cozy.files", "debounce": "1m", "worker": "rag-index", "message": {"doctype": "io.cozy.files"} } } }'
```

It can also be a good idea to start a first indexation with:
Expand All @@ -38,11 +35,9 @@ In practice, when files are uploaded/modified/deleted, the trigger will create
a job for the index worker (with debounce). The index worker will look at the
changed feed, and will call the RAG for each entry in the changes feed.


## Chat

When a user starts a chat in LibreChat, their prompts are sent to the RAG that
can use the vector database to find relevant documents (technically, only some
parts of the documents called chunks). Those documents are sent back to
LibreChat that can be added to the prompt, so that the LLM can use them as a
context when answering.
When a user starts a chat, their prompts are sent to the RAG that can use the
vector database to find relevant documents (technically, only some parts of
the documents called chunks). Those documents are added to the prompt, so
that the LLM can use them as a context when answering.
11 changes: 2 additions & 9 deletions docs/diagrams/librechat.d2 → docs/diagrams/ai.d2
Original file line number Diff line number Diff line change
@@ -1,24 +1,17 @@
# https://d2lang.com/

stack: {label: "Cozy-Stack"}
librechat: {label: "Librechat"}
rag: {label: "RAG"}
llm: {label: "LLM"; shape: diamond}
embed: {label: "Embeddings model"; shape: diamond}
vector: {label: "Vector DB"; shape: cylinder}
couchdb: {label: "CouchDB"; shape: cylinder}
swift: {label: "Swift"; shape: cylinder}
mongodb: {label: "MongoDB"; shape: cylinder}

stack -> rag: ask to generate/delete embeddings
stack <-> librechat: Shared authentication
librechat -> rag: ask context for a query

stack -> rag
stack -> couchdb
stack -> swift

librechat -> llm
librechat -> mongodb

rag -> embed
rag -> vector
rag -> llm
101 changes: 101 additions & 0 deletions docs/diagrams/ai.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
113 changes: 0 additions & 113 deletions docs/diagrams/librechat.svg

This file was deleted.

11 changes: 5 additions & 6 deletions model/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,13 +439,12 @@ func (i *Instance) Registries() []*url.URL {
return context
}

// ExternalIndexers returns the URLs of the external indexers (RAG) where
// documents of this instance can be indexed.
func (i *Instance) ExternalIndexers() []string {
contexts := config.GetConfig().ExternalIndexers
// RAGServer returns the RAG server for the instance (AI features).
func (i *Instance) RAGServer() config.RAGServer {
contexts := config.GetConfig().RAGServers
if i.ContextName != "" {
if indexers, ok := contexts[i.ContextName]; ok {
return indexers
if server, ok := contexts[i.ContextName]; ok {
return server
}
}
return contexts[config.DefaultInstanceContext]
Expand Down
229 changes: 229 additions & 0 deletions model/rag/index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
package rag

import (
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"strings"

"github.com/cozy/cozy-stack/model/instance"
"github.com/cozy/cozy-stack/model/job"
"github.com/cozy/cozy-stack/model/vfs"
"github.com/cozy/cozy-stack/pkg/config/config"
"github.com/cozy/cozy-stack/pkg/consts"
"github.com/cozy/cozy-stack/pkg/couchdb"
"github.com/cozy/cozy-stack/pkg/couchdb/revision"
"github.com/cozy/cozy-stack/pkg/logger"
)

// BatchSize is the maximal number of documents manipulated at once by the
// worker.
const BatchSize = 100

type IndexMessage struct {
Doctype string `json:"doctype"`
}

func Index(inst *instance.Instance, logger logger.Logger, msg IndexMessage) error {
if msg.Doctype != consts.Files {
return errors.New("Only file can be indexed for the moment")
}

mu := config.Lock().ReadWrite(inst, "index/"+msg.Doctype)
if err := mu.Lock(); err != nil {
return err
}
defer mu.Unlock()

lastSeq, err := getLastSeqNumber(inst, msg.Doctype)
if err != nil {
return err
}
feed, err := callChangesFeed(inst, msg.Doctype, lastSeq)
if err != nil {
return err
}
if feed.LastSeq == lastSeq {
return nil
}

for _, change := range feed.Results {
if err := callRAGIndexer(inst, msg.Doctype, change); err != nil {
logger.Warnf("Index error: %s", err)
return err
}
}
_ = updateLastSequenceNumber(inst, msg.Doctype, feed.LastSeq)

if feed.Pending > 0 {
_ = pushJob(inst, msg.Doctype)
}

return nil
}

func callRAGIndexer(inst *instance.Instance, doctype string, change couchdb.Change) error {
if strings.HasPrefix(change.DocID, "_design/") {
return nil
}
if change.Doc.Get("type") == consts.DirType {
return nil
}

ragServer := inst.RAGServer()
if ragServer.URL == "" {
return errors.New("no RAG server configured")
}
u, err := url.Parse(ragServer.URL)
if err != nil {
return err
}
u.Path = fmt.Sprintf("/docs/%s/%s/%s", inst.Domain, doctype, change.DocID)
if change.Deleted {
req, err := http.NewRequest(http.MethodDelete, u.String(), nil)
if err != nil {
return err
}
res, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode >= 500 {
return fmt.Errorf("DELETE status code: %d", res.StatusCode)
}
} else {
md5sum := fmt.Sprintf("%x", change.Doc.Get("md5sum"))
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
if err != nil {
return err
}
res, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer res.Body.Close()

// When the content has not changed, there is no need to regenerate
// an embedding.
needIndexation := false
switch res.StatusCode {
case 200:
var metadata map[string]interface{}
if err = json.NewDecoder(res.Body).Decode(&metadata); err != nil {
return err
}
needIndexation = metadata["md5sum"] != md5sum
case 404:
needIndexation = true
default:
return fmt.Errorf("GET status code: %d", res.StatusCode)
}
if !needIndexation {
// TODO we should patch the metadata in the vector db when a
// file has been moved/renamed.
return nil
}

dirID, _ := change.Doc.Get("dir_id").(string)
name, _ := change.Doc.Get("name").(string)
mime, _ := change.Doc.Get("mime").(string)
internalID, _ := change.Doc.Get("internal_vfs_id").(string)
u.RawQuery = url.Values{
"dir_id": []string{dirID},
"name": []string{name},
"md5sum": []string{md5sum},
}.Encode()
fs := inst.VFS()
fileDoc := &vfs.FileDoc{
Type: consts.FileType,
DocID: change.DocID,
DirID: dirID,
DocName: name,
InternalID: internalID,
}
// TODO notes with images
content, err := fs.OpenFile(fileDoc)
if err != nil {
return err
}
defer content.Close()
req, err = http.NewRequest(http.MethodPut, u.String(), content)
if err != nil {
return err
}
req.Header.Add("Content-Type", mime)
res, err = http.DefaultClient.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode >= 500 {
return fmt.Errorf("PUT status code: %d", res.StatusCode)
}
}
return nil
}

// getLastSeqNumber returns the last sequence number of the previous
// indexation for this doctype.
func getLastSeqNumber(inst *instance.Instance, doctype string) (string, error) {
result, err := couchdb.GetLocal(inst, doctype, "rag-index")
if couchdb.IsNotFoundError(err) {
return "", nil
}
if err != nil {
return "", err
}
seq, _ := result["last_seq"].(string)
return seq, nil
}

// updateLastSequenceNumber updates the last sequence number for this
// indexation if it's superior to the number in CouchDB.
func updateLastSequenceNumber(inst *instance.Instance, doctype, seq string) error {
result, err := couchdb.GetLocal(inst, doctype, "rag-index")
if err != nil {
if !couchdb.IsNotFoundError(err) {
return err
}
result = make(map[string]interface{})
} else {
if prev, ok := result["last_seq"].(string); ok {
if revision.Generation(seq) <= revision.Generation(prev) {
return nil
}
}
}
result["last_seq"] = seq
return couchdb.PutLocal(inst, doctype, "rag-index", result)
}

// callChangesFeed fetches the last changes from the changes feed
// http://docs.couchdb.org/en/stable/api/database/changes.html
func callChangesFeed(inst *instance.Instance, doctype, since string) (*couchdb.ChangesResponse, error) {
return couchdb.GetChanges(inst, &couchdb.ChangesRequest{
DocType: doctype,
IncludeDocs: true,
Since: since,
Limit: BatchSize,
})
}

// pushJob adds a new job to continue on the pending documents in the changes
// feed.
func pushJob(inst *instance.Instance, doctype string) error {
msg, err := job.NewMessage(&IndexMessage{
Doctype: doctype,
})
if err != nil {
return err
}
_, err = job.System().PushJob(inst, &job.JobRequest{
WorkerType: "rag-index",
Message: msg,
})
return err
}
Loading

0 comments on commit 8660cc6

Please sign in to comment.