Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(artifact): add retry in minIO and milvus #109

Merged
merged 1 commit into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/acl/acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func (c *ACLClient) ListPermissions(ctx context.Context, objectType string, role
Relation: role,
Type: objectType,
})
// TODO: handle error when no model is created
// TODO: handle error when no auth model is created
if err != nil {
if statusErr, ok := status.FromError(err); ok {
if statusErr.Code() == codes.Code(openfga.ErrorCode_type_not_found) {
Expand Down
16 changes: 11 additions & 5 deletions pkg/handler/knowledgebasefiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@ import (
"strings"

"github.com/gofrs/uuid"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"
"gorm.io/gorm"

"github.com/instill-ai/artifact-backend/pkg/constant"
"github.com/instill-ai/artifact-backend/pkg/customerror"
"github.com/instill-ai/artifact-backend/pkg/logger" // Add this import
"github.com/instill-ai/artifact-backend/pkg/logger"
"github.com/instill-ai/artifact-backend/pkg/repository"
"github.com/instill-ai/artifact-backend/pkg/resource"
"github.com/instill-ai/artifact-backend/pkg/utils"

artifactpb "github.com/instill-ai/protogen-go/artifact/artifact/v1alpha"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"
"gorm.io/gorm"
)

func (ph *PublicHandler) UploadCatalogFile(ctx context.Context, req *artifactpb.UploadCatalogFileRequest) (*artifactpb.UploadCatalogFileResponse, error) {
Expand All @@ -29,7 +31,11 @@ func (ph *PublicHandler) UploadCatalogFile(ctx context.Context, req *artifactpb.
if err != nil {
return nil, err
}

// check file name length based on character count
if len(req.File.Name) > 255 {
return nil, fmt.Errorf("file name is too long. max length is 255. name: %s err: %w",
req.File.Name, customerror.ErrInvalidArgument)
}
// determine the file type by its extension
req.File.Type = DetermineFileType(req.File.Name)
if req.File.Type == artifactpb.FileType_FILE_TYPE_UNSPECIFIED {
Expand Down
20 changes: 20 additions & 0 deletions pkg/handler/knowledgebasefiles_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package handler

import (
"fmt"
"testing"
"unicode/utf8"
)

func TestUploadCatalogFile(t *testing.T) {
// Check rune count
input := "-"

actual := utf8.RuneCountInString(input)
// print actual
fmt.Println(actual)

// check string length
expected := len(input)
fmt.Println(expected)
}
29 changes: 25 additions & 4 deletions pkg/milvus/milvus.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,17 @@ func (m *MilvusClient) CreateKnowledgeBaseCollection(ctx context.Context, kbUID

// InsertVectorsToKnowledgeBaseCollection
func (m *MilvusClient) InsertVectorsToKnowledgeBaseCollection(ctx context.Context, kbUID string, embeddings []Embedding) error {
logger, _ := logger.GetZapLogger(ctx)
collectionName := m.GetKnowledgeBaseCollectionName(kbUID)

// Check if the collection exists
has, err := m.c.HasCollection(ctx, collectionName)
if err != nil {
logger.Error("Failed to check collection existence", zap.Error(err))
return fmt.Errorf("failed to check collection existence: %w", err)
}
if !has {
logger.Error("Collection does not exist", zap.String("collection", collectionName))
return fmt.Errorf("collection %s does not exist", collectionName)
}

Expand Down Expand Up @@ -180,18 +183,36 @@ func (m *MilvusClient) InsertVectorsToKnowledgeBaseCollection(ctx context.Contex
entity.NewColumnFloatVector(KbCollectionFiledEmbedding, VectorDim, vectors),
}

// Insert the data
_, err = m.c.Upsert(ctx, collectionName, "", columns...)
// Insert the data with retry
maxRetries := 3
for attempt := 1; attempt <= maxRetries; attempt++ {
_, err = m.c.Upsert(ctx, collectionName, "", columns...)
if err == nil {
break
}
logger.Warn("Failed to insert vectors, retrying", zap.Int("attempt", attempt), zap.Error(err))
time.Sleep(time.Second * time.Duration(attempt))
}
if err != nil {
logger.Error("Failed to insert vectors after retries", zap.Error(err))
return fmt.Errorf("failed to insert vectors: %w", err)
}

// Optionally, you can flush the collection to ensure the data is persisted
err = m.c.Flush(ctx, collectionName, false)
// Flush the collection with retry
for attempt := 1; attempt <= maxRetries; attempt++ {
err = m.c.Flush(ctx, collectionName, false)
if err == nil {
break
}
logger.Warn("Failed to flush collection, retrying", zap.Int("attempt", attempt), zap.Error(err))
time.Sleep(time.Second * time.Duration(attempt))
}
if err != nil {
logger.Error("Failed to flush collection after retries", zap.Error(err))
return fmt.Errorf("failed to flush collection after insertion: %w", err)
}

logger.Info("Successfully inserted and flushed vectors", zap.String("collection", collectionName))
return nil
}

Expand Down
15 changes: 12 additions & 3 deletions pkg/minio/knowledgebase.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"fmt"
"sync"

"github.com/instill-ai/artifact-backend/pkg/logger"
"github.com/instill-ai/artifact-backend/pkg/utils"
"go.uber.org/zap"
)

// KnowledgeBaseI is the interface for knowledge base related operations.
Expand Down Expand Up @@ -56,8 +58,13 @@ type ChunkContentType []byte

// SaveTextChunks saves batch of chunks(text files) to MinIO.
func (m *Minio) SaveTextChunks(ctx context.Context, kbUID string, chunks map[ChunkUIDType]ChunkContentType) error {
logger, _ := logger.GetZapLogger(ctx)
var wg sync.WaitGroup
errorUIDChan := make(chan string, len(chunks))
type ChunkError struct {
ChunkUID string
ErrorMessage string
}
errorUIDChan := make(chan ChunkError, len(chunks))
for chunkUID, chunkContent := range chunks {
wg.Add(1)
go utils.GoRecover(func() {
Expand All @@ -67,19 +74,21 @@ func (m *Minio) SaveTextChunks(ctx context.Context, kbUID string, chunks map[Chu

err := m.UploadBase64File(ctx, filePathName, base64.StdEncoding.EncodeToString(chunkContent), "text/plain")
if err != nil {
errorUIDChan <- string(chunkUID)
logger.Error("Failed to upload chunk after retries", zap.String("chunkUID", string(chunkUID)), zap.Error(err))
errorUIDChan <- ChunkError{ChunkUID: string(chunkUID), ErrorMessage: err.Error()}
return
}
}(chunkUID, chunkContent)
}, fmt.Sprintf("SaveTextChunks %s", chunkUID))
}
wg.Wait()
close(errorUIDChan)
var errStr []string
var errStr []ChunkError
for err := range errorUIDChan {
errStr = append(errStr, err)
}
if len(errStr) > 0 {
logger.Error("Failed to upload chunks", zap.Any("ChunkError", errStr))
return fmt.Errorf("failed to upload chunks: %v", errStr)
}
return nil
Expand Down
95 changes: 73 additions & 22 deletions pkg/minio/minio.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"path/filepath"
"strings"
"sync"
"time"

"github.com/instill-ai/artifact-backend/config"
log "github.com/instill-ai/artifact-backend/pkg/logger"
Expand Down Expand Up @@ -92,9 +93,16 @@ func (m *Minio) UploadBase64File(ctx context.Context, filePathName string, base6
// Upload the content to MinIO
size := int64(len(decodedContent))
// Create the file path with folder structure
_, err = m.client.PutObjectWithContext(ctx, m.bucket, filePathName, contentReader, size, minio.PutObjectOptions{ContentType: fileMimeType})
for i := 0; i < 3; i++ {
_, err = m.client.PutObjectWithContext(ctx, m.bucket, filePathName, contentReader, size, minio.PutObjectOptions{ContentType: fileMimeType})
if err == nil {
break
}
log.Error("Failed to upload file to MinIO, retrying...", zap.String("attempt", fmt.Sprintf("%d", i+1)), zap.Error(err))
time.Sleep(1 * time.Second)
}
if err != nil {
log.Error("Failed to upload file to MinIO", zap.Error(err))
log.Error("Failed to upload file to MinIO after retries", zap.Error(err))
return err
}
return nil
Expand All @@ -107,7 +115,14 @@ func (m *Minio) DeleteFile(ctx context.Context, filePathName string) (err error)
return err
}
// Delete the file from MinIO
err = m.client.RemoveObject(m.bucket, filePathName)
for attempt := 1; attempt <= 3; attempt++ {
err = m.client.RemoveObject(m.bucket, filePathName)
if err == nil {
break
}
log.Error("Failed to delete file from MinIO, retrying...", zap.String("filePathName", filePathName), zap.Int("attempt", attempt), zap.Error(err))
time.Sleep(time.Duration(attempt) * time.Second)
}
if err != nil {
log.Error("Failed to delete file from MinIO", zap.Error(err))
return err
Expand All @@ -132,7 +147,15 @@ func (m *Minio) DeleteFiles(ctx context.Context, filePathNames []string) chan er
func() {
func(filePathName string, errCh chan error) {
defer wg.Done()
err := m.client.RemoveObject(m.bucket, filePathName)
var err error
for attempt := 1; attempt <= 3; attempt++ {
err = m.client.RemoveObject(m.bucket, filePathName)
if err == nil {
break
}
log.Error("Failed to delete file from MinIO, retrying...", zap.String("filePathName", filePathName), zap.Int("attempt", attempt), zap.Error(err))
time.Sleep(time.Duration(attempt) * time.Second)
}
if err != nil {
log.Error("Failed to delete file from MinIO", zap.Error(err))
errCh <- err
Expand All @@ -151,10 +174,18 @@ func (m *Minio) GetFile(ctx context.Context, filePathName string) ([]byte, error
return nil, err
}

// Get the object using the client
object, err := m.client.GetObject(m.bucket, filePathName, minio.GetObjectOptions{})
// Get the object using the client with three attempts and proper time delay
var object *minio.Object
for attempt := 1; attempt <= 3; attempt++ {
object, err = m.client.GetObject(m.bucket, filePathName, minio.GetObjectOptions{})
if err == nil {
break
}
log.Error("Failed to get file from MinIO, retrying...", zap.String("filePathName", filePathName), zap.Int("attempt", attempt), zap.Error(err))
time.Sleep(time.Duration(attempt) * time.Second)
}
if err != nil {
log.Error("Failed to get file from MinIO", zap.Error(err))
log.Error("Failed to get file from MinIO after 3 attempts", zap.String("filePathName", filePathName), zap.Error(err))
return nil, err
}
defer object.Close()
Expand Down Expand Up @@ -184,21 +215,27 @@ func (m *Minio) GetFilesByPaths(ctx context.Context, filePaths []string) ([]File
}

var wg sync.WaitGroup
files := make([]FileContent, len(filePaths))
errors := make([]error, len(filePaths))
var mu sync.Mutex
fileCh := make(chan FileContent, len(filePaths))
errorCh := make(chan error, len(filePaths))

for i, path := range filePaths {
for _, path := range filePaths {
wg.Add(1)
go utils.GoRecover(func() {
func(index int, filePath string) {
func(filePath string) {
defer wg.Done()
obj, err := m.client.GetObject(m.bucket, filePath, minio.GetObjectOptions{})
var obj *minio.Object
var err error
for attempt := 1; attempt <= 3; attempt++ {
obj, err = m.client.GetObject(m.bucket, filePath, minio.GetObjectOptions{})
if err == nil {
break
}
log.Error("Failed to get object from MinIO, retrying...", zap.String("path", filePath), zap.Int("attempt", attempt), zap.Error(err))
time.Sleep(time.Duration(attempt) * time.Second)
}
if err != nil {
log.Error("Failed to get object from MinIO", zap.String("path", filePath), zap.Error(err))
mu.Lock()
errors[index] = err
mu.Unlock()
errorCh <- err
return
}
defer obj.Close()
Expand All @@ -207,22 +244,28 @@ func (m *Minio) GetFilesByPaths(ctx context.Context, filePaths []string) ([]File
_, err = io.Copy(&buffer, obj)
if err != nil {
log.Error("Failed to read object content", zap.String("path", filePath), zap.Error(err))
errors[index] = err
errorCh <- err
return
}

files[index] = FileContent{
fileCh <- FileContent{
Name: filepath.Base(filePath),
Content: buffer.Bytes(),
}
}(i, path)
}(path)
}, fmt.Sprintf("GetFilesByPaths %s", path))
}

wg.Wait()
close(fileCh)
close(errorCh)

var files []FileContent
for file := range fileCh {
files = append(files, file)
}

// Check if any errors occurred
for _, err := range errors {
for err := range errorCh {
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -257,7 +300,15 @@ func (m *Minio) DeleteFilesWithPrefix(ctx context.Context, prefix string) chan e
go utils.GoRecover(func() {
func(objectName string) {
defer wg.Done()
err := m.client.RemoveObject(m.bucket, objectName)
var err error
for attempt := 1; attempt <= 3; attempt++ {
err = m.client.RemoveObject(m.bucket, objectName)
if err == nil {
break
}
log.Error("Failed to delete object from MinIO, retrying...", zap.String("object", objectName), zap.Int("attempt", attempt), zap.Error(err))
time.Sleep(time.Duration(attempt) * time.Second)
}
if err != nil {
log.Error("Failed to delete object from MinIO", zap.String("object", objectName), zap.Error(err))
errCh <- err
Expand Down
2 changes: 0 additions & 2 deletions pkg/service/mgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func (s *Service) GetNamespaceByNsID(ctx context.Context, nsID string) (*resourc
return &ns, nil
}

// TODO: GetNamespaceTierByNsID: in the future, this logic should be removed in CE. Because CE does not have subscription
// GetNamespaceTierByNsID returns the tier of the namespace given the namespace ID
func (s *Service) GetNamespaceTierByNsID(ctx context.Context, nsID string) (Tier, error) {
ns, err := s.GetNamespaceByNsID(ctx, nsID)
Expand All @@ -55,7 +54,6 @@ func (s *Service) GetNamespaceTierByNsID(ctx context.Context, nsID string) (Tier
return s.GetNamespaceTier(ctx, ns)
}

// TODO: GetNamespaceTier: in the future, this logic should be removed in CE. Because CE does not have subscription
func (s *Service) GetNamespaceTier(ctx context.Context, ns *resource.Namespace) (Tier, error) {
log, _ := logger.GetZapLogger(ctx)
switch ns.NsType {
Expand Down
4 changes: 2 additions & 2 deletions pkg/service/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,11 @@ func GetVectorsFromResponse(resp *pipelinePb.TriggerNamespacePipelineReleaseResp
for _, output := range resp.Outputs {
embedResult, ok := output.GetFields()["embed_result"]
if !ok {
return nil, fmt.Errorf("embed_result not found in the output fields. resp: %v", resp)
return nil, fmt.Errorf("embed_result not found in the output fields. output: %v", output)
}
listValue := embedResult.GetListValue()
if listValue == nil {
return nil, fmt.Errorf("embed_result is not a list. resp: %v", resp)
return nil, fmt.Errorf("embed_result is not a list. output: %v", output)
}

vector := make([]float32, 0, len(listValue.GetValues()))
Expand Down
4 changes: 2 additions & 2 deletions pkg/usage/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (u *usage) RetrieveArtifactUsageData() interface{} {

// Roll all artifact resources on a user
// for _, user := range userResp.GetUsers() {
//TODO: implement the logic to retrieve the artifact usage data
//TODO: implement the logic to retrieve the app usage data
// }

if userResp.NextPageToken == "" {
Expand All @@ -115,7 +115,7 @@ func (u *usage) RetrieveArtifactUsageData() interface{} {

// Roll all artifact resources on an org
// for _, org := range orgResp.GetOrganizations() {
//TODO: implement the logic to retrieve the artifact usage data
//TODO: implement the logic to retrieve the app usage data
// }

if orgResp.NextPageToken == "" {
Expand Down
Loading
Loading