Skip to content

Commit

Permalink
Merge pull request #4 from ystv/marks/refactors
Browse files Browse the repository at this point in the history
A complete rewrite of how streamer works
  • Loading branch information
COMTOP1 authored Dec 16, 2023
2 parents 80099c8 + 8450129 commit 4f6c219
Show file tree
Hide file tree
Showing 87 changed files with 5,460 additions and 3,035 deletions.
34 changes: 34 additions & 0 deletions .github/workflows/golangci-lint.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
name: golangci-lint
on: [push, pull_request]

permissions:
contents: read
pull-requests: read

jobs:
golangci:
name: lint
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: WillAbides/setup-go-faster@v1
with:
go-version: '1.21'
- name: golangci-lint-server
uses: golangci/golangci-lint-action@v3
with:
version: latest
only-new-issues: true
working-directory: ./server
- name: golangci-lint-forwarder
uses: golangci/golangci-lint-action@v3
with:
version: latest
only-new-issues: true
working-directory: ./forwarder
- name: golangci-lint-recorder
uses: golangci/golangci-lint-action@v3
with:
version: latest
only-new-issues: true
working-directory: ./recorder
154 changes: 154 additions & 0 deletions Jenkinsfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
String registryEndpoint = 'registry.comp.ystv.co.uk'

def branch = env.BRANCH_NAME.replaceAll("/", "_")
def image
String proceed = "yes"
String serverImageName = "ystv/streamer/server:${branch}-${env.BUILD_ID}"
String forwarderImageName = "ystv/streamer/forwarder:${branch}-${env.BUILD_ID}"
String recorderImageName = "ystv/streamer/recorder:${branch}-${env.BUILD_ID}"

pipeline {
agent {
label 'docker'
}

environment {
DOCKER_BUILDKIT = '1'
}

stages {
stage('Build images') {
parallel {
stage('Build Server') {
steps {
script {
dir("server") {
docker.withRegistry('https://' + registryEndpoint, 'docker-registry') {
serverImage = docker.build(serverImageName, "--no-cache .")
}
}
}
}
}
stage('Build Forwarder') {
steps {
script {
dir("forwarder") {
docker.withRegistry('https://' + registryEndpoint, 'docker-registry') {
forwarderImage = docker.build(forwarderImageName, "--no-cache .")
}
}
}
}
}
stage('Build Recorder') {
steps {
script {
dir("recorder") {
docker.withRegistry('https://' + registryEndpoint, 'docker-registry') {
recorderImage = docker.build(recorderImageName, "--no-cache .")
}
}
}
}
}
}
}

stage('Push images to registry') {
parallel {
stage('Push Server image to registry') {
steps {
script {
docker.withRegistry('https://' + registryEndpoint, 'docker-registry') {
serverImage.push()
if (env.BRANCH_IS_PRIMARY) {
serverImage.push('latest')
}
}
}
}
}
stage('Push Forwarder image to registry') {
steps {
script {
docker.withRegistry('https://' + registryEndpoint, 'docker-registry') {
forwarderImage.push()
if (env.BRANCH_IS_PRIMARY) {
forwarderImage.push('latest')
}
}
}
}
}
stage('Push Recorder image to registry') {
steps {
script {
docker.withRegistry('https://' + registryEndpoint, 'docker-registry') {
recorderImage.push()
if (env.BRANCH_IS_PRIMARY) {
recorderImage.push('latest')
}
}
}
}
}
}
}

stage('Deploy') {
stages {
stage('Checking existing') {
steps {
script {
final String url = "https://streamer.dev.ystv.co.uk/activeStreams"
final def (String response, String tempCode) =
sh(script: "curl -s -w '~~~%{response_code}' $url", returnStdout: true)
.trim()
.tokenize("~~~")
int code = Integer.parseInt(tempCode)

echo "HTTP response status code: $code"
if (response.contains("\"stream\":")) {
echo "HTTP response: $response"

if (code == 200) {
def streams = sh(script: "echo '$response' | jq -M '.streams'", returnStdout: true)
if (streams > 0) {
proceed = "no"
}
}
} else {
echo "HTTP response not JSON, proceeding..."
}
}
}
}
stage('Development') {
when {
expression { env.BRANCH_IS_PRIMARY && proceed == "yes" }
}
steps {
build(job: 'Deploy Nomad Job', parameters: [
string(name: 'JOB_FILE', value: 'streamer-dev.nomad'),
text(name: 'TAG_REPLACEMENTS', value: "${registryEndpoint}/${serverImageName} ${registryEndpoint}/${forwarderImageName} ${registryEndpoint}/${recorderImageName}")
])
}
}

stage('Production') {
when {
// Checking if it is semantic version release.
expression { return env.TAG_NAME ==~ /v(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)/ && proceed == "yes" }
}
steps {
build(job: 'Deploy Nomad Job', parameters: [
string(name: 'JOB_FILE', value: 'streamer-prod.nomad'),
text(name: 'TAG_REPLACEMENTS', value: "${registryEndpoint}/${serverImageName} ${registryEndpoint}/${forwarderImageName} ${registryEndpoint}/${recorderImageName}")
])
}
}
}
}
}
}
20 changes: 20 additions & 0 deletions forwarder/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
FROM golang:1.21.1-alpine3.18 AS build

