diff --git a/cmd/load_cnosdb/http_writer.go b/cmd/load_cnosdb/http_writer.go index 6fb7c0d..a1a7699 100644 --- a/cmd/load_cnosdb/http_writer.go +++ b/cmd/load_cnosdb/http_writer.go @@ -5,7 +5,6 @@ package main import ( "bytes" "fmt" - "net/url" "time" "github.com/valyala/fasthttp" @@ -36,6 +35,8 @@ type HTTPWriterConfig struct { // Name of the target database into which points will be written. Database string + Auth string + // Debug label for more informative errors. DebugInfo string } @@ -56,7 +57,7 @@ func NewHTTPWriter(c HTTPWriterConfig, consistency string) *HTTPWriter { }, c: c, - url: []byte(c.Host + "/write/line_protocol?consistency=" + consistency + "&db=" + url.QueryEscape(c.Database)), + url: []byte(c.Host + "/write"), } } @@ -69,6 +70,8 @@ func (w *HTTPWriter) initializeReq(req *fasthttp.Request, body []byte, isGzip bo req.Header.SetContentTypeBytes(textPlain) req.Header.SetMethodBytes(methodPost) req.Header.SetRequestURIBytes(w.url) + req.Header.Add("database", w.c.Database) + req.Header.Add("user_id", w.c.Auth) if isGzip { req.Header.Add(headerContentEncoding, headerGzip) } @@ -83,7 +86,7 @@ func (w *HTTPWriter) executeReq(req *fasthttp.Request, resp *fasthttp.Response) sc := resp.StatusCode() if sc == 500 && backpressurePred(resp.Body()) { err = errBackoff - } else if sc != fasthttp.StatusNoContent { + } else if sc != fasthttp.StatusOK { err = fmt.Errorf("[DebugInfo: %s] Invalid write response (status %d): %s", w.c.DebugInfo, sc, resp.Body()) } } diff --git a/cmd/load_cnosdb/process.go b/cmd/load_cnosdb/process.go index 0d36830..f6fcc79 100644 --- a/cmd/load_cnosdb/process.go +++ b/cmd/load_cnosdb/process.go @@ -27,6 +27,7 @@ func (p *processor) Init(numWorker int, _, _ bool) { DebugInfo: fmt.Sprintf("worker #%d, dest url: %s", numWorker, daemonURL), Host: daemonURL, Database: loader.DatabaseName(), + Auth: config.Auth, } w := NewHTTPWriter(cfg, consistency) p.initWithHTTPWriter(numWorker, w) diff --git a/go.sum b/go.sum index 495f956..5153892 100644 --- a/go.sum +++ b/go.sum @@ -404,7 +404,6 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= diff --git a/load/loader.go b/load/loader.go index c468a08..5c7d06e 100644 --- a/load/loader.go +++ b/load/loader.go @@ -33,6 +33,7 @@ var ( // BenchmarkRunnerConfig contains all the configuration information required for running BenchmarkRunner. type BenchmarkRunnerConfig struct { DBName string `yaml:"db-name" mapstructure:"db-name" json:"db-name"` + Auth string `yaml:"user_id" mapstructure:"user_id" json:"user_id"` BatchSize uint `yaml:"batch-size" mapstructure:"batch-size" json:"batch-size"` Workers uint `yaml:"workers" mapstructure:"workers" json:"workers"` Limit uint64 `yaml:"limit" mapstructure:"limit" json:"limit"` @@ -53,6 +54,7 @@ type BenchmarkRunnerConfig struct { // AddToFlagSet adds command line flags needed by the BenchmarkRunnerConfig to the flag set. func (c BenchmarkRunnerConfig) AddToFlagSet(fs *pflag.FlagSet) { fs.String("db-name", "benchmark", "Name of database") + fs.String("user_id", "123", "auth of database") fs.Uint("batch-size", defaultBatchSize, "Number of items to batch together in a single insert") fs.Uint("workers", 1, "Number of parallel clients inserting") fs.Uint64("limit", 0, "Number of items to insert (0 = all of them).")