diff --git a/go.mod b/go.mod index ad2ba51..5779c90 100644 --- a/go.mod +++ b/go.mod @@ -3,15 +3,9 @@ module github.com/logzio/logzio-go go 1.15 require ( - github.com/StackExchange/wmi v1.2.0 // indirect github.com/beeker1121/goque v2.1.0+incompatible - github.com/go-ole/go-ole v1.2.5 // indirect - github.com/golang/snappy v0.0.4 // indirect - github.com/shirou/gopsutil v0.0.0-20190323131628-2cbc9195c892 // indirect github.com/shirou/gopsutil/v3 v3.21.6 github.com/syndtr/goleveldb v1.0.0 // indirect - github.com/tidwall/gjson v1.8.1 // indirect - github.com/tidwall/pretty v1.2.0 // indirect + github.com/tidwall/sjson v1.1.7 go.uber.org/atomic v1.9.0 - golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect ) diff --git a/go.sum b/go.sum index 8941071..f4f8d4e 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,5 @@ -github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 h1:fLjPD/aNc3UIOA6tDi6QXUemppXK3P9BI7mr2hd6gx8= -github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk= github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= -github.com/StackExchange/wmi v1.2.0 h1:noJEYkMQVlFCEAc+2ma5YyRhlfjcWfZqk5sBRYozdyM= -github.com/StackExchange/wmi v1.2.0/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/beeker1121/goque v2.0.1+incompatible h1:5nJHPMqQLxUvGFc8m/NW2QzxKyc0zICmqs/JUsmEjwE= github.com/beeker1121/goque v2.0.1+incompatible/go.mod h1:L6dOWBhDOnxUVQsb0wkLve0VCnt2xJW/MI8pdRX4ANw= github.com/beeker1121/goque v2.1.0+incompatible h1:m5pZ5b8nqzojS2DF2ioZphFYQUqGYsDORq6uefUItPM= @@ -20,6 +16,7 @@ github.com/go-ole/go-ole v1.2.5 h1:t4MGB5xEDZvXI+0rMjjsfBsD7yAgp/s9ZDkL1JndXwY= github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= diff --git a/inMemoryQueue/temp.txt b/inMemoryQueue/temp.txt new file mode 100644 index 0000000..dd11f10 --- /dev/null +++ b/inMemoryQueue/temp.txt @@ -0,0 +1,174 @@ +package inMemoryQueue + +import ( + queue "github.com/beeker1121/goque" + "go.uber.org/atomic" + "sync" +) + +var wg sync.WaitGroup +//Node storage of queue data +type Node struct { + data []byte + prev *Node + next *Node +} + +//ConcurrentQueue concurrent queue +type ConcurrentQueue struct { + //mutex lock + lock *sync.Mutex + + //empty and full locks + notEnqueueing *sync.Cond + notDequeuing *sync.Cond + + isEnqueueing atomic.Bool + isDequeuing atomic.Bool + + //queue storage backend + backend *QueueBackend +} + +func NewConcurrentQueue() *ConcurrentQueue { + queue := ConcurrentQueue{} + + //init mutexes + queue.lock = &sync.Mutex{} + queue.notEnqueueing = sync.NewCond(queue.lock) + queue.notDequeuing = sync.NewCond(queue.lock) + + queue.isEnqueueing.Store(false) + queue.isDequeuing.Store(false) + + //init backend + queue.backend = &QueueBackend{} + queue.backend.size = make(chan int,2) + queue.backend.size <- 0 + queue.backend.head = nil + queue.backend.tail = nil + queue.backend.lock = &sync.Mutex{} + + return &queue +} + +//QueueBackend Backend storage of the queue, a double linked list +type QueueBackend struct { + //Pointers to root and end + head *Node + tail *Node + //keep track of current size + size chan int + + lock *sync.Mutex +} + +func (queue *QueueBackend) createNode(data []byte) *Node { + queue.lock.Lock() + node := Node{} + node.data = data + node.next = nil + node.prev = nil + queue.lock.Unlock() + return &node +} + +func (queue *QueueBackend) put(data []byte) error { + + actualSize := <-queue.size + + if actualSize == 0 { + //new root node + node := queue.createNode(data) + queue.head = node + queue.tail = node + + queue.size <- 0 + len(data) + + return nil + } + //queue non-empty append to head + currentHead := queue.head + newHead := queue.createNode(data) + newHead.next = currentHead + currentHead.prev = newHead + + queue.head = currentHead + queue.size <- actualSize + (len(data)) + return nil + +} + +func (queue *QueueBackend) pop() ([]byte, error) { + queue.lock.Lock() + currentEnd := queue.tail + newEnd := currentEnd.prev + queue.lock.Unlock() + + if newEnd != nil { + newEnd.next = nil + } + queue.size <- <-queue.size - len(currentEnd.data) + actualSize := <-queue.size + + if actualSize == 0 { + queue.head = nil + queue.tail = nil + } + queue.size <- actualSize + return currentEnd.data, nil +} +func (c *ConcurrentQueue) Enqueue(data []byte) (*Item, error) { + if c.isEnqueueing.Load() { + c.notEnqueueing.Wait() + } + c.isEnqueueing.Toggle() + _ = c.backend.put(data) + item := &Item{ + Value: data, + } + // Signal free + c.isEnqueueing.Toggle() + c.notEnqueueing.Signal() + return item, nil +} + +type Item = queue.Item + +func (c *ConcurrentQueue) Dequeue() (*Item, error) { + // another goroutine is dequeuing + if c.isDequeuing.Load() { + c.notDequeuing.Wait() + } + c.isDequeuing.Toggle() + c.lock.Lock() + if c.Length() == 0 { + c.isDequeuing.Toggle() + c.notDequeuing.Signal() + c.lock.Unlock() + return nil, ErrEmpty + } + c.lock.Unlock() + data, err := c.backend.pop() + item := &Item{ + Value: data, + } + // Signal free + c.isDequeuing.Toggle() + c.notDequeuing.Signal() + return item, err +} + +func (c *ConcurrentQueue) Length() uint64 { + + size := <-c.backend.size + c.backend.size <-size + return uint64(size) +} +func (c *ConcurrentQueue) Close() { + +} + +BenchmarkLogzioSenderInmemory-12 5290641 227 ns/op 78 B/op 2 allocs/op + +BenchmarkLogzioSender-12 91461 15551 ns/op 928 B/op 15 allocs/op diff --git a/logsender_test.go b/logsender_test.go index 8201cb2..c5e93c6 100644 --- a/logsender_test.go +++ b/logsender_test.go @@ -15,7 +15,11 @@ package logzio import ( + "bytes" "fmt" + "github.com/tidwall/sjson" + "io/ioutil" + "math/rand" "net/http" "net/http/httptest" "os" @@ -38,6 +42,7 @@ func TestLogzioSender_inMemoryRetries(t *testing.T) { defer ts.Close() l, err := New( "fake-token", + "fake", SetDebug(os.Stderr), SetUrl("http://localhost:12345"), SetDrainDuration(time.Minute*10), @@ -65,6 +70,7 @@ func TestLogzioSender_inMemoryRetries(t *testing.T) { func TestLogzioSender_InMemoryCapacityLimit(t *testing.T) { l, err := New( "fake-token", + "fake", SetDebug(os.Stderr), SetUrl("http://localhost:12345"), SetInMemoryQueue(true), @@ -101,6 +107,7 @@ func TestLogzioSender_InMemorySend(t *testing.T) { })) defer ts.Close() l, err := New("fake-token", + "fake", SetUrl(ts.URL), SetinMemoryCapacity(defaultQueueSize), SetInMemoryQueue(true), @@ -137,6 +144,7 @@ func TestLogzioSender_InMemoryDrain(t *testing.T) { })) defer ts.Close() l, err := New("fake-token", + "fake", SetUrl(ts.URL), SetinMemoryCapacity(defaultQueueSize), SetInMemoryQueue(true), @@ -166,6 +174,7 @@ func TestLogzioSender_ShouldRetry(t *testing.T) { //var sent = make([]byte, 1024) l, err := New( "fake-token", + "fake", SetDebug(os.Stderr), SetUrl("http://localhost:12345"), SetDrainDuration(time.Minute*10), @@ -210,6 +219,7 @@ func TestLogzioSender_InMemoryDelayStart(t *testing.T) { defer ts.Close() l, err := New( "fake-token", + "fake", SetDebug(os.Stderr), SetUrl("http://localhost:12345"), SetInMemoryQueue(true), @@ -251,6 +261,7 @@ func TestLogzioSender_InMemoryUnauth(t *testing.T) { defer ts.Close() l, err := New( "fake-token", + "fake", SetDebug(os.Stderr), SetCompress(false), SetDrainDuration(time.Minute), @@ -287,6 +298,7 @@ func TestLogzioSender_InMemoryWrite(t *testing.T) { defer ts.Close() l, err := New( "fake-token", + "fake", SetDebug(os.Stderr), SetDrainDuration(time.Minute), SetUrl(ts.URL), @@ -320,6 +332,7 @@ func TestLogzioSender_DequeueUpToMaxBatchSize(t *testing.T) { defer ts.Close() l, err := New( "fake-token", + "fake", SetDebug(os.Stderr), SetDrainDuration(time.Hour), SetUrl(ts.URL), @@ -353,6 +366,7 @@ func TestLogzioSender_Retries(t *testing.T) { defer ts.Close() l, err := New( "fake-token", + "fake", SetDebug(os.Stderr), SetUrl("http://localhost:12345"), SetDrainDuration(time.Minute*10), @@ -387,6 +401,7 @@ func TestLogzioSender_Send(t *testing.T) { defer ts.Close() l, err := New("fake-token", + "fake", SetUrl(ts.URL), SetCompress(false), ) @@ -416,6 +431,7 @@ func TestLogzioSender_DelayStart(t *testing.T) { defer ts.Close() l, err := New( "fake-token", + "fake", SetDebug(os.Stderr), SetCompress(false), SetUrl("http://localhost:12345"), @@ -452,6 +468,7 @@ func TestLogzioSender_TmpDir(t *testing.T) { tmp := fmt.Sprintf("%s/%d", os.TempDir(), time.Now().Nanosecond()) l, err := New( "fake-token", + "fake", SetDebug(os.Stderr), SetTempDirectory(tmp), SetCompress(false), @@ -486,6 +503,7 @@ func TestLogzioSender_Write(t *testing.T) { tmp := fmt.Sprintf("%s/%d", os.TempDir(), time.Now().Nanosecond()) l, err := New( "fake-token", + "fake", SetDebug(os.Stderr), SetTempDirectory(tmp), SetCompress(false), @@ -518,6 +536,7 @@ func TestLogzioSender_RestoreQueue(t *testing.T) { defer ts.Close() l, err := New( "fake-token", + "fake", SetDebug(os.Stderr), SetUrl("http://localhost:12345"), SetDrainDuration(time.Minute*10), @@ -534,6 +553,7 @@ func TestLogzioSender_RestoreQueue(t *testing.T) { // open queue again - same dir l, err = New( "fake-token", + "fake", SetDebug(os.Stderr), SetUrl("http://localhost:12345"), SetDrainDuration(time.Minute*10), @@ -567,6 +587,7 @@ func TestLogzioSender_Unauth(t *testing.T) { tmp := fmt.Sprintf("%s/%d", os.TempDir(), time.Now().Nanosecond()) l, err := New( "fake-token", + "fake", SetDebug(os.Stderr), SetTempDirectory(tmp), SetCompress(false), @@ -599,6 +620,7 @@ func TestLogzioSender_CountDropped(t *testing.T) { })) l, err := New( "fake-token", + "fake", SetDebug(os.Stderr), SetUrl("http://localhost:12345"), SetDrainDiskThreshold(0), @@ -633,6 +655,7 @@ func TestLogzioSender_CountDropped(t *testing.T) { func TestLogzioSender_ThresholdLimit(t *testing.T) { l, err := New( "fake-token", + "fake", SetDebug(os.Stderr), SetUrl("http://localhost:12345"), SetDrainDiskThreshold(0), @@ -653,6 +676,7 @@ func TestLogzioSender_ThresholdLimit(t *testing.T) { func TestLogzioSender_ThresholdLimitWithoutCheck(t *testing.T) { l, err := New( "fake-token", + "fake", SetDebug(os.Stderr), SetUrl("http://localhost:12345"), SetDrainDiskThreshold(0), @@ -678,7 +702,7 @@ func BenchmarkLogzioSender(b *testing.B) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) })) - l, _ := New("fake-token", SetUrl(ts.URL), + l, _ := New("fake-token", "fake", SetUrl(ts.URL), SetDrainDuration(time.Hour)) defer ts.Close() defer l.Stop() @@ -693,7 +717,7 @@ func BenchmarkLogzioSenderInmemory(b *testing.B) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) })) - l, _ := New("fake-token", SetUrl(ts.URL), + l, _ := New("fake-token", "fake", SetUrl(ts.URL), SetDrainDuration(time.Hour), SetInMemoryQueue(true), SetlogCountLimit(6000000), @@ -707,73 +731,75 @@ func BenchmarkLogzioSenderInmemory(b *testing.B) { } } -////E2E test -//func TestLogzioSender_E2E(t *testing.T) { -// l, err := New("", -// SetInMemoryQueue(true), -// SetDrainDuration(time.Second*5), -// SetDebug(os.Stderr), -// ) -// if err != nil { -// panic(err) -// } -// randomString := fmt.Sprint(rand.Int()) -// msg := fmt.Sprintf("{ \"%s\": \"%s\"}", "message", randomString) -// l.debugLog("Sending 500 logs...\n") -// for i := 0; i < 500; i++ { -// err := l.Send([]byte(msg)) -// if err != nil { -// panic(err) -// } -// } -// <-time.After(l.drainDuration) -// -// apiQuery := `{ -// "query": { -// "bool": { -// "must": [{ -// "query_string": { -// "query": "" -// } -// }, -// { -// "range": { -// "@timestamp": { -// "gte": "now-5m", -// "lte": "now" -// } -// } -// } -// ] -// } -// }, -// "size": 1000, -// "from": 0 -// }` -// -// url := "https://api.logz.io/v1/search" -// queryString := fmt.Sprintf("message:%s", randomString) -// query, _ := sjson.Set(apiQuery, "query.bool.must.0.query_string.query", queryString) -// var jsonStr = []byte(query) -// -// l.debugLog("Waiting 40 seconds for ingestion\n") -// time.Sleep(time.Second * 40) -// -// fmt.Println("URL:>", url) -// req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonStr)) -// req.Header.Set("X-API-TOKEN", "6c94fd02-8e34-4f0c-bc00-9a42e35171fc") -// req.Header.Set("Content-Type", "application/json") -// -// client := &http.Client{} -// resp, err := client.Do(req) -// if err != nil { -// panic(err) -// } -// defer resp.Body.Close() -// -// fmt.Println("response Status:", resp.Status) -// fmt.Println("response Headers:", resp.Header) -// body, _ := ioutil.ReadAll(resp.Body) -// fmt.Println("response Body:", string(body)) -// l.Stop() //logs are buffered on disk. Stop will drain the buffer -//} +//E2E test +func TestLogzioSender_E2E(t *testing.T) { + l, err := New("McvJQAtOrFUZQRFMrvSqnKSEJhjjFZHz", + "fake", + SetInMemoryQueue(true), + SetUrl("https://listener.logz.io:8071"), + SetDrainDuration(time.Second*5), + SetDebug(os.Stderr), + ) + if err != nil { + panic(err) + } + randomString := fmt.Sprint(rand.Int()) + msg := fmt.Sprintf("{ \"%s\": \"%s\"}\n{ \"%s\": \"%s\"}", "message", randomString, "type", randomString) + l.debugLog("Sending 500 logs...\n") + for i := 0; i < 500; i++ { + err := l.Send([]byte(msg)) + if err != nil { + panic(err) + } + } + <-time.After(l.drainDuration) + + apiQuery := `{ + "query": { + "bool": { + "must": [{ + "query_string": { + "query": "" + } + }, + { + "range": { + "@timestamp": { + "gte": "now-5m", + "lte": "now" + } + } + } + ] + } + }, + "size": 1000, + "from": 0 + }` + + url := "https://api.logz.io/v1/search" + queryString := fmt.Sprintf("message:%s", randomString) + query, _ := sjson.Set(apiQuery, "query.bool.must.0.query_string.query", queryString) + var jsonStr = []byte(query) + + l.debugLog("Waiting 40 seconds for ingestion\n") + time.Sleep(time.Second * 40) + + fmt.Println("URL:>", url) + req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonStr)) + req.Header.Set("X-API-TOKEN", "6c94fd02-8e34-4f0c-bc00-9a42e35171fc") + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + panic(err) + } + defer resp.Body.Close() + + fmt.Println("response Status:", resp.Status) + fmt.Println("response Headers:", resp.Header) + body, _ := ioutil.ReadAll(resp.Body) + fmt.Println("response Body:", string(body)) + l.Stop() //logs are buffered on disk. Stop will drain the buffer +} diff --git a/logziosender.go b/logziosender.go index 9639654..12fdef0 100644 --- a/logziosender.go +++ b/logziosender.go @@ -58,6 +58,7 @@ type LogzioSender struct { mux sync.Mutex token string url string + logzioType string debug io.Writer diskThreshold float32 checkDiskSpace bool @@ -77,11 +78,12 @@ type LogzioSender struct { type SenderOptionFunc func(*LogzioSender) error // New creates a new Logzio sender with a token and options -func New(token string, options ...SenderOptionFunc) (*LogzioSender, error) { +func New(token string, logzioType string, options ...SenderOptionFunc) (*LogzioSender, error) { l := &LogzioSender{ buf: bytes.NewBuffer(make([]byte, maxSize)), drainDuration: defaultDrainDuration, - url: fmt.Sprintf("%s/?token=%s", defaultHost, token), + url: fmt.Sprintf("%s/?token=%s?type=%s", defaultHost, token, logzioType), + logzioType: logzioType, token: token, dir: fmt.Sprintf("%s%s%s%s%d", os.TempDir(), string(os.PathSeparator), "logzio-buffer", string(os.PathSeparator), time.Now().UnixNano()), diskThreshold: defaultDiskThreshold, @@ -172,7 +174,7 @@ func SetTempDirectory(dir string) SenderOptionFunc { // SetUrl set the url which maybe different from the defaultUrl func SetUrl(url string) SenderOptionFunc { return func(l *LogzioSender) error { - l.url = fmt.Sprintf("%s/?token=%s", url, l.token) + l.url = fmt.Sprintf("%s/?token=%s&type=%s", url, l.token, l.logzioType) l.debugLog("logziosender.go: Setting url to %s\n", l.url) return nil }