Skip to content

Commit

Permalink
Merge pull request #11 from cnosdb/zipper
Browse files Browse the repository at this point in the history
Fix backoff pressure triggers for influxdb-1.8 and cnosdb-2.3
  • Loading branch information
zipper-meng authored Jun 14, 2024
2 parents 5892d1e + 51c88d9 commit 034c763
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 51 deletions.
21 changes: 4 additions & 17 deletions cmd/load_cnosdb/http_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,9 @@ const (
)

var (
errBackoff = fmt.Errorf("backpressure is needed")
backoffMagicWords0 = []byte("engine: cache maximum memory size exceeded")
backoffMagicWords1 = []byte("write failed: hinted handoff queue not empty")
backoffMagicWords2a = []byte("write failed: read message type: read tcp")
backoffMagicWords2b = []byte("i/o timeout")
backoffMagicWords3 = []byte("write failed: engine: cache-max-memory-size exceeded")
backoffMagicWords4 = []byte("timeout")
backoffMagicWords5 = []byte("write failed: can not exceed max connections of 500")
errBackoff = fmt.Errorf("backpressure is needed")
backoffMagicWords0 = []byte("Memory Exhausted Retry Later")
backoffMagicWords4 = []byte("timeout")
)

// HTTPWriterConfig is the configuration used to create an HTTPWriter.
Expand Down Expand Up @@ -89,7 +84,7 @@ func (w *HTTPWriter) executeReq(req *fasthttp.Request, resp *fasthttp.Response)
lat := time.Since(start).Nanoseconds()
if err == nil {
sc := resp.StatusCode()
if sc == 500 && backpressurePred(resp.Body()) {
if sc == 422 && backpressurePred(resp.Body()) {
err = errBackoff
} else if sc != fasthttp.StatusOK {
err = fmt.Errorf("[DebugInfo: %s] Invalid write response (status %d): %s", w.c.DebugInfo, sc, resp.Body())
Expand All @@ -115,16 +110,8 @@ func (w *HTTPWriter) WriteLineProtocol(body []byte, isGzip bool) (int64, error)
func backpressurePred(body []byte) bool {
if bytes.Contains(body, backoffMagicWords0) {
return true
} else if bytes.Contains(body, backoffMagicWords1) {
return true
} else if bytes.Contains(body, backoffMagicWords2a) && bytes.Contains(body, backoffMagicWords2b) {
return true
} else if bytes.Contains(body, backoffMagicWords3) {
return true
} else if bytes.Contains(body, backoffMagicWords4) {
return true
} else if bytes.Contains(body, backoffMagicWords5) {
return true
} else {
return false
}
Expand Down
26 changes: 3 additions & 23 deletions cmd/load_cnosdb/http_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func runHTTPServer(c chan struct{}) {
coinflip := atomic.AddInt64(&i, 1)
if coinflip%2 == 1 {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, string(backoffMagicWords1))
fmt.Fprintf(w, string(backoffMagicWords0))
} else {
w.WriteHeader(http.StatusNoContent)
fmt.Fprintf(w, "")
Expand Down Expand Up @@ -198,33 +198,13 @@ func TestBackpressurePred(t *testing.T) {
want: true,
},
{
body: "yadda" + string(backoffMagicWords1),
want: true,
},
{
body: string(backoffMagicWords2a),
want: false, // need both magic strings or it fails
},
{
body: string(backoffMagicWords2a) + " AND " + string(backoffMagicWords2b),
want: true,
},
{
body: string(backoffMagicWords3) + " yadda",
want: true,
body: string(backoffMagicWords0[2:]),
want: false,
},
{
body: "yadda " + string(backoffMagicWords4) + " yadda",
want: true,
},
{
body: "foo " + string(backoffMagicWords5) + " yadda",
want: true,
},
{
body: string(backoffMagicWords0[2:]),
want: false,
},
}

for _, c := range cases {
Expand Down
22 changes: 11 additions & 11 deletions cmd/load_influx/http_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"fmt"
"net/url"
"time"

"github.com/valyala/fasthttp"
)

Expand All @@ -20,30 +20,30 @@ const (
var (
errBackoff = fmt.Errorf("backpressure is needed")
backoffMagicWords0 = []byte("engine: cache maximum memory size exceeded")
backoffMagicWords1 = []byte("write failed: hinted handoff queue not empty")
backoffMagicWords2a = []byte("write failed: read message type: read tcp")
backoffMagicWords1 = []byte("hinted handoff queue not empty")
backoffMagicWords2a = []byte("read message type: read tcp")
backoffMagicWords2b = []byte("i/o timeout")
backoffMagicWords3 = []byte("write failed: engine: cache-max-memory-size exceeded")
backoffMagicWords3 = []byte("engine: cache-max-memory-size exceeded")
backoffMagicWords4 = []byte("timeout")
backoffMagicWords5 = []byte("write failed: can not exceed max connections of 500")
backoffMagicWords5 = []byte("can not exceed max connections of 500")
)

// HTTPWriterConfig is the configuration used to create an HTTPWriter.
type HTTPWriterConfig struct {
// URL of the host, in form "http://example.com:8086"
Host string

// Name of the target database into which points will be written.
Database string

// Debug label for more informative errors.
DebugInfo string
}

// HTTPWriter is a Writer that writes to an InfluxDB HTTP server.
type HTTPWriter struct {
client fasthttp.Client

c HTTPWriterConfig
url []byte
}
Expand All @@ -54,7 +54,7 @@ func NewHTTPWriter(c HTTPWriterConfig, consistency string) *HTTPWriter {
client: fasthttp.Client{
Name: httpClientName,
},

c: c,
url: []byte(c.Host + "/write?consistency=" + consistency + "&db=" + url.QueryEscape(c.Database)),
}
Expand Down Expand Up @@ -97,10 +97,10 @@ func (w *HTTPWriter) WriteLineProtocol(body []byte, isGzip bool) (int64, error)
req := fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(req)
w.initializeReq(req, body, isGzip)

resp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(resp)

return w.executeReq(req, resp)
}

Expand Down

0 comments on commit 034c763

Please sign in to comment.