From 8851c7d0cd6fa2ff57c3ef40f21f0ffeadc04be6 Mon Sep 17 00:00:00 2001 From: Bruno Michel Date: Wed, 28 Aug 2024 18:17:35 +0200 Subject: [PATCH 01/10] Add external indexers for RAG --- cozy.example.yaml | 11 ++ docs/README.md | 1 + docs/ai.md | 46 +++++++ docs/diagrams/librechat.d2 | 24 ++++ docs/diagrams/librechat.svg | 113 +++++++++++++++++ docs/toc.yml | 1 + docs/workers.md | 6 + model/instance/instance.go | 12 ++ pkg/config/config/config.go | 26 ++++ web/jobs/jobs.go | 1 + worker/index/index.go | 242 ++++++++++++++++++++++++++++++++++++ 11 files changed, 483 insertions(+) create mode 100644 docs/ai.md create mode 100644 docs/diagrams/librechat.d2 create mode 100644 docs/diagrams/librechat.svg create mode 100644 worker/index/index.go diff --git a/cozy.example.yaml b/cozy.example.yaml index fbd56d8602e..7b7ce9c6c7a 100644 --- a/cozy.example.yaml +++ b/cozy.example.yaml @@ -140,6 +140,7 @@ jobs: # List of available workers: # # - "clean-clients": delete unused OAuth clients + # - "index": send data to a RAG for being indexed # - "export": exporting data from a cozy instance # - "import": importing data into a cozy instance # - "konnector": launching konnectors @@ -200,6 +201,16 @@ konnectors: # cmd: ./scripts/konnector-rkt-run.sh # run connectors with rkt # cmd: ./scripts/konnector-nsjail-node8-run.sh # run connectors with nsjail +# external indexers are used for indexing files and doctypes in vector +# databases. +external_indexers: + # A cozy will use the indexers for its context, or if the context is not + # declared, for default. + default: + - http://localhost:8000 + beta: + - http://localhost:8001 + # mail service parameters for sending email via SMTP mail: # mail noreply address - flags: --mail-noreply-address diff --git a/docs/README.md b/docs/README.md index 502c16ae827..35ac5e15e47 100644 --- a/docs/README.md +++ b/docs/README.md @@ -72,6 +72,7 @@ designing new services. ### List of services +- `/ai` - [AI](ai.md) - `/auth` - [Authentication & OAuth](auth.md) - [Delegated authentication](delegated-auth.md) - `/apps` - [Applications Management](apps.md) diff --git a/docs/ai.md b/docs/ai.md new file mode 100644 index 00000000000..abdd20ce9bf --- /dev/null +++ b/docs/ai.md @@ -0,0 +1,46 @@ +[Table of contents](README.md#table-of-contents) + +# AI for personal data + +## Introduction + +AI can be used for interacting with the personal data of a Cozy. This is +currently an experimental feature. Retrieval-Augmented Generation (RAG) is +a classical pattern in the AI world. Here, it is specific to each Cozy. + +[LibreChat](https://www.librechat.ai/) has been integrated this way: + +![Architecture with LibreChat](diagrams/librechat.svg) + +## Indexation + +First of all, LibreChat and the RAG must be installed with their dependencies. +It is not mandatory to install them on the same servers as the cozy-stack. And +the URL of RAG must be filled in cozy-stack configuration file (in +`external_indexers`). + +For the moment, the feature is experimental, and a trigger must be created +manually on the Cozy: + +```sh +$ curl http://cozy.localhost:8080/jobs/triggers -H "Authorization: Bearer $TOKEN" -d '{ "data": { "attributes": { "type": "@event", "arguments": "io.cozy.files", "debounce": "1m", "worker": "index", "message": {"doctype": "io.cozy.files"} } } }' +``` + +It can also be a good idea to start a first indexation with: + +```sh +$ cozy-stack triggers launch --domain cozy.localhost:8080 $TRIGGER_ID +``` + +In practice, when files are uploaded/modified/deleted, the trigger will create +a job for the index worker (with debounce). The index worker will look at the +changed feed, and will call the RAG for each entry in the changes feed. + + +## Chat + +When a user starts a chat in LibreChat, their prompts are sent to the RAG that +can use the vector database to find relevant documents (technically, only some +parts of the documents called chunks). Those documents are sent back to +LibreChat that can be added to the prompt, so that the LLM can use them as a +context when answering. diff --git a/docs/diagrams/librechat.d2 b/docs/diagrams/librechat.d2 new file mode 100644 index 00000000000..67ab6cd4da9 --- /dev/null +++ b/docs/diagrams/librechat.d2 @@ -0,0 +1,24 @@ +# https://d2lang.com/ + +stack: {label: "Cozy-Stack"} +librechat: {label: "Librechat"} +rag: {label: "RAG"} +llm: {label: "LLM"; shape: diamond} +embed: {label: "Embeddings model"; shape: diamond} +vector: {label: "Vector DB"; shape: cylinder} +couchdb: {label: "CouchDB"; shape: cylinder} +swift: {label: "Swift"; shape: cylinder} +mongodb: {label: "MongoDB"; shape: cylinder} + +stack -> rag: ask to generate/delete embeddings +stack <-> librechat: Shared authentication +librechat -> rag: ask context for a query + +stack -> couchdb +stack -> swift + +librechat -> llm +librechat -> mongodb + +rag -> embed +rag -> vector diff --git a/docs/diagrams/librechat.svg b/docs/diagrams/librechat.svg new file mode 100644 index 00000000000..6bb327ebc29 --- /dev/null +++ b/docs/diagrams/librechat.svg @@ -0,0 +1,113 @@ +Cozy-StackLibrechatRAGLLMEmbeddings modelVector DBCouchDBSwiftMongoDB ask to generate/delete embeddings Shared authenticationask context for a query + + + + + + + + + + + + + + diff --git a/docs/toc.yml b/docs/toc.yml index 3eeab163039..bc7b56acdf1 100644 --- a/docs/toc.yml +++ b/docs/toc.yml @@ -23,6 +23,7 @@ - "Sharing design": ./sharing-design.md - "Workflow of the konnectors": ./konnectors-workflow.md - List of services: + - "/ai - AI": ./ai.md - "/auth - Authentication & OAuth": ./auth.md - " /oidc - Delegated authentication": ./delegated-auth.md - "/apps - Applications Management": ./apps.md diff --git a/docs/workers.md b/docs/workers.md index 85be2df102a..48c2224b739 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -412,3 +412,9 @@ It can be launched from command-line with: ```sh $ cozy-stack jobs run migrations --domain example.mycozy.cloud --json '{"type": "to-swift-v3"}' ``` + +## index + +This worker is used for sending data to a RAG. It looks at the changes feed for +the given doctype, send the changes to an external indexer that will generate +embeddings for the data and put them in a vector database. diff --git a/model/instance/instance.go b/model/instance/instance.go index 16ea2aeddf2..26d451ae595 100644 --- a/model/instance/instance.go +++ b/model/instance/instance.go @@ -439,6 +439,18 @@ func (i *Instance) Registries() []*url.URL { return context } +// ExternalIndexers returns the URLs of the external indexers (RAG) where +// documents of this instance can be indexed. +func (i *Instance) ExternalIndexers() []string { + contexts := config.GetConfig().ExternalIndexers + if i.ContextName != "" { + if indexers, ok := contexts[i.ContextName]; ok { + return indexers + } + } + return contexts[config.DefaultInstanceContext] +} + // HasForcedOIDC returns true only if the instance is in a context where the // config says that the stack shouldn't allow to authenticate with the // password. diff --git a/pkg/config/config/config.go b/pkg/config/config/config.go index a8cff1760a6..9f4cde65798 100644 --- a/pkg/config/config/config.go +++ b/pkg/config/config/config.go @@ -123,6 +123,7 @@ type Config struct { CouchDB CouchDB Jobs Jobs Konnectors Konnectors + ExternalIndexers map[string][]string Mail *gomail.DialerOptions MailPerContext map[string]interface{} CampaignMail *gomail.DialerOptions @@ -557,6 +558,11 @@ func UseViper(v *viper.Viper) error { return err } + indexers, err := makeExternalIndexers(v) + if err != nil { + return err + } + regs, err := makeRegistries(v) if err != nil { return err @@ -848,6 +854,7 @@ func UseViper(v *viper.Viper) error { Konnectors: Konnectors{ Cmd: v.GetString("konnectors.cmd"), }, + ExternalIndexers: indexers, Move: Move{ URL: v.GetString("move.url"), }, @@ -1004,6 +1011,25 @@ func makeCouch(v *viper.Viper) (CouchDB, error) { return couch, nil } +func makeExternalIndexers(v *viper.Viper) (map[string][]string, error) { + indexers := make(map[string][]string) + for k, v := range v.GetStringMap("external_indexers") { + list, ok := v.([]interface{}) + if !ok { + return nil, fmt.Errorf( + "Bad format in the external_indexers section of the configuration file: "+ + "should be a list of strings, got %#v", v) + } + strings := []string{} + for _, item := range list { + str, _ := item.(string) + strings = append(strings, str) + } + indexers[k] = strings + } + return indexers, nil +} + func makeRegistries(v *viper.Viper) (map[string][]*url.URL, error) { regs := make(map[string][]*url.URL) diff --git a/web/jobs/jobs.go b/web/jobs/jobs.go index c0e88ee4549..f9dc4850307 100644 --- a/web/jobs/jobs.go +++ b/web/jobs/jobs.go @@ -31,6 +31,7 @@ import ( // import workers _ "github.com/cozy/cozy-stack/worker/archive" "github.com/cozy/cozy-stack/worker/exec" + _ "github.com/cozy/cozy-stack/worker/index" _ "github.com/cozy/cozy-stack/worker/log" _ "github.com/cozy/cozy-stack/worker/mails" _ "github.com/cozy/cozy-stack/worker/migrations" diff --git a/worker/index/index.go b/worker/index/index.go new file mode 100644 index 00000000000..b6ad8672613 --- /dev/null +++ b/worker/index/index.go @@ -0,0 +1,242 @@ +package index + +import ( + "encoding/json" + "errors" + "fmt" + "net/http" + "net/url" + "runtime" + "time" + + "github.com/cozy/cozy-stack/model/instance" + "github.com/cozy/cozy-stack/model/job" + "github.com/cozy/cozy-stack/model/vfs" + "github.com/cozy/cozy-stack/pkg/config/config" + "github.com/cozy/cozy-stack/pkg/consts" + "github.com/cozy/cozy-stack/pkg/couchdb" + "github.com/cozy/cozy-stack/pkg/couchdb/revision" +) + +func init() { + job.AddWorker(&job.WorkerConfig{ + WorkerType: "index", + Concurrency: runtime.NumCPU(), + MaxExecCount: 1, + Reserved: true, + Timeout: 5 * time.Minute, + WorkerFunc: Worker, + }) +} + +// BatchSize is the maximal number of documents manipulated at once by the +// worker. +const BatchSize = 100 + +type IndexMessage struct { + Doctype string `json:"doctype"` +} + +func Worker(ctx *job.TaskContext) error { + logger := ctx.Logger() + inst := ctx.Instance + var msg IndexMessage + if err := ctx.UnmarshalMessage(&msg); err != nil { + return err + } + logger.Debugf("Index %s", msg.Doctype) + if msg.Doctype != consts.Files { + return errors.New("Only file can be indexed for the moment") + } + + mu := config.Lock().ReadWrite(inst, "index/"+msg.Doctype) + if err := mu.Lock(); err != nil { + return err + } + defer mu.Unlock() + + lastSeq, err := getLastSeqNumber(inst, msg.Doctype) + if err != nil { + return err + } + feed, err := callChangesFeed(inst, msg.Doctype, lastSeq) + if err != nil { + return err + } + if feed.LastSeq == lastSeq { + return nil + } + + for _, change := range feed.Results { + if err := callExternalIndexers(inst, msg.Doctype, change); err != nil { + logger.Warnf("Index error: %s", err) + return err + } + } + _ = updateLastSequenceNumber(inst, msg.Doctype, feed.LastSeq) + + if feed.Pending > 0 { + _ = pushJob(inst, msg.Doctype) + } + + return nil +} + +func callExternalIndexers(inst *instance.Instance, doctype string, change couchdb.Change) error { + if change.Doc.Get("type") == consts.DirType { + return nil + } + + indexers := inst.ExternalIndexers() + for _, indexer := range indexers { + u, err := url.Parse(indexer) + if err != nil { + return err + } + u.Path = fmt.Sprintf("/docs/%s/%s/%s", inst.Domain, doctype, change.DocID) + if change.Deleted { + req, err := http.NewRequest(http.MethodDelete, u.String(), nil) + if err != nil { + return err + } + res, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + if res.StatusCode >= 500 { + return fmt.Errorf("DELETE status code: %d", res.StatusCode) + } + } else { + md5sum := fmt.Sprintf("%x", change.Doc.Get("md5sum")) + req, err := http.NewRequest(http.MethodGet, u.String(), nil) + if err != nil { + return err + } + res, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + + // When the content has not changed, there is no need to regenerate + // an embedding. + needIndexation := false + switch res.StatusCode { + case 200: + var metadata map[string]interface{} + if err = json.NewDecoder(res.Body).Decode(&metadata); err != nil { + return err + } + needIndexation = metadata["md5sum"] != md5sum + case 404: + needIndexation = true + default: + return fmt.Errorf("GET status code: %d", res.StatusCode) + } + if !needIndexation { + // TODO we should patch the metadata in the vector db when a + // file has been moved/renamed. + continue + } + + dirID, _ := change.Doc.Get("dir_id").(string) + name, _ := change.Doc.Get("name").(string) + mime, _ := change.Doc.Get("mime").(string) + internalID, _ := change.Doc.Get("internal_vfs_id").(string) + u.RawQuery = url.Values{ + "dir_id": []string{dirID}, + "name": []string{name}, + "md5sum": []string{md5sum}, + }.Encode() + fs := inst.VFS() + fileDoc := &vfs.FileDoc{ + Type: consts.FileType, + DocID: change.DocID, + DirID: dirID, + DocName: name, + InternalID: internalID, + } + content, err := fs.OpenFile(fileDoc) + if err != nil { + return err + } + defer content.Close() + req, err = http.NewRequest(http.MethodPut, u.String(), content) + if err != nil { + return err + } + req.Header.Add("Content-Type", mime) + res, err = http.DefaultClient.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + if res.StatusCode >= 500 { + return fmt.Errorf("PUT status code: %d", res.StatusCode) + } + } + } + return nil +} + +// getLastSeqNumber returns the last sequence number of the previous +// indexation for this doctype. +func getLastSeqNumber(inst *instance.Instance, doctype string) (string, error) { + result, err := couchdb.GetLocal(inst, doctype, "index") + if couchdb.IsNotFoundError(err) { + return "", nil + } + if err != nil { + return "", err + } + seq, _ := result["last_seq"].(string) + return seq, nil +} + +// updateLastSequenceNumber updates the last sequence number for this +// indexation if it's superior to the number in CouchDB. +func updateLastSequenceNumber(inst *instance.Instance, doctype, seq string) error { + result, err := couchdb.GetLocal(inst, doctype, "index") + if err != nil { + if !couchdb.IsNotFoundError(err) { + return err + } + result = make(map[string]interface{}) + } else { + if prev, ok := result["last_seq"].(string); ok { + if revision.Generation(seq) <= revision.Generation(prev) { + return nil + } + } + } + result["last_seq"] = seq + return couchdb.PutLocal(inst, doctype, "index", result) +} + +// callChangesFeed fetches the last changes from the changes feed +// http://docs.couchdb.org/en/stable/api/database/changes.html +func callChangesFeed(inst *instance.Instance, doctype, since string) (*couchdb.ChangesResponse, error) { + return couchdb.GetChanges(inst, &couchdb.ChangesRequest{ + DocType: doctype, + IncludeDocs: true, + Since: since, + Limit: BatchSize, + }) +} + +// pushJob adds a new job to continue on the pending documents in the changes +// feed. +func pushJob(inst *instance.Instance, doctype string) error { + msg, err := job.NewMessage(&IndexMessage{ + Doctype: doctype, + }) + if err != nil { + return err + } + _, err = job.System().PushJob(inst, &job.JobRequest{ + WorkerType: "index", + Message: msg, + }) + return err +} From 95b677db190d2927b1c6978dcfa1e7d31122055e Mon Sep 17 00:00:00 2001 From: Bruno Michel Date: Mon, 2 Sep 2024 15:21:08 +0200 Subject: [PATCH 02/10] Skip design_docs in index worker --- docs/ai.md | 6 ++++-- worker/index/index.go | 5 +++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/docs/ai.md b/docs/ai.md index abdd20ce9bf..f0526b51da3 100644 --- a/docs/ai.md +++ b/docs/ai.md @@ -23,13 +23,15 @@ For the moment, the feature is experimental, and a trigger must be created manually on the Cozy: ```sh -$ curl http://cozy.localhost:8080/jobs/triggers -H "Authorization: Bearer $TOKEN" -d '{ "data": { "attributes": { "type": "@event", "arguments": "io.cozy.files", "debounce": "1m", "worker": "index", "message": {"doctype": "io.cozy.files"} } } }' +$ COZY=cozy.localhost:8080 +$ TOKEN=$(cozy-stack instances token-cli $COZY io.cozy.triggers) +$ curl "http://${COZY}/jobs/triggers" -H "Authorization: Bearer $TOKEN" -d '{ "data": { "attributes": { "type": "@event", "arguments": "io.cozy.files", "debounce": "1m", "worker": "index", "message": {"doctype": "io.cozy.files"} } } }' ``` It can also be a good idea to start a first indexation with: ```sh -$ cozy-stack triggers launch --domain cozy.localhost:8080 $TRIGGER_ID +$ cozy-stack triggers launch --domain $COZY $TRIGGER_ID ``` In practice, when files are uploaded/modified/deleted, the trigger will create diff --git a/worker/index/index.go b/worker/index/index.go index b6ad8672613..607a2515b8f 100644 --- a/worker/index/index.go +++ b/worker/index/index.go @@ -7,6 +7,7 @@ import ( "net/http" "net/url" "runtime" + "strings" "time" "github.com/cozy/cozy-stack/model/instance" @@ -83,6 +84,9 @@ func Worker(ctx *job.TaskContext) error { } func callExternalIndexers(inst *instance.Instance, doctype string, change couchdb.Change) error { + if strings.HasPrefix(change.DocID, "_design/") { + return nil + } if change.Doc.Get("type") == consts.DirType { return nil } @@ -157,6 +161,7 @@ func callExternalIndexers(inst *instance.Instance, doctype string, change couchd DocName: name, InternalID: internalID, } + // TODO notes with images content, err := fs.OpenFile(fileDoc) if err != nil { return err From 8660cc60a6ed8817e61779e6a9f6d44b95449c12 Mon Sep 17 00:00:00 2001 From: Bruno Michel Date: Wed, 18 Sep 2024 18:07:46 +0200 Subject: [PATCH 03/10] Remove LibreChat and use a standalone RAG server --- cozy.example.yaml | 14 +- docs/ai.md | 23 +-- docs/diagrams/{librechat.d2 => ai.d2} | 11 +- docs/diagrams/ai.svg | 101 +++++++++++ docs/diagrams/librechat.svg | 113 ------------ model/instance/instance.go | 11 +- model/rag/index.go | 229 ++++++++++++++++++++++++ pkg/config/config/config.go | 33 ++-- web/jobs/jobs.go | 2 +- worker/index/index.go | 247 -------------------------- worker/rag/rag.go | 30 ++++ 11 files changed, 402 insertions(+), 412 deletions(-) rename docs/diagrams/{librechat.d2 => ai.d2} (58%) create mode 100644 docs/diagrams/ai.svg delete mode 100644 docs/diagrams/librechat.svg create mode 100644 model/rag/index.go delete mode 100644 worker/index/index.go create mode 100644 worker/rag/rag.go diff --git a/cozy.example.yaml b/cozy.example.yaml index 7b7ce9c6c7a..cfd821a5261 100644 --- a/cozy.example.yaml +++ b/cozy.example.yaml @@ -140,13 +140,14 @@ jobs: # List of available workers: # # - "clean-clients": delete unused OAuth clients - # - "index": send data to a RAG for being indexed # - "export": exporting data from a cozy instance # - "import": importing data into a cozy instance # - "konnector": launching konnectors # - "service": launching services # - "migrations": transforming a VFS with Swift to layout v3 # - "notes-save": saving notes to the VFS + # - "rag-index": send data to the RAG server for being indexed + # - "rag-query": send a query to the RAG server # - "push": sending push notifications # - "sms": sending SMS notifications # - "sendmail": sending mails @@ -201,15 +202,14 @@ konnectors: # cmd: ./scripts/konnector-rkt-run.sh # run connectors with rkt # cmd: ./scripts/konnector-nsjail-node8-run.sh # run connectors with nsjail -# external indexers are used for indexing files and doctypes in vector -# databases. -external_indexers: - # A cozy will use the indexers for its context, or if the context is not +# rag are the URL of the RAG server(s) for AI. +rag: + # A cozy will use the rag server for its context, or if the context is not # declared, for default. default: - - http://localhost:8000 + url: http://localhost:8000 beta: - - http://localhost:8001 + url: http://localhost:8001 # mail service parameters for sending email via SMTP mail: diff --git a/docs/ai.md b/docs/ai.md index f0526b51da3..da05b2a6d5a 100644 --- a/docs/ai.md +++ b/docs/ai.md @@ -8,16 +8,13 @@ AI can be used for interacting with the personal data of a Cozy. This is currently an experimental feature. Retrieval-Augmented Generation (RAG) is a classical pattern in the AI world. Here, it is specific to each Cozy. -[LibreChat](https://www.librechat.ai/) has been integrated this way: - -![Architecture with LibreChat](diagrams/librechat.svg) +![Architecture with a RAG server](diagrams/ai.svg) ## Indexation -First of all, LibreChat and the RAG must be installed with their dependencies. -It is not mandatory to install them on the same servers as the cozy-stack. And -the URL of RAG must be filled in cozy-stack configuration file (in -`external_indexers`). +First of all, the RAG server must be installed with its dependencies. It is +not mandatory to install them on the same servers as the cozy-stack. And the +URL of RAG must be filled in cozy-stack configuration file (in `rag`). For the moment, the feature is experimental, and a trigger must be created manually on the Cozy: @@ -25,7 +22,7 @@ manually on the Cozy: ```sh $ COZY=cozy.localhost:8080 $ TOKEN=$(cozy-stack instances token-cli $COZY io.cozy.triggers) -$ curl "http://${COZY}/jobs/triggers" -H "Authorization: Bearer $TOKEN" -d '{ "data": { "attributes": { "type": "@event", "arguments": "io.cozy.files", "debounce": "1m", "worker": "index", "message": {"doctype": "io.cozy.files"} } } }' +$ curl "http://${COZY}/jobs/triggers" -H "Authorization: Bearer $TOKEN" -d '{ "data": { "attributes": { "type": "@event", "arguments": "io.cozy.files", "debounce": "1m", "worker": "rag-index", "message": {"doctype": "io.cozy.files"} } } }' ``` It can also be a good idea to start a first indexation with: @@ -38,11 +35,9 @@ In practice, when files are uploaded/modified/deleted, the trigger will create a job for the index worker (with debounce). The index worker will look at the changed feed, and will call the RAG for each entry in the changes feed. - ## Chat -When a user starts a chat in LibreChat, their prompts are sent to the RAG that -can use the vector database to find relevant documents (technically, only some -parts of the documents called chunks). Those documents are sent back to -LibreChat that can be added to the prompt, so that the LLM can use them as a -context when answering. +When a user starts a chat, their prompts are sent to the RAG that can use the +vector database to find relevant documents (technically, only some parts of +the documents called chunks). Those documents are added to the prompt, so +that the LLM can use them as a context when answering. diff --git a/docs/diagrams/librechat.d2 b/docs/diagrams/ai.d2 similarity index 58% rename from docs/diagrams/librechat.d2 rename to docs/diagrams/ai.d2 index 67ab6cd4da9..fcc2df1056d 100644 --- a/docs/diagrams/librechat.d2 +++ b/docs/diagrams/ai.d2 @@ -1,24 +1,17 @@ # https://d2lang.com/ stack: {label: "Cozy-Stack"} -librechat: {label: "Librechat"} rag: {label: "RAG"} llm: {label: "LLM"; shape: diamond} embed: {label: "Embeddings model"; shape: diamond} vector: {label: "Vector DB"; shape: cylinder} couchdb: {label: "CouchDB"; shape: cylinder} swift: {label: "Swift"; shape: cylinder} -mongodb: {label: "MongoDB"; shape: cylinder} - -stack -> rag: ask to generate/delete embeddings -stack <-> librechat: Shared authentication -librechat -> rag: ask context for a query +stack -> rag stack -> couchdb stack -> swift -librechat -> llm -librechat -> mongodb - rag -> embed rag -> vector +rag -> llm diff --git a/docs/diagrams/ai.svg b/docs/diagrams/ai.svg new file mode 100644 index 00000000000..f41af658283 --- /dev/null +++ b/docs/diagrams/ai.svg @@ -0,0 +1,101 @@ +Cozy-StackRAGLLMEmbeddings modelVector DBCouchDBSwift + + + + + + + + + diff --git a/docs/diagrams/librechat.svg b/docs/diagrams/librechat.svg deleted file mode 100644 index 6bb327ebc29..00000000000 --- a/docs/diagrams/librechat.svg +++ /dev/null @@ -1,113 +0,0 @@ -Cozy-StackLibrechatRAGLLMEmbeddings modelVector DBCouchDBSwiftMongoDB ask to generate/delete embeddings Shared authenticationask context for a query - - - - - - - - - - - - - - diff --git a/model/instance/instance.go b/model/instance/instance.go index 26d451ae595..6339f5c24b2 100644 --- a/model/instance/instance.go +++ b/model/instance/instance.go @@ -439,13 +439,12 @@ func (i *Instance) Registries() []*url.URL { return context } -// ExternalIndexers returns the URLs of the external indexers (RAG) where -// documents of this instance can be indexed. -func (i *Instance) ExternalIndexers() []string { - contexts := config.GetConfig().ExternalIndexers +// RAGServer returns the RAG server for the instance (AI features). +func (i *Instance) RAGServer() config.RAGServer { + contexts := config.GetConfig().RAGServers if i.ContextName != "" { - if indexers, ok := contexts[i.ContextName]; ok { - return indexers + if server, ok := contexts[i.ContextName]; ok { + return server } } return contexts[config.DefaultInstanceContext] diff --git a/model/rag/index.go b/model/rag/index.go new file mode 100644 index 00000000000..16ff664a94f --- /dev/null +++ b/model/rag/index.go @@ -0,0 +1,229 @@ +package rag + +import ( + "encoding/json" + "errors" + "fmt" + "net/http" + "net/url" + "strings" + + "github.com/cozy/cozy-stack/model/instance" + "github.com/cozy/cozy-stack/model/job" + "github.com/cozy/cozy-stack/model/vfs" + "github.com/cozy/cozy-stack/pkg/config/config" + "github.com/cozy/cozy-stack/pkg/consts" + "github.com/cozy/cozy-stack/pkg/couchdb" + "github.com/cozy/cozy-stack/pkg/couchdb/revision" + "github.com/cozy/cozy-stack/pkg/logger" +) + +// BatchSize is the maximal number of documents manipulated at once by the +// worker. +const BatchSize = 100 + +type IndexMessage struct { + Doctype string `json:"doctype"` +} + +func Index(inst *instance.Instance, logger logger.Logger, msg IndexMessage) error { + if msg.Doctype != consts.Files { + return errors.New("Only file can be indexed for the moment") + } + + mu := config.Lock().ReadWrite(inst, "index/"+msg.Doctype) + if err := mu.Lock(); err != nil { + return err + } + defer mu.Unlock() + + lastSeq, err := getLastSeqNumber(inst, msg.Doctype) + if err != nil { + return err + } + feed, err := callChangesFeed(inst, msg.Doctype, lastSeq) + if err != nil { + return err + } + if feed.LastSeq == lastSeq { + return nil + } + + for _, change := range feed.Results { + if err := callRAGIndexer(inst, msg.Doctype, change); err != nil { + logger.Warnf("Index error: %s", err) + return err + } + } + _ = updateLastSequenceNumber(inst, msg.Doctype, feed.LastSeq) + + if feed.Pending > 0 { + _ = pushJob(inst, msg.Doctype) + } + + return nil +} + +func callRAGIndexer(inst *instance.Instance, doctype string, change couchdb.Change) error { + if strings.HasPrefix(change.DocID, "_design/") { + return nil + } + if change.Doc.Get("type") == consts.DirType { + return nil + } + + ragServer := inst.RAGServer() + if ragServer.URL == "" { + return errors.New("no RAG server configured") + } + u, err := url.Parse(ragServer.URL) + if err != nil { + return err + } + u.Path = fmt.Sprintf("/docs/%s/%s/%s", inst.Domain, doctype, change.DocID) + if change.Deleted { + req, err := http.NewRequest(http.MethodDelete, u.String(), nil) + if err != nil { + return err + } + res, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + if res.StatusCode >= 500 { + return fmt.Errorf("DELETE status code: %d", res.StatusCode) + } + } else { + md5sum := fmt.Sprintf("%x", change.Doc.Get("md5sum")) + req, err := http.NewRequest(http.MethodGet, u.String(), nil) + if err != nil { + return err + } + res, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + + // When the content has not changed, there is no need to regenerate + // an embedding. + needIndexation := false + switch res.StatusCode { + case 200: + var metadata map[string]interface{} + if err = json.NewDecoder(res.Body).Decode(&metadata); err != nil { + return err + } + needIndexation = metadata["md5sum"] != md5sum + case 404: + needIndexation = true + default: + return fmt.Errorf("GET status code: %d", res.StatusCode) + } + if !needIndexation { + // TODO we should patch the metadata in the vector db when a + // file has been moved/renamed. + return nil + } + + dirID, _ := change.Doc.Get("dir_id").(string) + name, _ := change.Doc.Get("name").(string) + mime, _ := change.Doc.Get("mime").(string) + internalID, _ := change.Doc.Get("internal_vfs_id").(string) + u.RawQuery = url.Values{ + "dir_id": []string{dirID}, + "name": []string{name}, + "md5sum": []string{md5sum}, + }.Encode() + fs := inst.VFS() + fileDoc := &vfs.FileDoc{ + Type: consts.FileType, + DocID: change.DocID, + DirID: dirID, + DocName: name, + InternalID: internalID, + } + // TODO notes with images + content, err := fs.OpenFile(fileDoc) + if err != nil { + return err + } + defer content.Close() + req, err = http.NewRequest(http.MethodPut, u.String(), content) + if err != nil { + return err + } + req.Header.Add("Content-Type", mime) + res, err = http.DefaultClient.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + if res.StatusCode >= 500 { + return fmt.Errorf("PUT status code: %d", res.StatusCode) + } + } + return nil +} + +// getLastSeqNumber returns the last sequence number of the previous +// indexation for this doctype. +func getLastSeqNumber(inst *instance.Instance, doctype string) (string, error) { + result, err := couchdb.GetLocal(inst, doctype, "rag-index") + if couchdb.IsNotFoundError(err) { + return "", nil + } + if err != nil { + return "", err + } + seq, _ := result["last_seq"].(string) + return seq, nil +} + +// updateLastSequenceNumber updates the last sequence number for this +// indexation if it's superior to the number in CouchDB. +func updateLastSequenceNumber(inst *instance.Instance, doctype, seq string) error { + result, err := couchdb.GetLocal(inst, doctype, "rag-index") + if err != nil { + if !couchdb.IsNotFoundError(err) { + return err + } + result = make(map[string]interface{}) + } else { + if prev, ok := result["last_seq"].(string); ok { + if revision.Generation(seq) <= revision.Generation(prev) { + return nil + } + } + } + result["last_seq"] = seq + return couchdb.PutLocal(inst, doctype, "rag-index", result) +} + +// callChangesFeed fetches the last changes from the changes feed +// http://docs.couchdb.org/en/stable/api/database/changes.html +func callChangesFeed(inst *instance.Instance, doctype, since string) (*couchdb.ChangesResponse, error) { + return couchdb.GetChanges(inst, &couchdb.ChangesRequest{ + DocType: doctype, + IncludeDocs: true, + Since: since, + Limit: BatchSize, + }) +} + +// pushJob adds a new job to continue on the pending documents in the changes +// feed. +func pushJob(inst *instance.Instance, doctype string) error { + msg, err := job.NewMessage(&IndexMessage{ + Doctype: doctype, + }) + if err != nil { + return err + } + _, err = job.System().PushJob(inst, &job.JobRequest{ + WorkerType: "rag-index", + Message: msg, + }) + return err +} diff --git a/pkg/config/config/config.go b/pkg/config/config/config.go index 9f4cde65798..ed3a9665374 100644 --- a/pkg/config/config/config.go +++ b/pkg/config/config/config.go @@ -123,7 +123,6 @@ type Config struct { CouchDB CouchDB Jobs Jobs Konnectors Konnectors - ExternalIndexers map[string][]string Mail *gomail.DialerOptions MailPerContext map[string]interface{} CampaignMail *gomail.DialerOptions @@ -143,6 +142,7 @@ type Config struct { Contexts map[string]interface{} Authentication map[string]interface{} + RAGServers map[string]RAGServer Office map[string]Office Registries map[string][]*url.URL Clouderies map[string]ClouderyConfig @@ -230,6 +230,11 @@ type Office struct { OutboxSecret string } +// RAGServer contains the configuration for a RAG server (AI features). +type RAGServer struct { + URL string +} + // Notifications contains the configuration for the mobile push-notification // center, for Android and iOS type Notifications struct { @@ -558,7 +563,7 @@ func UseViper(v *viper.Viper) error { return err } - indexers, err := makeExternalIndexers(v) + rag, err := makeRAGServers(v) if err != nil { return err } @@ -854,7 +859,7 @@ func UseViper(v *viper.Viper) error { Konnectors: Konnectors{ Cmd: v.GetString("konnectors.cmd"), }, - ExternalIndexers: indexers, + RAGServers: rag, Move: Move{ URL: v.GetString("move.url"), }, @@ -1011,23 +1016,21 @@ func makeCouch(v *viper.Viper) (CouchDB, error) { return couch, nil } -func makeExternalIndexers(v *viper.Viper) (map[string][]string, error) { - indexers := make(map[string][]string) - for k, v := range v.GetStringMap("external_indexers") { - list, ok := v.([]interface{}) +func makeRAGServers(v *viper.Viper) (map[string]RAGServer, error) { + servers := make(map[string]RAGServer) + for k, v := range v.GetStringMap("rag") { + m, ok := v.(map[string]interface{}) if !ok { return nil, fmt.Errorf( - "Bad format in the external_indexers section of the configuration file: "+ - "should be a list of strings, got %#v", v) + "Bad format in the rag section of the configuration file: "+ + "should be a map, got %#v", v) } - strings := []string{} - for _, item := range list { - str, _ := item.(string) - strings = append(strings, str) + url, _ := m["url"].(string) + servers[k] = RAGServer{ + URL: url, } - indexers[k] = strings } - return indexers, nil + return servers, nil } func makeRegistries(v *viper.Viper) (map[string][]*url.URL, error) { diff --git a/web/jobs/jobs.go b/web/jobs/jobs.go index f9dc4850307..454ddc5dcf9 100644 --- a/web/jobs/jobs.go +++ b/web/jobs/jobs.go @@ -31,7 +31,6 @@ import ( // import workers _ "github.com/cozy/cozy-stack/worker/archive" "github.com/cozy/cozy-stack/worker/exec" - _ "github.com/cozy/cozy-stack/worker/index" _ "github.com/cozy/cozy-stack/worker/log" _ "github.com/cozy/cozy-stack/worker/mails" _ "github.com/cozy/cozy-stack/worker/migrations" @@ -39,6 +38,7 @@ import ( _ "github.com/cozy/cozy-stack/worker/notes" _ "github.com/cozy/cozy-stack/worker/oauth" _ "github.com/cozy/cozy-stack/worker/push" + _ "github.com/cozy/cozy-stack/worker/rag" _ "github.com/cozy/cozy-stack/worker/share" _ "github.com/cozy/cozy-stack/worker/sms" _ "github.com/cozy/cozy-stack/worker/thumbnail" diff --git a/worker/index/index.go b/worker/index/index.go deleted file mode 100644 index 607a2515b8f..00000000000 --- a/worker/index/index.go +++ /dev/null @@ -1,247 +0,0 @@ -package index - -import ( - "encoding/json" - "errors" - "fmt" - "net/http" - "net/url" - "runtime" - "strings" - "time" - - "github.com/cozy/cozy-stack/model/instance" - "github.com/cozy/cozy-stack/model/job" - "github.com/cozy/cozy-stack/model/vfs" - "github.com/cozy/cozy-stack/pkg/config/config" - "github.com/cozy/cozy-stack/pkg/consts" - "github.com/cozy/cozy-stack/pkg/couchdb" - "github.com/cozy/cozy-stack/pkg/couchdb/revision" -) - -func init() { - job.AddWorker(&job.WorkerConfig{ - WorkerType: "index", - Concurrency: runtime.NumCPU(), - MaxExecCount: 1, - Reserved: true, - Timeout: 5 * time.Minute, - WorkerFunc: Worker, - }) -} - -// BatchSize is the maximal number of documents manipulated at once by the -// worker. -const BatchSize = 100 - -type IndexMessage struct { - Doctype string `json:"doctype"` -} - -func Worker(ctx *job.TaskContext) error { - logger := ctx.Logger() - inst := ctx.Instance - var msg IndexMessage - if err := ctx.UnmarshalMessage(&msg); err != nil { - return err - } - logger.Debugf("Index %s", msg.Doctype) - if msg.Doctype != consts.Files { - return errors.New("Only file can be indexed for the moment") - } - - mu := config.Lock().ReadWrite(inst, "index/"+msg.Doctype) - if err := mu.Lock(); err != nil { - return err - } - defer mu.Unlock() - - lastSeq, err := getLastSeqNumber(inst, msg.Doctype) - if err != nil { - return err - } - feed, err := callChangesFeed(inst, msg.Doctype, lastSeq) - if err != nil { - return err - } - if feed.LastSeq == lastSeq { - return nil - } - - for _, change := range feed.Results { - if err := callExternalIndexers(inst, msg.Doctype, change); err != nil { - logger.Warnf("Index error: %s", err) - return err - } - } - _ = updateLastSequenceNumber(inst, msg.Doctype, feed.LastSeq) - - if feed.Pending > 0 { - _ = pushJob(inst, msg.Doctype) - } - - return nil -} - -func callExternalIndexers(inst *instance.Instance, doctype string, change couchdb.Change) error { - if strings.HasPrefix(change.DocID, "_design/") { - return nil - } - if change.Doc.Get("type") == consts.DirType { - return nil - } - - indexers := inst.ExternalIndexers() - for _, indexer := range indexers { - u, err := url.Parse(indexer) - if err != nil { - return err - } - u.Path = fmt.Sprintf("/docs/%s/%s/%s", inst.Domain, doctype, change.DocID) - if change.Deleted { - req, err := http.NewRequest(http.MethodDelete, u.String(), nil) - if err != nil { - return err - } - res, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - defer res.Body.Close() - if res.StatusCode >= 500 { - return fmt.Errorf("DELETE status code: %d", res.StatusCode) - } - } else { - md5sum := fmt.Sprintf("%x", change.Doc.Get("md5sum")) - req, err := http.NewRequest(http.MethodGet, u.String(), nil) - if err != nil { - return err - } - res, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - defer res.Body.Close() - - // When the content has not changed, there is no need to regenerate - // an embedding. - needIndexation := false - switch res.StatusCode { - case 200: - var metadata map[string]interface{} - if err = json.NewDecoder(res.Body).Decode(&metadata); err != nil { - return err - } - needIndexation = metadata["md5sum"] != md5sum - case 404: - needIndexation = true - default: - return fmt.Errorf("GET status code: %d", res.StatusCode) - } - if !needIndexation { - // TODO we should patch the metadata in the vector db when a - // file has been moved/renamed. - continue - } - - dirID, _ := change.Doc.Get("dir_id").(string) - name, _ := change.Doc.Get("name").(string) - mime, _ := change.Doc.Get("mime").(string) - internalID, _ := change.Doc.Get("internal_vfs_id").(string) - u.RawQuery = url.Values{ - "dir_id": []string{dirID}, - "name": []string{name}, - "md5sum": []string{md5sum}, - }.Encode() - fs := inst.VFS() - fileDoc := &vfs.FileDoc{ - Type: consts.FileType, - DocID: change.DocID, - DirID: dirID, - DocName: name, - InternalID: internalID, - } - // TODO notes with images - content, err := fs.OpenFile(fileDoc) - if err != nil { - return err - } - defer content.Close() - req, err = http.NewRequest(http.MethodPut, u.String(), content) - if err != nil { - return err - } - req.Header.Add("Content-Type", mime) - res, err = http.DefaultClient.Do(req) - if err != nil { - return err - } - defer res.Body.Close() - if res.StatusCode >= 500 { - return fmt.Errorf("PUT status code: %d", res.StatusCode) - } - } - } - return nil -} - -// getLastSeqNumber returns the last sequence number of the previous -// indexation for this doctype. -func getLastSeqNumber(inst *instance.Instance, doctype string) (string, error) { - result, err := couchdb.GetLocal(inst, doctype, "index") - if couchdb.IsNotFoundError(err) { - return "", nil - } - if err != nil { - return "", err - } - seq, _ := result["last_seq"].(string) - return seq, nil -} - -// updateLastSequenceNumber updates the last sequence number for this -// indexation if it's superior to the number in CouchDB. -func updateLastSequenceNumber(inst *instance.Instance, doctype, seq string) error { - result, err := couchdb.GetLocal(inst, doctype, "index") - if err != nil { - if !couchdb.IsNotFoundError(err) { - return err - } - result = make(map[string]interface{}) - } else { - if prev, ok := result["last_seq"].(string); ok { - if revision.Generation(seq) <= revision.Generation(prev) { - return nil - } - } - } - result["last_seq"] = seq - return couchdb.PutLocal(inst, doctype, "index", result) -} - -// callChangesFeed fetches the last changes from the changes feed -// http://docs.couchdb.org/en/stable/api/database/changes.html -func callChangesFeed(inst *instance.Instance, doctype, since string) (*couchdb.ChangesResponse, error) { - return couchdb.GetChanges(inst, &couchdb.ChangesRequest{ - DocType: doctype, - IncludeDocs: true, - Since: since, - Limit: BatchSize, - }) -} - -// pushJob adds a new job to continue on the pending documents in the changes -// feed. -func pushJob(inst *instance.Instance, doctype string) error { - msg, err := job.NewMessage(&IndexMessage{ - Doctype: doctype, - }) - if err != nil { - return err - } - _, err = job.System().PushJob(inst, &job.JobRequest{ - WorkerType: "index", - Message: msg, - }) - return err -} diff --git a/worker/rag/rag.go b/worker/rag/rag.go new file mode 100644 index 00000000000..1ceac8fe691 --- /dev/null +++ b/worker/rag/rag.go @@ -0,0 +1,30 @@ +package rag + +import ( + "runtime" + "time" + + "github.com/cozy/cozy-stack/model/job" + "github.com/cozy/cozy-stack/model/rag" +) + +func init() { + job.AddWorker(&job.WorkerConfig{ + WorkerType: "rag-index", + Concurrency: runtime.NumCPU(), + MaxExecCount: 1, + Reserved: true, + Timeout: 15 * time.Minute, + WorkerFunc: WorkerIndex, + }) +} + +func WorkerIndex(ctx *job.TaskContext) error { + logger := ctx.Logger() + var msg rag.IndexMessage + if err := ctx.UnmarshalMessage(&msg); err != nil { + return err + } + logger.Debugf("RAG: index %s", msg.Doctype) + return rag.Index(ctx.Instance, logger, msg) +} From 392e491d14be5b2f5908bf5fba35ea8704d04416 Mon Sep 17 00:00:00 2001 From: Bruno Michel Date: Thu, 19 Sep 2024 17:28:02 +0200 Subject: [PATCH 04/10] Add a route to ask chat completion to the RAG --- docs/ai.md | 48 +++++++++++++ model/rag/chat.go | 152 ++++++++++++++++++++++++++++++++++++++++++ pkg/consts/doctype.go | 2 + web/ai/ai.go | 35 ++++++++++ web/routing.go | 2 + worker/rag/rag.go | 19 ++++++ 6 files changed, 258 insertions(+) create mode 100644 model/rag/chat.go create mode 100644 web/ai/ai.go diff --git a/docs/ai.md b/docs/ai.md index da05b2a6d5a..5df4137c54a 100644 --- a/docs/ai.md +++ b/docs/ai.md @@ -41,3 +41,51 @@ When a user starts a chat, their prompts are sent to the RAG that can use the vector database to find relevant documents (technically, only some parts of the documents called chunks). Those documents are added to the prompt, so that the LLM can use them as a context when answering. + +### POST /ai/chat/completions/:id + +This route can be used to ask AI for a chat completion. The id in the path +must be the identifier of a chat session. The client can generate a random +identifier for a new chat session. + +The stack will respond after pushing a job for this task, but without the +response. The client must use the real-time websocket and subscribe to +`io.cozy.ai.chat.completions`. + +#### Request + +```http +POST /ai/chat/completions/e21dce8058b9013d800a18c04daba326 HTTP/1.1 +Content-Type: application/json +``` + +```json +{ + "q": "Why the sky is blue?" +} +``` + +#### Response + +```http +HTTP/1.1 202 Accepted +Content-Type: application/vnd.api+json +``` + +```json +{ + "data": { + "type": "io.cozy.ai.chat.completions" + "id": "e21dce8058b9013d800a18c04daba326", + "rev": "1-23456", + "attributes": { + "messages": [ + { + "role": "user", + "content": "Why the sky is blue?" + } + ] + } + } +} +``` diff --git a/model/rag/chat.go b/model/rag/chat.go new file mode 100644 index 00000000000..997a65e0b2c --- /dev/null +++ b/model/rag/chat.go @@ -0,0 +1,152 @@ +package rag + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "net/url" + + "github.com/cozy/cozy-stack/model/instance" + "github.com/cozy/cozy-stack/model/job" + "github.com/cozy/cozy-stack/pkg/consts" + "github.com/cozy/cozy-stack/pkg/couchdb" + "github.com/cozy/cozy-stack/pkg/jsonapi" + "github.com/cozy/cozy-stack/pkg/logger" + "github.com/labstack/echo/v4" +) + +type ChatPayload struct { + ChatCompletionID string + Query string `json:"q"` +} + +type ChatCompletion struct { + DocID string `json:"_id"` + DocRev string `json:"_rev,omitempty"` + Messages []ChatMessage `json:"messages"` +} + +type ChatMessage struct { + Role string `json:"role"` + Content string `json:"content"` +} + +const ( + HumanRole = "human" + AIRole = "ai" +) + +func (c *ChatCompletion) ID() string { return c.DocID } +func (c *ChatCompletion) Rev() string { return c.DocRev } +func (c *ChatCompletion) DocType() string { return consts.ChatCompletions } +func (c *ChatCompletion) SetID(id string) { c.DocID = id } +func (c *ChatCompletion) SetRev(rev string) { c.DocRev = rev } +func (c *ChatCompletion) Clone() couchdb.Doc { + cloned := *c + cloned.Messages = make([]ChatMessage, len(c.Messages)) + copy(cloned.Messages, c.Messages) + return &cloned +} +func (c *ChatCompletion) Included() []jsonapi.Object { return nil } +func (c *ChatCompletion) Relationships() jsonapi.RelationshipMap { return nil } +func (c *ChatCompletion) Links() *jsonapi.LinksList { return nil } + +var _ jsonapi.Object = (*ChatCompletion)(nil) + +type QueryMessage struct { + Task string `json:"task"` + DocID string `json:"doc_id"` +} + +func Chat(inst *instance.Instance, payload ChatPayload) (*ChatCompletion, error) { + var chat ChatCompletion + err := couchdb.GetDoc(inst, consts.ChatCompletions, payload.ChatCompletionID, &chat) + if couchdb.IsNotFoundError(err) { + chat.DocID = payload.ChatCompletionID + } else if err != nil { + return nil, err + } + msg := ChatMessage{Role: HumanRole, Content: payload.Query} + chat.Messages = append(chat.Messages, msg) + if chat.DocRev == "" { + err = couchdb.CreateNamedDocWithDB(inst, &chat) + } else { + err = couchdb.UpdateDoc(inst, &chat) + } + if err != nil { + return nil, err + } + query, err := job.NewMessage(&QueryMessage{ + Task: "chat-completion", + DocID: chat.DocID, + }) + if err != nil { + return nil, err + } + _, err = job.System().PushJob(inst, &job.JobRequest{ + WorkerType: "rag-query", + Message: query, + }) + if err != nil { + return nil, err + } + return &chat, nil +} + +func Query(inst *instance.Instance, logger logger.Logger, query QueryMessage) error { + var chat ChatCompletion + err := couchdb.GetDoc(inst, consts.ChatCompletions, query.DocID, &chat) + if err != nil { + return err + } + msg := chat.Messages[len(chat.Messages)-1] + payload := map[string]interface{}{ + "q": msg.Content, + } + + res, err := callRAGQuery(inst, payload) + if err != nil { + return err + } + defer res.Body.Close() + if res.StatusCode != 200 { + return fmt.Errorf("POST status code: %d", res.StatusCode) + } + + // TODO streaming + completion, err := io.ReadAll(res.Body) + if err != nil { + return err + } + answer := ChatMessage{ + Role: AIRole, + Content: string(completion), + } + chat.Messages = append(chat.Messages, answer) + return couchdb.UpdateDoc(inst, &chat) +} + +func callRAGQuery(inst *instance.Instance, payload map[string]interface{}) (*http.Response, error) { + ragServer := inst.RAGServer() + if ragServer.URL == "" { + return nil, errors.New("no RAG server configured") + } + u, err := url.Parse(ragServer.URL) + if err != nil { + return nil, err + } + u.Path = fmt.Sprintf("/query/%s", inst.Domain) + body, err := json.Marshal(payload) + if err != nil { + return nil, err + } + req, err := http.NewRequest(http.MethodPost, u.String(), bytes.NewReader(body)) + if err != nil { + return nil, err + } + req.Header.Add("Content-Type", echo.MIMEApplicationJSON) + return http.DefaultClient.Do(req) +} diff --git a/pkg/consts/doctype.go b/pkg/consts/doctype.go index 2675e72716f..ee9b8243353 100644 --- a/pkg/consts/doctype.go +++ b/pkg/consts/doctype.go @@ -142,4 +142,6 @@ const ( // NextCloudFiles doc type is used when listing files from a NextCloud via // WebDAV. NextCloudFiles = "io.cozy.remote.nextcloud.files" + // ChatCompletions doc type is used for a chat between the user and a chatbot. + ChatCompletions = "io.cozy.ai.chat.completions" ) diff --git a/web/ai/ai.go b/web/ai/ai.go new file mode 100644 index 00000000000..98b597be02c --- /dev/null +++ b/web/ai/ai.go @@ -0,0 +1,35 @@ +package ai + +import ( + "net/http" + + "github.com/cozy/cozy-stack/model/permission" + "github.com/cozy/cozy-stack/model/rag" + "github.com/cozy/cozy-stack/pkg/consts" + "github.com/cozy/cozy-stack/pkg/jsonapi" + "github.com/cozy/cozy-stack/web/middlewares" + "github.com/labstack/echo/v4" +) + +// Chat is the route for asking a chat completion to AI. +func Chat(c echo.Context) error { + if err := middlewares.AllowWholeType(c, permission.POST, consts.ChatCompletions); err != nil { + return middlewares.ErrForbidden + } + var payload rag.ChatPayload + if err := c.Bind(&payload); err != nil { + return err + } + payload.ChatCompletionID = c.Param("id") + inst := middlewares.GetInstance(c) + chat, err := rag.Chat(inst, payload) + if err != nil { + return jsonapi.InternalServerError(err) + } + return jsonapi.Data(c, http.StatusAccepted, chat, nil) +} + +// Routes sets the routing for the AI tasks. +func Routes(router *echo.Group) { + router.POST("/chat/completions/:id", Chat) +} diff --git a/web/routing.go b/web/routing.go index 61df17656e4..84e84e356df 100644 --- a/web/routing.go +++ b/web/routing.go @@ -14,6 +14,7 @@ import ( "github.com/cozy/cozy-stack/pkg/jsonapi" "github.com/cozy/cozy-stack/pkg/metrics" "github.com/cozy/cozy-stack/web/accounts" + "github.com/cozy/cozy-stack/web/ai" "github.com/cozy/cozy-stack/web/apps" "github.com/cozy/cozy-stack/web/auth" "github.com/cozy/cozy-stack/web/bitwarden" @@ -235,6 +236,7 @@ func SetupRoutes(router *echo.Echo, services *stack.Services) error { sharings.Routes(router.Group("/sharings", mws...)) bitwarden.Routes(router.Group("/bitwarden", mws...)) shortcuts.Routes(router.Group("/shortcuts", mws...)) + ai.Routes(router.Group("/ai", mws...)) // The settings routes needs not to be blocked apps.WebappsRoutes(router.Group("/apps", mwsNotBlocked...)) diff --git a/worker/rag/rag.go b/worker/rag/rag.go index 1ceac8fe691..3b755b231ed 100644 --- a/worker/rag/rag.go +++ b/worker/rag/rag.go @@ -17,6 +17,15 @@ func init() { Timeout: 15 * time.Minute, WorkerFunc: WorkerIndex, }) + + job.AddWorker(&job.WorkerConfig{ + WorkerType: "rag-query", + Concurrency: runtime.NumCPU(), + MaxExecCount: 1, + Reserved: true, + Timeout: 15 * time.Minute, + WorkerFunc: WorkerQuery, + }) } func WorkerIndex(ctx *job.TaskContext) error { @@ -28,3 +37,13 @@ func WorkerIndex(ctx *job.TaskContext) error { logger.Debugf("RAG: index %s", msg.Doctype) return rag.Index(ctx.Instance, logger, msg) } + +func WorkerQuery(ctx *job.TaskContext) error { + logger := ctx.Logger() + var msg rag.QueryMessage + if err := ctx.UnmarshalMessage(&msg); err != nil { + return err + } + logger.Debugf("RAG: query %v", msg) + return rag.Query(ctx.Instance, logger, msg) +} From 036ca1c74bf7cad9fca0bd4d346a283ce91b5cf5 Mon Sep 17 00:00:00 2001 From: Bruno Michel Date: Thu, 19 Sep 2024 18:57:10 +0200 Subject: [PATCH 05/10] Add streaming and realtime events for AI chats --- docs/ai.md | 23 +++++++++++++++++- model/rag/chat.go | 56 +++++++++++++++++++++++++++++++++++++++---- pkg/consts/doctype.go | 2 ++ 3 files changed, 76 insertions(+), 5 deletions(-) diff --git a/docs/ai.md b/docs/ai.md index 5df4137c54a..9fdfcfb6136 100644 --- a/docs/ai.md +++ b/docs/ai.md @@ -50,7 +50,7 @@ identifier for a new chat session. The stack will respond after pushing a job for this task, but without the response. The client must use the real-time websocket and subscribe to -`io.cozy.ai.chat.completions`. +`io.cozy.ai.chat.events`. #### Request @@ -89,3 +89,24 @@ Content-Type: application/vnd.api+json } } ``` + +### Real-time via websockets + +``` +client > {"method": "AUTH", "payload": "token"} +client > {"method": "SUBSCRIBE", + "payload": {"type": "io.cozy.ai.chat.events"}} +server > {"event": "CREATED", + "payload": {"id": "e21dce8058b9013d800a18c04daba326", + "type": "io.cozy.ai.chat.events", + "doc": {"object": "delta", "content": "The "}}} +server > {"event": "CREATED", + "payload": {"id": "e21dce8058b9013d800a18c04daba326", + "type": "io.cozy.ai.chat.events", + "doc": {"object": "delta", "content": "sky "}}} +[...] +server > {"event": "CREATED", + "payload": {"id": "e21dce8058b9013d800a18c04daba326", + "type": "io.cozy.ai.chat.events", + "doc": {"object": "done"}}} +``` diff --git a/model/rag/chat.go b/model/rag/chat.go index 997a65e0b2c..20a8087ef94 100644 --- a/model/rag/chat.go +++ b/model/rag/chat.go @@ -1,6 +1,7 @@ package rag import ( + "bufio" "bytes" "encoding/json" "errors" @@ -15,6 +16,7 @@ import ( "github.com/cozy/cozy-stack/pkg/couchdb" "github.com/cozy/cozy-stack/pkg/jsonapi" "github.com/cozy/cozy-stack/pkg/logger" + "github.com/cozy/cozy-stack/pkg/realtime" "github.com/labstack/echo/v4" ) @@ -104,7 +106,8 @@ func Query(inst *instance.Instance, logger logger.Logger, query QueryMessage) er } msg := chat.Messages[len(chat.Messages)-1] payload := map[string]interface{}{ - "q": msg.Content, + "q": msg.Content, + "stream": true, } res, err := callRAGQuery(inst, payload) @@ -116,14 +119,28 @@ func Query(inst *instance.Instance, logger logger.Logger, query QueryMessage) er return fmt.Errorf("POST status code: %d", res.StatusCode) } - // TODO streaming - completion, err := io.ReadAll(res.Body) + var completion string + err = foreachSSE(res.Body, func(event map[string]interface{}) { + switch event["object"] { + case "delta", "done": + content, _ := event["content"].(string) + completion += content + delta := couchdb.JSONDoc{ + Type: consts.ChatEvents, + M: event, + } + go realtime.GetHub().Publish(inst, realtime.EventCreate, &delta, nil) + default: + // We can ignore done events + } + }) if err != nil { return err } + answer := ChatMessage{ Role: AIRole, - Content: string(completion), + Content: completion, } chat.Messages = append(chat.Messages, answer) return couchdb.UpdateDoc(inst, &chat) @@ -150,3 +167,34 @@ func callRAGQuery(inst *instance.Instance, payload map[string]interface{}) (*htt req.Header.Add("Content-Type", echo.MIMEApplicationJSON) return http.DefaultClient.Do(req) } + +func foreachSSE(r io.Reader, fn func(event map[string]interface{})) error { + rb := bufio.NewReader(r) + for { + bs, err := rb.ReadBytes('\n') + if errors.Is(err, io.EOF) { + return nil + } + if err != nil { + return err + } + if bytes.Equal(bs, []byte("\n")) { + continue + } + if bytes.HasPrefix(bs, []byte(":")) { + continue + } + parts := bytes.SplitN(bs, []byte(": "), 2) + if len(parts) != 2 { + return errors.New("invalid SSE response") + } + if string(parts[0]) != "data" { + return errors.New("invalid SSE response") + } + var event map[string]interface{} + if err := json.Unmarshal(bytes.TrimSpace(parts[1]), &event); err != nil { + return err + } + fn(event) + } +} diff --git a/pkg/consts/doctype.go b/pkg/consts/doctype.go index ee9b8243353..17f9d16894e 100644 --- a/pkg/consts/doctype.go +++ b/pkg/consts/doctype.go @@ -144,4 +144,6 @@ const ( NextCloudFiles = "io.cozy.remote.nextcloud.files" // ChatCompletions doc type is used for a chat between the user and a chatbot. ChatCompletions = "io.cozy.ai.chat.completions" + // ChatEvents doc type is used for RAG events about a chat session. + ChatEvents = "io.cozy.ai.chat.events" ) From 3f0beddd25cc8e0b7c929214311813cd69a23f7b Mon Sep 17 00:00:00 2001 From: Bruno Michel Date: Mon, 23 Sep 2024 18:14:10 +0200 Subject: [PATCH 06/10] Improve a few things --- docs/ai.md | 17 ++++---- model/rag/chat.go | 92 +++++++++++++++++++++++++++---------------- pkg/consts/doctype.go | 6 +-- web/ai/ai.go | 6 +-- 4 files changed, 74 insertions(+), 47 deletions(-) diff --git a/docs/ai.md b/docs/ai.md index 9fdfcfb6136..f323dcedcdb 100644 --- a/docs/ai.md +++ b/docs/ai.md @@ -42,11 +42,11 @@ vector database to find relevant documents (technically, only some parts of the documents called chunks). Those documents are added to the prompt, so that the LLM can use them as a context when answering. -### POST /ai/chat/completions/:id +### POST /ai/chat/conversations/:id This route can be used to ask AI for a chat completion. The id in the path -must be the identifier of a chat session. The client can generate a random -identifier for a new chat session. +must be the identifier of a chat conversation. The client can generate a random +identifier for a new chat conversation. The stack will respond after pushing a job for this task, but without the response. The client must use the real-time websocket and subscribe to @@ -55,7 +55,7 @@ response. The client must use the real-time websocket and subscribe to #### Request ```http -POST /ai/chat/completions/e21dce8058b9013d800a18c04daba326 HTTP/1.1 +POST /ai/chat/conversations/e21dce8058b9013d800a18c04daba326 HTTP/1.1 Content-Type: application/json ``` @@ -75,12 +75,13 @@ Content-Type: application/vnd.api+json ```json { "data": { - "type": "io.cozy.ai.chat.completions" + "type": "io.cozy.ai.chat.conversations" "id": "e21dce8058b9013d800a18c04daba326", "rev": "1-23456", "attributes": { "messages": [ { + "id": "eb17c3205bf1013ddea018c04daba326", "role": "user", "content": "Why the sky is blue?" } @@ -97,16 +98,16 @@ client > {"method": "AUTH", "payload": "token"} client > {"method": "SUBSCRIBE", "payload": {"type": "io.cozy.ai.chat.events"}} server > {"event": "CREATED", - "payload": {"id": "e21dce8058b9013d800a18c04daba326", + "payload": {"id": "eb17c3205bf1013ddea018c04daba326", "type": "io.cozy.ai.chat.events", "doc": {"object": "delta", "content": "The "}}} server > {"event": "CREATED", - "payload": {"id": "e21dce8058b9013d800a18c04daba326", + "payload": {"id": "eb17c3205bf1013ddea018c04daba326", "type": "io.cozy.ai.chat.events", "doc": {"object": "delta", "content": "sky "}}} [...] server > {"event": "CREATED", - "payload": {"id": "e21dce8058b9013d800a18c04daba326", + "payload": {"id": "eb17c3205bf1013ddea018c04daba326", "type": "io.cozy.ai.chat.events", "doc": {"object": "done"}}} ``` diff --git a/model/rag/chat.go b/model/rag/chat.go index 20a8087ef94..3a053627ac0 100644 --- a/model/rag/chat.go +++ b/model/rag/chat.go @@ -9,6 +9,7 @@ import ( "io" "net/http" "net/url" + "time" "github.com/cozy/cozy-stack/model/instance" "github.com/cozy/cozy-stack/model/job" @@ -16,62 +17,83 @@ import ( "github.com/cozy/cozy-stack/pkg/couchdb" "github.com/cozy/cozy-stack/pkg/jsonapi" "github.com/cozy/cozy-stack/pkg/logger" + "github.com/cozy/cozy-stack/pkg/metadata" "github.com/cozy/cozy-stack/pkg/realtime" + "github.com/gofrs/uuid/v5" "github.com/labstack/echo/v4" ) type ChatPayload struct { - ChatCompletionID string - Query string `json:"q"` + ChatConversationID string + Query string `json:"q"` } -type ChatCompletion struct { - DocID string `json:"_id"` - DocRev string `json:"_rev,omitempty"` - Messages []ChatMessage `json:"messages"` +type ChatConversation struct { + DocID string `json:"_id"` + DocRev string `json:"_rev,omitempty"` + Messages []ChatMessage `json:"messages"` + Metadata *metadata.CozyMetadata `json:"cozyMetadata"` } type ChatMessage struct { - Role string `json:"role"` - Content string `json:"content"` + ID string `json:"id"` + Role string `json:"role"` + Content string `json:"content"` + CreatedAt time.Time } const ( - HumanRole = "human" - AIRole = "ai" + UserRole = "user" + AssistantRole = "assistant" ) -func (c *ChatCompletion) ID() string { return c.DocID } -func (c *ChatCompletion) Rev() string { return c.DocRev } -func (c *ChatCompletion) DocType() string { return consts.ChatCompletions } -func (c *ChatCompletion) SetID(id string) { c.DocID = id } -func (c *ChatCompletion) SetRev(rev string) { c.DocRev = rev } -func (c *ChatCompletion) Clone() couchdb.Doc { +// DocTypeVersion represents the doctype version. Each time this document +// structure is modified, update this value +const DocTypeVersion = "1" + +func (c *ChatConversation) ID() string { return c.DocID } +func (c *ChatConversation) Rev() string { return c.DocRev } +func (c *ChatConversation) DocType() string { return consts.ChatConversations } +func (c *ChatConversation) SetID(id string) { c.DocID = id } +func (c *ChatConversation) SetRev(rev string) { c.DocRev = rev } +func (c *ChatConversation) Clone() couchdb.Doc { cloned := *c cloned.Messages = make([]ChatMessage, len(c.Messages)) copy(cloned.Messages, c.Messages) return &cloned } -func (c *ChatCompletion) Included() []jsonapi.Object { return nil } -func (c *ChatCompletion) Relationships() jsonapi.RelationshipMap { return nil } -func (c *ChatCompletion) Links() *jsonapi.LinksList { return nil } +func (c *ChatConversation) Included() []jsonapi.Object { return nil } +func (c *ChatConversation) Relationships() jsonapi.RelationshipMap { return nil } +func (c *ChatConversation) Links() *jsonapi.LinksList { return nil } -var _ jsonapi.Object = (*ChatCompletion)(nil) +var _ jsonapi.Object = (*ChatConversation)(nil) type QueryMessage struct { Task string `json:"task"` DocID string `json:"doc_id"` } -func Chat(inst *instance.Instance, payload ChatPayload) (*ChatCompletion, error) { - var chat ChatCompletion - err := couchdb.GetDoc(inst, consts.ChatCompletions, payload.ChatCompletionID, &chat) +func Chat(inst *instance.Instance, payload ChatPayload) (*ChatConversation, error) { + var chat ChatConversation + err := couchdb.GetDoc(inst, consts.ChatConversations, payload.ChatConversationID, &chat) if couchdb.IsNotFoundError(err) { - chat.DocID = payload.ChatCompletionID + chat.DocID = payload.ChatConversationID + md := metadata.New() + md.DocTypeVersion = DocTypeVersion + md.UpdatedAt = md.CreatedAt + chat.Metadata = md } else if err != nil { return nil, err + } else { + chat.Metadata.UpdatedAt = time.Now().UTC() + } + uuidv7, _ := uuid.NewV7() + msg := ChatMessage{ + ID: uuidv7.String(), + Role: UserRole, + Content: payload.Query, + CreatedAt: time.Now().UTC(), } - msg := ChatMessage{Role: HumanRole, Content: payload.Query} chat.Messages = append(chat.Messages, msg) if chat.DocRev == "" { err = couchdb.CreateNamedDocWithDB(inst, &chat) @@ -99,15 +121,14 @@ func Chat(inst *instance.Instance, payload ChatPayload) (*ChatCompletion, error) } func Query(inst *instance.Instance, logger logger.Logger, query QueryMessage) error { - var chat ChatCompletion - err := couchdb.GetDoc(inst, consts.ChatCompletions, query.DocID, &chat) + var chat ChatConversation + err := couchdb.GetDoc(inst, consts.ChatConversations, query.DocID, &chat) if err != nil { return err } - msg := chat.Messages[len(chat.Messages)-1] payload := map[string]interface{}{ - "q": msg.Content, - "stream": true, + "messages": chat.Messages, + "stream": true, } res, err := callRAGQuery(inst, payload) @@ -119,6 +140,7 @@ func Query(inst *instance.Instance, logger logger.Logger, query QueryMessage) er return fmt.Errorf("POST status code: %d", res.StatusCode) } + msg := chat.Messages[len(chat.Messages)-1] var completion string err = foreachSSE(res.Body, func(event map[string]interface{}) { switch event["object"] { @@ -129,6 +151,7 @@ func Query(inst *instance.Instance, logger logger.Logger, query QueryMessage) er Type: consts.ChatEvents, M: event, } + delta.SetID(msg.ID) go realtime.GetHub().Publish(inst, realtime.EventCreate, &delta, nil) default: // We can ignore done events @@ -138,9 +161,12 @@ func Query(inst *instance.Instance, logger logger.Logger, query QueryMessage) er return err } + uuidv7, _ := uuid.NewV7() answer := ChatMessage{ - Role: AIRole, - Content: completion, + ID: uuidv7.String(), + Role: AssistantRole, + Content: completion, + CreatedAt: time.Now().UTC(), } chat.Messages = append(chat.Messages, answer) return couchdb.UpdateDoc(inst, &chat) @@ -155,7 +181,7 @@ func callRAGQuery(inst *instance.Instance, payload map[string]interface{}) (*htt if err != nil { return nil, err } - u.Path = fmt.Sprintf("/query/%s", inst.Domain) + u.Path = fmt.Sprintf("/chat/%s", inst.Domain) body, err := json.Marshal(payload) if err != nil { return nil, err diff --git a/pkg/consts/doctype.go b/pkg/consts/doctype.go index 17f9d16894e..49f4d4620a8 100644 --- a/pkg/consts/doctype.go +++ b/pkg/consts/doctype.go @@ -142,8 +142,8 @@ const ( // NextCloudFiles doc type is used when listing files from a NextCloud via // WebDAV. NextCloudFiles = "io.cozy.remote.nextcloud.files" - // ChatCompletions doc type is used for a chat between the user and a chatbot. - ChatCompletions = "io.cozy.ai.chat.completions" - // ChatEvents doc type is used for RAG events about a chat session. + // ChatConversations doc type is used for a chat between the user and a chatbot. + ChatConversations = "io.cozy.ai.chat.conversations" + // ChatEvents doc type is used for RAG events about a chat conversation. ChatEvents = "io.cozy.ai.chat.events" ) diff --git a/web/ai/ai.go b/web/ai/ai.go index 98b597be02c..6c43d93ae19 100644 --- a/web/ai/ai.go +++ b/web/ai/ai.go @@ -13,14 +13,14 @@ import ( // Chat is the route for asking a chat completion to AI. func Chat(c echo.Context) error { - if err := middlewares.AllowWholeType(c, permission.POST, consts.ChatCompletions); err != nil { + if err := middlewares.AllowWholeType(c, permission.POST, consts.ChatConversations); err != nil { return middlewares.ErrForbidden } var payload rag.ChatPayload if err := c.Bind(&payload); err != nil { return err } - payload.ChatCompletionID = c.Param("id") + payload.ChatConversationID = c.Param("id") inst := middlewares.GetInstance(c) chat, err := rag.Chat(inst, payload) if err != nil { @@ -31,5 +31,5 @@ func Chat(c echo.Context) error { // Routes sets the routing for the AI tasks. func Routes(router *echo.Group) { - router.POST("/chat/completions/:id", Chat) + router.POST("/chat/conversations/:id", Chat) } From 97919af8b451b76d2881a4562433c4afd533e6da Mon Sep 17 00:00:00 2001 From: Bruno Michel Date: Tue, 24 Sep 2024 15:13:16 +0200 Subject: [PATCH 07/10] Update doc --- docs/ai.md | 12 ++++++++++-- model/rag/chat.go | 8 ++++---- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/docs/ai.md b/docs/ai.md index f323dcedcdb..d3a13c6fe40 100644 --- a/docs/ai.md +++ b/docs/ai.md @@ -75,7 +75,7 @@ Content-Type: application/vnd.api+json ```json { "data": { - "type": "io.cozy.ai.chat.conversations" + "type": "io.cozy.ai.chat.conversations", "id": "e21dce8058b9013d800a18c04daba326", "rev": "1-23456", "attributes": { @@ -83,10 +83,18 @@ Content-Type: application/vnd.api+json { "id": "eb17c3205bf1013ddea018c04daba326", "role": "user", - "content": "Why the sky is blue?" + "content": "Why the sky is blue?", + "createdAt": "2024-09-24T13:24:07.576Z" } ] } + }, + "cozyMetadata": { + "createdAt": "2024-09-24T13:24:07.576Z", + "createdOn": "http://cozy.localhost:8080/", + "doctypeVersion": "1", + "metadataVersion": 1, + "updatedAt": "2024-09-24T13:24:07.576Z" } } ``` diff --git a/model/rag/chat.go b/model/rag/chat.go index 3a053627ac0..ff4ff39b7f5 100644 --- a/model/rag/chat.go +++ b/model/rag/chat.go @@ -36,10 +36,10 @@ type ChatConversation struct { } type ChatMessage struct { - ID string `json:"id"` - Role string `json:"role"` - Content string `json:"content"` - CreatedAt time.Time + ID string `json:"id"` + Role string `json:"role"` + Content string `json:"content"` + CreatedAt time.Time `json:"createdAt"` } const ( From 6ebb03b490c2b2f1888ce2af2485178ffd23bec1 Mon Sep 17 00:00:00 2001 From: Bruno Michel Date: Wed, 25 Sep 2024 09:47:16 +0200 Subject: [PATCH 08/10] Add auth for RAG server --- cozy.example.yaml | 2 ++ model/rag/chat.go | 1 + model/rag/index.go | 4 ++++ pkg/config/config/config.go | 7 +++++-- 4 files changed, 12 insertions(+), 2 deletions(-) diff --git a/cozy.example.yaml b/cozy.example.yaml index cfd821a5261..d73aae151c8 100644 --- a/cozy.example.yaml +++ b/cozy.example.yaml @@ -208,8 +208,10 @@ rag: # declared, for default. default: url: http://localhost:8000 + api_key: $3cr3t beta: url: http://localhost:8001 + api_key: $3cr3t # mail service parameters for sending email via SMTP mail: diff --git a/model/rag/chat.go b/model/rag/chat.go index ff4ff39b7f5..eee05487088 100644 --- a/model/rag/chat.go +++ b/model/rag/chat.go @@ -190,6 +190,7 @@ func callRAGQuery(inst *instance.Instance, payload map[string]interface{}) (*htt if err != nil { return nil, err } + req.Header.Add(echo.HeaderAuthorization, "Bearer "+ragServer.APIKey) req.Header.Add("Content-Type", echo.MIMEApplicationJSON) return http.DefaultClient.Do(req) } diff --git a/model/rag/index.go b/model/rag/index.go index 16ff664a94f..cc6451fd559 100644 --- a/model/rag/index.go +++ b/model/rag/index.go @@ -16,6 +16,7 @@ import ( "github.com/cozy/cozy-stack/pkg/couchdb" "github.com/cozy/cozy-stack/pkg/couchdb/revision" "github.com/cozy/cozy-stack/pkg/logger" + "github.com/labstack/echo/v4" ) // BatchSize is the maximal number of documents manipulated at once by the @@ -86,6 +87,7 @@ func callRAGIndexer(inst *instance.Instance, doctype string, change couchdb.Chan if err != nil { return err } + req.Header.Add(echo.HeaderAuthorization, "Bearer "+ragServer.APIKey) res, err := http.DefaultClient.Do(req) if err != nil { return err @@ -100,6 +102,7 @@ func callRAGIndexer(inst *instance.Instance, doctype string, change couchdb.Chan if err != nil { return err } + req.Header.Add(echo.HeaderAuthorization, "Bearer "+ragServer.APIKey) res, err := http.DefaultClient.Do(req) if err != nil { return err @@ -154,6 +157,7 @@ func callRAGIndexer(inst *instance.Instance, doctype string, change couchdb.Chan if err != nil { return err } + req.Header.Add(echo.HeaderAuthorization, "Bearer "+ragServer.APIKey) req.Header.Add("Content-Type", mime) res, err = http.DefaultClient.Do(req) if err != nil { diff --git a/pkg/config/config/config.go b/pkg/config/config/config.go index ed3a9665374..8199fb1c122 100644 --- a/pkg/config/config/config.go +++ b/pkg/config/config/config.go @@ -232,7 +232,8 @@ type Office struct { // RAGServer contains the configuration for a RAG server (AI features). type RAGServer struct { - URL string + URL string + APIKey string } // Notifications contains the configuration for the mobile push-notification @@ -1026,8 +1027,10 @@ func makeRAGServers(v *viper.Viper) (map[string]RAGServer, error) { "should be a map, got %#v", v) } url, _ := m["url"].(string) + key, _ := m["api_key"].(string) servers[k] = RAGServer{ - URL: url, + URL: url, + APIKey: key, } } return servers, nil From 902130a6c3b0715322bab01928cd78d956acc21b Mon Sep 17 00:00:00 2001 From: Bruno Michel Date: Tue, 1 Oct 2024 14:37:46 +0200 Subject: [PATCH 09/10] Add position to io.cozy.ai.chat.events --- docs/ai.md | 4 ++-- model/rag/chat.go | 3 +++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/docs/ai.md b/docs/ai.md index d3a13c6fe40..beaf403e48a 100644 --- a/docs/ai.md +++ b/docs/ai.md @@ -108,11 +108,11 @@ client > {"method": "SUBSCRIBE", server > {"event": "CREATED", "payload": {"id": "eb17c3205bf1013ddea018c04daba326", "type": "io.cozy.ai.chat.events", - "doc": {"object": "delta", "content": "The "}}} + "doc": {"object": "delta", "content": "The ", "position": 0}}} server > {"event": "CREATED", "payload": {"id": "eb17c3205bf1013ddea018c04daba326", "type": "io.cozy.ai.chat.events", - "doc": {"object": "delta", "content": "sky "}}} + "doc": {"object": "delta", "content": "sky ", "position": 1}}} [...] server > {"event": "CREATED", "payload": {"id": "eb17c3205bf1013ddea018c04daba326", diff --git a/model/rag/chat.go b/model/rag/chat.go index eee05487088..2475f2071f0 100644 --- a/model/rag/chat.go +++ b/model/rag/chat.go @@ -141,10 +141,13 @@ func Query(inst *instance.Instance, logger logger.Logger, query QueryMessage) er } msg := chat.Messages[len(chat.Messages)-1] + position := 0 var completion string err = foreachSSE(res.Body, func(event map[string]interface{}) { switch event["object"] { case "delta", "done": + event["position"] = position + position++ content, _ := event["content"].(string) completion += content delta := couchdb.JSONDoc{ From 1be146980cf941128210b89f77ace15a1d2e6344 Mon Sep 17 00:00:00 2001 From: Bruno Michel Date: Thu, 3 Oct 2024 09:21:08 +0200 Subject: [PATCH 10/10] Add an endpoint to fill the description of a file with AI --- docs/files.md | 77 ++++++++++++++++++++++++++++++++++++++++++++-- model/rag/file.go | 72 +++++++++++++++++++++++++++++++++++++++++++ web/files/files.go | 20 ++++++++++++ 3 files changed, 167 insertions(+), 2 deletions(-) create mode 100644 model/rag/file.go diff --git a/docs/files.md b/docs/files.md index 6b4c579eaf0..ca045f4953e 100644 --- a/docs/files.md +++ b/docs/files.md @@ -958,8 +958,8 @@ Get an image that shows the first page of a PDF (at most 1080x1920). Get a thumbnail of a file (for an image & pdf only). `:format` can be `tiny` (96x96) `small` (640x480), `medium` (1280x720), or `large` (1920x1080). -This API does not require authentication because the secret acts as a token. -This secret is valid for 10 minutes, after which the link will return an error. +This API does not require authentication because the secret acts as a token. +This secret is valid for 10 minutes, after which the link will return an error. To retrieve a new functional link, you must query the files API again to obtain a new secret. @@ -1400,6 +1400,79 @@ Content-Type: application/vnd.api+json The same status codes can be encountered as the `PATCH /files/:file-id` route. +### POST /files/:id/description + +This endpoint fills the `metadata.description` field of a file with a +description generated by the IA from the content of this file. + +#### Request + +```http +POST /files/9152d568-7e7c-11e6-a377-37cbfb190b4b/description HTTP/1.1 +``` + +#### Response + +```http +HTTP/1.1 200 OK +Content-Type: application/json.vnd+api +``` + +```json +{ + "data": { + "type": "io.cozy.files", + "id": "9152d568-7e7c-11e6-a377-37cbfb190b4b", + "meta": { + "rev": "2-20900ae0" + }, + "attributes": { + "type": "file", + "name": "hi.txt", + "trashed": false, + "md5sum": "ODZmYjI2OWQxOTBkMmM4NQo=", + "created_at": "2016-09-19T12:38:04Z", + "updated_at": "2016-09-19T12:38:04Z", + "tags": ["poem"], + "size": 12, + "executable": false, + "class": "document", + "mime": "text/plain", + "metadata": { + "description": "Explores love's complexities through vivid imagery and heartfelt emotions" + }, + "cozyMetadata": { + "doctypeVersion": "1", + "metadataVersion": 1, + "createdAt": "2016-09-20T18:32:49Z", + "createdByApp": "drive", + "createdOn": "https://cozy.example.com/", + "updatedAt": "2016-09-22T13:32:51Z", + "uploadedAt": "2016-09-21T04:27:50Z", + "uploadedOn": "https://cozy.example.com/", + "uploadedBy": { + "slug": "drive" + } + } + }, + "relationships": { + "parent": { + "links": { + "related": "/files/f2f36fec-8018-11e6-abd8-8b3814d9a465" + }, + "data": { + "type": "io.cozy.files", + "id": "f2f36fec-8018-11e6-abd8-8b3814d9a465" + } + } + }, + "links": { + "self": "/files/9152d568-7e7c-11e6-a377-37cbfb190b4b" + } + } +} +``` + ### POST /files/archive Create an archive. The body of the request lists the files and directories that diff --git a/model/rag/file.go b/model/rag/file.go new file mode 100644 index 00000000000..11cf81ae134 --- /dev/null +++ b/model/rag/file.go @@ -0,0 +1,72 @@ +package rag + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "net/url" + + "github.com/cozy/cozy-stack/model/instance" + "github.com/cozy/cozy-stack/model/vfs" + "github.com/cozy/cozy-stack/pkg/couchdb" + "github.com/labstack/echo/v4" +) + +func AddDescriptionToFile(inst *instance.Instance, file *vfs.FileDoc) (*vfs.FileDoc, error) { + content, err := inst.VFS().OpenFile(file) + if err != nil { + return nil, err + } + defer content.Close() + description, err := callRAGDescription(inst, content, file.Mime) + if err != nil { + return nil, err + } + newfile := file.Clone().(*vfs.FileDoc) + if newfile.Metadata == nil { + newfile.Metadata = make(map[string]interface{}) + } + newfile.Metadata["description"] = description + if err := couchdb.UpdateDocWithOld(inst, newfile, file); err != nil { + return nil, err + } + return newfile, nil +} + +func callRAGDescription(inst *instance.Instance, content io.Reader, mime string) (string, error) { + ragServer := inst.RAGServer() + if ragServer.URL == "" { + return "", errors.New("no RAG server configured") + } + u, err := url.Parse(ragServer.URL) + if err != nil { + return "", err + } + u.Path = "/description" + req, err := http.NewRequest(http.MethodPost, u.String(), content) + if err != nil { + return "", err + } + req.Header.Add(echo.HeaderAuthorization, "Bearer "+ragServer.APIKey) + req.Header.Add("Content-Type", mime) + res, err := http.DefaultClient.Do(req) + if err != nil { + return "", err + } + defer res.Body.Close() + if res.StatusCode >= 300 { + return "", fmt.Errorf("POST status code: %d", res.StatusCode) + } + var data struct { + Description string `json:"description"` + } + if err := json.NewDecoder(res.Body).Decode(&data); err != nil { + return "", err + } + if data.Description == "" { + return "", errors.New("no description") + } + return data.Description, nil +} diff --git a/web/files/files.go b/web/files/files.go index 71047ba90ba..7743c23a36a 100644 --- a/web/files/files.go +++ b/web/files/files.go @@ -25,6 +25,7 @@ import ( "github.com/cozy/cozy-stack/model/note" "github.com/cozy/cozy-stack/model/oauth" "github.com/cozy/cozy-stack/model/permission" + "github.com/cozy/cozy-stack/model/rag" "github.com/cozy/cozy-stack/model/sharing" "github.com/cozy/cozy-stack/model/vfs" "github.com/cozy/cozy-stack/pkg/assets/statik" @@ -385,6 +386,24 @@ func FileCopyHandler(c echo.Context) error { return FileData(c, http.StatusCreated, newdoc, false, nil) } +func AddDescription(c echo.Context) error { + inst := middlewares.GetInstance(c) + fileID := c.Param("file-id") + doc, err := inst.VFS().FileByID(fileID) + if err != nil { + return WrapVfsError(err) + } + err = checkPerm(c, permission.POST, nil, doc) + if err != nil { + return err + } + newdoc, err := rag.AddDescriptionToFile(inst, doc) + if err != nil { + return WrapVfsError(err) + } + return FileData(c, http.StatusOK, newdoc, false, nil) +} + // ModifyMetadataByIDHandler handles PATCH requests on /files/:file-id // // It can be used to modify the file or directory metadata, as well as @@ -1932,6 +1951,7 @@ func Routes(router *echo.Group) { router.PATCH("/metadata", ModifyMetadataByPathHandler) router.PATCH("/:file-id", ModifyMetadataByIDHandler) router.PATCH("/", ModifyMetadataByIDInBatchHandler) + router.POST("/:file-id/description", AddDescription) router.POST("/shared-drives", SharedDrivesCreationHandler) router.POST("/", CreationHandler)