Skip to content

Commit

Permalink
Fixed internal connection state logic and added more log messages
Browse files Browse the repository at this point in the history
  • Loading branch information
srgoni committed Apr 24, 2017
1 parent f558081 commit 2a5d349
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 41 deletions.
69 changes: 36 additions & 33 deletions src/restreamer/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,6 @@ const (
type Connection struct {
// Queue is the per-connection packet queue
Queue chan Packet
// Shutdown notification channel.
// Send true to close the connection and stop the handler.
shutdown chan bool
// internal flag
// true while the connection is up
running bool
// the destination socket
writer http.ResponseWriter
// needed for flushing
Expand Down Expand Up @@ -77,21 +71,16 @@ func NewConnection(destination http.ResponseWriter, qsize int) (*Connection) {
}
conn := &Connection{
Queue: make(chan Packet, qsize),
shutdown: make(chan bool),
running: true,
writer: destination,
flusher: flusher,
logger: logger,
}
return conn
}

// Close shuts down the streamer and all incoming connections.
// This action is asynchronous.
func (conn *Connection) Close() error {
// signal shutdown
conn.shutdown<- true
return nil
// SetLogger assigns a logger
func (conn *Connection) SetLogger(logger JsonLogger) {
conn.logger.Logger = logger
}

// Serve starts serving data to a client, continuously feeding packets from the queue.
Expand Down Expand Up @@ -124,34 +113,48 @@ func (conn *Connection) Serve() {
}

// start reading packets
for conn.running {
running := true
for running {
select {
case packet := <-conn.Queue:
// packet received, log
//log.Printf("Sending packet (length %d):\n%s\n", len(packet), hex.Dump(packet))
// send the packet out
_, err := conn.writer.Write(packet)
if err == nil {
if conn.flusher != nil {
conn.flusher.Flush()
case packet, ok := <-conn.Queue:
if ok {
// packet received, log
//log.Printf("Sending packet (length %d):\n%s\n", len(packet), hex.Dump(packet))
// send the packet out
_, err := conn.writer.Write(packet)
if err == nil {
if conn.flusher != nil {
conn.flusher.Flush()
}
} else {
conn.logger.Log(Dict{
"event": eventConnectionClosed,
"message": "Downstream connection closed",
})
running = false
}
//log.Printf("Wrote packet of %d bytes\n", bytes)
} else {
conn.running = false
// channel closed, exit
conn.logger.Log(Dict{
"event": eventConnectionShutdown,
"message": "Shutting down client connection",
})
running = false
}
case <-notifier.CloseNotify():
// connection closed while we were waiting for more data
conn.running = false
case <-conn.shutdown:
// and shut down
conn.running = false
conn.logger.Log(Dict{
"event": eventConnectionClosedWait,
"message": "Downstream connection closed (while waiting)",
})
running = false
}
}

// drain the shutdown channel
select {
case <-conn.shutdown:
default:
}
// we cannot drain the channel here, as it might not be closed yet.
// better let the our caller handle closure and draining.

conn.logger.Log(Dict{
"event": eventConnectionDone,
"message": "Shutdown complete",
Expand Down
86 changes: 78 additions & 8 deletions src/restreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,28 @@ var (
ErrPoolFull = errors.New("restreamer: maximum number of active connections exceeded")
)

// Command is one of several possible constants.
// See StreamerCommandAdd for more information.
type Command int

const (
// streamerCommandIgnore is a default dummy command
streamerCommandIgnore Command = iota
// streamerCommandStart is an internal start command, used to signal request
// processing to commence.
streamerCommandStart
// StreamerCommandAdd signals a stream to add a connection.
StreamerCommandAdd
// StreamerCommandRemove signals a stream to remove a connection.
StreamerCommandRemove
)

// ConnectionRequest encapsulates a request that new connection be added or removed.
type ConnectionRequest struct {
// Command is the command to execute
Command Command
// Address is the remote client address
Address string
// Remove is true if the connection should be added, false if it should be removed
Remove bool
// Connection is the connection object
Connection *Connection
}
Expand Down Expand Up @@ -121,6 +137,8 @@ func NewStreamer(qsize uint, broker ConnectionBroker) (*Streamer) {
logger: logger,
request: make(chan ConnectionRequest),
}
// start the command eater
go streamer.eatCommands()
return streamer
}

Expand All @@ -134,6 +152,27 @@ func (streamer *Streamer) SetCollector(stats Collector) {
streamer.stats = stats
}

// eatCommands is started in the background to drain the command
// queue and wait for a start command, in which case it will exit.
func (streamer *Streamer) eatCommands() {
running := true
for running {
select {
case request := <-streamer.request:
switch request.Command {
case streamerCommandStart:
streamer.logger.Log(Dict{
"event": eventStreamerQueueStart,
"message": "Stopping eater process and starting real processing",
})
running = false
default:
// Eating all other commands
}
}
}
}

// Stream is the main stream multiplier loop.
// It reads data from the input queue and distributes it to the connections.
//
Expand All @@ -156,6 +195,11 @@ func (streamer *Streamer) Stream(queue <-chan Packet) error {
// create the local outgoing connection pool
pool := make(map[*Connection]bool)

// stop the eater process
streamer.request<- ConnectionRequest{
Command: streamerCommandStart,
}

streamer.logger.Log(Dict{
"event": eventStreamerStart,
"message": "Starting streaming",
Expand Down Expand Up @@ -194,10 +238,26 @@ func (streamer *Streamer) Stream(queue <-chan Packet) error {
StoreBool(&streamer.running, false)
}
case request := <-streamer.request:
if request.Remove {
delete(pool, request.Connection)
} else {
pool[request.Connection] = true
switch request.Command {
case StreamerCommandRemove:
streamer.logger.Log(Dict{
"event": eventStreamerClientRemove,
"message": fmt.Sprintf("Removing client %s from pool", request.Address),
})
close(request.Connection.Queue)
delete(pool, request.Connection)
case StreamerCommandAdd:
streamer.logger.Log(Dict{
"event": eventStreamerClientAdd,
"message": fmt.Sprintf("Adding client %s to pool", request.Address),
})
pool[request.Connection] = true
default:
streamer.logger.Log(Dict{
"event": eventStreamerError,
"error": errorStreamerInvalidCommand,
"message": "Ignoring invalid command in started state",
})
}
}
}
Expand All @@ -207,9 +267,12 @@ func (streamer *Streamer) Stream(queue <-chan Packet) error {
// drain any leftovers
}
for conn, _ := range pool {
conn.Close()
close(conn.Queue)
}

// start the command eater again
go streamer.eatCommands()

streamer.logger.Log(Dict{
"event": eventStreamerStop,
"message": "Ending streaming",
Expand All @@ -227,7 +290,10 @@ func (streamer *Streamer) ServeHTTP(writer http.ResponseWriter, request *http.Re
// check if the connection can be accepted
if streamer.broker.Accept(request.RemoteAddr, streamer) {
conn = NewConnection(writer, streamer.queueSize)
conn.SetLogger(streamer.logger.Logger)

streamer.request<- ConnectionRequest{
Command: StreamerCommandAdd,
Address: request.RemoteAddr,
Connection: conn,
}
Expand Down Expand Up @@ -258,10 +324,14 @@ func (streamer *Streamer) ServeHTTP(writer http.ResponseWriter, request *http.Re

// done, remove the stale connection
streamer.request<- ConnectionRequest{
Remove: true,
Command: StreamerCommandRemove,
Address: request.RemoteAddr,
Connection: conn,
}
// and drain the queue AFTER we have sent the shutdown signal
for _ = range conn.Queue {
// drain any leftovers
}
streamer.logger.Log(Dict{
"event": eventStreamerClosed,
"message": fmt.Sprintf("Connection from %s closed", request.RemoteAddr),
Expand Down

0 comments on commit 2a5d349

Please sign in to comment.