From 6f8ea1fa15737755406d7488ea0b068bf74c22d3 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Rafa=C5=82=20Leszko?= <rafal@livepeer.org>
Date: Tue, 23 Jul 2024 09:35:38 +0200
Subject: [PATCH] Add option for running catalyst-api as a stateless service
 (#1336)

From now on, we'll be able to run catalyst-api in 3 modes:
- all: the same as before
- cluster-only: managed MistUtilLoad and Serf only (intended to use inside Catalyst)
- api-only: stateless API only (intended to get deployed separately from Catalyst)
---
 api/http_internal.go |  84 +++++++-------
 config/cli.go        |  10 ++
 main.go              | 255 ++++++++++++++++++++++---------------------
 3 files changed, 186 insertions(+), 163 deletions(-)

diff --git a/api/http_internal.go b/api/http_internal.go
index db6c62cc..d7068e1d 100644
--- a/api/http_internal.go
+++ b/api/http_internal.go
@@ -84,56 +84,60 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato
 	// Simple endpoint for healthchecks
 	router.GET("/ok", withLogging(catalystApiHandlers.Ok()))
 
-	var metricsHandlers []http.Handler
-	if cli.ShouldMapic() {
-		metricsHandlers = append(metricsHandlers, mapic.MetricsHandler())
+	if cli.IsClusterMode() {
+		// Handler to get members Catalyst API => Catalyst
+		router.GET("/api/serf/members", withLogging(adminHandlers.MembersHandler()))
+		// Public handler to propagate an event to all Catalyst nodes, execute from Studio API => Catalyst
+		router.POST("/api/events", withLogging(eventsHandler.Events()))
 	}
-	if cli.MistPrometheus != "" {
-		// Enable Mist metrics enrichment
-		metricsHandlers = append(metricsHandlers, mapic.MistMetricsHandler())
-	}
-	metricsHandlers = append(metricsHandlers, promhttp.Handler())
-	// Hacky combined metrics handler. To be refactored away with mapic.
-	router.GET("/metrics", concatHandlers(metricsHandlers...))
-
-	// Public Catalyst API
-	router.POST("/api/vod",
-		withLogging(
-			withAuth(
-				cli.APIToken,
-				withCapacityChecking(
-					vodEngine,
-					catalystApiHandlers.UploadVOD(),
+
+	if cli.IsApiMode() {
+		var metricsHandlers []http.Handler
+		if cli.ShouldMapic() {
+			metricsHandlers = append(metricsHandlers, mapic.MetricsHandler())
+		}
+		if cli.MistPrometheus != "" {
+			// Enable Mist metrics enrichment
+			metricsHandlers = append(metricsHandlers, mapic.MistMetricsHandler())
+		}
+		metricsHandlers = append(metricsHandlers, promhttp.Handler())
+		// Hacky combined metrics handler. To be refactored away with mapic.
+		router.GET("/metrics", concatHandlers(metricsHandlers...))
+
+		// Public Catalyst API
+		router.POST("/api/vod",
+			withLogging(
+				withAuth(
+					cli.APIToken,
+					withCapacityChecking(
+						vodEngine,
+						catalystApiHandlers.UploadVOD(),
+					),
 				),
 			),
-		),
-	)
-
-	// Handler to get members Catalyst API => Catalyst
-	router.GET("/api/serf/members", withLogging(adminHandlers.MembersHandler()))
-	// Public handler to propagate an event to all Catalyst nodes, execute from Studio API => Catalyst
-	router.POST("/api/events", withLogging(eventsHandler.Events()))
+		)
 
-	// Handler to forward the user event from Catalyst => Catalyst API
-	router.POST("/api/serf/receiveUserEvent", withLogging(eventsHandler.ReceiveUserEvent()))
+		// Handler to forward the user event from Catalyst => Catalyst API
+		router.POST("/api/serf/receiveUserEvent", withLogging(eventsHandler.ReceiveUserEvent()))
 
-	// Public GET handler to retrieve the public key for vod encryption
-	router.GET("/api/pubkey", withLogging(encryptionHandlers.PublicKeyHandler()))
+		// Public GET handler to retrieve the public key for vod encryption
+		router.GET("/api/pubkey", withLogging(encryptionHandlers.PublicKeyHandler()))
 
-	// Endpoint to receive "Triggers" (callbacks) from Mist
-	router.POST("/api/mist/trigger", withLogging(mistCallbackHandlers.Trigger()))
+		// Endpoint to receive "Triggers" (callbacks) from Mist
+		router.POST("/api/mist/trigger", withLogging(mistCallbackHandlers.Trigger()))
 
-	// Handler for STREAM_SOURCE triggers
-	broker.OnStreamSource(geoHandlers.HandleStreamSource)
+		// Handler for STREAM_SOURCE triggers
+		broker.OnStreamSource(geoHandlers.HandleStreamSource)
 
-	// Handler for USER_NEW triggers
-	broker.OnUserNew(accessControlHandlers.HandleUserNew)
+		// Handler for USER_NEW triggers
+		broker.OnUserNew(accessControlHandlers.HandleUserNew)
 
-	// Handler for USER_END triggers.
-	broker.OnUserEnd(analyticsHandlers.HandleUserEnd)
+		// Handler for USER_END triggers.
+		broker.OnUserEnd(analyticsHandlers.HandleUserEnd)
 
-	// Endpoint to receive segments and manifests that ffmpeg produces
-	router.POST("/api/ffmpeg/:id/:filename", withLogging(ffmpegSegmentingHandlers.NewFile()))
+		// Endpoint to receive segments and manifests that ffmpeg produces
+		router.POST("/api/ffmpeg/:id/:filename", withLogging(ffmpegSegmentingHandlers.NewFile()))
+	}
 
 	return router
 }
diff --git a/config/cli.go b/config/cli.go
index 418812fe..4243282e 100644
--- a/config/cli.go
+++ b/config/cli.go
@@ -25,6 +25,7 @@ type Cli struct {
 	MistUser                  string
 	MistPassword              string
 	MistPrometheus            string
+	Mode                      string
 	MistPort                  int
 	MistConnectTimeout        time.Duration
 	MistStreamSource          string
@@ -72,6 +73,7 @@ type Cli struct {
 	KafkaPassword             string
 	AnalyticsKafkaTopic       string
 	SerfMembersEndpoint       string
+	CatalystApiURL            string
 
 	// mapping playbackId to value between 0.0 to 100.0
 	CdnRedirectPlaybackPct             map[string]float64
@@ -111,6 +113,14 @@ func (cli *Cli) ShouldMapic() bool {
 	return cli.APIServer != ""
 }
 
+func (cli *Cli) IsClusterMode() bool {
+	return cli.Mode == "cluster-only" || cli.Mode == "all"
+}
+
+func (cli *Cli) IsApiMode() bool {
+	return cli.Mode == "api-only" || cli.Mode == "all"
+}
+
 // Should we enable mist-cleanup script to run periodically and delete leaky shm?
 func (cli *Cli) ShouldMistCleanup() bool {
 	return cli.MistCleanup
diff --git a/main.go b/main.go
index ca30baef..a59fc96a 100644
--- a/main.go
+++ b/main.go
@@ -50,6 +50,8 @@ func main() {
 
 	version := fs.Bool("version", false, "print application version")
 
+	fs.StringVar(&cli.Mode, "mode", "all", "Mode to run the application in. Options: all, cluster-only, api-only")
+
 	// listen addresses
 	config.AddrFlag(fs, &cli.HTTPAddress, "http-addr", "0.0.0.0:8989", "Address to bind for external-facing Catalyst HTTP handling")
 	config.AddrFlag(fs, &cli.HTTPInternalAddress, "http-internal-addr", "127.0.0.1:7979", "Address to bind for internal privileged HTTP commands")
@@ -129,6 +131,7 @@ func main() {
 	fs.StringVar(&cli.KafkaPassword, "kafka-password", "", "Kafka Password")
 	fs.StringVar(&cli.AnalyticsKafkaTopic, "analytics-kafka-topic", "", "Kafka Topic used to send analytics logs")
 	fs.StringVar(&cli.SerfMembersEndpoint, "serf-members-endpoint", "http://127.0.0.1:7979/api/serf/members", "Endpoint to get the current members in the cluster")
+	fs.StringVar(&cli.CatalystApiURL, "catalyst-api-url", "", "Endpoint for externally deployed catalyst-api; if not set, use local catalyst-api")
 	pprofPort := fs.Int("pprof-port", 6061, "Pprof listen port")
 
 	fs.String("send-audio", "", "[DEPRECATED] ignored, will be removed")
@@ -176,167 +179,173 @@ func main() {
 		return
 	}
 
-	// TODO: I don't love the global variables for these
-	config.ImportIPFSGatewayURLs = cli.ImportIPFSGatewayURLs
-	config.ImportArweaveGatewayURLs = cli.ImportArweaveGatewayURLs
-	config.HTTPInternalAddress = cli.HTTPInternalAddress
-
 	var (
 		metricsDB *sql.DB
+		vodEngine *pipeline.Coordinator
+		mapic     mistapiconnector.IMac
+		bal       balancer.Balancer
+		broker    misttriggers.TriggerBroker
+		mist      clients.MistAPIClient
+		c         cluster.Cluster
 	)
 
-	// Kick off the callback client, to send job update messages on a regular interval
-	headers := map[string]string{"Authorization": fmt.Sprintf("Bearer %s", cli.APIToken)}
-	statusClient := clients.NewPeriodicCallbackClient(15*time.Second, headers).Start()
+	// Initialize root context; cancelling this prompts all components to shut down cleanly
+	group, ctx := errgroup.WithContext(context.Background())
+	mistBalancerConfig := &balancer.Config{
+		Args:                     cli.BalancerArgs,
+		MistUtilLoadPort:         uint32(cli.MistLoadBalancerPort),
+		MistLoadBalancerTemplate: cli.MistLoadBalancerTemplate,
+		MistHost:                 cli.MistHost,
+		MistPort:                 cli.MistPort,
+		NodeName:                 cli.NodeName,
+		OwnRegion:                cli.OwnRegion,
+		OwnRegionTagAdjust:       cli.OwnRegionTagAdjust,
+	}
+	broker = misttriggers.NewTriggerBroker()
 
-	// Emit high-cardinality metrics to a Postrgres database if configured
-	if cli.MetricsDBConnectionString != "" {
-		metricsDB, err = sql.Open("postgres", cli.MetricsDBConnectionString)
-		if err != nil {
-			glog.Fatalf("Error creating postgres metrics connection: %v", err)
-		}
+	catalystApiURL := cli.CatalystApiURL
+	if catalystApiURL == "" {
+		catalystApiURL = cli.OwnInternalURL()
+	}
 
-		// Without this, we've run into issues with exceeding our open connection limit
-		metricsDB.SetMaxOpenConns(2)
-		metricsDB.SetMaxIdleConns(2)
-		metricsDB.SetConnMaxLifetime(time.Hour)
-	} else {
-		glog.Info("Postgres metrics connection string was not set, postgres metrics are disabled.")
+	if cli.MistEnabled {
+		mist = clients.NewMistAPIClient(cli.MistUser, cli.MistPassword, cli.MistHost, cli.MistPort)
 	}
 
-	var vodDecryptPrivateKey *rsa.PrivateKey
+	if cli.IsApiMode() {
+		// TODO: I don't love the global variables for these
+		config.ImportIPFSGatewayURLs = cli.ImportIPFSGatewayURLs
+		config.ImportArweaveGatewayURLs = cli.ImportArweaveGatewayURLs
+		config.HTTPInternalAddress = cli.HTTPInternalAddress
 
-	if cli.VodDecryptPrivateKey != "" && cli.VodDecryptPublicKey != "" {
-		vodDecryptPrivateKey, err = crypto.LoadPrivateKey(cli.VodDecryptPrivateKey)
-		if err != nil {
-			glog.Fatalf("Error loading vod decrypt private key: %v", err)
-		}
-		isValidKeyPair, err := crypto.ValidateKeyPair(cli.VodDecryptPublicKey, *vodDecryptPrivateKey)
-		if !isValidKeyPair || err != nil {
-			glog.Fatalf("Invalid vod decrypt key pair")
+		// Kick off the callback client, to send job update messages on a regular interval
+		headers := map[string]string{"Authorization": fmt.Sprintf("Bearer %s", cli.APIToken)}
+		statusClient := clients.NewPeriodicCallbackClient(15*time.Second, headers).Start()
+
+		// Emit high-cardinality metrics to a Postgres database if configured
+		if cli.MetricsDBConnectionString != "" {
+			metricsDB, err = sql.Open("postgres", cli.MetricsDBConnectionString)
+			if err != nil {
+				glog.Fatalf("Error creating postgres metrics connection: %v", err)
+			}
+
+			// Without this, we've run into issues with exceeding our open connection limit
+			metricsDB.SetMaxOpenConns(2)
+			metricsDB.SetMaxIdleConns(2)
+			metricsDB.SetConnMaxLifetime(time.Hour)
+		} else {
+			glog.Info("Postgres metrics connection string was not set, postgres metrics are disabled.")
 		}
-	}
 
-	c2, err := createC2PA(&cli)
-	if err != nil {
-		// Log warning, but still start without C2PA signing
-		glog.Warning(err)
-	}
-	// Start the "co-ordinator" that determines whether to send jobs to the Catalyst transcoding pipeline
-	// or an external one
-	vodEngine, err := pipeline.NewCoordinator(pipeline.Strategy(cli.VodPipelineStrategy), cli.SourceOutput, cli.ExternalTranscoder, statusClient, metricsDB, vodDecryptPrivateKey, cli.BroadcasterURL, cli.SourcePlaybackHosts, c2)
-	if err != nil {
-		glog.Fatalf("Error creating VOD pipeline coordinator: %v", err)
-	}
+		var vodDecryptPrivateKey *rsa.PrivateKey
 
-	// Start cron style apps to run periodically
-	if cli.ShouldMistCleanup() {
-		app := "mist-cleanup.sh"
-		// schedule mist-cleanup every 2hrs with a timeout of 15min
-		mistCleanup, err := middleware.NewShell(2*60*60*time.Second, 15*60*time.Second, app)
+		if cli.VodDecryptPrivateKey != "" && cli.VodDecryptPublicKey != "" {
+			vodDecryptPrivateKey, err = crypto.LoadPrivateKey(cli.VodDecryptPrivateKey)
+			if err != nil {
+				glog.Fatalf("Error loading vod decrypt private key: %v", err)
+			}
+			isValidKeyPair, err := crypto.ValidateKeyPair(cli.VodDecryptPublicKey, *vodDecryptPrivateKey)
+			if !isValidKeyPair || err != nil {
+				glog.Fatalf("Invalid vod decrypt key pair")
+			}
+		}
+
+		c2, err := createC2PA(&cli)
 		if err != nil {
-			glog.Info("Failed to shell out:", app, err)
+			// Log warning, but still start without C2PA signing
+			glog.Warning(err)
 		}
-		mistCleanupTick := mistCleanup.RunBg()
-		defer mistCleanupTick.Stop()
-	}
-	if cli.ShouldLogSysUsage() {
-		app := "pod-mon.sh"
-		// schedule pod-mon every 5min with timeout of 5s
-		podMon, err := middleware.NewShell(300*time.Second, 5*time.Second, app)
+		// Start the "co-ordinator" that determines whether to send jobs to the Catalyst transcoding pipeline
+		// or an external one
+		vodEngine, err = pipeline.NewCoordinator(pipeline.Strategy(cli.VodPipelineStrategy), cli.SourceOutput, cli.ExternalTranscoder, statusClient, metricsDB, vodDecryptPrivateKey, cli.BroadcasterURL, cli.SourcePlaybackHosts, c2)
 		if err != nil {
-			glog.Info("Failed to shell out:", app, err)
+			glog.Fatalf("Error creating VOD pipeline coordinator: %v", err)
 		}
-		podMonTick := podMon.RunBg()
-		defer podMonTick.Stop()
-	}
 
-	broker := misttriggers.NewTriggerBroker()
+		bal = mist_balancer.NewRemoteBalancer(mistBalancerConfig)
+		if balancer.CombinedBalancerEnabled(cli.CataBalancer) {
+			cataBalancer := catabalancer.NewBalancer(cli.NodeName, cli.CataBalancerMetricTimeout, cli.CataBalancerIngestStreamTimeout)
+			// Temporary combined balancer to test cataBalancer logic alongside existing mist balancer
+			bal = balancer.NewCombinedBalancer(cataBalancer, bal, cli.CataBalancer)
 
-	var mist clients.MistAPIClient
-	if cli.MistEnabled {
-		mist = clients.NewMistAPIClient(cli.MistUser, cli.MistPassword, cli.MistHost, cli.MistPort)
-		if cli.MistTriggerSetup {
-			ownURL := fmt.Sprintf("%s/api/mist/trigger", cli.OwnInternalURL())
-			err := broker.SetupMistTriggers(mist, ownURL)
+			if cli.Tags["node"] == "media" { // don't announce load balancing availability for testing nodes
+				events.StartMetricSending(cli.NodeName, cli.NodeLatitude, cli.NodeLongitude, c, mist)
+			}
+		}
+		if cli.ShouldMapic() {
+			mapic = mistapiconnector.NewMapic(&cli, broker, mist)
+			group.Go(func() error {
+				return mapic.Start(ctx)
+			})
+		}
+	}
+
+	if cli.IsClusterMode() {
+		// Configure Mist Triggers
+		if cli.MistEnabled && cli.MistTriggerSetup {
+			mistTriggerHandlerEndpoint := fmt.Sprintf("%s/api/mist/trigger", catalystApiURL)
+			err := broker.SetupMistTriggers(mist, mistTriggerHandlerEndpoint)
 			if err != nil {
 				glog.Error("catalyst-api was unable to communicate with MistServer to set up its triggers.")
 				glog.Error("hint: are you trying to boot catalyst-api without Mist for development purposes? use the flag -no-mist")
 				glog.Fatalf("error setting up Mist triggers err=%s", err)
 			}
 		}
-	} else {
-		glog.Info("-no-mist flag detected, not initializing Mist stream triggers")
-	}
-
-	var mapic mistapiconnector.IMac
-	if cli.ShouldMapic() {
-		mapic = mistapiconnector.NewMapic(&cli, broker, mist)
-	}
 
-	c := cluster.NewCluster(&cli)
-
-	// Start balancer
-	mistBalancerConfig := &balancer.Config{
-		Args:                     cli.BalancerArgs,
-		MistUtilLoadPort:         uint32(cli.MistLoadBalancerPort),
-		MistLoadBalancerTemplate: cli.MistLoadBalancerTemplate,
-		MistHost:                 cli.MistHost,
-		MistPort:                 cli.MistPort,
-		NodeName:                 cli.NodeName,
-		OwnRegion:                cli.OwnRegion,
-		OwnRegionTagAdjust:       cli.OwnRegionTagAdjust,
-	}
-	mistBalancer := mist_balancer.NewLocalBalancer(mistBalancerConfig)
-
-	bal := mistBalancer
-	if balancer.CombinedBalancerEnabled(cli.CataBalancer) {
-		cataBalancer := catabalancer.NewBalancer(cli.NodeName, cli.CataBalancerMetricTimeout, cli.CataBalancerIngestStreamTimeout)
-		// Temporary combined balancer to test cataBalancer logic alongside existing mist balancer
-		bal = balancer.NewCombinedBalancer(cataBalancer, mistBalancer, cli.CataBalancer)
-
-		if cli.Tags["node"] == "media" { // don't announce load balancing availability for testing nodes
-			events.StartMetricSending(cli.NodeName, cli.NodeLatitude, cli.NodeLongitude, c, mist)
+		// Start cron style apps to run periodically
+		if cli.ShouldMistCleanup() {
+			app := "mist-cleanup.sh"
+			// schedule mist-cleanup every 2hrs with a timeout of 15min
+			mistCleanup, err := middleware.NewShell(2*60*60*time.Second, 15*60*time.Second, app)
+			if err != nil {
+				glog.Info("Failed to shell out:", app, err)
+			}
+			mistCleanupTick := mistCleanup.RunBg()
+			defer mistCleanupTick.Stop()
+		}
+		if cli.ShouldLogSysUsage() {
+			app := "pod-mon.sh"
+			// schedule pod-mon every 5min with timeout of 5s
+			podMon, err := middleware.NewShell(300*time.Second, 5*time.Second, app)
+			if err != nil {
+				glog.Info("Failed to shell out:", app, err)
+			}
+			podMonTick := podMon.RunBg()
+			defer podMonTick.Stop()
 		}
-	}
-
-	// Initialize root context; cancelling this prompts all components to shut down cleanly
-	group, ctx := errgroup.WithContext(context.Background())
 
-	group.Go(func() error {
-		return handleSignals(ctx)
-	})
+		group.Go(func() error {
+			return handleSignals(ctx)
+		})
 
-	group.Go(func() error {
-		return api.ListenAndServe(ctx, cli, vodEngine, bal, mapic)
-	})
+		// Configure Serf cluster
+		c = cluster.NewCluster(&cli)
+		group.Go(func() error {
+			return c.Start(ctx)
+		})
 
-	group.Go(func() error {
-		return api.ListenAndServeInternal(ctx, cli, vodEngine, mapic, bal, c, broker, metricsDB)
-	})
+		// Configure local MistUtilLoad balancer
+		bal = mist_balancer.NewLocalBalancer(mistBalancerConfig)
+		group.Go(func() error {
+			return bal.Start(ctx)
+		})
+		group.Go(func() error {
+			return reconcileBalancer(ctx, bal, c)
+		})
 
-	if cli.ShouldMapic() {
+		// Handle Serf cluster events broadcasted to all nodes
 		group.Go(func() error {
-			return mapic.Start(ctx)
+			serfUserEventCallbackEndpoint := fmt.Sprintf("%s/api/serf/receiveUserEvent", catalystApiURL)
+			return handleClusterEvents(ctx, serfUserEventCallbackEndpoint, c)
 		})
 	}
 
 	group.Go(func() error {
-		return bal.Start(ctx)
-	})
-
-	group.Go(func() error {
-		return c.Start(ctx)
-	})
-
-	group.Go(func() error {
-		// TODO these errors cause the app to shut down?
-		return reconcileBalancer(ctx, bal, c)
+		return api.ListenAndServe(ctx, cli, vodEngine, bal, mapic)
 	})
 
 	group.Go(func() error {
-		serfUserEventCallbackEndpoint := fmt.Sprintf("%s/api/serf/receiveUserEvent", cli.OwnInternalURL())
-		return handleClusterEvents(ctx, serfUserEventCallbackEndpoint, c)
+		return api.ListenAndServeInternal(ctx, cli, vodEngine, mapic, bal, c, broker, metricsDB)
 	})
 
 	err = group.Wait()