LABEL site="ystv-streamer-forwarder"

VOLUME /logs

WORKDIR /src/

COPY go.mod .
COPY go.sum .

RUN go mod download

COPY . .

RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o /bin/forwarder

EXPOSE 1323

ENTRYPOINT ["/bin/forwarder"]
4 changes: 3 additions & 1 deletion forwarder/example.env
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
STREAM_SERVER=
STREAM_SERVER=
STREAMER_WEB_ADDRESS=
STREAMER_WEBSOCKET_PATH=
155 changes: 102 additions & 53 deletions forwarder/forwarder_start.go
Original file line number Diff line number Diff line change
@@ -1,70 +1,119 @@
package main

import (
"fmt"
"github.com/joho/godotenv"
"github.com/wricardo/gomux"
"github.com/patrickmn/go-cache"
"log"
"os"
"os/exec"
"strconv"
"strings"
"time"
)

func main() {
if strings.Contains(os.Args[0], "/var/folders") || strings.Contains(os.Args[0], "/tmp/go") || strings.Contains(os.Args[0], "./forwarder_start") {
if len(os.Args) < 5 {
log.Fatalf("echo Arguments error")
}
for i := 0; i < len(os.Args)-1; i++ {
os.Args[i] = os.Args[i+1]
}
} else {
if len(os.Args) < 4 {
log.Fatalf("echo Arguments error")
}
}
streamIn := os.Args[0]
websiteOut := os.Args[1]
unique := os.Args[2]
var serversKeys []string
for i := 3; i < len(os.Args)-1; i++ {
serversKeys = append(serversKeys, os.Args[i])
}
func (v *Views) start(transporter Transporter) error {
streamIn := "rtmp://" + v.Config.StreamServer + transporter.Payload.(ForwarderStart).StreamIn

if len(transporter.Payload.(ForwarderStart).WebsiteOut) > 0 {
finish := make(chan bool)

sessionName := "STREAM FORWARDER - " + unique
err := v.cache.Add(transporter.Unique+"_0Finish", finish, cache.NoExpiration)
if err != nil {
return err
}

s := gomux.NewSession(sessionName, os.Stdout)
go func() {
for {
v.cache.Delete(transporter.Unique + "_0")
switch {
case <-finish:
return
default:
c := exec.Command("ffmpeg", "-i", "\""+streamIn+"\"", "-c", "copy", "-f", "flv", "\""+v.Config.StreamServer+"live/"+transporter.Payload.(ForwarderStart).WebsiteOut+"\"", ">>", "\"/logs/"+transporter.Unique+"_0.txt\"", "2>&1")
err = v.cache.Add(transporter.Unique+"0", c, cache.NoExpiration)
if err != nil {
log.Println(err)
return
}
if err = c.Run(); err != nil {
log.Println("could not run command: ", err)
}
time.Sleep(500 * time.Millisecond)
}
}
}()

w1 := s.AddWindow("FORWARDING - 0")
go func() {
for {
switch {
case <-finish:
cmd, ok := v.cache.Get(transporter.Unique + "_0")
if !ok {
log.Println("unable to get cmd from cache")
}
c1 := cmd.(*exec.Cmd)
err = c1.Process.Kill()
if err != nil {
log.Println(err)
}
v.cache.Delete(transporter.Unique + "_0")
return
default:
time.Sleep(1 * time.Second) // This is so it doesn't spam constantly and take the entire CPU up
}
}
}()
}

var panes []*gomux.Pane
for i := 0; i < len(transporter.Payload.(ForwarderStart).Streams); i++ {
finish := make(chan bool)

err := godotenv.Load()
if err != nil {
fmt.Printf("echo Error loading .env file: %s", err)
} else {
streamServer := os.Getenv("STREAM_SERVER")
if websiteOut != "no" {
panes = append(panes, w1.Pane(0))
panes[0].Exec("./forwarder_start.sh " + streamServer + streamIn + " " + streamServer + "live/" + websiteOut + " " + unique + " " + strconv.Itoa(0) + " | bash")
} else {
panes = append(panes, w1.Pane(0))
panes[0].Exec("echo No website stream")
err := v.cache.Add(transporter.Unique+"_"+strconv.Itoa(i+1)+"Finish", finish, cache.NoExpiration)
if err != nil {
return err
}
j := 1
k := 0
for i := 0; i < len(serversKeys); i = i + 2 {
if (i%8) == 0 && i != 0 {
k++
w1 = s.AddWindow("FORWARDING - " + strconv.Itoa(k))
panes = append(panes, w1.Pane(0))

k := i
go func() {
j := k
for {
v.cache.Delete(transporter.Unique + "_" + strconv.Itoa(j+1))
switch {
case <-finish:
return
default:
c := exec.Command("ffmpeg", "-i", "\""+streamIn+"\"", "-c", "copy", "-f", "flv", "\""+transporter.Payload.(ForwarderStart).Streams[j]+"\"", ">>", "\"/logs/"+transporter.Unique+"_"+strconv.Itoa(j+1)+".txt\"", "2>&1")
err = v.cache.Add(transporter.Unique+"_"+strconv.Itoa(j+1), c, cache.NoExpiration)
if err != nil {
log.Println(err)
return
}
if err = c.Run(); err != nil {
log.Println("could not run command: ", err)
}
time.Sleep(500 * time.Millisecond)
}
}
panes = append(panes, w1.Pane(0).Split())
fmt.Println("echo", (i/2)+1)
panes[(i/2)+1].Exec("./forwarder_start.sh " + streamServer + streamIn + " " + serversKeys[i] + serversKeys[i+1] + " " + unique + " " + strconv.Itoa(j) + " | bash")
j++
}
}()

fmt.Println("echo FORWARDER STARTED!")
go func() {
for {
switch {
case <-finish:
cmd, ok := v.cache.Get(transporter.Unique + "_" + strconv.Itoa(k))
if !ok {
log.Println("unable to get cmd from cache")
}
c1 := cmd.(*exec.Cmd)
err = c1.Process.Kill()
if err != nil {
log.Println(err)
}
v.cache.Delete(transporter.Unique + "_" + strconv.Itoa(k))
return
default:
time.Sleep(1 * time.Second) // This is so it doesn't spam constantly and take the entire CPU up
}
}
}()
}

return nil
}
Loading

0 comments on commit 4f6c219

Please sign in to comment.