diff --git a/cmd/load_cnosdb/http_writer.go b/cmd/load_cnosdb/http_writer.go index 58ae942..bea1dca 100644 --- a/cmd/load_cnosdb/http_writer.go +++ b/cmd/load_cnosdb/http_writer.go @@ -19,14 +19,9 @@ const ( ) var ( - errBackoff = fmt.Errorf("backpressure is needed") - backoffMagicWords0 = []byte("engine: cache maximum memory size exceeded") - backoffMagicWords1 = []byte("write failed: hinted handoff queue not empty") - backoffMagicWords2a = []byte("write failed: read message type: read tcp") - backoffMagicWords2b = []byte("i/o timeout") - backoffMagicWords3 = []byte("write failed: engine: cache-max-memory-size exceeded") - backoffMagicWords4 = []byte("timeout") - backoffMagicWords5 = []byte("write failed: can not exceed max connections of 500") + errBackoff = fmt.Errorf("backpressure is needed") + backoffMagicWords0 = []byte("Memory Exhausted Retry Later") + backoffMagicWords4 = []byte("timeout") ) // HTTPWriterConfig is the configuration used to create an HTTPWriter. @@ -89,7 +84,7 @@ func (w *HTTPWriter) executeReq(req *fasthttp.Request, resp *fasthttp.Response) lat := time.Since(start).Nanoseconds() if err == nil { sc := resp.StatusCode() - if sc == 500 && backpressurePred(resp.Body()) { + if sc == 422 && backpressurePred(resp.Body()) { err = errBackoff } else if sc != fasthttp.StatusOK { err = fmt.Errorf("[DebugInfo: %s] Invalid write response (status %d): %s", w.c.DebugInfo, sc, resp.Body()) @@ -115,16 +110,8 @@ func (w *HTTPWriter) WriteLineProtocol(body []byte, isGzip bool) (int64, error) func backpressurePred(body []byte) bool { if bytes.Contains(body, backoffMagicWords0) { return true - } else if bytes.Contains(body, backoffMagicWords1) { - return true - } else if bytes.Contains(body, backoffMagicWords2a) && bytes.Contains(body, backoffMagicWords2b) { - return true - } else if bytes.Contains(body, backoffMagicWords3) { - return true } else if bytes.Contains(body, backoffMagicWords4) { return true - } else if bytes.Contains(body, backoffMagicWords5) { - return true } else { return false } diff --git a/cmd/load_cnosdb/http_writer_test.go b/cmd/load_cnosdb/http_writer_test.go index 170ae4e..0bead1a 100644 --- a/cmd/load_cnosdb/http_writer_test.go +++ b/cmd/load_cnosdb/http_writer_test.go @@ -37,7 +37,7 @@ func runHTTPServer(c chan struct{}) { coinflip := atomic.AddInt64(&i, 1) if coinflip%2 == 1 { w.WriteHeader(http.StatusInternalServerError) - fmt.Fprintf(w, string(backoffMagicWords1)) + fmt.Fprintf(w, string(backoffMagicWords0)) } else { w.WriteHeader(http.StatusNoContent) fmt.Fprintf(w, "") @@ -198,33 +198,13 @@ func TestBackpressurePred(t *testing.T) { want: true, }, { - body: "yadda" + string(backoffMagicWords1), - want: true, - }, - { - body: string(backoffMagicWords2a), - want: false, // need both magic strings or it fails - }, - { - body: string(backoffMagicWords2a) + " AND " + string(backoffMagicWords2b), - want: true, - }, - { - body: string(backoffMagicWords3) + " yadda", - want: true, + body: string(backoffMagicWords0[2:]), + want: false, }, { body: "yadda " + string(backoffMagicWords4) + " yadda", want: true, }, - { - body: "foo " + string(backoffMagicWords5) + " yadda", - want: true, - }, - { - body: string(backoffMagicWords0[2:]), - want: false, - }, } for _, c := range cases { diff --git a/cmd/load_influx/http_writer.go b/cmd/load_influx/http_writer.go index fa0fbce..6f23434 100644 --- a/cmd/load_influx/http_writer.go +++ b/cmd/load_influx/http_writer.go @@ -7,7 +7,7 @@ import ( "fmt" "net/url" "time" - + "github.com/valyala/fasthttp" ) @@ -20,22 +20,22 @@ const ( var ( errBackoff = fmt.Errorf("backpressure is needed") backoffMagicWords0 = []byte("engine: cache maximum memory size exceeded") - backoffMagicWords1 = []byte("write failed: hinted handoff queue not empty") - backoffMagicWords2a = []byte("write failed: read message type: read tcp") + backoffMagicWords1 = []byte("hinted handoff queue not empty") + backoffMagicWords2a = []byte("read message type: read tcp") backoffMagicWords2b = []byte("i/o timeout") - backoffMagicWords3 = []byte("write failed: engine: cache-max-memory-size exceeded") + backoffMagicWords3 = []byte("engine: cache-max-memory-size exceeded") backoffMagicWords4 = []byte("timeout") - backoffMagicWords5 = []byte("write failed: can not exceed max connections of 500") + backoffMagicWords5 = []byte("can not exceed max connections of 500") ) // HTTPWriterConfig is the configuration used to create an HTTPWriter. type HTTPWriterConfig struct { // URL of the host, in form "http://example.com:8086" Host string - + // Name of the target database into which points will be written. Database string - + // Debug label for more informative errors. DebugInfo string } @@ -43,7 +43,7 @@ type HTTPWriterConfig struct { // HTTPWriter is a Writer that writes to an InfluxDB HTTP server. type HTTPWriter struct { client fasthttp.Client - + c HTTPWriterConfig url []byte } @@ -54,7 +54,7 @@ func NewHTTPWriter(c HTTPWriterConfig, consistency string) *HTTPWriter { client: fasthttp.Client{ Name: httpClientName, }, - + c: c, url: []byte(c.Host + "/write?consistency=" + consistency + "&db=" + url.QueryEscape(c.Database)), } @@ -97,10 +97,10 @@ func (w *HTTPWriter) WriteLineProtocol(body []byte, isGzip bool) (int64, error) req := fasthttp.AcquireRequest() defer fasthttp.ReleaseRequest(req) w.initializeReq(req, body, isGzip) - + resp := fasthttp.AcquireResponse() defer fasthttp.ReleaseResponse(resp) - + return w.executeReq(req, resp) }