From 70d5a7fca9f9ee903610669eb42ea01518bdb31d Mon Sep 17 00:00:00 2001 From: curi-adi Date: Tue, 19 Mar 2024 21:53:12 +0530 Subject: [PATCH 1/2] Add Anthropic integration for chat streaming#172 --- pkg/providers/anthropic/chat_stream.go | 93 +++++++++++++++++++++++++- 1 file changed, 90 insertions(+), 3 deletions(-) diff --git a/pkg/providers/anthropic/chat_stream.go b/pkg/providers/anthropic/chat_stream.go index 7ca861c1..12a9a29f 100644 --- a/pkg/providers/anthropic/chat_stream.go +++ b/pkg/providers/anthropic/chat_stream.go @@ -2,15 +2,102 @@ package anthropic import ( "context" + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" "glide/pkg/api/schemas" "glide/pkg/providers/clients" ) +type Client struct { + APIKey string +} + func (c *Client) SupportChatStream() bool { - return false + return true +} + +func (c *Client) ChatStream(ctx context.Context, chatReq *schemas.ChatRequest) (clients.ChatStream, error) { + apiURL := "https://api.anthropic.com/v1/complete" + requestBody := map[string]interface{}{ + "model": "claude-2", + "prompt": chatReq.Message, // Assuming chatReq.Message contains the user's message + "max_tokens_to_sample": 256, + "stream": true, + } + requestJSON, err := json.Marshal(requestBody) + if err != nil { + return nil, err + } + + httpReq, err := http.NewRequestWithContext(ctx, "POST", apiURL, bytes.NewBuffer(requestJSON)) + if err != nil { + return nil, err + } + httpReq.Header.Set("Content-Type", "application/json") + httpReq.Header.Set("anthropic-version", "2023-06-01") + httpReq.Header.Set("x-api-key", c.APIKey) + + resp, err := http.DefaultClient.Do(httpReq) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) + } + + chatStream := &AnthropicChatStream{ + responseBody: resp.Body, + } + + return chatStream, nil +} + +type AnthropicChatStream struct { + responseBody io.ReadCloser +} + +func (s *AnthropicChatStream) Receive() (string, error) { + decoder := json.NewDecoder(s.responseBody) + for { + var event map[string]interface{} + if err := decoder.Decode(&event); err != nil { + if err == io.EOF { + return "", nil // No more events, return nil error + } + return "", err + } + + eventType, ok := event["type"].(string) + if !ok { + return "", fmt.Errorf("missing event type") + } + + switch eventType { + case "completion": + completionData, ok := event["completion"].(string) + if !ok { + return "", fmt.Errorf("missing completion data") + } + return completionData, nil + case "error": + errorData, ok := event["error"].(map[string]interface{}) + if !ok { + return "", fmt.Errorf("missing error data") + } + errorType, _ := errorData["type"].(string) + errorMessage, _ := errorData["message"].(string) + return "", fmt.Errorf("error from Anthropic API: %s - %s", errorType, errorMessage) + } + } } -func (c *Client) ChatStream(_ context.Context, _ *schemas.ChatRequest) (clients.ChatStream, error) { - return nil, clients.ErrChatStreamNotImplemented +func (s *AnthropicChatStream) Close() error { + return s.responseBody.Close() } From 07d942798aedbd66b428e09ea2679a98b9e3597a Mon Sep 17 00:00:00 2001 From: curi-adi Date: Mon, 25 Mar 2024 15:57:17 +0530 Subject: [PATCH 2/2] updating the changes to Anthropic Provider as suggested by Roman --- pkg/providers/anthropic/chat_stream.go | 146 +++++++++++-------------- 1 file changed, 64 insertions(+), 82 deletions(-) diff --git a/pkg/providers/anthropic/chat_stream.go b/pkg/providers/anthropic/chat_stream.go index 12a9a29f..3c9ef104 100644 --- a/pkg/providers/anthropic/chat_stream.go +++ b/pkg/providers/anthropic/chat_stream.go @@ -1,103 +1,85 @@ package anthropic import ( - "context" - "bytes" - "context" - "encoding/json" - "fmt" - "io" - "net/http" + "context" + "encoding/json" + "fmt" + "io" + "net/http" - "glide/pkg/api/schemas" - "glide/pkg/providers/clients" + "glide/pkg/api/schemas" + "glide/pkg/providers/clients" + "glide/pkg/telemetry" + "go.uber.org/zap" ) -type Client struct { - APIKey string -} - func (c *Client) SupportChatStream() bool { return true } -func (c *Client) ChatStream(ctx context.Context, chatReq *schemas.ChatRequest) (clients.ChatStream, error) { - apiURL := "https://api.anthropic.com/v1/complete" - requestBody := map[string]interface{}{ - "model": "claude-2", - "prompt": chatReq.Message, // Assuming chatReq.Message contains the user's message - "max_tokens_to_sample": 256, - "stream": true, - } - requestJSON, err := json.Marshal(requestBody) - if err != nil { - return nil, err - } - - httpReq, err := http.NewRequestWithContext(ctx, "POST", apiURL, bytes.NewBuffer(requestJSON)) - if err != nil { - return nil, err - } - httpReq.Header.Set("Content-Type", "application/json") - httpReq.Header.Set("anthropic-version", "2023-06-01") - httpReq.Header.Set("x-api-key", c.APIKey) - - resp, err := http.DefaultClient.Do(httpReq) - if err != nil { - return nil, err - } - defer resp.Body.Close() +type AnthropicChatStream struct { + tel *telemetry.Telemetry + client *http.client + request *http.Request + response *http.Response + errMapper *ErrorMapper +} - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) - } +func NewAnthropicChatStream(tel *telemetry.Telemetry, *http.client, request *http.Request, errMapper *ErrorMapper) *AnthropicChatStream { + return &AnthropicChatStream{ + tel: tel, + client: client, + request: request, + errMapper: errMapper, + } +} - chatStream := &AnthropicChatStream{ - responseBody: resp.Body, - } +// Open makes the HTTP request using the provided http.Client to initiate the chat stream. +func (s *AnthropicChatStream) Open(ctx context.Context) error { + resp, err := s.client.Do(s.request) + if err != nil { + s.tel.L().Error("Failed to open chat stream", zap.Error(err)) + // Map and return the error using errMapper, if errMapper is defined. + return s.errMapper.Map(err) + } - return chatStream, nil -} + if resp.StatusCode != http.StatusOK { + resp.Body.Close() + s.tel.L().Warn("Unexpected status code", zap.Int("status", resp.StatusCode)) + return fmt.Errorf("unexpected status code: %d", resp.StatusCode) + } -type AnthropicChatStream struct { - responseBody io.ReadCloser + s.response = resp + s.tel.L().Info("Chat stream opened successfully") + return nil } -func (s *AnthropicChatStream) Receive() (string, error) { - decoder := json.NewDecoder(s.responseBody) - for { - var event map[string]interface{} - if err := decoder.Decode(&event); err != nil { - if err == io.EOF { - return "", nil // No more events, return nil error - } - return "", err - } +// Recv listens for and decodes incoming messages from the chat stream into ChatStreamChunk objects. +func (s *AnthropicChatStream) Recv() (*schemas.ChatStreamChunk, error) { + if s.response == nil { + s.tel.L().Error("Attempted to receive from an unopened stream") + return nil, fmt.Errorf("stream not opened") + } - eventType, ok := event["type"].(string) - if !ok { - return "", fmt.Errorf("missing event type") - } + decoder := json.NewDecoder(s.response.Body) + var chunk schemas.ChatStreamChunk + if err := decoder.Decode(&chunk); err != nil { + if err == io.EOF { + s.tel.L().Info("Chat stream ended") + return nil, nil // Stream ended normally. + } + s.tel.L().Error("Error during stream processing", zap.Error(err)) + return nil, err // An error occurred during stream processing. + } - switch eventType { - case "completion": - completionData, ok := event["completion"].(string) - if !ok { - return "", fmt.Errorf("missing completion data") - } - return completionData, nil - case "error": - errorData, ok := event["error"].(map[string]interface{}) - if !ok { - return "", fmt.Errorf("missing error data") - } - errorType, _ := errorData["type"].(string) - errorMessage, _ := errorData["message"].(string) - return "", fmt.Errorf("error from Anthropic API: %s - %s", errorType, errorMessage) - } - } + return &chunk, nil } +// Close ensures the chat stream is properly terminated by closing the response body. func (s *AnthropicChatStream) Close() error { - return s.responseBody.Close() -} + if s.response != nil { + s.tel.L().Info("Closing chat stream") + return s.response.Body.Close() + } + return nil +} \ No newline at end of file