Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(catalog): implement conversation and message api #77

Merged
merged 1 commit into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ func grpcHandlerFunc(grpcServer *grpc.Server, gwHandler http.Handler) http.Handl
}

func main() {
// gorm's autoUpdate will use local timezone by default, so we need to set it to UTC
time.Local = time.UTC

// Initialize config
if err := config.Init(); err != nil {
log.Fatal(err.Error())
}
Expand Down
2 changes: 1 addition & 1 deletion config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ database:
host: pg-sql
port: 5432
name: artifact
version: 14
version: 15
timezone: Etc/UTC
pool:
idleconnections: 5
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1
github.com/influxdata/influxdb-client-go/v2 v2.12.3
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20240808093014-75008c807ea7
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20240816101745-44cbb332d242
github.com/instill-ai/usage-client v0.3.0-alpha.0.20240319060111-4a3a39f2fd61
github.com/instill-ai/x v0.3.0-alpha.0.20231219052200-6230a89e386c
github.com/knadh/koanf v1.5.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,8 @@ github.com/influxdata/influxdb-client-go/v2 v2.12.3 h1:28nRlNMRIV4QbtIUvxhWqaxn0
github.com/influxdata/influxdb-client-go/v2 v2.12.3/go.mod h1:IrrLUbCjjfkmRuaCiGQg4m2GbkaeJDcuWoxiWdQEbA0=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20240808093014-75008c807ea7 h1:2uF3AxjNM8EgRIVViitAoX6qr/aTaO2wrr8pk9CCp+I=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20240808093014-75008c807ea7/go.mod h1:2blmpUwiTwxIDnrjIqT6FhR5ewshZZF554wzjXFvKpQ=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20240816101745-44cbb332d242 h1:4mP3CToV4oO5MkzAVVbcGqoMNS49AiaT0biNNv805Vs=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20240816101745-44cbb332d242/go.mod h1:2blmpUwiTwxIDnrjIqT6FhR5ewshZZF554wzjXFvKpQ=
github.com/instill-ai/usage-client v0.3.0-alpha.0.20240319060111-4a3a39f2fd61 h1:smPTvmXDhn/QC7y/TPXyMTqbbRd0gvzmFgWBChwTfhE=
github.com/instill-ai/usage-client v0.3.0-alpha.0.20240319060111-4a3a39f2fd61/go.mod h1:/TAHs4ybuylk5icuy+MQtHRc4XUnIyXzeNKxX9qDFhw=
github.com/instill-ai/x v0.3.0-alpha.0.20231219052200-6230a89e386c h1:a2RVkpIV2QcrGnSHAou+t/L+vBsaIfFvk5inVg5Uh4s=
Expand Down
2 changes: 2 additions & 0 deletions pkg/acl/acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type Relation struct {
Relation string
}

const CatalogObject = "knowledgebase"

func NewACLClient(wc openfga.OpenFGAServiceClient, rc openfga.OpenFGAServiceClient, redisClient *redis.Client) ACLClient {
if rc == nil {
rc = wc
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
BEGIN;

-- Drop the message table
DROP TABLE IF EXISTS message;

-- Drop the conversation table
DROP TABLE IF EXISTS conversation;

COMMIT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
BEGIN;
-- Create the conversation table
CREATE TABLE conversation (
uid UUID PRIMARY KEY DEFAULT gen_random_uuid(),
namespace_uid UUID NOT NULL,
catalog_uid UUID NOT NULL,
id VARCHAR(255) NOT NULL,
create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
delete_time TIMESTAMP
);
-- Create unique index on namespace_uid, catalog_uid, and id
CREATE UNIQUE INDEX idx_unique_namespace_catalog_id ON conversation (namespace_uid, catalog_uid, id)
WHERE delete_time IS NULL;
-- Create the message table
CREATE TABLE message (
uid UUID PRIMARY KEY DEFAULT gen_random_uuid(),
namespace_uid UUID NOT NULL,
catalog_uid UUID NOT NULL,
conversation_uid UUID NOT NULL,
content TEXT,
role VARCHAR(50) NOT NULL,
type VARCHAR(50) NOT NULL,
create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
delete_time TIMESTAMP
);
-- Add foreign key constraint with CASCADE DELETE
ALTER TABLE message
ADD CONSTRAINT fk_message_conversation FOREIGN KEY (conversation_uid) REFERENCES conversation(uid) ON DELETE CASCADE;
-- Add index for efficient message retrieval
CREATE INDEX idx_message_catalog_conversation ON message (namespace_uid, catalog_uid, conversation_uid);
-- Add comments
COMMENT ON TABLE conversation IS 'Table to store conversations';
COMMENT ON COLUMN conversation.uid IS 'Unique identifier(uuid) for the conversation';
COMMENT ON COLUMN conversation.namespace_uid IS 'Namespace identifier(uuid) for the conversation';
COMMENT ON COLUMN conversation.catalog_uid IS 'Catalog identifier(uuid) for the conversation';
COMMENT ON COLUMN conversation.id IS 'User-defined identifier for the conversation';
COMMENT ON COLUMN conversation.create_time IS 'Timestamp when the conversation was created';
COMMENT ON COLUMN conversation.update_time IS 'Timestamp when the conversation was last updated';
COMMENT ON COLUMN conversation.delete_time IS 'Timestamp when the conversation was deleted (soft delete)';
COMMENT ON TABLE message IS 'Table to store messages within conversations';
COMMENT ON COLUMN message.uid IS 'Unique identifier(uuid) for the message';
COMMENT ON COLUMN message.namespace_uid IS 'Namespace identifier(uuid) for the message';
COMMENT ON COLUMN message.catalog_uid IS 'Catalog identifier(uuid) for the message';
COMMENT ON COLUMN message.conversation_uid IS 'Reference to the conversation this message belongs to';
COMMENT ON COLUMN message.content IS 'Content of the message';
COMMENT ON COLUMN message.role IS 'Role of the message sender (e.g., user, assistant)';
COMMENT ON COLUMN message.type IS 'Type of the message';
COMMENT ON COLUMN message.create_time IS 'Timestamp when the message was created';
COMMENT ON COLUMN message.update_time IS 'Timestamp when the message was last updated';
COMMENT ON COLUMN message.delete_time IS 'Timestamp when the message was deleted (soft delete)';
COMMIT;
2 changes: 1 addition & 1 deletion pkg/handler/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (ph *PublicHandler) GetFileCatalog(ctx context.Context, req *artifactpb.Get
log.Error("failed to get namespace by ns id", zap.Error(err))
return nil, fmt.Errorf("failed to get namespace by ns id. err: %w", err)
}
kb, err := ph.service.Repository.GetKnowledgeBaseByOwnerAndKbID(ctx, ns.NsUID.String(), req.CatalogId)
kb, err := ph.service.Repository.GetKnowledgeBaseByOwnerAndKbID(ctx, ns.NsUID, req.CatalogId)
if err != nil {
log.Error("failed to get knowledge base by owner and kb id", zap.Error(err))
return nil, fmt.Errorf("failed to get catalog by namepsace and catalog id. err: %w", err)
Expand Down
188 changes: 188 additions & 0 deletions pkg/handler/conversation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package handler

import (
"context"
"fmt"

"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/instill-ai/artifact-backend/pkg/customerror"
"github.com/instill-ai/artifact-backend/pkg/logger"
"github.com/instill-ai/artifact-backend/pkg/repository"
artifactpb "github.com/instill-ai/protogen-go/artifact/artifact/v1alpha"
)

// CreateConversation creates a new conversation
func (ph *PublicHandler) CreateConversation(ctx context.Context, req *artifactpb.CreateConversationRequest) (*artifactpb.CreateConversationResponse, error) {
log, _ := logger.GetZapLogger(ctx)

// Get user ID from context
authUID, err := getUserUIDFromContext(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get user id from header: %v", err)
}

nameOk := isValidName(req.GetConversationId())
if !nameOk {
msg := "the conversation id should be lowercase without any space or special character besides the hyphen, " +
"it can not start with number or hyphen, and should be less than 32 characters. name: %v. err: %w"
return nil, fmt.Errorf(msg, req.GetConversationId(), customerror.ErrInvalidArgument)
}

// ACL - check user's permission to create conversation in the namespace
ns, catalog, err := ph.service.CheckCatalogUserPermission(ctx, req.GetNamespaceId(), req.GetCatalogId(), authUID)
if err != nil {
log.Error(
"failed to check user permission",
zap.Error(err),
zap.String("namespace_id", req.GetNamespaceId()),
zap.String("auth_uid", authUID),
)
return nil, fmt.Errorf("failed to check user permission: %w", err)
}

// Create conversation
conversation, err := ph.service.Repository.CreateConversation(ctx, repository.Conversation{
NamespaceUID: ns.NsUID,
CatalogUID: catalog.UID,
ID: req.GetConversationId(),
})
if err != nil {
log.Error("failed to create conversation", zap.Error(err))
return nil, fmt.Errorf("failed to create conversation: %w", err)
}

return &artifactpb.CreateConversationResponse{
Conversation: convertToProtoConversation(conversation, req.GetNamespaceId(), req.GetCatalogId()),
}, nil
}

// ListConversations lists conversations for a given catalog
func (ph *PublicHandler) ListConversations(ctx context.Context, req *artifactpb.ListConversationsRequest) (*artifactpb.ListConversationsResponse, error) {
log, _ := logger.GetZapLogger(ctx)

// Get user ID from context
authUID, err := getUserUIDFromContext(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get user id from header: %v", err)
}

// ACL - check user's permission to list conversations in the catalog
ns, catalog, err := ph.service.CheckCatalogUserPermission(ctx, req.GetNamespaceId(), req.GetCatalogId(), authUID)
if err != nil {
log.Error(
"failed to check user permission",
zap.Error(err),
zap.String("namespace_id", req.GetNamespaceId()),
zap.String("auth_uid", authUID),
)
return nil, fmt.Errorf("failed to check user permission: %w", err)
}

// Get conversations
conversations, totalCount, nextPageToken, err := ph.service.Repository.ListConversations(ctx, ns.NsUID, catalog.UID, req.GetPageSize(), req.GetPageToken())
if err != nil {
log.Error("failed to list conversations", zap.Error(err))
return nil, fmt.Errorf("failed to list conversations: %w", err)
}

// Convert to proto conversations
protoConversations := make([]*artifactpb.Conversation, len(conversations))
for i, conv := range conversations {
protoConversations[i] = convertToProtoConversation(conv, req.GetNamespaceId(), req.GetCatalogId())
}

return &artifactpb.ListConversationsResponse{
Conversations: protoConversations,
NextPageToken: nextPageToken,
TotalSize: int32(totalCount),
}, nil
}

// UpdateConversation updates an existing conversation
func (ph *PublicHandler) UpdateConversation(ctx context.Context, req *artifactpb.UpdateConversationRequest) (*artifactpb.UpdateConversationResponse, error) {
log, _ := logger.GetZapLogger(ctx)

// Get user ID from context
authUID, err := getUserUIDFromContext(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get user id from header: %v", err)
}

// ACL - check user's permission to update conversation in the catalog
ns, catalog, err := ph.service.CheckCatalogUserPermission(ctx, req.GetNamespaceId(), req.GetCatalogId(), authUID)
if err != nil {
log.Error(
"failed to check user permission",
zap.Error(err),
zap.String("namespace_id", req.GetNamespaceId()),
zap.String("auth_uid", authUID),
)
return nil, fmt.Errorf("failed to check user permission: %w", err)
}

// Get the existing conversation
existingConv, err := ph.service.Repository.GetConversationByID(ctx, ns.NsUID, catalog.UID, req.GetConversationId())
if err != nil {
log.Error("failed to get existing conversation", zap.Error(err))
return nil, fmt.Errorf("failed to get existing conversation: %w", err)
}

// Update conversation
updatedConv, err := ph.service.Repository.UpdateConversationByUpdateMap(ctx, existingConv.UID, map[string]interface{}{
repository.ConversationColumn.ID: req.GetNewConversationId(),
})
if err != nil {
log.Error("failed to update conversation", zap.Error(err))
return nil, fmt.Errorf("failed to update conversation: %w", err)
}

return &artifactpb.UpdateConversationResponse{
Conversation: convertToProtoConversation(updatedConv, req.GetNamespaceId(), req.GetCatalogId()),
}, nil
}

// DeleteConversation deletes an existing conversation
func (ph *PublicHandler) DeleteConversation(ctx context.Context, req *artifactpb.DeleteConversationRequest) (*artifactpb.DeleteConversationResponse, error) {
log, _ := logger.GetZapLogger(ctx)

// Get user ID from context
authUID, err := getUserUIDFromContext(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get user id from header: %v", err)
}

// ACL - check user's permission to delete conversation in the namespace
ns, catalog, err := ph.service.CheckCatalogUserPermission(ctx, req.GetNamespaceId(), req.GetCatalogId(), authUID)
if err != nil {
log.Error(
"failed to check user permission",
zap.Error(err),
zap.String("namespace_id", req.GetNamespaceId()),
zap.String("auth_uid", authUID),
)
return nil, fmt.Errorf("failed to check user permission: %w", err)
}
// Delete conversation
err = ph.service.Repository.SoftDeleteConversation(ctx, ns.NsUID, catalog.UID, req.GetConversationId())
if err != nil {
log.Error("failed to delete conversation", zap.Error(err))
return nil, fmt.Errorf("failed to delete conversation: %w", err)
}

return &artifactpb.DeleteConversationResponse{}, nil
}

// Helper function to convert repository.Conversation to artifactpb.Conversation
func convertToProtoConversation(conv *repository.Conversation, nsID, catalogID string) *artifactpb.Conversation {

return &artifactpb.Conversation{
Uid: conv.UID.String(),
NamespaceId: nsID,
CatalogId: catalogID,
Id: conv.ID,
CreateTime: timestamppb.New(conv.CreateTime),
UpdateTime: timestamppb.New(conv.UpdateTime),
}
}
14 changes: 2 additions & 12 deletions pkg/handler/knowledgebase.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,6 @@ func (ph *PublicHandler) CreateCatalog(ctx context.Context, req *artifactpb.Crea
return nil, fmt.Errorf(msg, req.Name, customerror.ErrInvalidArgument)
}

// // get the owner uid from the mgmt service
// var ownerUUID string
// {
// // get the owner uid from the mgmt service
// ownerUUID, err = ph.getOwnerUID(ctx, req.OwnerId)
// if err != nil {
// log.Error("failed to get owner uid", zap.Error(err))
// return nil, err
// }
// }

creatorUUID, err := uuid.FromString(authUID)
if err != nil {
Expand Down Expand Up @@ -262,7 +252,7 @@ func (ph *PublicHandler) UpdateCatalog(ctx context.Context, req *artifactpb.Upda
return nil, fmt.Errorf("failed to get namespace. err: %w", err)
}
// ACL - check user's permission to update catalog
kb, err := ph.service.Repository.GetKnowledgeBaseByOwnerAndKbID(ctx, ns.NsUID.String(), req.CatalogId)
kb, err := ph.service.Repository.GetKnowledgeBaseByOwnerAndKbID(ctx, ns.NsUID, req.CatalogId)
if err != nil {
log.Error("failed to get catalog", zap.Error(err))
return nil, fmt.Errorf(ErrorListKnowledgeBasesMsg, err)
Expand Down Expand Up @@ -343,7 +333,7 @@ func (ph *PublicHandler) DeleteCatalog(ctx context.Context, req *artifactpb.Dele
return nil, fmt.Errorf("failed to get namespace. err: %w", err)
}
// ACL - check user's permission to write catalog
kb, err := ph.service.Repository.GetKnowledgeBaseByOwnerAndKbID(ctx, ns.NsUID.String(), req.CatalogId)
kb, err := ph.service.Repository.GetKnowledgeBaseByOwnerAndKbID(ctx, ns.NsUID, req.CatalogId)
if err != nil {
log.Error("failed to get catalog", zap.Error(err))
return nil, fmt.Errorf(ErrorListKnowledgeBasesMsg, err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/handler/knowledgebasefiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (ph *PublicHandler) UploadCatalogFile(ctx context.Context, req *artifactpb.
return nil, fmt.Errorf("failed to get namespace. err: %w", err)
}
// ACL - check user's permission to write catalog
kb, err := ph.service.Repository.GetKnowledgeBaseByOwnerAndKbID(ctx, ns.NsUID.String(), req.CatalogId)
kb, err := ph.service.Repository.GetKnowledgeBaseByOwnerAndKbID(ctx, ns.NsUID, req.CatalogId)
if err != nil {
log.Error("failed to get catalog", zap.Error(err))
return nil, fmt.Errorf(ErrorListKnowledgeBasesMsg, err)
Expand Down Expand Up @@ -228,7 +228,7 @@ func (ph *PublicHandler) ListCatalogFiles(ctx context.Context, req *artifactpb.L
return nil, fmt.Errorf("failed to get namespace. err: %w", err)
}
// ACL - check user's permission to write catalog
kb, err := ph.service.Repository.GetKnowledgeBaseByOwnerAndKbID(ctx, ns.NsUID.String(), req.CatalogId)
kb, err := ph.service.Repository.GetKnowledgeBaseByOwnerAndKbID(ctx, ns.NsUID, req.CatalogId)
if err != nil {
log.Error("failed to get catalog", zap.Error(err))
return nil, fmt.Errorf(ErrorListKnowledgeBasesMsg, err)
Expand Down
Loading
Loading