From 99e3792b0856c8546b5cc4b69a6439d649cc19f0 Mon Sep 17 00:00:00 2001 From: Zhiyuan Wang Date: Sun, 4 Oct 2020 21:23:03 -0400 Subject: [PATCH 1/4] Restructure dependency injection --- daemon/Makefile | 2 +- daemon/api/service/message.go | 19 +- daemon/cmd/main.go | 125 ++++++++++ daemon/cmd/server/main.go | 231 ------------------ daemon/cmd/server/server.go | 145 +++++++++++ daemon/persistence/etag_dao.go | 6 +- daemon/persistence/group_dao.go | 10 +- daemon/persistence/joingroupinvitation_dao.go | 6 +- 8 files changed, 292 insertions(+), 252 deletions(-) create mode 100644 daemon/cmd/main.go delete mode 100644 daemon/cmd/server/main.go create mode 100644 daemon/cmd/server/server.go diff --git a/daemon/Makefile b/daemon/Makefile index a53b4f380..78256cf7a 100644 --- a/daemon/Makefile +++ b/daemon/Makefile @@ -4,7 +4,7 @@ TARGET ?= target .PHONY: daemon-server daemon-server: - GOOS=${GOOS} GOARCH=${GOARCH} go build -mod=vendor -o ${TARGET}/daemon-server cmd/server/main.go + GOOS=${GOOS} GOARCH=${GOARCH} go build -mod=vendor -o ${TARGET}/daemon-server cmd/main.go chmod a+x ${TARGET}/daemon-server .PHONY: clean diff --git a/daemon/api/service/message.go b/daemon/api/service/message.go index 35e8f66c1..4c9746d69 100644 --- a/daemon/api/service/message.go +++ b/daemon/api/service/message.go @@ -5,7 +5,6 @@ import ( "time" "github.com/mailjet/mailjet-apiv3-go" - "github.com/singerdmx/BulletJournal/daemon/config" "github.com/singerdmx/BulletJournal/daemon/persistence" "github.com/singerdmx/BulletJournal/daemon/utils" ) @@ -19,8 +18,16 @@ const ( var ctx = context.Background() type MessageService struct { - groupDao *persistence.GroupDao - mailClient *persistence.MailjetClient + groupDao *persistence.GroupDao + joinGroupInvitationDao *persistence.JoinGroupInvitationDao + mailClient *mailjet.Client +} + +func NewMessageService( + groupDao *persistence.GroupDao, + joinGroupInvitationDao *persistence.JoinGroupInvitationDao, + mailClient *mailjet.Client) *MessageService { + return &MessageService{groupDao, joinGroupInvitationDao, mailClient} } func GetUrl(uuid string, action string) string { @@ -31,12 +38,10 @@ func GetUrl(uuid string, action string) string { func (m *MessageService) SendJoinGroupEmail(username, email string, groupId, uid uint64) { notificationId := utils.GenerateUID() // Set in redis with key of uid and value of JoinGroupInvitation json string - joinGroupInvitationDao := persistence.InitializeJoinGroupInvitationDao(config.GetConfig()) - joinGroupInvitationDao.SingleCache( + m.joinGroupInvitationDao.SingleCache( &persistence.JoinGroupInvitation{string(uid), username, string(groupId), notificationId}) - groupDao := persistence.NewGroupDao() - group := groupDao.FindGroup(groupId) + group := m.groupDao.FindGroup(groupId) if group == nil { log.Fatalf("cannot find group with group id %v", groupId) return diff --git a/daemon/cmd/main.go b/daemon/cmd/main.go new file mode 100644 index 000000000..1601acc49 --- /dev/null +++ b/daemon/cmd/main.go @@ -0,0 +1,125 @@ +package main + +import ( + "context" + "net" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "github.com/grpc-ecosystem/grpc-gateway/runtime" + "github.com/singerdmx/BulletJournal/daemon/api/middleware" + daemon "github.com/singerdmx/BulletJournal/daemon/api/service" + "github.com/singerdmx/BulletJournal/daemon/cmd/server" + "github.com/singerdmx/BulletJournal/daemon/config" + "github.com/singerdmx/BulletJournal/daemon/logging" + "github.com/singerdmx/BulletJournal/protobuf/daemon/grpc/services" + scheduler "github.com/zywangzy/JobScheduler" + "google.golang.org/grpc" + "upper.io/db.v3/postgresql" +) + +var log logging.Logger + +func main() { + logging.InitLogging(config.GetEnv()) + log = *logging.GetLogger() + + ctx := context.Background() + log.WithContext(ctx) + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + shutdown := make(chan os.Signal, 1) + signal.Notify(shutdown, syscall.SIGTERM, syscall.SIGINT) + + fanInService := daemon.Streaming{ServiceName: server.FanInServiceName, ServiceChannel: make(chan *daemon.StreamingMessage, 100)} + cleanerService := daemon.Streaming{ServiceName: server.CleanerServiceName, ServiceChannel: make(chan *daemon.StreamingMessage, 100)} + reminderService := daemon.Streaming{ServiceName: server.ReminderServiceName, ServiceChannel: make(chan *daemon.StreamingMessage, 100)} + daemonRpc := server.NewServer(ctx, []daemon.Streaming{fanInService, cleanerService, reminderService}) + + rpcPort := ":" + daemonRpc.ServiceConfig.RPCPort + lis, err := net.Listen("tcp", rpcPort) + if err != nil { + log.Fatalf("failed to listen: %v", err) + } + rpcServer := grpc.NewServer() + services.RegisterDaemonServer(rpcServer, daemonRpc) + + gatewayMux := runtime.NewServeMux(runtime.WithIncomingHeaderMatcher(middleware.IncomingHeaderMatcher), runtime.WithOutgoingHeaderMatcher(middleware.OutgoingHeaderMatcher)) + endpoint := "127.0.0.1" + rpcPort + err = services.RegisterDaemonHandlerFromEndpoint(ctx, gatewayMux, endpoint, []grpc.DialOption{grpc.WithInsecure()}) + if err != nil { + log.Fatalf("failed to register rpc server to gateway server: %v", err) + } + + httpAddr := ":" + daemonRpc.ServiceConfig.HttpPort + httpServer := &http.Server{Addr: httpAddr, Handler: gatewayMux} + + //// Serve the swagger-ui and swagger file + //mux := http.NewServeMux() + //mux.Handle("/", rmux) + //mux.HandleFunc("/swagger.json", serveSwagger) + //fs := http.FileServer(http.Dir("www/swagger-ui")) + //mux.Handle("/swagger-ui/", http.StripPrefix("/swagger-ui", fs)) + + go func() { + log.Infof("rpc server running at port [%v]", daemonRpc.ServiceConfig.RPCPort) + if err := rpcServer.Serve(lis); err != nil { + log.Fatalf("rpc server failed to serve: %v", err) + } + }() + + go func() { + log.Infof("http server running at port [%v]", daemonRpc.ServiceConfig.HttpPort) + // Start HTTP server (and proxy calls to gRPC server endpoint) + if err := httpServer.ListenAndServe(); err != nil { + log.Fatalf("http server failed to serve: %v", err) + } + }() + + jobScheduler := scheduler.NewJobScheduler() + jobScheduler.Start() + cleaner := daemon.Cleaner{ + Service: cleanerService, + Settings: postgresql.ConnectionURL{ + Host: daemonRpc.ServiceConfig.Host + ":" + daemonRpc.ServiceConfig.DBPort, + Database: daemonRpc.ServiceConfig.Database, + User: daemonRpc.ServiceConfig.Username, + Password: daemonRpc.ServiceConfig.Password, + }, + } + + PST, _ := time.LoadLocation("America/Los_Angeles") + log.Infof("PST [%T] [%v]", PST, PST) + year, month, day := time.Now().AddDate(0, 0, daemonRpc.ServiceConfig.IntervalInDays).In(PST).Date() + start := time.Date(year, month, day, 0, 0, 0, 0, PST) + + daemonBackgroundJob := daemon.Job{Cleaner: cleaner, Reminder: daemon.Reminder{}, Investment: daemon.Investment{}} + log.Infof("The next daemon job will start at %v", start.Format(time.RFC3339)) + jobScheduler.AddRecurrentJob( + daemonBackgroundJob.Run, + start, + time.Hour*24*time.Duration(daemonRpc.ServiceConfig.IntervalInDays), + PST, + daemonRpc.ServiceConfig.MaxRetentionTimeInDays, + ) + + <-shutdown + log.Infof("Shutdown signal received") + cleaner.Close() + close(reminderService.ServiceChannel) + close(fanInService.ServiceChannel) + jobScheduler.Stop() + log.Infof("JobScheduler stopped") + rpcServer.GracefulStop() + log.Infof("rpc server stopped") + if httpServer.Shutdown(ctx) != nil { + log.Fatalf("failed to stop http server: %v", err) + } else { + log.Infof("http server stopped") + } +} diff --git a/daemon/cmd/server/main.go b/daemon/cmd/server/main.go deleted file mode 100644 index 226474f72..000000000 --- a/daemon/cmd/server/main.go +++ /dev/null @@ -1,231 +0,0 @@ -package main - -import ( - "context" - "github.com/singerdmx/BulletJournal/daemon/api/middleware" - daemon "github.com/singerdmx/BulletJournal/daemon/api/service" - scheduler "github.com/zywangzy/JobScheduler" - "google.golang.org/grpc/metadata" - "net" - "net/http" - "os" - "os/signal" - "strconv" - "strings" - "syscall" - "time" - "github.com/singerdmx/BulletJournal/daemon/api/middleware" - daemon "github.com/singerdmx/BulletJournal/daemon/api/service" - "github.com/singerdmx/BulletJournal/daemon/persistence" - "google.golang.org/grpc/metadata" - "github.com/grpc-ecosystem/grpc-gateway/runtime" - "github.com/singerdmx/BulletJournal/daemon/config" - "github.com/singerdmx/BulletJournal/daemon/logging" - uid "github.com/singerdmx/BulletJournal/daemon/utils" - "github.com/singerdmx/BulletJournal/protobuf/daemon/grpc/services" - "github.com/singerdmx/BulletJournal/protobuf/daemon/grpc/types" - "google.golang.org/grpc" -) - -const ( - bulletJournalId string = "bulletJournal" - fanInServiceName string = "fanIn" - cleanerServiceName string = "cleaner" - reminderServiceName string = "reminder" -) - -var log logging.Logger - -// server should implement services.UnimplementedDaemonServer's methods -type server struct { - serviceConfig *config.Config - subscriptions map[string][]daemon.Streaming -} - -// HealthCheck implements the rest endpoint healthcheck -> rpc -func (s *server) HealthCheck(ctx context.Context, request *types.HealthCheckRequest) (*types.HealthCheckResponse, error) { - log.Printf("Received health check request: %v", request.String()) - return &types.HealthCheckResponse{}, nil -} - -// Send implements the JoinGroupEvents rpc endpoint of services.DaemonServer -func (s *server) JoinGroupEvents(ctx context.Context, request *types.JoinGroupEvents) (*types.ReplyMessage, error) { - log.Printf("Received rpc request: %v", request.String()) - return &types.ReplyMessage{Message: "Hello daemon"}, nil -} - -// Rest implements the Rest rest->rpc endpoint of services.DaemonServer -func (s *server) HandleJoinGroupResponse(ctx context.Context, request *types.JoinGroupResponse) (*types.ReplyMessage, error) { - if meta, ok := metadata.FromIncomingContext(ctx); ok { - if requestId, ok := meta[strings.ToLower(middleware.RequestIDKey)]; ok { - log.Printf(middleware.RequestIDKey+": %v", requestId[0]) - grpc.SendHeader(ctx, metadata.Pairs(middleware.RequestIDKey, requestId[0])) - } else { - requestId := uid.GenerateUID() - log.Printf(middleware.RequestIDKey+": %v", requestId) - grpc.SendHeader(ctx, metadata.Pairs(middleware.RequestIDKey, requestId)) - } - } - log.Printf("Received JoinGroupResponse request: %v", request.String()) - // get username from uid - joinGroupInvitationDao := persistence.InitializeJoinGroupInvitationDao(config.GetConfig()) - invitation := joinGroupInvitationDao.Find(request.Uid) - // then delete etags - etagDao := persistence.InitializeEtagDao(ctx, config.GetConfig()) - etagDao.DeleteEtagByUserName(invitation.Username) - // finally delete edges - // TODO: call usergroup dao - return &types.ReplyMessage{Message: "Hello daemon"}, nil -} - -func (s *server) SubscribeNotification(subscribe *types.SubscribeNotification, stream services.Daemon_SubscribeNotificationServer) error { - log.Printf("Received rpc request for subscription: %s", subscribe.String()) - if _, ok := s.subscriptions[subscribe.Id]; ok { - log.Printf("Subscription: %s's streaming has been idle, start streaming!", subscribe.String()) - daemonServices := s.subscriptions[subscribe.Id] - // Prevent requests with the same subscribe.Id from new subscriptions - delete(s.subscriptions, subscribe.Id) - // Keep the subscription session alive - var fanInChannel chan *daemon.StreamingMessage - for _, service := range daemonServices { - if service.ServiceName == fanInServiceName { - fanInChannel = service.ServiceChannel - } else { - go func(service daemon.Streaming) { - for message := range service.ServiceChannel { - message.ServiceName = service.ServiceName - log.Printf("Preparing streaming: %v to subscription: %s", message, subscribe.String()) - fanInChannel <- message - } - }(service) - } - } - for service := range fanInChannel { - if service == nil { - log.Printf("Service: %s for subscription: %s is closed", service.ServiceName, subscribe.String()) - log.Printf("Closing streaming to subscription: %s", subscribe.String()) - break - } else if service.ServiceName == cleanerServiceName { - projectId := strconv.Itoa(int(service.Message)) - if err := stream.Send(&types.StreamMessage{Id: cleanerServiceName, Message: projectId}); err != nil { - log.Printf("Unexpected error happened to subscription: %s, error: %v", subscribe.String(), err) - // Allow future requests with the same subscribe.Id from new subscriptions - s.subscriptions[subscribe.Id] = daemonServices - log.Printf("Transition streaming to idle for subscription: %s", subscribe.String()) - break - } else { - log.Printf("Streaming projectId: %s to subscription: %s", projectId, subscribe.String()) - } - } else { - log.Printf("Service: %s for subscription: %s is not implemented as for now", service.ServiceName, subscribe.String()) - } - } - } else { - log.Printf("Subscription %s is already streaming!", subscribe.String()) - } - return nil -} - -func main() { - logging.InitLogging(config.GetEnv()) - log = *logging.GetLogger() - - ctx := context.Background() - log.WithContext(ctx) - - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - shutdown := make(chan os.Signal, 1) - signal.Notify(shutdown, syscall.SIGTERM, syscall.SIGINT) - - fanInService := daemon.Streaming{ServiceName: fanInServiceName, ServiceChannel: make(chan *daemon.StreamingMessage, 100)} - cleanerService := daemon.Streaming{ServiceName: cleanerServiceName, ServiceChannel: make(chan *daemon.StreamingMessage, 100)} - reminderService := daemon.Streaming{ServiceName: reminderServiceName, ServiceChannel: make(chan *daemon.StreamingMessage, 100)} - daemonRpc := &server{ - serviceConfig: config.GetConfig(), - subscriptions: map[string][]daemon.Streaming{bulletJournalId: {fanInService, cleanerService, reminderService}}, - } - - rpcPort := ":" + daemonRpc.serviceConfig.RPCPort - lis, err := net.Listen("tcp", rpcPort) - if err != nil { - log.Fatalf("failed to listen: %v", err) - } - rpcServer := grpc.NewServer() - services.RegisterDaemonServer(rpcServer, daemonRpc) - - gatewayMux := runtime.NewServeMux(runtime.WithIncomingHeaderMatcher(middleware.IncomingHeaderMatcher), runtime.WithOutgoingHeaderMatcher(middleware.OutgoingHeaderMatcher)) - endpoint := "127.0.0.1" + rpcPort - err = services.RegisterDaemonHandlerFromEndpoint(ctx, gatewayMux, endpoint, []grpc.DialOption{grpc.WithInsecure()}) - if err != nil { - log.Fatalf("failed to register rpc server to gateway server: %v", err) - } - - httpAddr := ":" + daemonRpc.serviceConfig.HttpPort - httpServer := &http.Server{Addr: httpAddr, Handler: gatewayMux} - - //// Serve the swagger-ui and swagger file - //mux := http.NewServeMux() - //mux.Handle("/", rmux) - //mux.HandleFunc("/swagger.json", serveSwagger) - //fs := http.FileServer(http.Dir("www/swagger-ui")) - //mux.Handle("/swagger-ui/", http.StripPrefix("/swagger-ui", fs)) - - go func() { - log.Infof("rpc server running at port [%v]", daemonRpc.serviceConfig.RPCPort) - if err := rpcServer.Serve(lis); err != nil { - log.Fatalf("rpc server failed to serve: %v", err) - } - }() - - go func() { - log.Infof("http server running at port [%v]", daemonRpc.serviceConfig.HttpPort) - // Start HTTP server (and proxy calls to gRPC server endpoint) - if err := httpServer.ListenAndServe(); err != nil { - log.Fatalf("http server failed to serve: %v", err) - } - }() - - jobScheduler := scheduler.NewJobScheduler() - jobScheduler.Start() - cleaner := daemon.Cleaner{ - Service: cleanerService, - Settings: postgresql.ConnectionURL{ - Host: daemonRpc.serviceConfig.Host + ":" + daemonRpc.serviceConfig.DBPort, - Database: daemonRpc.serviceConfig.Database, - User: daemonRpc.serviceConfig.Username, - Password: daemonRpc.serviceConfig.Password, - }, - } - - PST, _ := time.LoadLocation("America/Los_Angeles") - log.Infof("PST [%T] [%v]", PST, PST) - year, month, day := time.Now().AddDate(0, 0, daemonRpc.serviceConfig.IntervalInDays).In(PST).Date() - start := time.Date(year, month, day, 0, 0, 0, 0, PST) - - daemonBackgroundJob := daemon.Job{Cleaner: cleaner, Reminder: daemon.Reminder{}, Investment: daemon.Investment{}} - log.Infof("The next daemon job will start at %v", start.Format(time.RFC3339)) - jobScheduler.AddRecurrentJob( - daemonBackgroundJob.Run, - start, - time.Hour*24*time.Duration(daemonRpc.serviceConfig.IntervalInDays), - PST, - daemonRpc.serviceConfig.MaxRetentionTimeInDays, - ) - - <-shutdown - log.Infof("Shutdown signal received") - cleaner.Close() - close(reminderService.ServiceChannel) - close(fanInService.ServiceChannel) - jobScheduler.Stop() - log.Infof("JobScheduler stopped") - rpcServer.GracefulStop() - log.Infof("rpc server stopped") - if httpServer.Shutdown(ctx) != nil { - log.Fatalf("failed to stop http server: %v", err) - } else { - log.Infof("http server stopped") - } -} diff --git a/daemon/cmd/server/server.go b/daemon/cmd/server/server.go new file mode 100644 index 000000000..dfee8ae0b --- /dev/null +++ b/daemon/cmd/server/server.go @@ -0,0 +1,145 @@ +package server + +import ( + "context" + "log" + "strconv" + "strings" + + "github.com/singerdmx/BulletJournal/daemon/api/middleware" + "github.com/singerdmx/BulletJournal/daemon/api/service" + daemon "github.com/singerdmx/BulletJournal/daemon/api/service" + "github.com/singerdmx/BulletJournal/daemon/config" + "github.com/singerdmx/BulletJournal/daemon/persistence" + "github.com/singerdmx/BulletJournal/daemon/utils" + "github.com/singerdmx/BulletJournal/protobuf/daemon/grpc/services" + "github.com/singerdmx/BulletJournal/protobuf/daemon/grpc/types" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +const ( + bulletJournalId string = "bulletJournal" + FanInServiceName string = "fanIn" + CleanerServiceName string = "cleaner" + ReminderServiceName string = "reminder" +) + +// server should implement services.UnimplementedDaemonServer's methods +type Server struct { + ServiceConfig *config.Config + subscriptions map[string][]daemon.Streaming + messageService *service.MessageService + etagDao *persistence.EtagDao + joinGroupInvitationDao *persistence.JoinGroupInvitationDao +} + +func NewServer(ctx context.Context, services []daemon.Streaming) *Server { + // Get config + serviceConfig := config.GetConfig() + + // Get clients + redisClient := persistence.GetRedisClient(serviceConfig) + mailClient, err := persistence.GetMailClient() + if err != nil { + log.Printf("Failed to initialize mail client: %v", err) + } + + // Get DAOs + etagDao := persistence.InitializeEtagDao(ctx, redisClient) + groupDao := persistence.NewGroupDao(ctx) + joinGroupInvitationDao := persistence.InitializeJoinGroupInvitationDao(ctx, redisClient) + + // Get services + messageService := daemon.NewMessageService(groupDao, joinGroupInvitationDao, mailClient) + + return &Server{ + ServiceConfig: serviceConfig, + subscriptions: map[string][]daemon.Streaming{bulletJournalId: services}, + messageService: messageService, + etagDao: etagDao, + joinGroupInvitationDao: joinGroupInvitationDao, + } +} + +// HealthCheck implements the rest endpoint healthcheck -> rpc +func (s *Server) HealthCheck(ctx context.Context, request *types.HealthCheckRequest) (*types.HealthCheckResponse, error) { + log.Printf("Received health check request: %v", request.String()) + return &types.HealthCheckResponse{}, nil +} + +// Send implements the JoinGroupEvents rpc endpoint of services.DaemonServer +func (s *Server) JoinGroupEvents(ctx context.Context, request *types.JoinGroupEvents) (*types.ReplyMessage, error) { + log.Printf("Received rpc request: %v", request.String()) + return &types.ReplyMessage{Message: "Hello daemon"}, nil +} + +// Rest implements the Rest rest->rpc endpoint of services.DaemonServer +func (s *Server) HandleJoinGroupResponse(ctx context.Context, request *types.JoinGroupResponse) (*types.ReplyMessage, error) { + if meta, ok := metadata.FromIncomingContext(ctx); ok { + if requestId, ok := meta[strings.ToLower(middleware.RequestIDKey)]; ok { + log.Printf(middleware.RequestIDKey+": %v", requestId[0]) + grpc.SendHeader(ctx, metadata.Pairs(middleware.RequestIDKey, requestId[0])) + } else { + requestId := utils.GenerateUID() + log.Printf(middleware.RequestIDKey+": %v", requestId) + grpc.SendHeader(ctx, metadata.Pairs(middleware.RequestIDKey, requestId)) + } + } + log.Printf("Received JoinGroupResponse request: %v", request.String()) + // get username from uid + invitation := s.joinGroupInvitationDao.Find(request.Uid) + // then delete etags + s.etagDao.DeleteEtagByUserName(invitation.Username) + // finally delete edges + // TODO: call usergroup dao + return &types.ReplyMessage{Message: "Hello daemon"}, nil +} + +func (s *Server) SubscribeNotification(subscribe *types.SubscribeNotification, stream services.Daemon_SubscribeNotificationServer) error { + log.Printf("Received rpc request for subscription: %s", subscribe.String()) + if _, ok := s.subscriptions[subscribe.Id]; ok { + log.Printf("Subscription: %s's streaming has been idle, start streaming!", subscribe.String()) + daemonServices := s.subscriptions[subscribe.Id] + // Prevent requests with the same subscribe.Id from new subscriptions + delete(s.subscriptions, subscribe.Id) + // Keep the subscription session alive + var fanInChannel chan *daemon.StreamingMessage + for _, service := range daemonServices { + if service.ServiceName == FanInServiceName { + fanInChannel = service.ServiceChannel + } else { + go func(service daemon.Streaming) { + for message := range service.ServiceChannel { + message.ServiceName = service.ServiceName + log.Printf("Preparing streaming: %v to subscription: %s", message, subscribe.String()) + fanInChannel <- message + } + }(service) + } + } + for service := range fanInChannel { + if service == nil { + log.Printf("Service: %s for subscription: %s is closed", service.ServiceName, subscribe.String()) + log.Printf("Closing streaming to subscription: %s", subscribe.String()) + break + } else if service.ServiceName == CleanerServiceName { + projectId := strconv.Itoa(int(service.Message)) + if err := stream.Send(&types.StreamMessage{Id: CleanerServiceName, Message: projectId}); err != nil { + log.Printf("Unexpected error happened to subscription: %s, error: %v", subscribe.String(), err) + // Allow future requests with the same subscribe.Id from new subscriptions + s.subscriptions[subscribe.Id] = daemonServices + log.Printf("Transition streaming to idle for subscription: %s", subscribe.String()) + break + } else { + log.Printf("Streaming projectId: %s to subscription: %s", projectId, subscribe.String()) + } + } else { + log.Printf("Service: %s for subscription: %s is not implemented as for now", service.ServiceName, subscribe.String()) + } + } + } else { + log.Printf("Subscription %s is already streaming!", subscribe.String()) + } + return nil +} diff --git a/daemon/persistence/etag_dao.go b/daemon/persistence/etag_dao.go index 7d4c421e4..528b7629f 100644 --- a/daemon/persistence/etag_dao.go +++ b/daemon/persistence/etag_dao.go @@ -6,7 +6,6 @@ import ( "time" "github.com/go-redis/redis/v8" - "github.com/singerdmx/BulletJournal/daemon/config" ) const ( @@ -19,9 +18,8 @@ type EtagDao struct { Rdb *redis.Client } -func InitializeEtagDao(ctx context.Context, serviceConfig *config.Config) *EtagDao { - client := GetRedisClient(serviceConfig) - return &EtagDao{Ctx: ctx, Rdb: client} +func InitializeEtagDao(ctx context.Context, redisClient *redis.Client) *EtagDao { + return &EtagDao{Ctx: ctx, Rdb: redisClient} } func (e *EtagDao) SingleCache(etag *Etag) { diff --git a/daemon/persistence/group_dao.go b/daemon/persistence/group_dao.go index 62025eb40..87602e753 100644 --- a/daemon/persistence/group_dao.go +++ b/daemon/persistence/group_dao.go @@ -2,30 +2,30 @@ package persistence import ( "context" + "gorm.io/gorm" ) type GroupDao struct { Ctx context.Context - db *gorm.DB + db *gorm.DB } -func NewGroupDao() *GroupDao { +func NewGroupDao(ctx context.Context) *GroupDao { return &GroupDao{ - db: DB, + Ctx: ctx, + db: DB, } } var groupDao *GroupDao - func (g *GroupDao) FindGroup(groupId uint64) *Group { var group *Group g.db.Where("id = ?", groupId).First(&group) return group } - func (g *GroupDao) Find(groupId uint64) *Group { return g.FindGroup(groupId) } diff --git a/daemon/persistence/joingroupinvitation_dao.go b/daemon/persistence/joingroupinvitation_dao.go index a0a5d0f0a..ae25ede64 100644 --- a/daemon/persistence/joingroupinvitation_dao.go +++ b/daemon/persistence/joingroupinvitation_dao.go @@ -6,7 +6,6 @@ import ( "time" "github.com/go-redis/redis/v8" - "github.com/singerdmx/BulletJournal/daemon/config" ) const ( @@ -19,9 +18,8 @@ type JoinGroupInvitationDao struct { Rdb *redis.Client } -func InitializeJoinGroupInvitationDao(serviceConfig *config.Config) *JoinGroupInvitationDao { - client := GetRedisClient(serviceConfig) - return &JoinGroupInvitationDao{Rdb: client} +func InitializeJoinGroupInvitationDao(ctx context.Context, redisClient *redis.Client) *JoinGroupInvitationDao { + return &JoinGroupInvitationDao{Ctx: ctx, Rdb: redisClient} } func (j *JoinGroupInvitationDao) SingleCache(joinGroupInvitation *JoinGroupInvitation) { From ebc5920a89ae436576a1101b4589bd1846d86cc5 Mon Sep 17 00:00:00 2001 From: Zhiyuan Wang Date: Sun, 4 Oct 2020 21:46:20 -0400 Subject: [PATCH 2/4] Move creation of cleaner, fanIn and reminder services to server file --- daemon/cmd/main.go | 11 ++++------- daemon/cmd/server/server.go | 30 +++++++++++++++++++----------- 2 files changed, 23 insertions(+), 18 deletions(-) diff --git a/daemon/cmd/main.go b/daemon/cmd/main.go index 1601acc49..90f66f0d5 100644 --- a/daemon/cmd/main.go +++ b/daemon/cmd/main.go @@ -36,10 +36,7 @@ func main() { shutdown := make(chan os.Signal, 1) signal.Notify(shutdown, syscall.SIGTERM, syscall.SIGINT) - fanInService := daemon.Streaming{ServiceName: server.FanInServiceName, ServiceChannel: make(chan *daemon.StreamingMessage, 100)} - cleanerService := daemon.Streaming{ServiceName: server.CleanerServiceName, ServiceChannel: make(chan *daemon.StreamingMessage, 100)} - reminderService := daemon.Streaming{ServiceName: server.ReminderServiceName, ServiceChannel: make(chan *daemon.StreamingMessage, 100)} - daemonRpc := server.NewServer(ctx, []daemon.Streaming{fanInService, cleanerService, reminderService}) + daemonRpc := server.NewServer(ctx) rpcPort := ":" + daemonRpc.ServiceConfig.RPCPort lis, err := net.Listen("tcp", rpcPort) @@ -84,7 +81,7 @@ func main() { jobScheduler := scheduler.NewJobScheduler() jobScheduler.Start() cleaner := daemon.Cleaner{ - Service: cleanerService, + Service: daemonRpc.CleanerService, Settings: postgresql.ConnectionURL{ Host: daemonRpc.ServiceConfig.Host + ":" + daemonRpc.ServiceConfig.DBPort, Database: daemonRpc.ServiceConfig.Database, @@ -111,8 +108,8 @@ func main() { <-shutdown log.Infof("Shutdown signal received") cleaner.Close() - close(reminderService.ServiceChannel) - close(fanInService.ServiceChannel) + close(daemonRpc.ReminderService.ServiceChannel) + close(daemonRpc.FanInService.ServiceChannel) jobScheduler.Stop() log.Infof("JobScheduler stopped") rpcServer.GracefulStop() diff --git a/daemon/cmd/server/server.go b/daemon/cmd/server/server.go index dfee8ae0b..b7819e647 100644 --- a/daemon/cmd/server/server.go +++ b/daemon/cmd/server/server.go @@ -7,7 +7,6 @@ import ( "strings" "github.com/singerdmx/BulletJournal/daemon/api/middleware" - "github.com/singerdmx/BulletJournal/daemon/api/service" daemon "github.com/singerdmx/BulletJournal/daemon/api/service" "github.com/singerdmx/BulletJournal/daemon/config" "github.com/singerdmx/BulletJournal/daemon/persistence" @@ -20,21 +19,24 @@ import ( const ( bulletJournalId string = "bulletJournal" - FanInServiceName string = "fanIn" - CleanerServiceName string = "cleaner" - ReminderServiceName string = "reminder" + fanInServiceName string = "fanIn" + cleanerServiceName string = "cleaner" + reminderServiceName string = "reminder" ) // server should implement services.UnimplementedDaemonServer's methods type Server struct { ServiceConfig *config.Config + CleanerService daemon.Streaming + FanInService daemon.Streaming + MessageService *daemon.MessageService + ReminderService daemon.Streaming subscriptions map[string][]daemon.Streaming - messageService *service.MessageService etagDao *persistence.EtagDao joinGroupInvitationDao *persistence.JoinGroupInvitationDao } -func NewServer(ctx context.Context, services []daemon.Streaming) *Server { +func NewServer(ctx context.Context) *Server { // Get config serviceConfig := config.GetConfig() @@ -52,11 +54,17 @@ func NewServer(ctx context.Context, services []daemon.Streaming) *Server { // Get services messageService := daemon.NewMessageService(groupDao, joinGroupInvitationDao, mailClient) + fanInService := daemon.Streaming{ServiceName: fanInServiceName, ServiceChannel: make(chan *daemon.StreamingMessage, 100)} + cleanerService := daemon.Streaming{ServiceName: cleanerServiceName, ServiceChannel: make(chan *daemon.StreamingMessage, 100)} + reminderService := daemon.Streaming{ServiceName: reminderServiceName, ServiceChannel: make(chan *daemon.StreamingMessage, 100)} return &Server{ ServiceConfig: serviceConfig, - subscriptions: map[string][]daemon.Streaming{bulletJournalId: services}, - messageService: messageService, + CleanerService: cleanerService, + FanInService: fanInService, + MessageService: messageService, + ReminderService: reminderService, + subscriptions: map[string][]daemon.Streaming{bulletJournalId: []daemon.Streaming{fanInService, cleanerService, reminderService}}, etagDao: etagDao, joinGroupInvitationDao: joinGroupInvitationDao, } @@ -106,7 +114,7 @@ func (s *Server) SubscribeNotification(subscribe *types.SubscribeNotification, s // Keep the subscription session alive var fanInChannel chan *daemon.StreamingMessage for _, service := range daemonServices { - if service.ServiceName == FanInServiceName { + if service.ServiceName == fanInServiceName { fanInChannel = service.ServiceChannel } else { go func(service daemon.Streaming) { @@ -123,9 +131,9 @@ func (s *Server) SubscribeNotification(subscribe *types.SubscribeNotification, s log.Printf("Service: %s for subscription: %s is closed", service.ServiceName, subscribe.String()) log.Printf("Closing streaming to subscription: %s", subscribe.String()) break - } else if service.ServiceName == CleanerServiceName { + } else if service.ServiceName == cleanerServiceName { projectId := strconv.Itoa(int(service.Message)) - if err := stream.Send(&types.StreamMessage{Id: CleanerServiceName, Message: projectId}); err != nil { + if err := stream.Send(&types.StreamMessage{Id: cleanerServiceName, Message: projectId}); err != nil { log.Printf("Unexpected error happened to subscription: %s, error: %v", subscribe.String(), err) // Allow future requests with the same subscribe.Id from new subscriptions s.subscriptions[subscribe.Id] = daemonServices From e9a3cae32a425b7e6758f3e57c57208cd7b3d49f Mon Sep 17 00:00:00 2001 From: Zhiyuan Wang Date: Sun, 4 Oct 2020 22:19:40 -0400 Subject: [PATCH 3/4] Normalize naming of DAOs --- daemon/cmd/server/server.go | 28 ++----------------- daemon/persistence/etag_dao.go | 2 +- daemon/persistence/joingroupinvitation_dao.go | 2 +- daemon/persistence/usergroup_dao.go | 11 +++++++- 4 files changed, 14 insertions(+), 29 deletions(-) diff --git a/daemon/cmd/server/server.go b/daemon/cmd/server/server.go index cb90b8f0c..10c5bde08 100644 --- a/daemon/cmd/server/server.go +++ b/daemon/cmd/server/server.go @@ -2,7 +2,6 @@ package server import ( "context" -<<<<<<< HEAD:daemon/cmd/server/server.go "log" "strconv" "strings" @@ -16,29 +15,6 @@ import ( "github.com/singerdmx/BulletJournal/protobuf/daemon/grpc/types" "google.golang.org/grpc" "google.golang.org/grpc/metadata" -======= - "github.com/grpc-ecosystem/grpc-gateway/runtime" - "github.com/singerdmx/BulletJournal/daemon/api/middleware" - daemon "github.com/singerdmx/BulletJournal/daemon/api/service" - "github.com/singerdmx/BulletJournal/daemon/config" - "github.com/singerdmx/BulletJournal/daemon/logging" - "github.com/singerdmx/BulletJournal/daemon/persistence" - uid "github.com/singerdmx/BulletJournal/daemon/utils" - "github.com/singerdmx/BulletJournal/protobuf/daemon/grpc/services" - "github.com/singerdmx/BulletJournal/protobuf/daemon/grpc/types" - scheduler "github.com/zywangzy/JobScheduler" - "google.golang.org/grpc" - "google.golang.org/grpc/metadata" - "net" - "net/http" - "os" - "os/signal" - "strconv" - "strings" - "syscall" - "time" - "upper.io/db.v3/postgresql" ->>>>>>> a2905b3c0aadf9b3594ab4ac624c33273a03493a:daemon/cmd/server/main.go ) const ( @@ -72,9 +48,9 @@ func NewServer(ctx context.Context) *Server { } // Get DAOs - etagDao := persistence.InitializeEtagDao(ctx, redisClient) + etagDao := persistence.NewEtagDao(ctx, redisClient) groupDao := persistence.NewGroupDao(ctx) - joinGroupInvitationDao := persistence.InitializeJoinGroupInvitationDao(ctx, redisClient) + joinGroupInvitationDao := persistence.NewJoinGroupInvitationDao(ctx, redisClient) // Get services messageService := daemon.NewMessageService(groupDao, joinGroupInvitationDao, mailClient) diff --git a/daemon/persistence/etag_dao.go b/daemon/persistence/etag_dao.go index 528b7629f..093347ea7 100644 --- a/daemon/persistence/etag_dao.go +++ b/daemon/persistence/etag_dao.go @@ -18,7 +18,7 @@ type EtagDao struct { Rdb *redis.Client } -func InitializeEtagDao(ctx context.Context, redisClient *redis.Client) *EtagDao { +func NewEtagDao(ctx context.Context, redisClient *redis.Client) *EtagDao { return &EtagDao{Ctx: ctx, Rdb: redisClient} } diff --git a/daemon/persistence/joingroupinvitation_dao.go b/daemon/persistence/joingroupinvitation_dao.go index ae25ede64..b9a85bfa4 100644 --- a/daemon/persistence/joingroupinvitation_dao.go +++ b/daemon/persistence/joingroupinvitation_dao.go @@ -18,7 +18,7 @@ type JoinGroupInvitationDao struct { Rdb *redis.Client } -func InitializeJoinGroupInvitationDao(ctx context.Context, redisClient *redis.Client) *JoinGroupInvitationDao { +func NewJoinGroupInvitationDao(ctx context.Context, redisClient *redis.Client) *JoinGroupInvitationDao { return &JoinGroupInvitationDao{Ctx: ctx, Rdb: redisClient} } diff --git a/daemon/persistence/usergroup_dao.go b/daemon/persistence/usergroup_dao.go index d4f7266bd..80398e9dd 100644 --- a/daemon/persistence/usergroup_dao.go +++ b/daemon/persistence/usergroup_dao.go @@ -2,16 +2,25 @@ package persistence import ( "context" + "github.com/singerdmx/BulletJournal/daemon/logging" "gorm.io/gorm" ) type UserGroupDao struct { Ctx context.Context - db *gorm.DB + db *gorm.DB log *logging.Logger } +func NewUserGroupDao(ctx context.Context) *UserGroupDao { + userGroupDao := UserGroupDao{ + Ctx: ctx, + db: DB, + } + return &userGroupDao +} + func (u *UserGroupDao) Find(key UserGroupKey) (UserGroup, error) { var userGroup UserGroup if err := u.db.First(&userGroup, key).Error; err != nil { From dc34c0ae1b0252fefe35de88d397a811e948dae4 Mon Sep 17 00:00:00 2001 From: Zhiyuan Wang Date: Sun, 4 Oct 2020 22:21:53 -0400 Subject: [PATCH 4/4] Add context to SampleTaskDao --- daemon/persistence/sampletask_dao.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/daemon/persistence/sampletask_dao.go b/daemon/persistence/sampletask_dao.go index 77b099e5b..199da1db3 100644 --- a/daemon/persistence/sampletask_dao.go +++ b/daemon/persistence/sampletask_dao.go @@ -4,18 +4,20 @@ import ( "context" "errors" "fmt" + "gorm.io/gorm" ) type SampleTaskDao struct { Ctx context.Context - db *gorm.DB + db *gorm.DB } -func NewSampleTaskDao() (*SampleTaskDao, error) { +func NewSampleTaskDao(ctx context.Context) (*SampleTaskDao, error) { sampleTaskDao := SampleTaskDao{ - db: DB, + Ctx: ctx, + db: DB, } return &sampleTaskDao, nil } @@ -24,7 +26,7 @@ func (s *SampleTaskDao) Upsert(t *SampleTask) { prevReport := SampleTask{} err := s.db.Where("uid = ?", t.Uid).Last(&prevReport).Error if errors.Is(err, gorm.ErrRecordNotFound) { - err :=s.db.Create(&t).Error + err := s.db.Create(&t).Error if err != nil { fmt.Println(err, t.ID) } @@ -65,5 +67,5 @@ func (s *SampleTaskDao) Upsert(t *SampleTask) { // t.UpdatedAt, // t.AvailableBefore, // ) - //s.db.Exec(query) -//} \ No newline at end of file +//s.db.Exec(query) +//}