From 60f9e474065ac7a711e8924c29a29c2df5f8080b Mon Sep 17 00:00:00 2001 From: Yotam loewenbach <48534558+yotamloe@users.noreply.github.com> Date: Thu, 12 May 2022 14:52:57 +0300 Subject: [PATCH] v1.0.5 (#11) * Add info logs + update dep (goleveldb) * Change `token` query parameter to optional for generic use * Add tests for url setting * revert goleveldb version update * update Author git link --- README.md | 25 +++++++++++++++------ logsender_test.go | 55 +++++++++++++++++++++++++++++++---------------- logziosender.go | 48 +++++++++++++++++++++++------------------ 3 files changed, 81 insertions(+), 47 deletions(-) diff --git a/README.md b/README.md index 7a98153..51e2283 100644 --- a/README.md +++ b/README.md @@ -124,7 +124,8 @@ func main() { ## Data compression All bulks are compressed with gzip by default to disable compressing initialize the client with `SetCompress(false)`: ```go - logzio.New(token, SetCompress(false), + logzio.New(token, + SetCompress(false), ) ``` @@ -132,7 +133,6 @@ All bulks are compressed with gzip by default to disable compressing initialize ```shell $ go test -v - ``` @@ -142,9 +142,12 @@ $ go test -v ## Authors -* **Douglas Chimento** - [dougEfresh][me] +* **Douglas Chimento** - [dougEfresh](https://github.com/dougEfresh) * **Ido Halevi** - [idohalevi](https://github.com/idohalevi) +## Maintainers +* **Yotam Loewenbach** - [yotamloe](https://github.com/yotamloe) + ## License @@ -155,10 +158,18 @@ This project is licensed under the Apache License - see the [LICENSE](LICENSE) f * [logzio-java-sender](https://github.com/logzio/logzio-java-sender) -## Changelog +## Changelog +- v1.0.5 + - Change `token` query parameter to optional for generic use + - Changed logging levels +- v1.0.4 + - Update gopsutil version (v3.21.6 -> v3.22.3) +- v1.0.3 + - Adjust buffer clearance + - Changed logging format +- v1.0.2 + - Update dependencies - v1.0.1 - Add gzip compression - Add option for in Memory queue - -- v1.0.2 - - Update dependencies + diff --git a/logsender_test.go b/logsender_test.go index 2f144d3..c66d16e 100644 --- a/logsender_test.go +++ b/logsender_test.go @@ -28,6 +28,37 @@ const ( defaultQueueSize = 40 * 1024 * 1024 ) +func TestLogzioSender_SetUrl(t *testing.T) { + l, err := New( + "", + SetDebug(os.Stderr), + SetUrl("http://localhost:12345"), + SetInMemoryQueue(true), + SetinMemoryCapacity(500), + SetDrainDuration(time.Minute), + ) + if err != nil { + t.Fatal(err) + } + if l.url != "http://localhost:12345" { + t.Fatalf("url should be http://localhost:12345, actual: %s", l.url) + } + l2, err := New( + "token", + SetDebug(os.Stderr), + SetUrl("http://localhost:12345"), + SetInMemoryQueue(true), + SetinMemoryCapacity(500), + SetDrainDuration(time.Minute), + ) + if err != nil { + t.Fatal(err) + } + if l2.url != "http://localhost:12345/?token=token" { + t.Fatalf("url should be http://localhost:12345/?token=token, actual: %s", l.url) + } +} + // In memory queue tests func TestLogzioSender_inMemoryRetries(t *testing.T) { var sent = make([]byte, 1024) @@ -93,9 +124,7 @@ func TestLogzioSender_InMemoryCapacityLimit(t *testing.T) { func TestLogzioSender_InMemorySend(t *testing.T) { var sent = make([]byte, 1024) - var sentToken string ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - sentToken = r.URL.Query().Get("token") w.WriteHeader(http.StatusOK) r.Body.Read(sent) })) @@ -117,9 +146,6 @@ func TestLogzioSender_InMemorySend(t *testing.T) { } l.Drain() time.Sleep(200 * time.Millisecond) - if sentToken != "fake-token" { - t.Fatalf("token not sent %s", sentToken) - } item, err := l.queue.Dequeue() if item != nil { t.Fatalf("Unexpect item in the queue - %s", string(item.Value)) @@ -129,9 +155,7 @@ func TestLogzioSender_InMemorySend(t *testing.T) { func TestLogzioSender_InMemoryDrain(t *testing.T) { var sent = make([]byte, 1024) - var sentToken string ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - sentToken = r.URL.Query().Get("token") w.WriteHeader(http.StatusOK) r.Body.Read(sent) })) @@ -152,9 +176,6 @@ func TestLogzioSender_InMemoryDrain(t *testing.T) { } l.Drain() time.Sleep(time.Second * 10) - if sentToken != "fake-token" { - t.Fatalf("token not sent %s", sentToken) - } item, err := l.queue.Dequeue() if item != nil { t.Fatalf("Unexpect item in the queue - %s", string(item.Value)) @@ -378,9 +399,7 @@ func TestLogzioSender_Retries(t *testing.T) { func TestLogzioSender_Send(t *testing.T) { var sent = make([]byte, 1024) - var sentToken string ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - sentToken = r.URL.Query().Get("token") w.WriteHeader(http.StatusOK) r.Body.Read(sent) })) @@ -402,9 +421,7 @@ func TestLogzioSender_Send(t *testing.T) { if sentMsg != "blah\n" { t.Fatalf("%s != %s ", sent, sentMsg) } - if sentToken != "fake-token" { - t.Fatalf("token not sent %s", sentToken) - } + } func TestLogzioSender_DelayStart(t *testing.T) { @@ -614,7 +631,7 @@ func TestLogzioSender_CountDropped(t *testing.T) { if l.droppedLogs != 3 { t.Fatalf("items should have been dropped") } - l.diskThreshold = 95 + l.diskThreshold = 98 l.Send([]byte("blah")) l.Send([]byte("blah")) l.Drain() @@ -718,13 +735,13 @@ func TestLogzioSender_E2E(t *testing.T) { SetDebug(os.Stderr), ) if err != nil { - panic(err) + t.Fatal(err) } msg := `{"traceID":"0000000000000001","operationName":"o3","spanID":"2a3ad4a54c048830","references":[],"startTime":1632401226891238,"startTimeMillis":1632401226891,"duration":0,"logs":[],"process":{"serviceName":"testService","tags":[]},"type":"jaegerSpan"}` for i := 0; i < 10000; i++ { - err := l.Send([]byte(msg)) + err = l.Send([]byte(msg)) if err != nil { - panic(err) + t.Fatal(err) } } time.Sleep(time.Second * 40) diff --git a/logziosender.go b/logziosender.go index d1c28ec..d72260c 100644 --- a/logziosender.go +++ b/logziosender.go @@ -172,8 +172,11 @@ func SetTempDirectory(dir string) SenderOptionFunc { // SetUrl set the url which maybe different from the defaultUrl func SetUrl(url string) SenderOptionFunc { return func(l *LogzioSender) error { - l.url = fmt.Sprintf("%s/?token=%s", url, l.token) - l.debugLog("logziosender.go: Setting url to %s\n", l.url) + l.url = url + if l.token != "" { + l.url = fmt.Sprintf("%s/?token=%s", url, l.token) + } + l.debugLog("sender: Setting url to %s\n", l.url) return nil } } @@ -219,14 +222,14 @@ func (l *LogzioSender) isEnoughDiskSpace() bool { if l.checkDiskSpace { diskStat, err := disk.Usage(l.dir) if err != nil { - l.debugLog("logziosender.go: failed to get disk usage: %v\n", err) + l.debugLog("sender: failed to get disk usage: %v\n", err) l.checkDiskSpace = false return false } usage := float32(diskStat.UsedPercent) if usage > l.diskThreshold { - l.debugLog("Logz.io: Dropping logs, as FS used space on %s is %g percent,"+ + l.debugLog("sender: Dropping logs, as FS used space on %s is %g percent,"+ " and the drop threshold is %g percent\n", l.dir, usage, l.diskThreshold) l.droppedLogs++ @@ -242,7 +245,7 @@ func (l *LogzioSender) isEnoughDiskSpace() bool { func (l *LogzioSender) isEnoughMemory(dataSize uint64) bool { usage := l.queue.Length() if usage+dataSize >= l.inMemoryCapacity { - l.debugLog("Logz.io: Dropping logs, the max capacity is %d and %d is requested, Request size: %d\n", l.inMemoryCapacity, usage+dataSize, dataSize) + l.infoLog("sender: Dropping logs, the max capacity is %d and %d is requested, Request size: %d\n", l.inMemoryCapacity, usage+dataSize, dataSize) l.droppedLogs++ return false } else { @@ -291,10 +294,10 @@ func (l *LogzioSender) makeHttpRequest(data bytes.Buffer, attempt int, c bool) i if c { req.Header.Add("Content-Encoding", "gzip") } - l.debugLog("logziosender.go: Sending bulk of %v bytes\n", l.buf.Len()) + l.infoLog("sender: Sending bulk of %v bytes\n", l.buf.Len()) resp, err := l.httpClient.Do(req) if err != nil { - //l.debugLog("logziosender.go: Error sending logs to %s %s\n", l.url, err) + l.infoLog("sender: Error sending logs to %s %s\n", l.url, err) return httpError } @@ -302,9 +305,9 @@ func (l *LogzioSender) makeHttpRequest(data bytes.Buffer, attempt int, c bool) i statusCode := resp.StatusCode _, err = ioutil.ReadAll(resp.Body) if err != nil { - l.debugLog("Error reading response body: %v", err) + l.infoLog("sender: Error reading response body: %v", err) } - l.debugLog("logziosender.go: Response status code: %v \n", statusCode) + l.infoLog("sender: Response status code: %v \n", statusCode) if statusCode == 200 { l.droppedLogs = 0 } @@ -335,16 +338,16 @@ func (l *LogzioSender) shouldRetry(statusCode int) bool { retry := true switch statusCode { case http.StatusBadRequest: - l.debugLog("Got HTTP %d bad request, skip retry\n", statusCode) + l.infoLog("sender: Got HTTP %d bad request, skip retry\n", statusCode) retry = false case http.StatusNotFound: - l.debugLog("Got HTTP %d not found, skip retry\n", statusCode) + l.infoLog("sender: Got HTTP %d not found, skip retry\n", statusCode) retry = false case http.StatusUnauthorized: - l.debugLog("Got HTTP %d unauthorized, skip retry\n", statusCode) + l.infoLog("sender: Got HTTP %d unauthorized, skip retry\n", statusCode) retry = false case http.StatusForbidden: - l.debugLog("Got HTTP %d forbidden, skip retry\n", statusCode) + l.infoLog("sender: Got HTTP %d forbidden, skip retry\n", statusCode) retry = false case http.StatusOK: retry = false @@ -355,11 +358,11 @@ func (l *LogzioSender) shouldRetry(statusCode int) bool { // Drain - Send remaining logs func (l *LogzioSender) Drain() { if l.draining.Load() { - l.debugLog("logziosender.go: Already draining\n") + l.debugLog("sender: Already draining\n") return } l.mux.Lock() - l.debugLog("logziosender.go: draining queue\n") + l.debugLog("sender: draining queue\n") defer l.mux.Unlock() l.draining.Toggle() defer l.draining.Toggle() @@ -372,7 +375,7 @@ func (l *LogzioSender) Drain() { toBackOff := false for attempt := 0; attempt < sendRetries; attempt++ { if toBackOff { - l.debugLog("logziosender.go: failed to send logs, trying again in %v\n", backOff) + l.debugLog("sender: failed to send logs, trying again in %v\n", backOff) time.Sleep(backOff) backOff *= 2 } @@ -400,7 +403,7 @@ func (l *LogzioSender) dequeueUpToMaxBatchSize() { for l.buf.Len() < maxSize && err == nil { item, err := l.queue.Dequeue() if err != nil { - l.debugLog("queue state: %s\n", err) + l.debugLog("sender: queue state: %s\n", err) } if item != nil { // NewLine is appended tp item.Value @@ -409,9 +412,8 @@ func (l *LogzioSender) dequeueUpToMaxBatchSize() { break } _, err := l.buf.Write(append(item.Value, '\n')) - //l.debugLog("logziosender.go: Adding item with size %d (total buffSize: %d)\n", len(item.Value), l.buf.Len()) if err != nil { - l.errorLog("error writing to buffer %s", err) + l.errorLog("sender: error writing to buffer %s", err) } } else { break @@ -426,10 +428,10 @@ func (l *LogzioSender) Sync() error { } func (l *LogzioSender) requeue() { - l.debugLog("logziosender.go: Requeue %s", l.buf.String()) + l.debugLog("sender: Requeue %s", l.buf.String()) err := l.Send(l.buf.Bytes()) if err != nil { - l.errorLog("could not requeue logs %s", err) + l.errorLog("sender: could not requeue logs %s", err) } } @@ -439,6 +441,10 @@ func (l *LogzioSender) debugLog(format string, a ...interface{}) { } } +func (l *LogzioSender) infoLog(format string, a ...interface{}) { + fmt.Fprintf(os.Stderr, format, a...) +} + func (l *LogzioSender) errorLog(format string, a ...interface{}) { fmt.Fprintf(os.Stderr, format, a...) }