Skip to content

Commit

Permalink
feat(catalog): implement conversation and message api (#77)
Browse files Browse the repository at this point in the history
Because

catalog needs to supoort conversation and message data 

This commit

implemented the related APIs
Yougigun authored Aug 19, 2024
1 parent 2c5fdf9 commit e02b1f1
Showing 21 changed files with 13,372 additions and 7,957 deletions.
4 changes: 4 additions & 0 deletions cmd/main/main.go
Original file line number Diff line number Diff line change
@@ -83,6 +83,10 @@ func grpcHandlerFunc(grpcServer *grpc.Server, gwHandler http.Handler) http.Handl
}

func main() {
// gorm's autoUpdate will use local timezone by default, so we need to set it to UTC
time.Local = time.UTC

// Initialize config
if err := config.Init(); err != nil {
log.Fatal(err.Error())
}
2 changes: 1 addition & 1 deletion config/config.yaml
Original file line number Diff line number Diff line change
@@ -22,7 +22,7 @@ database:
host: pg-sql
port: 5432
name: artifact
version: 14
version: 15
timezone: Etc/UTC
pool:
idleconnections: 5
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1
github.com/influxdata/influxdb-client-go/v2 v2.12.3
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20240808093014-75008c807ea7
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20240816101745-44cbb332d242
github.com/instill-ai/usage-client v0.3.0-alpha.0.20240319060111-4a3a39f2fd61
github.com/instill-ai/x v0.3.0-alpha.0.20231219052200-6230a89e386c
github.com/knadh/koanf v1.5.0
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -342,8 +342,8 @@ github.com/influxdata/influxdb-client-go/v2 v2.12.3 h1:28nRlNMRIV4QbtIUvxhWqaxn0
github.com/influxdata/influxdb-client-go/v2 v2.12.3/go.mod h1:IrrLUbCjjfkmRuaCiGQg4m2GbkaeJDcuWoxiWdQEbA0=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20240808093014-75008c807ea7 h1:2uF3AxjNM8EgRIVViitAoX6qr/aTaO2wrr8pk9CCp+I=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20240808093014-75008c807ea7/go.mod h1:2blmpUwiTwxIDnrjIqT6FhR5ewshZZF554wzjXFvKpQ=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20240816101745-44cbb332d242 h1:4mP3CToV4oO5MkzAVVbcGqoMNS49AiaT0biNNv805Vs=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20240816101745-44cbb332d242/go.mod h1:2blmpUwiTwxIDnrjIqT6FhR5ewshZZF554wzjXFvKpQ=
github.com/instill-ai/usage-client v0.3.0-alpha.0.20240319060111-4a3a39f2fd61 h1:smPTvmXDhn/QC7y/TPXyMTqbbRd0gvzmFgWBChwTfhE=
github.com/instill-ai/usage-client v0.3.0-alpha.0.20240319060111-4a3a39f2fd61/go.mod h1:/TAHs4ybuylk5icuy+MQtHRc4XUnIyXzeNKxX9qDFhw=
github.com/instill-ai/x v0.3.0-alpha.0.20231219052200-6230a89e386c h1:a2RVkpIV2QcrGnSHAou+t/L+vBsaIfFvk5inVg5Uh4s=
2 changes: 2 additions & 0 deletions pkg/acl/acl.go
Original file line number Diff line number Diff line change
@@ -40,6 +40,8 @@ type Relation struct {
Relation string
}

const CatalogObject = "knowledgebase"

func NewACLClient(wc openfga.OpenFGAServiceClient, rc openfga.OpenFGAServiceClient, redisClient *redis.Client) ACLClient {
if rc == nil {
rc = wc
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
BEGIN;

-- Drop the message table
DROP TABLE IF EXISTS message;

-- Drop the conversation table
DROP TABLE IF EXISTS conversation;

COMMIT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
BEGIN;
-- Create the conversation table
CREATE TABLE conversation (
uid UUID PRIMARY KEY DEFAULT gen_random_uuid(),
namespace_uid UUID NOT NULL,
catalog_uid UUID NOT NULL,
id VARCHAR(255) NOT NULL,
create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
delete_time TIMESTAMP
);
-- Create unique index on namespace_uid, catalog_uid, and id
CREATE UNIQUE INDEX idx_unique_namespace_catalog_id ON conversation (namespace_uid, catalog_uid, id)
WHERE delete_time IS NULL;
-- Create the message table
CREATE TABLE message (
uid UUID PRIMARY KEY DEFAULT gen_random_uuid(),
namespace_uid UUID NOT NULL,
catalog_uid UUID NOT NULL,
conversation_uid UUID NOT NULL,
content TEXT,
role VARCHAR(50) NOT NULL,
type VARCHAR(50) NOT NULL,
create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
delete_time TIMESTAMP
);
-- Add foreign key constraint with CASCADE DELETE
ALTER TABLE message
ADD CONSTRAINT fk_message_conversation FOREIGN KEY (conversation_uid) REFERENCES conversation(uid) ON DELETE CASCADE;
-- Add index for efficient message retrieval
CREATE INDEX idx_message_catalog_conversation ON message (namespace_uid, catalog_uid, conversation_uid);
-- Add comments
COMMENT ON TABLE conversation IS 'Table to store conversations';
COMMENT ON COLUMN conversation.uid IS 'Unique identifier(uuid) for the conversation';
COMMENT ON COLUMN conversation.namespace_uid IS 'Namespace identifier(uuid) for the conversation';
COMMENT ON COLUMN conversation.catalog_uid IS 'Catalog identifier(uuid) for the conversation';
COMMENT ON COLUMN conversation.id IS 'User-defined identifier for the conversation';
COMMENT ON COLUMN conversation.create_time IS 'Timestamp when the conversation was created';
COMMENT ON COLUMN conversation.update_time IS 'Timestamp when the conversation was last updated';
COMMENT ON COLUMN conversation.delete_time IS 'Timestamp when the conversation was deleted (soft delete)';
COMMENT ON TABLE message IS 'Table to store messages within conversations';
COMMENT ON COLUMN message.uid IS 'Unique identifier(uuid) for the message';
COMMENT ON COLUMN message.namespace_uid IS 'Namespace identifier(uuid) for the message';
COMMENT ON COLUMN message.catalog_uid IS 'Catalog identifier(uuid) for the message';
COMMENT ON COLUMN message.conversation_uid IS 'Reference to the conversation this message belongs to';
COMMENT ON COLUMN message.content IS 'Content of the message';
COMMENT ON COLUMN message.role IS 'Role of the message sender (e.g., user, assistant)';
COMMENT ON COLUMN message.type IS 'Type of the message';
COMMENT ON COLUMN message.create_time IS 'Timestamp when the message was created';
COMMENT ON COLUMN message.update_time IS 'Timestamp when the message was last updated';
COMMENT ON COLUMN message.delete_time IS 'Timestamp when the message was deleted (soft delete)';
COMMIT;
2 changes: 1 addition & 1 deletion pkg/handler/catalog.go
Original file line number Diff line number Diff line change
@@ -36,7 +36,7 @@ func (ph *PublicHandler) GetFileCatalog(ctx context.Context, req *artifactpb.Get
log.Error("failed to get namespace by ns id", zap.Error(err))
return nil, fmt.Errorf("failed to get namespace by ns id. err: %w", err)
}
kb, err := ph.service.Repository.GetKnowledgeBaseByOwnerAndKbID(ctx, ns.NsUID.String(), req.CatalogId)
kb, err := ph.service.Repository.GetKnowledgeBaseByOwnerAndKbID(ctx, ns.NsUID, req.CatalogId)
if err != nil {
log.Error("failed to get knowledge base by owner and kb id", zap.Error(err))
return nil, fmt.Errorf("failed to get catalog by namepsace and catalog id. err: %w", err)
188 changes: 188 additions & 0 deletions pkg/handler/conversation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package handler

import (
"context"
"fmt"

"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/instill-ai/artifact-backend/pkg/customerror"
"github.com/instill-ai/artifact-backend/pkg/logger"
"github.com/instill-ai/artifact-backend/pkg/repository"
artifactpb "github.com/instill-ai/protogen-go/artifact/artifact/v1alpha"
)

// CreateConversation creates a new conversation
func (ph *PublicHandler) CreateConversation(ctx context.Context, req *artifactpb.CreateConversationRequest) (*artifactpb.CreateConversationResponse, error) {
log, _ := logger.GetZapLogger(ctx)

// Get user ID from context
authUID, err := getUserUIDFromContext(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get user id from header: %v", err)
}

nameOk := isValidName(req.GetConversationId())
if !nameOk {
msg := "the conversation id should be lowercase without any space or special character besides the hyphen, " +
"it can not start with number or hyphen, and should be less than 32 characters. name: %v. err: %w"
return nil, fmt.Errorf(msg, req.GetConversationId(), customerror.ErrInvalidArgument)
}

// ACL - check user's permission to create conversation in the namespace
ns, catalog, err := ph.service.CheckCatalogUserPermission(ctx, req.GetNamespaceId(), req.GetCatalogId(), authUID)
if err != nil {
log.Error(
"failed to check user permission",
zap.Error(err),
zap.String("namespace_id", req.GetNamespaceId()),
zap.String("auth_uid", authUID),
)
return nil, fmt.Errorf("failed to check user permission: %w", err)
}

// Create conversation
conversation, err := ph.service.Repository.CreateConversation(ctx, repository.Conversation{
NamespaceUID: ns.NsUID,
CatalogUID: catalog.UID,
ID: req.GetConversationId(),
})
if err != nil {
log.Error("failed to create conversation", zap.Error(err))
return nil, fmt.Errorf("failed to create conversation: %w", err)
}

return &artifactpb.CreateConversationResponse{
Conversation: convertToProtoConversation(conversation, req.GetNamespaceId(), req.GetCatalogId()),
}, nil
}

// ListConversations lists conversations for a given catalog
func (ph *PublicHandler) ListConversations(ctx context.Context, req *artifactpb.ListConversationsRequest) (*artifactpb.ListConversationsResponse, error) {
log, _ := logger.GetZapLogger(ctx)

// Get user ID from context
authUID, err := getUserUIDFromContext(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get user id from header: %v", err)
}

// ACL - check user's permission to list conversations in the catalog
ns, catalog, err := ph.service.CheckCatalogUserPermission(ctx, req.GetNamespaceId(), req.GetCatalogId(), authUID)
if err != nil {
log.Error(
"failed to check user permission",
zap.Error(err),
zap.String("namespace_id", req.GetNamespaceId()),
zap.String("auth_uid", authUID),
)
return nil, fmt.Errorf("failed to check user permission: %w", err)
}

// Get conversations
conversations, totalCount, nextPageToken, err := ph.service.Repository.ListConversations(ctx, ns.NsUID, catalog.UID, req.GetPageSize(), req.GetPageToken())
if err != nil {
log.Error("failed to list conversations", zap.Error(err))
return nil, fmt.Errorf("failed to list conversations: %w", err)
}

// Convert to proto conversations
protoConversations := make([]*artifactpb.Conversation, len(conversations))
for i, conv := range conversations {
protoConversations[i] = convertToProtoConversation(conv, req.GetNamespaceId(), req.GetCatalogId())
}

return &artifactpb.ListConversationsResponse{
Conversations: protoConversations,
NextPageToken: nextPageToken,
TotalSize: int32(totalCount),
}, nil
}

// UpdateConversation updates an existing conversation
func (ph *PublicHandler) UpdateConversation(ctx context.Context, req *artifactpb.UpdateConversationRequest) (*artifactpb.UpdateConversationResponse, error) {
log, _ := logger.GetZapLogger(ctx)

// Get user ID from context
authUID, err := getUserUIDFromContext(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get user id from header: %v", err)
}

// ACL - check user's permission to update conversation in the catalog
ns, catalog, err := ph.service.CheckCatalogUserPermission(ctx, req.GetNamespaceId(), req.GetCatalogId(), authUID)
if err != nil {
log.Error(
"failed to check user permission",
zap.Error(err),
zap.String("namespace_id", req.GetNamespaceId()),
zap.String("auth_uid", authUID),
)
return nil, fmt.Errorf("failed to check user permission: %w", err)
}

// Get the existing conversation
existingConv, err := ph.service.Repository.GetConversationByID(ctx, ns.NsUID, catalog.UID, req.GetConversationId())
if err != nil {
log.Error("failed to get existing conversation", zap.Error(err))
return nil, fmt.Errorf("failed to get existing conversation: %w", err)
}

// Update conversation
updatedConv, err := ph.service.Repository.UpdateConversationByUpdateMap(ctx, existingConv.UID, map[string]interface{}{
repository.ConversationColumn.ID: req.GetNewConversationId(),
})
if err != nil {
log.Error("failed to update conversation", zap.Error(err))
return nil, fmt.Errorf("failed to update conversation: %w", err)
}

return &artifactpb.UpdateConversationResponse{
Conversation: convertToProtoConversation(updatedConv, req.GetNamespaceId(), req.GetCatalogId()),
}, nil
}

// DeleteConversation deletes an existing conversation
func (ph *PublicHandler) DeleteConversation(ctx context.Context, req *artifactpb.DeleteConversationRequest) (*artifactpb.DeleteConversationResponse, error) {
log, _ := logger.GetZapLogger(ctx)

// Get user ID from context
authUID, err := getUserUIDFromContext(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get user id from header: %v", err)
}

// ACL - check user's permission to delete conversation in the namespace
ns, catalog, err := ph.service.CheckCatalogUserPermission(ctx, req.GetNamespaceId(), req.GetCatalogId(), authUID)
if err != nil {
log.Error(
"failed to check user permission",
zap.Error(err),
zap.String("namespace_id", req.GetNamespaceId()),
zap.String("auth_uid", authUID),
)
return nil, fmt.Errorf("failed to check user permission: %w", err)
}
// Delete conversation
err = ph.service.Repository.SoftDeleteConversation(ctx, ns.NsUID, catalog.UID, req.GetConversationId())
if err != nil {
log.Error("failed to delete conversation", zap.Error(err))
return nil, fmt.Errorf("failed to delete conversation: %w", err)
}

return &artifactpb.DeleteConversationResponse{}, nil
}

// Helper function to convert repository.Conversation to artifactpb.Conversation
func convertToProtoConversation(conv *repository.Conversation, nsID, catalogID string) *artifactpb.Conversation {

return &artifactpb.Conversation{
Uid: conv.UID.String(),
NamespaceId: nsID,
CatalogId: catalogID,
Id: conv.ID,
CreateTime: timestamppb.New(conv.CreateTime),
UpdateTime: timestamppb.New(conv.UpdateTime),
}
}
14 changes: 2 additions & 12 deletions pkg/handler/knowledgebase.go
Original file line number Diff line number Diff line change
@@ -87,16 +87,6 @@ func (ph *PublicHandler) CreateCatalog(ctx context.Context, req *artifactpb.Crea
return nil, fmt.Errorf(msg, req.Name, customerror.ErrInvalidArgument)
}

// // get the owner uid from the mgmt service
// var ownerUUID string
// {
// // get the owner uid from the mgmt service
// ownerUUID, err = ph.getOwnerUID(ctx, req.OwnerId)
// if err != nil {
// log.Error("failed to get owner uid", zap.Error(err))
// return nil, err
// }
// }

creatorUUID, err := uuid.FromString(authUID)
if err != nil {
@@ -262,7 +252,7 @@ func (ph *PublicHandler) UpdateCatalog(ctx context.Context, req *artifactpb.Upda
return nil, fmt.Errorf("failed to get namespace. err: %w", err)
}
// ACL - check user's permission to update catalog
kb, err := ph.service.Repository.GetKnowledgeBaseByOwnerAndKbID(ctx, ns.NsUID.String(), req.CatalogId)
kb, err := ph.service.Repository.GetKnowledgeBaseByOwnerAndKbID(ctx, ns.NsUID, req.CatalogId)
if err != nil {
log.Error("failed to get catalog", zap.Error(err))
return nil, fmt.Errorf(ErrorListKnowledgeBasesMsg, err)
@@ -343,7 +333,7 @@ func (ph *PublicHandler) DeleteCatalog(ctx context.Context, req *artifactpb.Dele
return nil, fmt.Errorf("failed to get namespace. err: %w", err)
}
// ACL - check user's permission to write catalog
kb, err := ph.service.Repository.GetKnowledgeBaseByOwnerAndKbID(ctx, ns.NsUID.String(), req.CatalogId)
kb, err := ph.service.Repository.GetKnowledgeBaseByOwnerAndKbID(ctx, ns.NsUID, req.CatalogId)
if err != nil {
log.Error("failed to get catalog", zap.Error(err))
return nil, fmt.Errorf(ErrorListKnowledgeBasesMsg, err)
4 changes: 2 additions & 2 deletions pkg/handler/knowledgebasefiles.go
Original file line number Diff line number Diff line change
@@ -48,7 +48,7 @@ func (ph *PublicHandler) UploadCatalogFile(ctx context.Context, req *artifactpb.
return nil, fmt.Errorf("failed to get namespace. err: %w", err)
}
// ACL - check user's permission to write catalog
kb, err := ph.service.Repository.GetKnowledgeBaseByOwnerAndKbID(ctx, ns.NsUID.String(), req.CatalogId)
kb, err := ph.service.Repository.GetKnowledgeBaseByOwnerAndKbID(ctx, ns.NsUID, req.CatalogId)
if err != nil {
log.Error("failed to get catalog", zap.Error(err))
return nil, fmt.Errorf(ErrorListKnowledgeBasesMsg, err)
@@ -228,7 +228,7 @@ func (ph *PublicHandler) ListCatalogFiles(ctx context.Context, req *artifactpb.L
return nil, fmt.Errorf("failed to get namespace. err: %w", err)
}
// ACL - check user's permission to write catalog
kb, err := ph.service.Repository.GetKnowledgeBaseByOwnerAndKbID(ctx, ns.NsUID.String(), req.CatalogId)
kb, err := ph.service.Repository.GetKnowledgeBaseByOwnerAndKbID(ctx, ns.NsUID, req.CatalogId)
if err != nil {
log.Error("failed to get catalog", zap.Error(err))
return nil, fmt.Errorf(ErrorListKnowledgeBasesMsg, err)
Loading

0 comments on commit e02b1f1

Please sign in to comment.