diff --git a/server/main.go b/server/main.go index 9c663d0..76d63af 100644 --- a/server/main.go +++ b/server/main.go @@ -134,6 +134,7 @@ func (r *Router) loadRoutes() { r.router.Match(validMethods, "/facebookhelp", r.views.FacebookHelpFunc) // Facebook help page r.router.Match(validMethods, "/"+r.config.StreamerWebsocketPath, r.views.Websocket) // Websocket for the recorder and forwarder to communicate on r.router.Match(validMethods, "/activeStreams", r.views.ActiveStreamsFunc) + r.router.Match(validMethods, "/"+r.config.StreamerAdminPath+"/forceRemove/:unique", r.views.ForceRemoveFunc) r.router.GET("/api/health", func(c echo.Context) error { marshal, err := json.Marshal(struct { Status int `json:"status"` diff --git a/server/views/forceRemove.go b/server/views/forceRemove.go new file mode 100644 index 0000000..19c4f26 --- /dev/null +++ b/server/views/forceRemove.go @@ -0,0 +1,112 @@ +package views + +import ( + "fmt" + "log" + "net/http" + + "github.com/labstack/echo/v4" + + commonTransporter "github.com/ystv/streamer/common/transporter" + "github.com/ystv/streamer/common/transporter/action" + "github.com/ystv/streamer/common/transporter/server" + "github.com/ystv/streamer/common/wsMessages" +) + +func (v *Views) ForceRemoveFunc(c echo.Context) error { + if c.Request().Method == "POST" { + log.Printf("force remove called") + + var response struct { + Error string `json:"error"` + } + + unique := c.Param("unique") + if len(unique) != 10 { + log.Printf("unique key invalid: %s", unique) + response.Error = fmt.Sprintf("unique key invalid: %s", unique) + return c.JSON(http.StatusOK, response) + } + + stored := false + + _, err := v.store.FindStored(unique) + if err != nil { + log.Printf("force did not find stored with unique: %s, attempting streams", unique) + } else { + stored = true + err = v.store.DeleteStored(unique) + if err != nil { + log.Printf("failed to delete stored: %+v, unique: %s", err, unique) + response.Error = fmt.Sprintf("failed to delete stored: %+v, unique: %s", err, unique) + return c.JSON(http.StatusInternalServerError, response) + } + } + + stream, err := v.store.FindStream(unique) + if err != nil { + log.Printf("force did not find stream with unique: %s", unique) + if !stored { + log.Printf("failing forced") + response.Error = fmt.Sprintf("force did not find stream or stored with unique: %s, failing", unique) + return c.JSON(http.StatusInternalServerError, response) + } + } else { + _, rec := v.cache.Get(server.Recorder.String()) + _, fow := v.cache.Get(server.Forwarder.String()) + errorString := "" + transporter := commonTransporter.Transporter{ + Action: action.Stop, + Unique: unique, + } + if len(stream.Recording) > 0 && rec { + recorderTransporter := transporter + var wsResponse commonTransporter.ResponseTransporter + wsResponse, err = v.wsHelper(server.Recorder, recorderTransporter) + if err != nil { + log.Printf("failed sending to Recorder for force stop: %+v", err) + errorString += fmt.Sprintf("failed sending to Recorder for force stop: %+v", err) + } + if wsResponse.Status == wsMessages.Error { + log.Printf("failed sending to Recorder for force stop: %s", wsResponse.Payload) + errorString += fmt.Sprintf("failed sending to Recorder for force stop: %s", wsResponse.Payload) + } + if wsResponse.Status != wsMessages.Okay { + log.Printf("invalid response from Recorder for force stop: %s", wsResponse.Status) + errorString += fmt.Sprintf("invalid response from Recorder for force stop: %s", wsResponse.Status) + } + } + if fow { + forwarderTransporter := transporter + + var wsResponse commonTransporter.ResponseTransporter + wsResponse, err = v.wsHelper(server.Forwarder, forwarderTransporter) + if err != nil { + log.Printf("failed sending to Forwarder for force stop: %+v", err) + errorString += fmt.Sprintf("failed sending to Forwarder for force stop: %+v", err) + } + if wsResponse.Status == wsMessages.Error { + log.Printf("failed sending to Forwarder for force stop: %s", wsResponse.Payload) + errorString += fmt.Sprintf("failed sending to Forwarder for force stop: %s", wsResponse.Payload) + } + if wsResponse.Status != wsMessages.Okay { + log.Printf("invalid response from Forwarder for force stop: %s", wsResponse.Status) + errorString += fmt.Sprintf("invalid response from Forwarder for force stop: %s", wsResponse.Status) + } + } + + err = v.store.DeleteStream(unique) + if err != nil { + log.Printf("failed to delete stream: %+v, unique: %s", err, unique) + errorString += fmt.Sprintf("failed to delete stream: %+v, unique: %s", err, unique) + } + + if len(errorString) > 0 { + response.Error = errorString + return c.JSON(http.StatusInternalServerError, response) + } + } + return nil + } + return echo.NewHTTPError(http.StatusMethodNotAllowed, "invalid method") +} diff --git a/server/views/views.go b/server/views/views.go index 2ca48d6..6ba7986 100644 --- a/server/views/views.go +++ b/server/views/views.go @@ -30,6 +30,7 @@ type ( ServerAddress string `envconfig:"SERVER_ADDRESS"` RecordingLocation string `envconfig:"RECORDING_LOCATION"` StreamerWebsocketPath string `envconfig:"STREAMER_WEBSOCKET_PATH"` + StreamerAdminPath string `envconfig:"STREAMER_ADMIN_PATH"` } // Views encapsulates our view dependencies