Skip to content

Commit

Permalink
Added rudimentary precaching support
Browse files Browse the repository at this point in the history
  • Loading branch information
onitake committed Oct 12, 2018
1 parent 8d7506e commit ea8a6a8
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 2 deletions.
3 changes: 3 additions & 0 deletions configuration/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ type Configuration struct {
Resources []Resource `json:"resources"`
// Notifications defines event callbacks.
Notifications []Notification `json:"notifications"`
// CacheSize is the number of packets to precache to boost client stream
// startup time.
CacheSize uint `json:"cachesize"`
}

// DefaultConfiguration creates and returns a configuration object
Expand Down
2 changes: 1 addition & 1 deletion restreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func main() {

auth := protocol.NewAuthenticator(streamdef.Authentication, config.UserList)

streamer := streaming.NewStreamer(config.OutputBuffer, controller, auth)
streamer := streaming.NewStreamer(config.OutputBuffer, config.CacheSize, controller, auth)
streamer.SetCollector(reg)
streamer.SetNotifier(queue)

Expand Down
14 changes: 13 additions & 1 deletion streaming/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ type Streamer struct {
events event.Notifiable
// auth is an authentication verifier for client requests
auth protocol.Authenticator
// cacheSize is the number of bytes to keep in precache
cacheSize int
}

// ConnectionBroker represents a policy handler for new connections.
Expand All @@ -119,16 +121,18 @@ type ConnectionBroker interface {
// NewStreamer creates a new packet streamer.
// queue is an input packet queue.
// qsize is the length of each connection's queue (in packets).
// cachesize is the size of the precache buffer, in number of packets
// broker handles policy enforcement
// stats is a statistics collector object.
func NewStreamer(qsize uint, broker ConnectionBroker, auth protocol.Authenticator) *Streamer {
func NewStreamer(qsize uint, cachesize uint, broker ConnectionBroker, auth protocol.Authenticator) *Streamer {
streamer := &Streamer{
broker: broker,
queueSize: int(qsize),
running: util.AtomicFalse,
stats: &api.DummyCollector{},
request: make(chan *ConnectionRequest),
auth: auth,
cacheSize: int(cachesize) * mpegts.PacketSize,
}
// start the command eater
go streamer.eatCommands()
Expand Down Expand Up @@ -211,6 +215,9 @@ func (streamer *Streamer) Stream(queue <-chan mpegts.Packet) error {
Command: streamerCommandStart,
}

// prepare the precache buffer
precache := util.CreateSlidingWindow(streamer.cacheSize)

logger.Logkv(
"event", eventStreamerStart,
"message", "Starting streaming",
Expand All @@ -226,6 +233,8 @@ func (streamer *Streamer) Stream(queue <-chan mpegts.Packet) error {
//log.Printf("Got packet (length %d):\n%s\n", len(packet), hex.Dump(packet))
//log.Printf("Got packet (length %d)\n", len(packet))

precache.Put(packet)

for conn, _ := range pool {
select {
case conn.Queue <- packet:
Expand Down Expand Up @@ -269,6 +278,9 @@ func (streamer *Streamer) Stream(queue <-chan mpegts.Packet) error {
)
pool[request.Connection] = true
request.Ok = true
// write precached data
// TODO maybe don't write this directly, use the queue?
request.Connection.writer.Write(precache.Get())
} else {
logger.Logkv(
"event", eventStreamerError,
Expand Down

0 comments on commit ea8a6a8

Please sign in to comment.