Skip to content

Commit

Permalink
Adding force removal
Browse files Browse the repository at this point in the history
  • Loading branch information
COMTOP1 committed Feb 20, 2024
1 parent 50febed commit eaed875
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 0 deletions.
1 change: 1 addition & 0 deletions server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func (r *Router) loadRoutes() {
r.router.Match(validMethods, "/facebookhelp", r.views.FacebookHelpFunc) // Facebook help page
r.router.Match(validMethods, "/"+r.config.StreamerWebsocketPath, r.views.Websocket) // Websocket for the recorder and forwarder to communicate on
r.router.Match(validMethods, "/activeStreams", r.views.ActiveStreamsFunc)
r.router.Match(validMethods, "/"+r.config.StreamerAdminPath+"/forceRemove/:unique", r.views.ForceRemoveFunc)
r.router.GET("/api/health", func(c echo.Context) error {
marshal, err := json.Marshal(struct {
Status int `json:"status"`
Expand Down
112 changes: 112 additions & 0 deletions server/views/forceRemove.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package views

import (
"fmt"
"log"
"net/http"

"github.com/labstack/echo/v4"

commonTransporter "github.com/ystv/streamer/common/transporter"
"github.com/ystv/streamer/common/transporter/action"
"github.com/ystv/streamer/common/transporter/server"
"github.com/ystv/streamer/common/wsMessages"
)

func (v *Views) ForceRemoveFunc(c echo.Context) error {
if c.Request().Method == "POST" {
log.Printf("force remove called")

var response struct {
Error string `json:"error"`
}

unique := c.Param("unique")
if len(unique) != 10 {
log.Printf("unique key invalid: %s", unique)
response.Error = fmt.Sprintf("unique key invalid: %s", unique)
return c.JSON(http.StatusOK, response)
}

stored := false

_, err := v.store.FindStored(unique)
if err != nil {
log.Printf("force did not find stored with unique: %s, attempting streams", unique)
} else {
stored = true
err = v.store.DeleteStored(unique)
if err != nil {
log.Printf("failed to delete stored: %+v, unique: %s", err, unique)
response.Error = fmt.Sprintf("failed to delete stored: %+v, unique: %s", err, unique)
return c.JSON(http.StatusInternalServerError, response)
}
}

stream, err := v.store.FindStream(unique)
if err != nil {
log.Printf("force did not find stream with unique: %s", unique)
if !stored {
log.Printf("failing forced")
response.Error = fmt.Sprintf("force did not find stream or stored with unique: %s, failing", unique)
return c.JSON(http.StatusInternalServerError, response)
}
} else {
_, rec := v.cache.Get(server.Recorder.String())
_, fow := v.cache.Get(server.Forwarder.String())
errorString := ""
transporter := commonTransporter.Transporter{
Action: action.Stop,
Unique: unique,
}
if len(stream.Recording) > 0 && rec {
recorderTransporter := transporter
var wsResponse commonTransporter.ResponseTransporter
wsResponse, err = v.wsHelper(server.Recorder, recorderTransporter)
if err != nil {
log.Printf("failed sending to Recorder for force stop: %+v", err)
errorString += fmt.Sprintf("failed sending to Recorder for force stop: %+v", err)
}
if wsResponse.Status == wsMessages.Error {
log.Printf("failed sending to Recorder for force stop: %s", wsResponse.Payload)
errorString += fmt.Sprintf("failed sending to Recorder for force stop: %s", wsResponse.Payload)
}
if wsResponse.Status != wsMessages.Okay {
log.Printf("invalid response from Recorder for force stop: %s", wsResponse.Status)
errorString += fmt.Sprintf("invalid response from Recorder for force stop: %s", wsResponse.Status)
}
}
if fow {
forwarderTransporter := transporter

var wsResponse commonTransporter.ResponseTransporter
wsResponse, err = v.wsHelper(server.Forwarder, forwarderTransporter)
if err != nil {
log.Printf("failed sending to Forwarder for force stop: %+v", err)
errorString += fmt.Sprintf("failed sending to Forwarder for force stop: %+v", err)
}
if wsResponse.Status == wsMessages.Error {
log.Printf("failed sending to Forwarder for force stop: %s", wsResponse.Payload)
errorString += fmt.Sprintf("failed sending to Forwarder for force stop: %s", wsResponse.Payload)
}
if wsResponse.Status != wsMessages.Okay {
log.Printf("invalid response from Forwarder for force stop: %s", wsResponse.Status)
errorString += fmt.Sprintf("invalid response from Forwarder for force stop: %s", wsResponse.Status)
}
}

err = v.store.DeleteStream(unique)
if err != nil {
log.Printf("failed to delete stream: %+v, unique: %s", err, unique)
errorString += fmt.Sprintf("failed to delete stream: %+v, unique: %s", err, unique)
}

if len(errorString) > 0 {
response.Error = errorString
return c.JSON(http.StatusInternalServerError, response)
}
}
return nil
}
return echo.NewHTTPError(http.StatusMethodNotAllowed, "invalid method")
}
1 change: 1 addition & 0 deletions server/views/views.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type (
ServerAddress string `envconfig:"SERVER_ADDRESS"`
RecordingLocation string `envconfig:"RECORDING_LOCATION"`
StreamerWebsocketPath string `envconfig:"STREAMER_WEBSOCKET_PATH"`
StreamerAdminPath string `envconfig:"STREAMER_ADMIN_PATH"`
}

// Views encapsulates our view dependencies
Expand Down

0 comments on commit eaed875

Please sign in to comment.