Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Normalize daemon dependency injection #42

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion daemon/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ TARGET ?= target

.PHONY: daemon-server
daemon-server:
GOOS=${GOOS} GOARCH=${GOARCH} go build -mod=vendor -o ${TARGET}/daemon-server cmd/server/main.go
GOOS=${GOOS} GOARCH=${GOARCH} go build -mod=vendor -o ${TARGET}/daemon-server cmd/main.go
chmod a+x ${TARGET}/daemon-server

.PHONY: clean
Expand Down
19 changes: 12 additions & 7 deletions daemon/api/service/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"time"

"github.com/mailjet/mailjet-apiv3-go"
"github.com/singerdmx/BulletJournal/daemon/config"
"github.com/singerdmx/BulletJournal/daemon/persistence"
"github.com/singerdmx/BulletJournal/daemon/utils"
)
Expand All @@ -19,8 +18,16 @@ const (
var ctx = context.Background()

type MessageService struct {
groupDao *persistence.GroupDao
mailClient *persistence.MailjetClient
groupDao *persistence.GroupDao
joinGroupInvitationDao *persistence.JoinGroupInvitationDao
mailClient *mailjet.Client
}

func NewMessageService(
groupDao *persistence.GroupDao,
joinGroupInvitationDao *persistence.JoinGroupInvitationDao,
mailClient *mailjet.Client) *MessageService {
return &MessageService{groupDao, joinGroupInvitationDao, mailClient}
}

func GetUrl(uuid string, action string) string {
Expand All @@ -31,12 +38,10 @@ func GetUrl(uuid string, action string) string {
func (m *MessageService) SendJoinGroupEmail(username, email string, groupId, uid uint64) {
notificationId := utils.GenerateUID()
// Set in redis with key of uid and value of JoinGroupInvitation json string
joinGroupInvitationDao := persistence.InitializeJoinGroupInvitationDao(config.GetConfig())
joinGroupInvitationDao.SingleCache(
m.joinGroupInvitationDao.SingleCache(
&persistence.JoinGroupInvitation{string(uid), username, string(groupId), notificationId})

groupDao := persistence.NewGroupDao()
group := groupDao.FindGroup(groupId)
group := m.groupDao.FindGroup(groupId)
if group == nil {
log.Fatalf("cannot find group with group id %v", groupId)
return
Expand Down
122 changes: 122 additions & 0 deletions daemon/cmd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package main

import (
"context"
"net"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/singerdmx/BulletJournal/daemon/api/middleware"
daemon "github.com/singerdmx/BulletJournal/daemon/api/service"
"github.com/singerdmx/BulletJournal/daemon/cmd/server"
"github.com/singerdmx/BulletJournal/daemon/config"
"github.com/singerdmx/BulletJournal/daemon/logging"
"github.com/singerdmx/BulletJournal/protobuf/daemon/grpc/services"
scheduler "github.com/zywangzy/JobScheduler"
"google.golang.org/grpc"
"upper.io/db.v3/postgresql"
)

var log logging.Logger

func main() {
logging.InitLogging(config.GetEnv())
log = *logging.GetLogger()

ctx := context.Background()
log.WithContext(ctx)

ctx, cancel := context.WithCancel(ctx)
defer cancel()

shutdown := make(chan os.Signal, 1)
signal.Notify(shutdown, syscall.SIGTERM, syscall.SIGINT)

daemonRpc := server.NewServer(ctx)

rpcPort := ":" + daemonRpc.ServiceConfig.RPCPort
lis, err := net.Listen("tcp", rpcPort)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
rpcServer := grpc.NewServer()
services.RegisterDaemonServer(rpcServer, daemonRpc)

gatewayMux := runtime.NewServeMux(runtime.WithIncomingHeaderMatcher(middleware.IncomingHeaderMatcher), runtime.WithOutgoingHeaderMatcher(middleware.OutgoingHeaderMatcher))
endpoint := "127.0.0.1" + rpcPort
err = services.RegisterDaemonHandlerFromEndpoint(ctx, gatewayMux, endpoint, []grpc.DialOption{grpc.WithInsecure()})
if err != nil {
log.Fatalf("failed to register rpc server to gateway server: %v", err)
}

httpAddr := ":" + daemonRpc.ServiceConfig.HttpPort
httpServer := &http.Server{Addr: httpAddr, Handler: gatewayMux}

//// Serve the swagger-ui and swagger file
//mux := http.NewServeMux()
//mux.Handle("/", rmux)
//mux.HandleFunc("/swagger.json", serveSwagger)
//fs := http.FileServer(http.Dir("www/swagger-ui"))
//mux.Handle("/swagger-ui/", http.StripPrefix("/swagger-ui", fs))

go func() {
log.Infof("rpc server running at port [%v]", daemonRpc.ServiceConfig.RPCPort)
if err := rpcServer.Serve(lis); err != nil {
log.Fatalf("rpc server failed to serve: %v", err)
}
}()

go func() {
log.Infof("http server running at port [%v]", daemonRpc.ServiceConfig.HttpPort)
// Start HTTP server (and proxy calls to gRPC server endpoint)
if err := httpServer.ListenAndServe(); err != nil {
log.Fatalf("http server failed to serve: %v", err)
}
}()

jobScheduler := scheduler.NewJobScheduler()
jobScheduler.Start()
cleaner := daemon.Cleaner{
Service: daemonRpc.CleanerService,
Settings: postgresql.ConnectionURL{
Host: daemonRpc.ServiceConfig.Host + ":" + daemonRpc.ServiceConfig.DBPort,
Database: daemonRpc.ServiceConfig.Database,
User: daemonRpc.ServiceConfig.Username,
Password: daemonRpc.ServiceConfig.Password,
},
}

PST, _ := time.LoadLocation("America/Los_Angeles")
log.Infof("PST [%T] [%v]", PST, PST)
year, month, day := time.Now().AddDate(0, 0, daemonRpc.ServiceConfig.IntervalInDays).In(PST).Date()
start := time.Date(year, month, day, 0, 0, 0, 0, PST)

daemonBackgroundJob := daemon.Job{Cleaner: cleaner, Reminder: daemon.Reminder{}, Investment: daemon.Investment{}}
log.Infof("The next daemon job will start at %v", start.Format(time.RFC3339))
jobScheduler.AddRecurrentJob(
daemonBackgroundJob.Run,
start,
time.Hour*24*time.Duration(daemonRpc.ServiceConfig.IntervalInDays),
PST,
daemonRpc.ServiceConfig.MaxRetentionTimeInDays,
)

<-shutdown
log.Infof("Shutdown signal received")
cleaner.Close()
close(daemonRpc.ReminderService.ServiceChannel)
close(daemonRpc.FanInService.ServiceChannel)
jobScheduler.Stop()
log.Infof("JobScheduler stopped")
rpcServer.GracefulStop()
log.Infof("rpc server stopped")
if httpServer.Shutdown(ctx) != nil {
log.Fatalf("failed to stop http server: %v", err)
} else {
log.Infof("http server stopped")
}
}
Loading