diff --git a/.gitkeep b/.gitkeep new file mode 100644 index 0000000..e08c5c7 --- /dev/null +++ b/.gitkeep @@ -0,0 +1,4 @@ +cmd +internal +tests +Makefile \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..e69de29 diff --git a/internal/event/subscribers.go b/internal/event/subscribers.go deleted file mode 100644 index 16ad6c1..0000000 --- a/internal/event/subscribers.go +++ /dev/null @@ -1,8 +0,0 @@ -package event - -import "log" - -func EventSubscribers() { - log.Println("Subscribing to app events...") - subscribeToPermissions() -} diff --git a/internal/event/eventBus.go b/internal/events/eventBus.go similarity index 92% rename from internal/event/eventBus.go rename to internal/events/eventBus.go index cbcf1e2..bc0c0f9 100644 --- a/internal/event/eventBus.go +++ b/internal/events/eventBus.go @@ -1,4 +1,4 @@ -package event +package events import ( "log" @@ -38,12 +38,8 @@ func (eb *EventBus) Subscribe(topic string, ch DataChannel) { eb.rm.Lock() if prev, found := eb.subscribers[topic]; found { eb.subscribers[topic] = append(prev, ch) - log.Println("subscribed to channel :::", ch) - } else { eb.subscribers[topic] = append([]DataChannel{}, ch) - log.Println("subscribed to channel :::", ch) - } eb.rm.Unlock() } diff --git a/internal/events/publishers/publishers.go b/internal/events/publishers/publishers.go new file mode 100644 index 0000000..c2ff21d --- /dev/null +++ b/internal/events/publishers/publishers.go @@ -0,0 +1,7 @@ +package publishers + +import "log" + +func InitEventPublishers() { + log.Println("Initiating global event publishers...") +} diff --git a/internal/events/publishers/request.go b/internal/events/publishers/request.go new file mode 100644 index 0000000..fd1bfa1 --- /dev/null +++ b/internal/events/publishers/request.go @@ -0,0 +1,36 @@ +package publishers + +import ( + "log" + "sync" + + "github.com/Tibz-Dankan/keep-active/internal/events" + "github.com/Tibz-Dankan/keep-active/internal/models" +) + +// Publishes all apps to the topic "makeRequest" +// for the purposes of making requests to the apps. +// This function must be called in a scheduler +func PublishRequestEvent() { + app := models.App{} + + apps, err := app.FindAll() + if err != nil { + log.Println("Error fetching apps:", err) + return + } + + if len(apps) == 0 { + return + } + + var wg sync.WaitGroup + for _, app := range apps { + wg.Add(1) + go func(app models.App) { + defer wg.Done() + events.EB.Publish("makeRequest", app) + }(app) + } + wg.Wait() +} diff --git a/internal/event/permissions.go b/internal/events/subscribers/permissions.go similarity index 78% rename from internal/event/permissions.go rename to internal/events/subscribers/permissions.go index 1cf65c5..e70b44f 100644 --- a/internal/event/permissions.go +++ b/internal/events/subscribers/permissions.go @@ -1,15 +1,16 @@ -package event +package subscribers import ( "log" "time" + "github.com/Tibz-Dankan/keep-active/internal/events" "github.com/Tibz-Dankan/keep-active/internal/models" ) func subscribeToPermissions() { - permissionCh := make(chan DataEvent) - EB.Subscribe("permissions", permissionCh) + permissionCh := make(chan events.DataEvent) + events.EB.Subscribe("permissions", permissionCh) permission := models.Permissions{} type User = models.User diff --git a/internal/events/subscribers/request.go b/internal/events/subscribers/request.go new file mode 100644 index 0000000..d642cdb --- /dev/null +++ b/internal/events/subscribers/request.go @@ -0,0 +1,30 @@ +package subscribers + +import ( + "log" + + "github.com/Tibz-Dankan/keep-active/internal/events" + "github.com/Tibz-Dankan/keep-active/internal/models" + "github.com/Tibz-Dankan/keep-active/internal/routes/request" +) + +// Subscribes/listens to the topic "makeRequest" +// and performs an http get request to each app +// whose data has been received. +func subscribeToRequestEvent() { + appCh := make(chan events.DataEvent) + events.EB.Subscribe("makeRequest", appCh) + type App = models.App + + for { + appEvent := <-appCh + app, ok := appEvent.Data.(App) + + if !ok { + log.Println("Interface does not hold type App") + return + } + + go request.MakeAppRequest(app) + } +} diff --git a/internal/events/subscribers/subscribers.go b/internal/events/subscribers/subscribers.go new file mode 100644 index 0000000..9a4e5d7 --- /dev/null +++ b/internal/events/subscribers/subscribers.go @@ -0,0 +1,9 @@ +package subscribers + +import "log" + +func InitEventSubscribers() { + log.Println("Initiating global event subscribers...") + go subscribeToRequestEvent() + subscribeToPermissions() +} diff --git a/internal/routes/app/disableApp.go b/internal/routes/app/disableApp.go index 06f3a9a..62c5030 100644 --- a/internal/routes/app/disableApp.go +++ b/internal/routes/app/disableApp.go @@ -4,7 +4,6 @@ import ( "encoding/json" "net/http" - "github.com/Tibz-Dankan/keep-active/internal/event" "github.com/Tibz-Dankan/keep-active/internal/models" "github.com/Tibz-Dankan/keep-active/internal/services" "github.com/gorilla/mux" @@ -39,8 +38,6 @@ func disableApp(w http.ResponseWriter, r *http.Request) { return } - event.EB.Publish("updateApp", app) - response := map[string]interface{}{ "status": "success", "message": "App is disabled successfully", diff --git a/internal/routes/app/enableApp.go b/internal/routes/app/enableApp.go index 38b42fe..2e5fd73 100644 --- a/internal/routes/app/enableApp.go +++ b/internal/routes/app/enableApp.go @@ -4,7 +4,6 @@ import ( "encoding/json" "net/http" - "github.com/Tibz-Dankan/keep-active/internal/event" "github.com/Tibz-Dankan/keep-active/internal/models" "github.com/Tibz-Dankan/keep-active/internal/services" "github.com/gorilla/mux" @@ -39,8 +38,6 @@ func enableApp(w http.ResponseWriter, r *http.Request) { return } - event.EB.Publish("updateApp", app) - response := map[string]interface{}{ "status": "success", "message": "App is enabled successfully", diff --git a/internal/routes/app/postApp.go b/internal/routes/app/postApp.go index 82a0dd4..a474444 100644 --- a/internal/routes/app/postApp.go +++ b/internal/routes/app/postApp.go @@ -4,7 +4,7 @@ import ( "encoding/json" "net/http" - "github.com/Tibz-Dankan/keep-active/internal/event" + "github.com/Tibz-Dankan/keep-active/internal/events" "github.com/Tibz-Dankan/keep-active/internal/middlewares" "github.com/Tibz-Dankan/keep-active/internal/models" "github.com/Tibz-Dankan/keep-active/internal/services" @@ -70,7 +70,7 @@ func PostAdd(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(response) user := models.User{ID: userID} - event.EB.Publish("permissions", user) + events.EB.Publish("permissions", user) } func PostAppRoute(router *mux.Router) { diff --git a/internal/routes/app/updateApp.go b/internal/routes/app/updateApp.go index f6d6a45..3a8dc30 100644 --- a/internal/routes/app/updateApp.go +++ b/internal/routes/app/updateApp.go @@ -4,7 +4,6 @@ import ( "encoding/json" "net/http" - "github.com/Tibz-Dankan/keep-active/internal/event" "github.com/Tibz-Dankan/keep-active/internal/models" "github.com/Tibz-Dankan/keep-active/internal/services" "github.com/gorilla/mux" @@ -72,8 +71,6 @@ func updateApp(w http.ResponseWriter, r *http.Request) { return } - event.EB.Publish("updateApp", app) - response := map[string]interface{}{ "status": "success", "message": "Updated successfully", diff --git a/internal/routes/auth/resetPassword.go b/internal/routes/auth/resetPassword.go index 4de9564..eecb147 100644 --- a/internal/routes/auth/resetPassword.go +++ b/internal/routes/auth/resetPassword.go @@ -4,7 +4,7 @@ import ( "encoding/json" "net/http" - "github.com/Tibz-Dankan/keep-active/internal/event" + "github.com/Tibz-Dankan/keep-active/internal/events" "github.com/Tibz-Dankan/keep-active/internal/models" "github.com/Tibz-Dankan/keep-active/internal/services" "github.com/gorilla/mux" @@ -68,7 +68,7 @@ func resetPassword(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(response) - event.EB.Publish("permissions", user) + events.EB.Publish("permissions", user) } func ResetPasswordRoute(router *mux.Router) { diff --git a/internal/routes/auth/signin.go b/internal/routes/auth/signin.go index ea975c9..3c9f518 100644 --- a/internal/routes/auth/signin.go +++ b/internal/routes/auth/signin.go @@ -4,7 +4,7 @@ import ( "encoding/json" "net/http" - "github.com/Tibz-Dankan/keep-active/internal/event" + "github.com/Tibz-Dankan/keep-active/internal/events" "github.com/Tibz-Dankan/keep-active/internal/models" "github.com/Tibz-Dankan/keep-active/internal/services" "github.com/gorilla/mux" @@ -67,7 +67,7 @@ func signIn(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(response) - event.EB.Publish("permissions", user) + events.EB.Publish("permissions", user) } func SignInRoute(router *mux.Router) { diff --git a/internal/routes/auth/signup.go b/internal/routes/auth/signup.go index a6969cc..bf6514b 100644 --- a/internal/routes/auth/signup.go +++ b/internal/routes/auth/signup.go @@ -4,7 +4,7 @@ import ( "encoding/json" "net/http" - "github.com/Tibz-Dankan/keep-active/internal/event" + "github.com/Tibz-Dankan/keep-active/internal/events" "github.com/Tibz-Dankan/keep-active/internal/models" "github.com/Tibz-Dankan/keep-active/internal/services" "github.com/gorilla/mux" @@ -73,7 +73,7 @@ func signUp(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(response) user.ID = userId - event.EB.Publish("permissions", user) + events.EB.Publish("permissions", user) } func SignUpRoute(router *mux.Router) { diff --git a/internal/routes/feedback/postFeedback.go b/internal/routes/feedback/postFeedback.go index 73ec635..2af1a39 100644 --- a/internal/routes/feedback/postFeedback.go +++ b/internal/routes/feedback/postFeedback.go @@ -4,7 +4,7 @@ import ( "encoding/json" "net/http" - "github.com/Tibz-Dankan/keep-active/internal/event" + "github.com/Tibz-Dankan/keep-active/internal/events" "github.com/Tibz-Dankan/keep-active/internal/middlewares" "github.com/Tibz-Dankan/keep-active/internal/models" "github.com/Tibz-Dankan/keep-active/internal/services" @@ -48,7 +48,7 @@ func postFeedback(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(response) user := models.User{ID: userId} - event.EB.Publish("permissions", user) + events.EB.Publish("permissions", user) } func PostFeedbackRoute(router *mux.Router) { diff --git a/internal/routes/request/deleteRequestTime.go b/internal/routes/request/deleteRequestTime.go index e0067a3..327896d 100644 --- a/internal/routes/request/deleteRequestTime.go +++ b/internal/routes/request/deleteRequestTime.go @@ -4,7 +4,6 @@ import ( "encoding/json" "net/http" - "github.com/Tibz-Dankan/keep-active/internal/event" "github.com/Tibz-Dankan/keep-active/internal/models" "github.com/Tibz-Dankan/keep-active/internal/services" "github.com/gorilla/mux" @@ -40,8 +39,6 @@ func deleteRequestTime(w http.ResponseWriter, r *http.Request) { "status": "success", "message": "Request Time deleted successfully", } - app := models.App{ID: savedRequestTime.AppID} - event.EB.Publish("updateApp", app) w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) diff --git a/internal/routes/request/getLiveRequests.go b/internal/routes/request/getLiveRequests.go index e4be0e3..5f7be8b 100644 --- a/internal/routes/request/getLiveRequests.go +++ b/internal/routes/request/getLiveRequests.go @@ -7,7 +7,7 @@ import ( "net/http" "time" - "github.com/Tibz-Dankan/keep-active/internal/event" + "github.com/Tibz-Dankan/keep-active/internal/events" "github.com/Tibz-Dankan/keep-active/internal/middlewares" "github.com/Tibz-Dankan/keep-active/internal/services" "github.com/gorilla/mux" @@ -90,8 +90,8 @@ func getLiveRequests(w http.ResponseWriter, r *http.Request) { } type AppRequestProgress = services.AppRequestProgress - appCh := make(chan event.DataEvent) - event.EB.Subscribe("appRequestProgress", appCh) + appCh := make(chan events.DataEvent) + events.EB.Subscribe("appRequestProgress", appCh) heartbeatTicker := time.NewTicker(30 * time.Second) @@ -120,7 +120,7 @@ func getLiveRequests(w http.ResponseWriter, r *http.Request) { } case <-disconnect: clientManager.RemoveClient(userId) - event.EB.Unsubscribe("appRequestProgress", appCh) + events.EB.Unsubscribe("appRequestProgress", appCh) heartbeatTicker.Stop() cancel() log.Println("Client disconnected:", userId) diff --git a/internal/routes/request/makeRequest.go b/internal/routes/request/makeRequest.go index f732949..a044985 100644 --- a/internal/routes/request/makeRequest.go +++ b/internal/routes/request/makeRequest.go @@ -3,81 +3,14 @@ package request import ( "log" "strconv" - "sync" "time" - "github.com/Tibz-Dankan/keep-active/internal/event" + "github.com/Tibz-Dankan/keep-active/internal/events" "github.com/Tibz-Dankan/keep-active/internal/models" "github.com/Tibz-Dankan/keep-active/internal/services" ) -func StartRequestScheduler() { - go requestPublishScheduler() - go requestEventSubscriber() -} - -// Runs the requestPublisher fn at -// start of every minute -func requestPublishScheduler() { - for { - now := time.Now() - nextMinute := now.Truncate(time.Minute).Add(time.Minute) - sleepDuration := nextMinute.Sub(now) - minute := now.Minute() - - if minute%5 == 0 && now.Second() == 0 { - requestPublisher() - } - - time.Sleep(sleepDuration) - } -} - -// Subscribes/listens to all published -// app request events -func requestEventSubscriber() { - appCh := make(chan event.DataEvent) - event.EB.Subscribe("makeRequest", appCh) - type App = models.App - - for { - appEvent := <-appCh - app, ok := appEvent.Data.(App) - - if !ok { - log.Println("Interface does not hold type App") - return - } - - go MakeAppRequest(app) - } -} - -func requestPublisher() { - app := models.App{} - - apps, err := app.FindAll() - if err != nil { - log.Println("Error fetching apps:", err) - return - } - - if len(apps) == 0 { - return - } - - var wg sync.WaitGroup - for _, app := range apps { - wg.Add(1) - go func(app models.App) { - defer wg.Done() - event.EB.Publish("makeRequest", app) - }(app) - } - wg.Wait() -} - // Makes request for the app func MakeAppRequest(app models.App) { ok, err := validateApp(app) @@ -92,7 +25,7 @@ func MakeAppRequest(app models.App) { appRequestProgress := services.AppRequestProgress{App: app, InProgress: true} services.UserAppMem.Add(app.UserID, appRequestProgress) - event.EB.Publish("appRequestProgress", appRequestProgress) + events.EB.Publish("appRequestProgress", appRequestProgress) response, err := services.MakeHTTPRequest(app.URL) if err != nil { @@ -117,7 +50,7 @@ func MakeAppRequest(app models.App) { appRequestProgress.InProgress = false services.UserAppMem.Add(app.UserID, appRequestProgress) - event.EB.Publish("appRequestProgress", appRequestProgress) + events.EB.Publish("appRequestProgress", appRequestProgress) } // Validates the app's eligibility for making requests diff --git a/internal/routes/request/postRequestTime.go b/internal/routes/request/postRequestTime.go index 057d904..6168891 100644 --- a/internal/routes/request/postRequestTime.go +++ b/internal/routes/request/postRequestTime.go @@ -6,7 +6,7 @@ import ( "net/http" "time" - "github.com/Tibz-Dankan/keep-active/internal/event" + "github.com/Tibz-Dankan/keep-active/internal/events" "github.com/Tibz-Dankan/keep-active/internal/middlewares" "github.com/Tibz-Dankan/keep-active/internal/models" "github.com/Tibz-Dankan/keep-active/internal/services" @@ -60,7 +60,7 @@ func postRequestTime(w http.ResponseWriter, r *http.Request) { userId, _ := r.Context().Value(middlewares.UserIDKey).(string) user := models.User{ID: userId} - event.EB.Publish("permissions", user) + events.EB.Publish("permissions", user) } func PostRequestTimeRoute(router *mux.Router) { diff --git a/internal/routes/request/updateRequestTime.go b/internal/routes/request/updateRequestTime.go index cf08075..880f48d 100644 --- a/internal/routes/request/updateRequestTime.go +++ b/internal/routes/request/updateRequestTime.go @@ -4,7 +4,6 @@ import ( "encoding/json" "net/http" - "github.com/Tibz-Dankan/keep-active/internal/event" "github.com/Tibz-Dankan/keep-active/internal/models" "github.com/Tibz-Dankan/keep-active/internal/services" "github.com/gorilla/mux" @@ -57,9 +56,6 @@ func updateRequestTime(w http.ResponseWriter, r *http.Request) { "requestTime": updatedRequestTime, } - app := models.App{ID: requestTime.AppID} - event.EB.Publish("updateApp", app) - w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(response) diff --git a/internal/services/clearUserAppMemory.go b/internal/schedulers/UserAppMemory.go similarity index 77% rename from internal/services/clearUserAppMemory.go rename to internal/schedulers/UserAppMemory.go index fe54fb0..6dcbdb7 100644 --- a/internal/services/clearUserAppMemory.go +++ b/internal/schedulers/UserAppMemory.go @@ -1,14 +1,14 @@ -package services +package schedulers import ( - "log" "time" + + "github.com/Tibz-Dankan/keep-active/internal/services" ) // deletes all key-value pairs in the user app memory // at 35 seconds for every minute that is a multiple of 5 -func clearUserAppMemory() { - log.Println("Inside clearUserAppMemory()") +func cleanUserAppMemory() { for { now := time.Now() minute := now.Minute() @@ -29,11 +29,7 @@ func clearUserAppMemory() { now = time.Now() if now.Minute()%5 == 0 && now.Second() == 35 { - UserAppMem.DeleteAll() + services.UserAppMem.DeleteAll() } } } - -func StartClearUserAppMemoryScheduler() { - go clearUserAppMemory() -} diff --git a/internal/schedulers/request.go b/internal/schedulers/request.go new file mode 100644 index 0000000..3d83104 --- /dev/null +++ b/internal/schedulers/request.go @@ -0,0 +1,24 @@ +package schedulers + +import ( + "time" + + "github.com/Tibz-Dankan/keep-active/internal/events/publishers" +) + +// Runs the PublishRequestEvent fn at +// start of every minute that is a multiple of 5 +func schedulePublishRequest() { + for { + now := time.Now() + nextMinute := now.Truncate(time.Minute).Add(time.Minute) + sleepDuration := nextMinute.Sub(now) + minute := now.Minute() + + if minute%5 == 0 && now.Second() == 0 { + publishers.PublishRequestEvent() + } + + time.Sleep(sleepDuration) + } +} diff --git a/internal/schedulers/schedulers.go b/internal/schedulers/schedulers.go new file mode 100644 index 0000000..ec073f6 --- /dev/null +++ b/internal/schedulers/schedulers.go @@ -0,0 +1,9 @@ +package schedulers + +import "log" + +func InitSchedulers() { + log.Println("Initiating schedulers...") + go schedulePublishRequest() + cleanUserAppMemory() +} diff --git a/main.go b/main.go index c22207d..579b5ea 100644 --- a/main.go +++ b/main.go @@ -4,12 +4,12 @@ import ( "log" "net/http" - "github.com/Tibz-Dankan/keep-active/internal/event" + "github.com/Tibz-Dankan/keep-active/internal/events/publishers" + "github.com/Tibz-Dankan/keep-active/internal/events/subscribers" "github.com/Tibz-Dankan/keep-active/internal/middlewares" "github.com/Tibz-Dankan/keep-active/internal/models" "github.com/Tibz-Dankan/keep-active/internal/routes" - "github.com/Tibz-Dankan/keep-active/internal/routes/request" - "github.com/Tibz-Dankan/keep-active/internal/services" + "github.com/Tibz-Dankan/keep-active/internal/schedulers" "github.com/rs/cors" ) @@ -35,11 +35,9 @@ func main() { log.Println("Starting http server up on 8000") go http.ListenAndServe(":8000", nil) - services.StartClearUserAppMemoryScheduler() - // Call StartRequestScheduler after server is started - request.StartRequestScheduler() - - event.EventSubscribers() + go schedulers.InitSchedulers() + go subscribers.InitEventSubscribers() + publishers.InitEventPublishers() select {} }