Skip to content

Commit

Permalink
Add streaming and realtime events for AI chats
Browse files Browse the repository at this point in the history
  • Loading branch information
nono committed Sep 19, 2024
1 parent 392e491 commit 036ca1c
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 5 deletions.
23 changes: 22 additions & 1 deletion docs/ai.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ identifier for a new chat session.

The stack will respond after pushing a job for this task, but without the
response. The client must use the real-time websocket and subscribe to
`io.cozy.ai.chat.completions`.
`io.cozy.ai.chat.events`.

#### Request

Expand Down Expand Up @@ -89,3 +89,24 @@ Content-Type: application/vnd.api+json
}
}
```

### Real-time via websockets

```
client > {"method": "AUTH", "payload": "token"}
client > {"method": "SUBSCRIBE",
"payload": {"type": "io.cozy.ai.chat.events"}}
server > {"event": "CREATED",
"payload": {"id": "e21dce8058b9013d800a18c04daba326",
"type": "io.cozy.ai.chat.events",
"doc": {"object": "delta", "content": "The "}}}
server > {"event": "CREATED",
"payload": {"id": "e21dce8058b9013d800a18c04daba326",
"type": "io.cozy.ai.chat.events",
"doc": {"object": "delta", "content": "sky "}}}
[...]
server > {"event": "CREATED",
"payload": {"id": "e21dce8058b9013d800a18c04daba326",
"type": "io.cozy.ai.chat.events",
"doc": {"object": "done"}}}
```
56 changes: 52 additions & 4 deletions model/rag/chat.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package rag

import (
"bufio"
"bytes"
"encoding/json"
"errors"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/cozy/cozy-stack/pkg/couchdb"
"github.com/cozy/cozy-stack/pkg/jsonapi"
"github.com/cozy/cozy-stack/pkg/logger"
"github.com/cozy/cozy-stack/pkg/realtime"
"github.com/labstack/echo/v4"
)

Expand Down Expand Up @@ -104,7 +106,8 @@ func Query(inst *instance.Instance, logger logger.Logger, query QueryMessage) er
}
msg := chat.Messages[len(chat.Messages)-1]
payload := map[string]interface{}{
"q": msg.Content,
"q": msg.Content,
"stream": true,
}

res, err := callRAGQuery(inst, payload)
Expand All @@ -116,14 +119,28 @@ func Query(inst *instance.Instance, logger logger.Logger, query QueryMessage) er
return fmt.Errorf("POST status code: %d", res.StatusCode)
}

// TODO streaming
completion, err := io.ReadAll(res.Body)
var completion string
err = foreachSSE(res.Body, func(event map[string]interface{}) {
switch event["object"] {
case "delta", "done":
content, _ := event["content"].(string)
completion += content
delta := couchdb.JSONDoc{
Type: consts.ChatEvents,
M: event,
}
go realtime.GetHub().Publish(inst, realtime.EventCreate, &delta, nil)
default:
// We can ignore done events
}
})
if err != nil {
return err
}

answer := ChatMessage{
Role: AIRole,
Content: string(completion),
Content: completion,
}
chat.Messages = append(chat.Messages, answer)
return couchdb.UpdateDoc(inst, &chat)
Expand All @@ -150,3 +167,34 @@ func callRAGQuery(inst *instance.Instance, payload map[string]interface{}) (*htt
req.Header.Add("Content-Type", echo.MIMEApplicationJSON)
return http.DefaultClient.Do(req)
}

func foreachSSE(r io.Reader, fn func(event map[string]interface{})) error {
rb := bufio.NewReader(r)
for {
bs, err := rb.ReadBytes('\n')
if errors.Is(err, io.EOF) {
return nil
}
if err != nil {
return err
}
if bytes.Equal(bs, []byte("\n")) {
continue
}
if bytes.HasPrefix(bs, []byte(":")) {
continue
}
parts := bytes.SplitN(bs, []byte(": "), 2)
if len(parts) != 2 {
return errors.New("invalid SSE response")
}
if string(parts[0]) != "data" {
return errors.New("invalid SSE response")
}
var event map[string]interface{}
if err := json.Unmarshal(bytes.TrimSpace(parts[1]), &event); err != nil {
return err
}
fn(event)
}
}
2 changes: 2 additions & 0 deletions pkg/consts/doctype.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,6 @@ const (
NextCloudFiles = "io.cozy.remote.nextcloud.files"
// ChatCompletions doc type is used for a chat between the user and a chatbot.
ChatCompletions = "io.cozy.ai.chat.completions"
// ChatEvents doc type is used for RAG events about a chat session.
ChatEvents = "io.cozy.ai.chat.events"
)

0 comments on commit 036ca1c

Please sign in to comment.