Skip to content

Commit

Permalink
proxy: set to be insensitive to failures
Browse files Browse the repository at this point in the history
  • Loading branch information
spinlock committed Dec 8, 2016
1 parent f7eecb2 commit 44e6848
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 17 deletions.
4 changes: 4 additions & 0 deletions pkg/proxy/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ session_max_pipeline = 512
# Set session tcp keepalive period. (0 to disable)
session_keepalive_period = "75s"
# Set session to be sensitive to failures. Default is false, instead of closing socket, proxy will send an error response to client.
session_break_on_failure = false
# Set metrics server (such as http://localhost:28000), proxy will report json formatted metrics to specified server in a predefined period.
metrics_report_server = ""
metrics_report_period = "1s"
Expand Down Expand Up @@ -140,6 +143,7 @@ type Config struct {
SessionSendTimeout timesize.Duration `toml:"session_send_timeout" json:"session_send_timeout"`
SessionMaxPipeline int `toml:"session_max_pipeline" json:"session_max_pipeline"`
SessionKeepAlivePeriod timesize.Duration `toml:"session_keepalive_period" json:"session_keepalive_period"`
SessionBreakOnFailure bool `toml:"session_break_on_failure" json:"session_break_on_failure"`

MetricsReportServer string `toml:"metrics_report_server" json:"metrics_report_server"`
MetricsReportPeriod timesize.Duration `toml:"metrics_report_period" json:"metrics_report_period"`
Expand Down
40 changes: 23 additions & 17 deletions pkg/proxy/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,13 @@ func NewSession(sock net.Conn, config *Config) *Session {
return s
}

func (s *Session) CloseReader() error {
func (s *Session) CloseReaderWithError(err error) error {
s.exit.Do(func() {
log.Infof("session [%p] closed: %s, quit", s, s)
if err != nil {
log.Infof("session [%p] closed: %s, error: %s", s, s, err)
} else {
log.Infof("session [%p] closed: %s, quit", s, s)
}
})
return s.Conn.CloseReader()
}
Expand Down Expand Up @@ -132,20 +136,18 @@ func (s *Session) Start(d *Router) {

go func() {
s.loopReader(tasks, d)
close(tasks)
}()
})
}

func (s *Session) loopReader(tasks chan<- *Request, d *Router) (err error) {
defer func() {
if err != nil {
s.CloseWithError(err)
} else {
s.CloseReader()
}
close(tasks)
s.CloseReaderWithError(err)
}()

var sensitive = s.config.SessionBreakOnFailure

for !s.quit {
multi, err := s.Conn.DecodeMultiBulk()
if err != nil {
Expand All @@ -164,7 +166,9 @@ func (s *Session) loopReader(tasks chan<- *Request, d *Router) (err error) {
if err := s.handleRequest(r, d); err != nil {
r.Resp = redis.NewErrorf("ERR handle request, %s", err)
tasks <- r
return s.incrOpFails(err)
if sensitive {
return err
}
} else {
tasks <- r
}
Expand All @@ -181,6 +185,8 @@ func (s *Session) loopWriter(tasks <-chan *Request) (err error) {
s.flushOpStats(true)
}()

var sensitive = s.config.SessionBreakOnFailure

p := s.Conn.FlushEncoder()
p.MaxInterval = time.Millisecond
p.MaxBuffered = 256
Expand All @@ -189,14 +195,18 @@ func (s *Session) loopWriter(tasks <-chan *Request) (err error) {
resp, err := s.handleResponse(r)
if err != nil {
resp = redis.NewErrorf("ERR handle response, %s", err)
s.Conn.Encode(resp, true)
return s.incrOpFails(err)
if sensitive {
s.Conn.Encode(resp, true)
return s.incrOpFails(err)
}
}
if err := p.Encode(resp); err != nil {
return s.incrOpFails(err)
}
if err := p.Flush(len(tasks) == 0); err != nil {
return s.incrOpFails(err)
} else {
s.incrOpStats(r)
}
if len(tasks) == 0 {
s.flushOpStats(false)
Expand All @@ -214,14 +224,10 @@ func (s *Session) handleResponse(r *Request) (*redis.Resp, error) {
}
if err := r.Err; err != nil {
return nil, err
}
switch resp := r.Resp; {
case resp == nil:
} else if r.Resp == nil {
return nil, ErrRespIsRequired
default:
s.incrOpStats(r)
return resp, nil
}
return r.Resp, nil
}

func (s *Session) handleRequest(r *Request, d *Router) error {
Expand Down

0 comments on commit 44e6848

Please sign in to comment.