diff --git a/cmd/restreamer/restreamer.go b/cmd/restreamer/restreamer.go index d3ec00b..226f2e3 100644 --- a/cmd/restreamer/restreamer.go +++ b/cmd/restreamer/restreamer.go @@ -159,7 +159,7 @@ func main() { auth := auth.NewAuthenticator(streamdef.Authentication, config.UserList) - streamer := streaming.NewStreamer(streamdef.Serve, config.OutputBuffer, controller, auth) + streamer := streaming.NewStreamer(streamdef.Serve, config.OutputBuffer, config.CacheSize, controller, auth) streamer.SetCollector(reg) streamer.SetNotifier(queue) diff --git a/configuration/config.go b/configuration/config.go index 6c62fdf..60c5d5e 100644 --- a/configuration/config.go +++ b/configuration/config.go @@ -137,6 +137,9 @@ type Configuration struct { Resources []Resource `json:"resources"` // Notifications defines event callbacks. Notifications []Notification `json:"notifications"` + // CacheSize is the number of packets to precache to boost client stream + // startup time. + CacheSize uint `json:"cachesize"` } // DefaultConfiguration creates and returns a configuration object diff --git a/streaming/streamer.go b/streaming/streamer.go index db8ec5c..d6406b6 100644 --- a/streaming/streamer.go +++ b/streaming/streamer.go @@ -161,6 +161,8 @@ type Streamer struct { auth auth.Authenticator // promCounter allows enabling/disabling Prometheus packet metrics. promCounter bool + // cacheSize is the number of bytes to keep in precache + cacheSize int } // ConnectionBroker represents a policy handler for new connections. @@ -178,9 +180,10 @@ type ConnectionBroker interface { // NewStreamer creates a new packet streamer. // queue is an input packet queue. // qsize is the length of each connection's queue (in packets). +// cachesize is the size of the precache buffer, in number of packets // broker handles policy enforcement // stats is a statistics collector object. -func NewStreamer(name string, qsize uint, broker ConnectionBroker, auth auth.Authenticator) *Streamer { +func NewStreamer(name string, cachesize uint, qsize uint, broker ConnectionBroker, auth auth.Authenticator) *Streamer { streamer := &Streamer{ name: name, broker: broker, @@ -189,6 +192,7 @@ func NewStreamer(name string, qsize uint, broker ConnectionBroker, auth auth.Aut stats: &metrics.DummyCollector{}, request: make(chan *ConnectionRequest), auth: auth, + cacheSize: int(cachesize) * protocol.MpegTsPacketSize, } // start the command eater go streamer.eatCommands() @@ -248,9 +252,10 @@ func (streamer *Streamer) eatCommands() { // This routine will block; you should run it asynchronously like this: // // queue := make(chan protocol.MpegTsPacket, inputQueueSize) -// go func() { -// log.Fatal(streamer.Stream(queue)) -// } +// +// go func() { +// log.Fatal(streamer.Stream(queue)) +// } // // or simply: // @@ -271,6 +276,9 @@ func (streamer *Streamer) Stream(queue <-chan protocol.MpegTsPacket) error { Command: streamerCommandStart, } + // prepare the precache buffer + precache := util.CreateSlidingWindow(streamer.cacheSize) + logger.Logkv( "event", eventStreamerStart, "message", "Starting streaming", @@ -286,7 +294,10 @@ func (streamer *Streamer) Stream(queue <-chan protocol.MpegTsPacket) error { //log.Printf("Got packet (length %d):\n%s\n", len(packet), hex.Dump(packet)) //log.Printf("Got packet (length %d)\n", len(packet)) + precache.Put(packet) + for conn := range pool { + select { case conn.Queue <- packet: // packet distributed, done @@ -338,6 +349,9 @@ func (streamer *Streamer) Stream(queue <-chan protocol.MpegTsPacket) error { ) pool[request.Connection] = true request.Ok = true + // write precached data + // TODO maybe don't write this directly, use the queue? + request.Connection.writer.Write(precache.Get()) } else { logger.Logkv( "event", eventStreamerError, diff --git a/util/window.go b/util/window.go new file mode 100644 index 0000000..1c06af2 --- /dev/null +++ b/util/window.go @@ -0,0 +1,58 @@ +/* Copyright (c) 2018 Gregor Riepl + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package util + +import ( + "sync" +) + +// SlidingWindow implements a buffer that continuously overwrites old data. +// A single fetch function that copies the whole window is provided. +// +// Note that the implementation uses a variable buffer and may not execute +// in deterministic time. +// +// All operations are thread-safe. +type SlidingWindow struct { + window []byte + lock sync.RWMutex +} + +// SlidingWindow creates a sliding window buffer with a fixed size. +// Note that the buffer is pre-filled with 0s. +func CreateSlidingWindow(size int) *SlidingWindow { + return &SlidingWindow{ + window: make([]byte, size), + } +} + +// Put copies the contents of data into the sliding window buffer. +// If data is longer than the buffer, the head will be cut off until it fits. +func (w *SlidingWindow) Put(data []byte) { + w.lock.Lock() + defer w.lock.Unlock() + w.window = append(w.window, data...)[len(data):] +} + +// Get returns the contents of the sliding window buffer. +// No copying is performed, the return value is simply a slice of th e buffer. +// Take care not to modify the contents. +func (w *SlidingWindow) Get() []byte { + w.lock.RLock() + defer w.lock.RUnlock() + return w.window +} diff --git a/util/window_test.go b/util/window_test.go new file mode 100644 index 0000000..6855934 --- /dev/null +++ b/util/window_test.go @@ -0,0 +1,79 @@ +/* Copyright (c) 2018 Gregor Riepl + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package util + +import ( + "testing" + "bytes" + "encoding/hex" +) + +func TestSlidingWindow01(t *testing.T) { + w := CreateSlidingWindow(10) + c := []byte{0,1,2,3} + w.Put(c) + r := w.Get() + x := []byte{0,0,0,0,0,0,0,1,2,3} + if bytes.Compare(r, x) != 0 { + t.Errorf("t01: smaller-than-capacity buffer did not compare to padded value:\n%s", hex.Dump(r)) + } +} + +func TestSlidingWindow02(t *testing.T) { + w := CreateSlidingWindow(4) + c := []byte{0,1,2,3} + w.Put(c) + r := w.Get() + x := []byte{0,1,2,3} + if bytes.Compare(r, x) != 0 { + t.Errorf("t01: at-capacity buffer did not compare to the same value:\n%s", hex.Dump(r)) + } +} + +func TestSlidingWindow03(t *testing.T) { + w := CreateSlidingWindow(4) + c := []byte{0,1,2,3,4,5} + w.Put(c) + r := w.Get() + x := []byte{2,3,4,5} + if bytes.Compare(r, x) != 0 { + t.Errorf("t01: over-capacity buffer did not compare to tail:\n%s", hex.Dump(r)) + } +} + +func BenchmarkSlidingWindowPutSingle100(b *testing.B) { + w := CreateSlidingWindow(100) + for n := 0; n < b.N; n++ { + w.Put([]byte{0xaa}) + } +} + +func BenchmarkSlidingWindowPutMany100(b *testing.B) { + w := CreateSlidingWindow(100) + buf := bytes.Repeat([]byte{0xaa}, 50) + for n := 0; n < b.N; n++ { + w.Put(buf) + } +} + +func BenchmarkSlidingWindowPutMany100000(b *testing.B) { + w := CreateSlidingWindow(1000000) + buf := bytes.Repeat([]byte{0xaa}, 5000) + for n := 0; n < b.N; n++ { + w.Put(buf) + } +}