Skip to content

Commit

Permalink
feat(artifact): support csv file type upload
Browse files Browse the repository at this point in the history
  • Loading branch information
Yougigun committed Sep 20, 2024
1 parent 0eef0cc commit 00d3f65
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 37 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1
github.com/influxdata/influxdb-client-go/v2 v2.12.3
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20240826094216-ad773d684498
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20240920071518-c77545942bf5
github.com/instill-ai/usage-client v0.3.0-alpha.0.20240319060111-4a3a39f2fd61
github.com/instill-ai/x v0.3.0-alpha.0.20231219052200-6230a89e386c
github.com/knadh/koanf v1.5.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,8 @@ github.com/influxdata/influxdb-client-go/v2 v2.12.3 h1:28nRlNMRIV4QbtIUvxhWqaxn0
github.com/influxdata/influxdb-client-go/v2 v2.12.3/go.mod h1:IrrLUbCjjfkmRuaCiGQg4m2GbkaeJDcuWoxiWdQEbA0=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20240826094216-ad773d684498 h1:uk5MtLSOAif+Y4rcY+f47LtxX2nPcIXScZaKzzBzKEE=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20240826094216-ad773d684498/go.mod h1:2blmpUwiTwxIDnrjIqT6FhR5ewshZZF554wzjXFvKpQ=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20240920071518-c77545942bf5 h1:PQK+m0eT7HEMbBJT5PDXj2jyaYrLd2qR5V7D5SNOU8o=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20240920071518-c77545942bf5/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY=
github.com/instill-ai/usage-client v0.3.0-alpha.0.20240319060111-4a3a39f2fd61 h1:smPTvmXDhn/QC7y/TPXyMTqbbRd0gvzmFgWBChwTfhE=
github.com/instill-ai/usage-client v0.3.0-alpha.0.20240319060111-4a3a39f2fd61/go.mod h1:/TAHs4ybuylk5icuy+MQtHRc4XUnIyXzeNKxX9qDFhw=
github.com/instill-ai/x v0.3.0-alpha.0.20231219052200-6230a89e386c h1:a2RVkpIV2QcrGnSHAou+t/L+vBsaIfFvk5inVg5Uh4s=
Expand Down
4 changes: 4 additions & 0 deletions pkg/handler/knowledgebasefiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,8 @@ func fileTypeConvertToMime(t artifactpb.FileType) string {
return "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
case artifactpb.FileType_FILE_TYPE_XLS:
return "application/vnd.ms-excel"
case artifactpb.FileType_FILE_TYPE_CSV:
return "text/csv"
default:
return "application/octet-stream"
}
Expand Down Expand Up @@ -567,6 +569,8 @@ func DetermineFileType(fileName string) artifactpb.FileType {
return artifactpb.FileType_FILE_TYPE_XLSX
} else if strings.HasSuffix(fileName, ".xls") {
return artifactpb.FileType_FILE_TYPE_XLS
} else if strings.HasSuffix(fileName, ".csv") {
return artifactpb.FileType_FILE_TYPE_CSV
}
return artifactpb.FileType_FILE_TYPE_UNSPECIFIED
}
6 changes: 4 additions & 2 deletions pkg/repository/knowledgebasefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,8 @@ func (r *Repository) GetSourceTableAndUIDByFileUIDs(ctx context.Context, files [
artifactpb.FileType_FILE_TYPE_PPT.String(),
artifactpb.FileType_FILE_TYPE_PPTX.String(),
artifactpb.FileType_FILE_TYPE_XLSX.String(),
artifactpb.FileType_FILE_TYPE_XLS.String():
artifactpb.FileType_FILE_TYPE_XLS.String(),
artifactpb.FileType_FILE_TYPE_CSV.String():
convertedFile, err := r.GetConvertedFileByFileUID(ctx, file.UID)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
Expand Down Expand Up @@ -542,7 +543,8 @@ func (r *Repository) GetTruthSourceByFileUID(ctx context.Context, fileUID uuid.U
artifactpb.FileType_FILE_TYPE_PPT.String(),
artifactpb.FileType_FILE_TYPE_PPTX.String(),
artifactpb.FileType_FILE_TYPE_XLSX.String(),
artifactpb.FileType_FILE_TYPE_XLS.String():
artifactpb.FileType_FILE_TYPE_XLS.String(),
artifactpb.FileType_FILE_TYPE_CSV.String():
convertedFile, err := r.GetConvertedFileByFileUID(ctx, fileUID)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/service/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ func (s *Service) GetChunksByFile(ctx context.Context, file *repository.Knowledg
artifactpb.FileType_FILE_TYPE_PPT.String(),
artifactpb.FileType_FILE_TYPE_PPTX.String(),
artifactpb.FileType_FILE_TYPE_XLSX.String(),
artifactpb.FileType_FILE_TYPE_XLS.String():
artifactpb.FileType_FILE_TYPE_XLS.String(),
artifactpb.FileType_FILE_TYPE_CSV.String():
// set the sourceTable and sourceUID
convertedFile, err := s.Repository.GetConvertedFileByFileUID(ctx, file.UID)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/service/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ func (s *Service) ConvertToMDPipe(ctx context.Context, caller uuid.UUID, request
prefix = "data:application/vnd.openxmlformats-officedocument.spreadsheetml.sheet;base64,"
} else if fileType == artifactPb.FileType_FILE_TYPE_XLS {
prefix = "data:application/vnd.ms-excel;base64,"
} else if fileType == artifactPb.FileType_FILE_TYPE_CSV {
prefix = "data:text/csv;base64,"
}

req := &pipelinePb.TriggerNamespacePipelineReleaseRequest{
Expand Down
66 changes: 35 additions & 31 deletions pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,8 +405,9 @@ func (wp *fileToEmbWorkerPool) processWaitingFile(ctx context.Context, file repo
artifactpb.FileType_FILE_TYPE_PPTX.String(),
artifactpb.FileType_FILE_TYPE_HTML.String(),
artifactpb.FileType_FILE_TYPE_XLSX.String(),
artifactpb.FileType_FILE_TYPE_XLS.String():

artifactpb.FileType_FILE_TYPE_XLS.String(),
artifactpb.FileType_FILE_TYPE_CSV.String():
// update the file status to converting status in database
updateMap := map[string]interface{}{
repository.KnowledgeBaseFileColumn.ProcessStatus: artifactpb.FileProcessStatus_name[int32(artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_CONVERTING)],
}
Expand Down Expand Up @@ -452,6 +453,16 @@ func (wp *fileToEmbWorkerPool) processConvertingFile(ctx context.Context, file r
// encode data to base64
base64Data := base64.StdEncoding.EncodeToString(data)

// save the converting pipeline metadata into database
convertingPipelineMetadata := service.NamespaceID + "/" + service.ConvertPDFToMDPipelineID + "@" + service.PDFToMDVersion
err = wp.svc.Repository.UpdateKbFileExtraMetaData(ctx, file.UID, "", convertingPipelineMetadata, "", "", nil, nil, nil, nil)
if err != nil {
logger.Error("Failed to save converting pipeline metadata.", zap.String("File uid:", file.UID.String()))
return nil,
artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED,
fmt.Errorf("failed to save converting pipeline metadata: %w", err)
}

// convert the pdf file to md
requesterUID := file.RequesterUID
convertedMD, err := wp.svc.ConvertToMDPipe(ctx, file.CreatorUID, requesterUID, base64Data, artifactpb.FileType(artifactpb.FileType_value[file.Type]))
Expand All @@ -466,15 +477,6 @@ func (wp *fileToEmbWorkerPool) processConvertingFile(ctx context.Context, file r
logger.Error("Failed to save converted data.", zap.String("File path", fileInMinIOPath))
return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err
}
// save the converting pipeline metadata into database
convertingPipelineMetadata := service.NamespaceID + "/" + service.ConvertPDFToMDPipelineID + "@" + service.PDFToMDVersion
err = wp.svc.Repository.UpdateKbFileExtraMetaData(ctx, file.UID, "", convertingPipelineMetadata, "", "", nil, nil, nil, nil)
if err != nil {
logger.Error("Failed to save converting pipeline metadata.", zap.String("File uid:", file.UID.String()))
return nil,
artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED,
fmt.Errorf("failed to save converting pipeline metadata: %w", err)
}
// update the file status to chunking status in database
updateMap := map[string]interface{}{
repository.KnowledgeBaseFileColumn.ProcessStatus: artifactpb.FileProcessStatus_name[int32(artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_CHUNKING)],
Expand Down Expand Up @@ -510,7 +512,8 @@ func (wp *fileToEmbWorkerPool) processChunkingFile(ctx context.Context, file rep
artifactpb.FileType_FILE_TYPE_PPTX.String(),
artifactpb.FileType_FILE_TYPE_HTML.String(),
artifactpb.FileType_FILE_TYPE_XLSX.String(),
artifactpb.FileType_FILE_TYPE_XLS.String():
artifactpb.FileType_FILE_TYPE_XLS.String(),
artifactpb.FileType_FILE_TYPE_CSV.String():
// get the converted file metadata from database
convertedFile, err := wp.svc.Repository.GetConvertedFileByFileUID(ctx, file.UID)
if err != nil {
Expand Down Expand Up @@ -606,6 +609,16 @@ func (wp *fileToEmbWorkerPool) processChunkingFile(ctx context.Context, file rep
return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err
}

// save chunking pipeline metadata into file's extra metadata
chunkingPipelineMetadata := service.NamespaceID + "/" + service.MdSplitPipelineID + "@" + service.MdSplitVersion
err = wp.svc.Repository.UpdateKbFileExtraMetaData(ctx, file.UID, "", "", chunkingPipelineMetadata, "", nil, nil, nil, nil)
if err != nil {
logger.Error("Failed to save chunking pipeline metadata.", zap.String("File uid:", file.UID.String()))
return nil,
artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED,
fmt.Errorf("failed to save chunking pipeline metadata: %w", err)
}

// Call the text chunking pipeline
requesterUID := file.RequesterUID
chunks, err := wp.svc.SplitMarkdownPipe(ctx, file.CreatorUID, requesterUID, string(originalFile))
Expand All @@ -621,16 +634,6 @@ func (wp *fileToEmbWorkerPool) processChunkingFile(ctx context.Context, file rep
return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err
}

// save chunking pipeline metadata into file's extra metadata
chunkingPipelineMetadata := service.NamespaceID + "/" + service.MdSplitPipelineID + "@" + service.MdSplitVersion
err = wp.svc.Repository.UpdateKbFileExtraMetaData(ctx, file.UID, "", "", chunkingPipelineMetadata, "", nil, nil, nil, nil)
if err != nil {
logger.Error("Failed to save chunking pipeline metadata.", zap.String("File uid:", file.UID.String()))
return nil,
artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED,
fmt.Errorf("failed to save chunking pipeline metadata: %w", err)
}

// update the file status to embedding status in database
updateMap := map[string]interface{}{
repository.KnowledgeBaseFileColumn.ProcessStatus: artifactpb.FileProcessStatus_name[int32(artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_EMBEDDING)],
Expand Down Expand Up @@ -671,6 +674,16 @@ func (wp *fileToEmbWorkerPool) processEmbeddingFile(ctx context.Context, file re
}
}

// save embedding pipeline metadata into file's extra metadata
embeddingPipelineMetadata := service.NamespaceID + "/" + service.TextEmbedPipelineID + "@" + service.TextEmbedVersion
err = wp.svc.Repository.UpdateKbFileExtraMetaData(ctx, file.UID, "", "", "", embeddingPipelineMetadata, nil, nil, nil, nil)
if err != nil {
logger.Error("Failed to save embedding pipeline metadata.", zap.String("File uid:", file.UID.String()))
return nil,
artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED,
fmt.Errorf("failed to save embedding pipeline metadata: %w", err)
}

// call the embedding pipeline
requesterUID := file.RequesterUID
vectors, err := wp.svc.EmbeddingTextPipe(ctx, file.CreatorUID, requesterUID, texts)
Expand All @@ -697,15 +710,6 @@ func (wp *fileToEmbWorkerPool) processEmbeddingFile(ctx context.Context, file re
return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err
}

// save embedding pipeline metadata into file's extra metadata
embeddingPipelineMetadata := service.NamespaceID + "/" + service.TextEmbedPipelineID + "@" + service.TextEmbedVersion
err = wp.svc.Repository.UpdateKbFileExtraMetaData(ctx, file.UID, "", "", "", embeddingPipelineMetadata, nil, nil, nil, nil)
if err != nil {
logger.Error("Failed to save embedding pipeline metadata.", zap.String("File uid:", file.UID.String()))
return nil,
artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED,
fmt.Errorf("failed to save embedding pipeline metadata: %w", err)
}

// update the file status to complete status in database
updateMap := map[string]interface{}{
Expand Down

0 comments on commit 00d3f65

Please sign in to comment.