diff --git a/daemon/Makefile b/daemon/Makefile index a53b4f380..78256cf7a 100644 --- a/daemon/Makefile +++ b/daemon/Makefile @@ -4,7 +4,7 @@ TARGET ?= target .PHONY: daemon-server daemon-server: - GOOS=${GOOS} GOARCH=${GOARCH} go build -mod=vendor -o ${TARGET}/daemon-server cmd/server/main.go + GOOS=${GOOS} GOARCH=${GOARCH} go build -mod=vendor -o ${TARGET}/daemon-server cmd/main.go chmod a+x ${TARGET}/daemon-server .PHONY: clean diff --git a/daemon/api/service/message.go b/daemon/api/service/message.go index 35e8f66c1..4c9746d69 100644 --- a/daemon/api/service/message.go +++ b/daemon/api/service/message.go @@ -5,7 +5,6 @@ import ( "time" "github.com/mailjet/mailjet-apiv3-go" - "github.com/singerdmx/BulletJournal/daemon/config" "github.com/singerdmx/BulletJournal/daemon/persistence" "github.com/singerdmx/BulletJournal/daemon/utils" ) @@ -19,8 +18,16 @@ const ( var ctx = context.Background() type MessageService struct { - groupDao *persistence.GroupDao - mailClient *persistence.MailjetClient + groupDao *persistence.GroupDao + joinGroupInvitationDao *persistence.JoinGroupInvitationDao + mailClient *mailjet.Client +} + +func NewMessageService( + groupDao *persistence.GroupDao, + joinGroupInvitationDao *persistence.JoinGroupInvitationDao, + mailClient *mailjet.Client) *MessageService { + return &MessageService{groupDao, joinGroupInvitationDao, mailClient} } func GetUrl(uuid string, action string) string { @@ -31,12 +38,10 @@ func GetUrl(uuid string, action string) string { func (m *MessageService) SendJoinGroupEmail(username, email string, groupId, uid uint64) { notificationId := utils.GenerateUID() // Set in redis with key of uid and value of JoinGroupInvitation json string - joinGroupInvitationDao := persistence.InitializeJoinGroupInvitationDao(config.GetConfig()) - joinGroupInvitationDao.SingleCache( + m.joinGroupInvitationDao.SingleCache( &persistence.JoinGroupInvitation{string(uid), username, string(groupId), notificationId}) - groupDao := persistence.NewGroupDao() - group := groupDao.FindGroup(groupId) + group := m.groupDao.FindGroup(groupId) if group == nil { log.Fatalf("cannot find group with group id %v", groupId) return diff --git a/daemon/cmd/main.go b/daemon/cmd/main.go new file mode 100644 index 000000000..90f66f0d5 --- /dev/null +++ b/daemon/cmd/main.go @@ -0,0 +1,122 @@ +package main + +import ( + "context" + "net" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "github.com/grpc-ecosystem/grpc-gateway/runtime" + "github.com/singerdmx/BulletJournal/daemon/api/middleware" + daemon "github.com/singerdmx/BulletJournal/daemon/api/service" + "github.com/singerdmx/BulletJournal/daemon/cmd/server" + "github.com/singerdmx/BulletJournal/daemon/config" + "github.com/singerdmx/BulletJournal/daemon/logging" + "github.com/singerdmx/BulletJournal/protobuf/daemon/grpc/services" + scheduler "github.com/zywangzy/JobScheduler" + "google.golang.org/grpc" + "upper.io/db.v3/postgresql" +) + +var log logging.Logger + +func main() { + logging.InitLogging(config.GetEnv()) + log = *logging.GetLogger() + + ctx := context.Background() + log.WithContext(ctx) + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + shutdown := make(chan os.Signal, 1) + signal.Notify(shutdown, syscall.SIGTERM, syscall.SIGINT) + + daemonRpc := server.NewServer(ctx) + + rpcPort := ":" + daemonRpc.ServiceConfig.RPCPort + lis, err := net.Listen("tcp", rpcPort) + if err != nil { + log.Fatalf("failed to listen: %v", err) + } + rpcServer := grpc.NewServer() + services.RegisterDaemonServer(rpcServer, daemonRpc) + + gatewayMux := runtime.NewServeMux(runtime.WithIncomingHeaderMatcher(middleware.IncomingHeaderMatcher), runtime.WithOutgoingHeaderMatcher(middleware.OutgoingHeaderMatcher)) + endpoint := "127.0.0.1" + rpcPort + err = services.RegisterDaemonHandlerFromEndpoint(ctx, gatewayMux, endpoint, []grpc.DialOption{grpc.WithInsecure()}) + if err != nil { + log.Fatalf("failed to register rpc server to gateway server: %v", err) + } + + httpAddr := ":" + daemonRpc.ServiceConfig.HttpPort + httpServer := &http.Server{Addr: httpAddr, Handler: gatewayMux} + + //// Serve the swagger-ui and swagger file + //mux := http.NewServeMux() + //mux.Handle("/", rmux) + //mux.HandleFunc("/swagger.json", serveSwagger) + //fs := http.FileServer(http.Dir("www/swagger-ui")) + //mux.Handle("/swagger-ui/", http.StripPrefix("/swagger-ui", fs)) + + go func() { + log.Infof("rpc server running at port [%v]", daemonRpc.ServiceConfig.RPCPort) + if err := rpcServer.Serve(lis); err != nil { + log.Fatalf("rpc server failed to serve: %v", err) + } + }() + + go func() { + log.Infof("http server running at port [%v]", daemonRpc.ServiceConfig.HttpPort) + // Start HTTP server (and proxy calls to gRPC server endpoint) + if err := httpServer.ListenAndServe(); err != nil { + log.Fatalf("http server failed to serve: %v", err) + } + }() + + jobScheduler := scheduler.NewJobScheduler() + jobScheduler.Start() + cleaner := daemon.Cleaner{ + Service: daemonRpc.CleanerService, + Settings: postgresql.ConnectionURL{ + Host: daemonRpc.ServiceConfig.Host + ":" + daemonRpc.ServiceConfig.DBPort, + Database: daemonRpc.ServiceConfig.Database, + User: daemonRpc.ServiceConfig.Username, + Password: daemonRpc.ServiceConfig.Password, + }, + } + + PST, _ := time.LoadLocation("America/Los_Angeles") + log.Infof("PST [%T] [%v]", PST, PST) + year, month, day := time.Now().AddDate(0, 0, daemonRpc.ServiceConfig.IntervalInDays).In(PST).Date() + start := time.Date(year, month, day, 0, 0, 0, 0, PST) + + daemonBackgroundJob := daemon.Job{Cleaner: cleaner, Reminder: daemon.Reminder{}, Investment: daemon.Investment{}} + log.Infof("The next daemon job will start at %v", start.Format(time.RFC3339)) + jobScheduler.AddRecurrentJob( + daemonBackgroundJob.Run, + start, + time.Hour*24*time.Duration(daemonRpc.ServiceConfig.IntervalInDays), + PST, + daemonRpc.ServiceConfig.MaxRetentionTimeInDays, + ) + + <-shutdown + log.Infof("Shutdown signal received") + cleaner.Close() + close(daemonRpc.ReminderService.ServiceChannel) + close(daemonRpc.FanInService.ServiceChannel) + jobScheduler.Stop() + log.Infof("JobScheduler stopped") + rpcServer.GracefulStop() + log.Infof("rpc server stopped") + if httpServer.Shutdown(ctx) != nil { + log.Fatalf("failed to stop http server: %v", err) + } else { + log.Infof("http server stopped") + } +} diff --git a/daemon/cmd/server/main.go b/daemon/cmd/server/server.go similarity index 51% rename from daemon/cmd/server/main.go rename to daemon/cmd/server/server.go index 422f9283c..10c5bde08 100644 --- a/daemon/cmd/server/main.go +++ b/daemon/cmd/server/server.go @@ -1,28 +1,20 @@ -package main +package server import ( "context" - "github.com/grpc-ecosystem/grpc-gateway/runtime" + "log" + "strconv" + "strings" + "github.com/singerdmx/BulletJournal/daemon/api/middleware" daemon "github.com/singerdmx/BulletJournal/daemon/api/service" "github.com/singerdmx/BulletJournal/daemon/config" - "github.com/singerdmx/BulletJournal/daemon/logging" "github.com/singerdmx/BulletJournal/daemon/persistence" - uid "github.com/singerdmx/BulletJournal/daemon/utils" + "github.com/singerdmx/BulletJournal/daemon/utils" "github.com/singerdmx/BulletJournal/protobuf/daemon/grpc/services" "github.com/singerdmx/BulletJournal/protobuf/daemon/grpc/types" - scheduler "github.com/zywangzy/JobScheduler" "google.golang.org/grpc" "google.golang.org/grpc/metadata" - "net" - "net/http" - "os" - "os/signal" - "strconv" - "strings" - "syscall" - "time" - "upper.io/db.v3/postgresql" ) const ( @@ -32,51 +24,87 @@ const ( reminderServiceName string = "reminder" ) -var log logging.Logger - // server should implement services.UnimplementedDaemonServer's methods -type server struct { - serviceConfig *config.Config - subscriptions map[string][]daemon.Streaming +type Server struct { + ServiceConfig *config.Config + CleanerService daemon.Streaming + FanInService daemon.Streaming + MessageService *daemon.MessageService + ReminderService daemon.Streaming + subscriptions map[string][]daemon.Streaming + etagDao *persistence.EtagDao + joinGroupInvitationDao *persistence.JoinGroupInvitationDao +} + +func NewServer(ctx context.Context) *Server { + // Get config + serviceConfig := config.GetConfig() + + // Get clients + redisClient := persistence.GetRedisClient(serviceConfig) + mailClient, err := persistence.GetMailClient() + if err != nil { + log.Printf("Failed to initialize mail client: %v", err) + } + + // Get DAOs + etagDao := persistence.NewEtagDao(ctx, redisClient) + groupDao := persistence.NewGroupDao(ctx) + joinGroupInvitationDao := persistence.NewJoinGroupInvitationDao(ctx, redisClient) + + // Get services + messageService := daemon.NewMessageService(groupDao, joinGroupInvitationDao, mailClient) + fanInService := daemon.Streaming{ServiceName: fanInServiceName, ServiceChannel: make(chan *daemon.StreamingMessage, 100)} + cleanerService := daemon.Streaming{ServiceName: cleanerServiceName, ServiceChannel: make(chan *daemon.StreamingMessage, 100)} + reminderService := daemon.Streaming{ServiceName: reminderServiceName, ServiceChannel: make(chan *daemon.StreamingMessage, 100)} + + return &Server{ + ServiceConfig: serviceConfig, + CleanerService: cleanerService, + FanInService: fanInService, + MessageService: messageService, + ReminderService: reminderService, + subscriptions: map[string][]daemon.Streaming{bulletJournalId: []daemon.Streaming{fanInService, cleanerService, reminderService}}, + etagDao: etagDao, + joinGroupInvitationDao: joinGroupInvitationDao, + } } // HealthCheck implements the rest endpoint healthcheck -> rpc -func (s *server) HealthCheck(ctx context.Context, request *types.HealthCheckRequest) (*types.HealthCheckResponse, error) { +func (s *Server) HealthCheck(ctx context.Context, request *types.HealthCheckRequest) (*types.HealthCheckResponse, error) { log.Printf("Received health check request: %v", request.String()) return &types.HealthCheckResponse{}, nil } // Send implements the JoinGroupEvents rpc endpoint of services.DaemonServer -func (s *server) JoinGroupEvents(ctx context.Context, request *types.JoinGroupEvents) (*types.ReplyMessage, error) { +func (s *Server) JoinGroupEvents(ctx context.Context, request *types.JoinGroupEvents) (*types.ReplyMessage, error) { log.Printf("Received rpc request: %v", request.String()) return &types.ReplyMessage{Message: "Hello daemon"}, nil } // Rest implements the Rest rest->rpc endpoint of services.DaemonServer -func (s *server) HandleJoinGroupResponse(ctx context.Context, request *types.JoinGroupResponse) (*types.ReplyMessage, error) { +func (s *Server) HandleJoinGroupResponse(ctx context.Context, request *types.JoinGroupResponse) (*types.ReplyMessage, error) { if meta, ok := metadata.FromIncomingContext(ctx); ok { if requestId, ok := meta[strings.ToLower(middleware.RequestIDKey)]; ok { log.Printf(middleware.RequestIDKey+": %v", requestId[0]) grpc.SendHeader(ctx, metadata.Pairs(middleware.RequestIDKey, requestId[0])) } else { - requestId := uid.GenerateUID() + requestId := utils.GenerateUID() log.Printf(middleware.RequestIDKey+": %v", requestId) grpc.SendHeader(ctx, metadata.Pairs(middleware.RequestIDKey, requestId)) } } log.Printf("Received JoinGroupResponse request: %v", request.String()) // get username from uid - joinGroupInvitationDao := persistence.InitializeJoinGroupInvitationDao(config.GetConfig()) - invitation := joinGroupInvitationDao.Find(request.Uid) + invitation := s.joinGroupInvitationDao.Find(request.Uid) // then delete etags - etagDao := persistence.InitializeEtagDao(ctx, config.GetConfig()) - etagDao.DeleteEtagByUserName(invitation.Username) + s.etagDao.DeleteEtagByUserName(invitation.Username) // finally delete edges // TODO: call usergroup dao return &types.ReplyMessage{Message: "Hello daemon"}, nil } -func (s *server) SubscribeNotification(subscribe *types.SubscribeNotification, stream services.Daemon_SubscribeNotificationServer) error { +func (s *Server) SubscribeNotification(subscribe *types.SubscribeNotification, stream services.Daemon_SubscribeNotificationServer) error { log.Printf("Received rpc request for subscription: %s", subscribe.String()) if _, ok := s.subscriptions[subscribe.Id]; ok { log.Printf("Subscription: %s's streaming has been idle, start streaming!", subscribe.String()) @@ -123,107 +151,3 @@ func (s *server) SubscribeNotification(subscribe *types.SubscribeNotification, s } return nil } - -func main() { - logging.InitLogging(config.GetEnv()) - log = *logging.GetLogger() - - ctx := context.Background() - log.WithContext(ctx) - - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - shutdown := make(chan os.Signal, 1) - signal.Notify(shutdown, syscall.SIGTERM, syscall.SIGINT) - - fanInService := daemon.Streaming{ServiceName: fanInServiceName, ServiceChannel: make(chan *daemon.StreamingMessage, 100)} - cleanerService := daemon.Streaming{ServiceName: cleanerServiceName, ServiceChannel: make(chan *daemon.StreamingMessage, 100)} - reminderService := daemon.Streaming{ServiceName: reminderServiceName, ServiceChannel: make(chan *daemon.StreamingMessage, 100)} - daemonRpc := &server{ - serviceConfig: config.GetConfig(), - subscriptions: map[string][]daemon.Streaming{bulletJournalId: {fanInService, cleanerService, reminderService}}, - } - - rpcPort := ":" + daemonRpc.serviceConfig.RPCPort - lis, err := net.Listen("tcp", rpcPort) - if err != nil { - log.Fatalf("failed to listen: %v", err) - } - rpcServer := grpc.NewServer() - services.RegisterDaemonServer(rpcServer, daemonRpc) - - gatewayMux := runtime.NewServeMux(runtime.WithIncomingHeaderMatcher(middleware.IncomingHeaderMatcher), runtime.WithOutgoingHeaderMatcher(middleware.OutgoingHeaderMatcher)) - endpoint := "127.0.0.1" + rpcPort - err = services.RegisterDaemonHandlerFromEndpoint(ctx, gatewayMux, endpoint, []grpc.DialOption{grpc.WithInsecure()}) - if err != nil { - log.Fatalf("failed to register rpc server to gateway server: %v", err) - } - - httpAddr := ":" + daemonRpc.serviceConfig.HttpPort - httpServer := &http.Server{Addr: httpAddr, Handler: gatewayMux} - - //// Serve the swagger-ui and swagger file - //mux := http.NewServeMux() - //mux.Handle("/", rmux) - //mux.HandleFunc("/swagger.json", serveSwagger) - //fs := http.FileServer(http.Dir("www/swagger-ui")) - //mux.Handle("/swagger-ui/", http.StripPrefix("/swagger-ui", fs)) - - go func() { - log.Infof("rpc server running at port [%v]", daemonRpc.serviceConfig.RPCPort) - if err := rpcServer.Serve(lis); err != nil { - log.Fatalf("rpc server failed to serve: %v", err) - } - }() - - go func() { - log.Infof("http server running at port [%v]", daemonRpc.serviceConfig.HttpPort) - // Start HTTP server (and proxy calls to gRPC server endpoint) - if err := httpServer.ListenAndServe(); err != nil { - log.Fatalf("http server failed to serve: %v", err) - } - }() - - jobScheduler := scheduler.NewJobScheduler() - jobScheduler.Start() - cleaner := daemon.Cleaner{ - Service: cleanerService, - Settings: postgresql.ConnectionURL{ - Host: daemonRpc.serviceConfig.Host + ":" + daemonRpc.serviceConfig.DBPort, - Database: daemonRpc.serviceConfig.Database, - User: daemonRpc.serviceConfig.Username, - Password: daemonRpc.serviceConfig.Password, - }, - } - - PST, _ := time.LoadLocation("America/Los_Angeles") - log.Infof("PST [%T] [%v]", PST, PST) - year, month, day := time.Now().AddDate(0, 0, daemonRpc.serviceConfig.IntervalInDays).In(PST).Date() - start := time.Date(year, month, day, 0, 0, 0, 0, PST) - - daemonBackgroundJob := daemon.Job{Cleaner: cleaner, Reminder: daemon.Reminder{}, Investment: daemon.Investment{}} - log.Infof("The next daemon job will start at %v", start.Format(time.RFC3339)) - jobScheduler.AddRecurrentJob( - daemonBackgroundJob.Run, - start, - time.Hour*24*time.Duration(daemonRpc.serviceConfig.IntervalInDays), - PST, - daemonRpc.serviceConfig.MaxRetentionTimeInDays, - ) - - <-shutdown - log.Infof("Shutdown signal received") - cleaner.Close() - close(reminderService.ServiceChannel) - close(fanInService.ServiceChannel) - jobScheduler.Stop() - log.Infof("JobScheduler stopped") - rpcServer.GracefulStop() - log.Infof("rpc server stopped") - if httpServer.Shutdown(ctx) != nil { - log.Fatalf("failed to stop http server: %v", err) - } else { - log.Infof("http server stopped") - } -} diff --git a/daemon/persistence/etag_dao.go b/daemon/persistence/etag_dao.go index 7d4c421e4..093347ea7 100644 --- a/daemon/persistence/etag_dao.go +++ b/daemon/persistence/etag_dao.go @@ -6,7 +6,6 @@ import ( "time" "github.com/go-redis/redis/v8" - "github.com/singerdmx/BulletJournal/daemon/config" ) const ( @@ -19,9 +18,8 @@ type EtagDao struct { Rdb *redis.Client } -func InitializeEtagDao(ctx context.Context, serviceConfig *config.Config) *EtagDao { - client := GetRedisClient(serviceConfig) - return &EtagDao{Ctx: ctx, Rdb: client} +func NewEtagDao(ctx context.Context, redisClient *redis.Client) *EtagDao { + return &EtagDao{Ctx: ctx, Rdb: redisClient} } func (e *EtagDao) SingleCache(etag *Etag) { diff --git a/daemon/persistence/group_dao.go b/daemon/persistence/group_dao.go index 62025eb40..87602e753 100644 --- a/daemon/persistence/group_dao.go +++ b/daemon/persistence/group_dao.go @@ -2,30 +2,30 @@ package persistence import ( "context" + "gorm.io/gorm" ) type GroupDao struct { Ctx context.Context - db *gorm.DB + db *gorm.DB } -func NewGroupDao() *GroupDao { +func NewGroupDao(ctx context.Context) *GroupDao { return &GroupDao{ - db: DB, + Ctx: ctx, + db: DB, } } var groupDao *GroupDao - func (g *GroupDao) FindGroup(groupId uint64) *Group { var group *Group g.db.Where("id = ?", groupId).First(&group) return group } - func (g *GroupDao) Find(groupId uint64) *Group { return g.FindGroup(groupId) } diff --git a/daemon/persistence/joingroupinvitation_dao.go b/daemon/persistence/joingroupinvitation_dao.go index a0a5d0f0a..b9a85bfa4 100644 --- a/daemon/persistence/joingroupinvitation_dao.go +++ b/daemon/persistence/joingroupinvitation_dao.go @@ -6,7 +6,6 @@ import ( "time" "github.com/go-redis/redis/v8" - "github.com/singerdmx/BulletJournal/daemon/config" ) const ( @@ -19,9 +18,8 @@ type JoinGroupInvitationDao struct { Rdb *redis.Client } -func InitializeJoinGroupInvitationDao(serviceConfig *config.Config) *JoinGroupInvitationDao { - client := GetRedisClient(serviceConfig) - return &JoinGroupInvitationDao{Rdb: client} +func NewJoinGroupInvitationDao(ctx context.Context, redisClient *redis.Client) *JoinGroupInvitationDao { + return &JoinGroupInvitationDao{Ctx: ctx, Rdb: redisClient} } func (j *JoinGroupInvitationDao) SingleCache(joinGroupInvitation *JoinGroupInvitation) { diff --git a/daemon/persistence/sampletask_dao.go b/daemon/persistence/sampletask_dao.go index 77b099e5b..199da1db3 100644 --- a/daemon/persistence/sampletask_dao.go +++ b/daemon/persistence/sampletask_dao.go @@ -4,18 +4,20 @@ import ( "context" "errors" "fmt" + "gorm.io/gorm" ) type SampleTaskDao struct { Ctx context.Context - db *gorm.DB + db *gorm.DB } -func NewSampleTaskDao() (*SampleTaskDao, error) { +func NewSampleTaskDao(ctx context.Context) (*SampleTaskDao, error) { sampleTaskDao := SampleTaskDao{ - db: DB, + Ctx: ctx, + db: DB, } return &sampleTaskDao, nil } @@ -24,7 +26,7 @@ func (s *SampleTaskDao) Upsert(t *SampleTask) { prevReport := SampleTask{} err := s.db.Where("uid = ?", t.Uid).Last(&prevReport).Error if errors.Is(err, gorm.ErrRecordNotFound) { - err :=s.db.Create(&t).Error + err := s.db.Create(&t).Error if err != nil { fmt.Println(err, t.ID) } @@ -65,5 +67,5 @@ func (s *SampleTaskDao) Upsert(t *SampleTask) { // t.UpdatedAt, // t.AvailableBefore, // ) - //s.db.Exec(query) -//} \ No newline at end of file +//s.db.Exec(query) +//} diff --git a/daemon/persistence/usergroup_dao.go b/daemon/persistence/usergroup_dao.go index d4f7266bd..80398e9dd 100644 --- a/daemon/persistence/usergroup_dao.go +++ b/daemon/persistence/usergroup_dao.go @@ -2,16 +2,25 @@ package persistence import ( "context" + "github.com/singerdmx/BulletJournal/daemon/logging" "gorm.io/gorm" ) type UserGroupDao struct { Ctx context.Context - db *gorm.DB + db *gorm.DB log *logging.Logger } +func NewUserGroupDao(ctx context.Context) *UserGroupDao { + userGroupDao := UserGroupDao{ + Ctx: ctx, + db: DB, + } + return &userGroupDao +} + func (u *UserGroupDao) Find(key UserGroupKey) (UserGroup, error) { var userGroup UserGroup if err := u.db.First(&userGroup, key).Error; err != nil {