Skip to content

Commit

Permalink
Merge branch 'ai-video' into frame-interpolation-go
Browse files Browse the repository at this point in the history
  • Loading branch information
JJassonn69 authored Oct 1, 2024
2 parents 09051d2 + a49cd1b commit 699dfb5
Show file tree
Hide file tree
Showing 44 changed files with 1,039 additions and 1,257 deletions.
1 change: 1 addition & 0 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ jobs:

upload:
name: Upload artifacts to google bucket
if: github.event_name == 'push' || github.event.pull_request.head.repo.full_name == github.repository
permissions:
contents: "read"
id-token: "write"
Expand Down
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,23 @@
# Changelog

## v0.7.9

- [#3165](https://github.com/livepeer/go-livepeer/pull/3165) Add node version and orch addr to transcoded metadata

### Features ⚒

#### Broadcaster

- [#3158](https://github.com/livepeer/go-livepeer/pull/3158) Add a metric tag for Orchestrator version

### Bug Fixes 🐞

#### Broadcaster

- [#3164](https://github.com/livepeer/go-livepeer/pull/3164) Fix media compatibility check
- [#3166](https://github.com/livepeer/go-livepeer/pull/3166) Clean up inactive sessions
- [#3086](https://github.com/livepeer/go-livepeer/pull/3086) Clear known sessions with inadequate latency scores

## v0.7.8

### Features ⚒
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.7.8-ai.1
0.7.9-ai.1
9 changes: 5 additions & 4 deletions ai/file_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func NewFileWorker(files map[string]string) *FileWorker {
return &FileWorker{files: files}
}

func (w *FileWorker) TextToImage(ctx context.Context, req worker.TextToImageJSONRequestBody) (*worker.ImageResponse, error) {
func (w *FileWorker) TextToImage(ctx context.Context, req worker.GenTextToImageJSONRequestBody) (*worker.ImageResponse, error) {
fname, ok := w.files["text-to-image"]
if !ok {
return nil, errors.New("text-to-image response file not found")
Expand All @@ -36,7 +36,7 @@ func (w *FileWorker) TextToImage(ctx context.Context, req worker.TextToImageJSON
return &resp, nil
}

func (w *FileWorker) ImageToImage(ctx context.Context, req worker.ImageToImageMultipartRequestBody) (*worker.ImageResponse, error) {
func (w *FileWorker) ImageToImage(ctx context.Context, req worker.GenImageToImageMultipartRequestBody) (*worker.ImageResponse, error) {
fname, ok := w.files["image-to-image"]
if !ok {
return nil, errors.New("image-to-image response file not found")
Expand All @@ -55,7 +55,7 @@ func (w *FileWorker) ImageToImage(ctx context.Context, req worker.ImageToImageMu
return &resp, nil
}

func (w *FileWorker) ImageToVideo(ctx context.Context, req worker.ImageToVideoMultipartRequestBody) (*worker.VideoResponse, error) {
func (w *FileWorker) ImageToVideo(ctx context.Context, req worker.GenImageToVideoMultipartRequestBody) (*worker.VideoResponse, error) {
fname, ok := w.files["image-to-video"]
if !ok {
return nil, errors.New("image-to-video response file not found")
Expand Down Expand Up @@ -93,7 +93,8 @@ func (w *FileWorker) FrameInterpolation(ctx context.Context, req worker.FrameInt
return &resp, nil
}

func (w *FileWorker) Upscale(ctx context.Context, req worker.UpscaleMultipartRequestBody) (*worker.ImageResponse, error) {
func (w *FileWorker) Upscale(ctx context.Context, req worker.GenUpscaleMultipartRequestBody) (*worker.ImageResponse, error) {

fname, ok := w.files["upscale"]
if !ok {
return nil, errors.New("upscale response file not found")
Expand Down
1 change: 1 addition & 0 deletions cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func parseLivepeerConfig() starter.LivepeerConfig {
cfg.MaxPricePerCapability = flag.String("maxPricePerCapability", *cfg.MaxPricePerCapability, `json list of prices per capability/model or path to json config file. Use "model_id": "default" to price all models in a pipeline the same. Example: {"capabilities_prices": [{"pipeline": "text-to-image", "model_id": "stabilityai/sd-turbo", "price_per_unit": 1000, "pixels_per_unit": 1}, {"pipeline": "upscale", "model_id": "default", price_per_unit": 1200, "pixels_per_unit": 1}]}`)
cfg.IgnoreMaxPriceIfNeeded = flag.Bool("ignoreMaxPriceIfNeeded", *cfg.IgnoreMaxPriceIfNeeded, "Set to true to allow exceeding max price condition if there is no O that meets this requirement")
cfg.MinPerfScore = flag.Float64("minPerfScore", *cfg.MinPerfScore, "The minimum orchestrator's performance score a broadcaster is willing to accept")
cfg.DiscoveryTimeout = flag.Duration("discoveryTimeout", *cfg.DiscoveryTimeout, "Time to wait for orchestrators to return info to be included in transcoding sessions for manifest (default = 500ms)")

// Transcoding:
cfg.Orchestrator = flag.Bool("orchestrator", *cfg.Orchestrator, "Set to true to be an orchestrator")
Expand Down
39 changes: 32 additions & 7 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ type LivepeerConfig struct {
MaxPricePerCapability *string
IgnoreMaxPriceIfNeeded *bool
MinPerfScore *float64
DiscoveryTimeout *time.Duration
MaxSessions *string
CurrentManifest *bool
Nvidia *string
Expand Down Expand Up @@ -189,6 +190,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
defaultOrchPerfStatsURL := ""
defaultRegion := ""
defaultMinPerfScore := 0.0
defaultDiscoveryTimeout := 500 * time.Millisecond
defaultCurrentManifest := false
defaultNvidia := ""
defaultNetint := ""
Expand Down Expand Up @@ -287,6 +289,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
OrchPerfStatsURL: &defaultOrchPerfStatsURL,
Region: &defaultRegion,
MinPerfScore: &defaultMinPerfScore,
DiscoveryTimeout: &defaultDiscoveryTimeout,
CurrentManifest: &defaultCurrentManifest,
Nvidia: &defaultNvidia,
Netint: &defaultNetint,
Expand Down Expand Up @@ -877,6 +880,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
glog.Errorf("Error setting up orchestrator: %v", err)
return
}
n.RecipientAddr = recipientAddr.Hex()

sigVerifier := &pm.DefaultSigVerifier{}
validator := pm.NewValidator(sigVerifier, timeWatcher)
Expand Down Expand Up @@ -1227,10 +1231,11 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
}
}

// If the config contains a URL we call Warm() anyway because AIWorker will just register
// the endpoint for an external container
if config.Warm || config.URL != "" {
// Register external container endpoint if URL is provided.
endpoint := worker.RunnerEndpoint{URL: config.URL, Token: config.Token}

// Warm the AI worker container or register the endpoint.
if err := n.AIWorker.Warm(ctx, config.Pipeline, config.ModelID, endpoint, config.OptimizationFlags); err != nil {
glog.Errorf("Error AI worker warming %v container: %v", config.Pipeline, err)
return
Expand Down Expand Up @@ -1321,13 +1326,28 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
Models: make(map[string]*core.ModelConstraint),
}
}

capabilityConstraints[core.Capability_FrameInterpolation].Models[config.ModelID] = modelConstraint

if *cfg.Network != "offchain" {
n.SetBasePriceForCap("default", core.Capability_FrameInterpolation, config.ModelID, autoPrice)
}
case "segment-anything-2":
_, ok := capabilityConstraints[core.Capability_SegmentAnything2]
if !ok {
aiCaps = append(aiCaps, core.Capability_SegmentAnything2)
capabilityConstraints[core.Capability_SegmentAnything2] = &core.CapabilityConstraints{
Models: make(map[string]*core.ModelConstraint),
}
}

capabilityConstraints[core.Capability_FrameInterpolation].Models[config.ModelID] = modelConstraint
capabilityConstraints[core.Capability_SegmentAnything2].Models[config.ModelID] = modelConstraint

if *cfg.Network != "offchain" {
n.SetBasePriceForCap("default", core.Capability_FrameInterpolation, config.ModelID, autoPrice)
n.SetBasePriceForCap("default", core.Capability_SegmentAnything2, config.ModelID, autoPrice)
}
}

if len(aiCaps) > 0 {
capability := aiCaps[len(aiCaps)-1]
price := n.GetBasePriceForCap("default", capability, config.ModelID)
Expand Down Expand Up @@ -1417,7 +1437,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
if *cfg.Network != "offchain" {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
dbOrchPoolCache, err := discovery.NewDBOrchestratorPoolCache(ctx, n, timeWatcher, orchBlacklist)
dbOrchPoolCache, err := discovery.NewDBOrchestratorPoolCache(ctx, n, timeWatcher, orchBlacklist, *cfg.DiscoveryTimeout)
if err != nil {
exit("Could not create orchestrator pool with DB cache: %v", err)
}
Expand All @@ -1432,9 +1452,9 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
glog.Exit("Error setting orch webhook URL ", err)
}
glog.Info("Using orchestrator webhook URL ", whurl)
n.OrchestratorPool = discovery.NewWebhookPool(bcast, whurl)
n.OrchestratorPool = discovery.NewWebhookPool(bcast, whurl, *cfg.DiscoveryTimeout)
} else if len(orchURLs) > 0 {
n.OrchestratorPool = discovery.NewOrchestratorPool(bcast, orchURLs, common.Score_Trusted, orchBlacklist)
n.OrchestratorPool = discovery.NewOrchestratorPool(bcast, orchURLs, common.Score_Trusted, orchBlacklist, *cfg.DiscoveryTimeout)
}

if n.OrchestratorPool == nil {
Expand Down Expand Up @@ -1494,6 +1514,11 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
if err != nil {
glog.Exit("Error getting service URI: ", err)
}

if *cfg.Network != "offchain" && !common.ValidateServiceURI(suri) {
glog.Warning("**Warning -serviceAddr is a not a public address or hostname; this is not recommended for onchain networks**")
}

n.SetServiceURI(suri)
// if http addr is not provided, listen to all ifaces
// take the port to listen to from the service URI
Expand Down
6 changes: 6 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"math/big"
"math/rand"
"mime"
"net/url"
"regexp"
"sort"
"strconv"
Expand Down Expand Up @@ -564,3 +565,8 @@ func CalculateAudioDuration(audio types.File) (int64, error) {

return duration, nil
}

// ValidateServiceURI checks if the serviceURI is valid.
func ValidateServiceURI(serviceURI *url.URL) bool {
return !strings.Contains(serviceURI.Host, "0.0.0.0")
}
36 changes: 36 additions & 0 deletions common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"math"
"math/big"
"net/url"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -483,3 +484,38 @@ func TestParseAccelDevices_CustomSelection(t *testing.T) {
assert.Equal(ids[1], "3")
assert.Equal(ids[2], "1")
}
func TestValidateServiceURI(t *testing.T) {
// Valid service URIs
validURIs := []string{
"https://8.8.8.8:8935",
"https://127.0.0.1:8935",
}

for _, uri := range validURIs {
serviceURI, err := url.Parse(uri)
if err != nil {
t.Errorf("Failed to parse valid service URI: %v", err)
}

if !ValidateServiceURI(serviceURI) {
t.Errorf("Expected service URI to be valid, but got invalid: %v", uri)
}
}

// Invalid service URIs
invalidURIs := []string{
"http://0.0.0.0",
"https://0.0.0.0",
}

for _, uri := range invalidURIs {
serviceURI, err := url.Parse(uri)
if err != nil {
t.Errorf("Failed to parse invalid service URI: %v", err)
}

if ValidateServiceURI(serviceURI) {
t.Errorf("Expected service URI to be invalid, but got valid: %v", uri)
}
}
}
13 changes: 7 additions & 6 deletions core/ai.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ import (
var errPipelineNotAvailable = errors.New("pipeline not available")

type AI interface {
TextToImage(context.Context, worker.TextToImageJSONRequestBody) (*worker.ImageResponse, error)
ImageToImage(context.Context, worker.ImageToImageMultipartRequestBody) (*worker.ImageResponse, error)
ImageToVideo(context.Context, worker.ImageToVideoMultipartRequestBody) (*worker.VideoResponse, error)
Upscale(context.Context, worker.UpscaleMultipartRequestBody) (*worker.ImageResponse, error)
FrameInterpolation(context.Context, worker.FrameInterpolationMultipartRequestBody) (*worker.VideoResponse, error)
AudioToText(context.Context, worker.AudioToTextMultipartRequestBody) (*worker.TextResponse, error)
TextToImage(context.Context, worker.GenTextToImageJSONRequestBody) (*worker.ImageResponse, error)
ImageToImage(context.Context, worker.GenImageToImageMultipartRequestBody) (*worker.ImageResponse, error)
ImageToVideo(context.Context, worker.GenImageToVideoMultipartRequestBody) (*worker.VideoResponse, error)
Upscale(context.Context, worker.GenUpscaleMultipartRequestBody) (*worker.ImageResponse, error)
AudioToText(context.Context, worker.GenAudioToTextMultipartRequestBody) (*worker.TextResponse, error)
SegmentAnything2(context.Context, worker.GenSegmentAnything2MultipartRequestBody) (*worker.MasksResponse, error)
FrameInterpolation(context.Context, worker.FrameInterpolationMultipartRequestBody) (*worker.VideoResponse, error)
Warm(context.Context, string, string, worker.RunnerEndpoint, worker.OptimizationFlags) error
Stop(context.Context) error
HasCapacity(pipeline, modelID string) bool
Expand Down
15 changes: 13 additions & 2 deletions core/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ const (
Capability_ImageToVideo
Capability_Upscale
Capability_AudioToText
Capability_FrameInterpolation
Capability_SegmentAnything2
Capability_FrameInterpolation
)

var CapabilityNameLookup = map[Capability]string{
Expand Down Expand Up @@ -115,7 +116,8 @@ var CapabilityNameLookup = map[Capability]string{
Capability_ImageToVideo: "Image to video",
Capability_Upscale: "Upscale",
Capability_AudioToText: "Audio to text",
Capability_FrameInterpolation: "Frame Interpolation",
Capability_SegmentAnything2: "Segment anything 2",
Capability_FrameInterpolation: "Frame Interpolation",
}

var CapabilityTestLookup = map[Capability]CapabilityTest{
Expand Down Expand Up @@ -207,6 +209,7 @@ func OptionalCapabilities() []Capability {
Capability_FrameInterpolation,
Capability_Upscale,
Capability_AudioToText,
Capability_SegmentAnything2,
}
}

Expand Down Expand Up @@ -623,6 +626,14 @@ func CapabilityToName(capability Capability) (string, error) {
return capName, nil
}

func (c Capability) String() string {
name, err := CapabilityToName(c)
if err != nil {
return fmt.Sprintf("%d", int(c))
}
return name
}

func HasCapability(caps []Capability, capability Capability) bool {
for _, c := range caps {
if capability == c {
Expand Down
25 changes: 25 additions & 0 deletions core/capabilities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,3 +714,28 @@ func TestLiveeerVersionCompatibleWith(t *testing.T) {
})
}
}

func TestCapability_String(t *testing.T) {
var unknownCap Capability = -100
tests := []struct {
name string
c Capability
want string
}{
{
name: "Capability_TextToImage",
c: Capability_TextToImage,
want: "Text to image",
},
{
name: "Unknown",
c: unknownCap,
want: "-100",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.want, tt.c.String())
})
}
}
1 change: 1 addition & 0 deletions core/livepeernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ type LivepeerNode struct {
// Transcoder public fields
SegmentChans map[ManifestID]SegmentChan
Recipient pm.Recipient
RecipientAddr string
SelectionAlgorithm common.SelectionAlgorithm
OrchestratorPool common.OrchestratorPool
OrchPerfScore *common.PerfScore
Expand Down
Loading

0 comments on commit 699dfb5

Please sign in to comment.