Skip to content

Commit

Permalink
Add support for log type
Browse files Browse the repository at this point in the history
  • Loading branch information
yotamloe committed Aug 10, 2021
1 parent 60d08ae commit abec779
Show file tree
Hide file tree
Showing 5 changed files with 279 additions and 86 deletions.
8 changes: 1 addition & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,9 @@ module github.com/logzio/logzio-go
go 1.15

require (
github.com/StackExchange/wmi v1.2.0 // indirect
github.com/beeker1121/goque v2.1.0+incompatible
github.com/go-ole/go-ole v1.2.5 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/shirou/gopsutil v0.0.0-20190323131628-2cbc9195c892 // indirect
github.com/shirou/gopsutil/v3 v3.21.6
github.com/syndtr/goleveldb v1.0.0 // indirect
github.com/tidwall/gjson v1.8.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/tidwall/sjson v1.1.7
go.uber.org/atomic v1.9.0
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect
)
5 changes: 1 addition & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 h1:fLjPD/aNc3UIOA6tDi6QXUemppXK3P9BI7mr2hd6gx8=
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/StackExchange/wmi v1.2.0 h1:noJEYkMQVlFCEAc+2ma5YyRhlfjcWfZqk5sBRYozdyM=
github.com/StackExchange/wmi v1.2.0/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/beeker1121/goque v2.0.1+incompatible h1:5nJHPMqQLxUvGFc8m/NW2QzxKyc0zICmqs/JUsmEjwE=
github.com/beeker1121/goque v2.0.1+incompatible/go.mod h1:L6dOWBhDOnxUVQsb0wkLve0VCnt2xJW/MI8pdRX4ANw=
github.com/beeker1121/goque v2.1.0+incompatible h1:m5pZ5b8nqzojS2DF2ioZphFYQUqGYsDORq6uefUItPM=
Expand All @@ -20,6 +16,7 @@ github.com/go-ole/go-ole v1.2.5 h1:t4MGB5xEDZvXI+0rMjjsfBsD7yAgp/s9ZDkL1JndXwY=
github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
Expand Down
174 changes: 174 additions & 0 deletions inMemoryQueue/temp.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package inMemoryQueue

import (
queue "github.com/beeker1121/goque"
"go.uber.org/atomic"
"sync"
)

var wg sync.WaitGroup
//Node storage of queue data
type Node struct {
data []byte
prev *Node
next *Node
}

//ConcurrentQueue concurrent queue
type ConcurrentQueue struct {
//mutex lock
lock *sync.Mutex

//empty and full locks
notEnqueueing *sync.Cond
notDequeuing *sync.Cond

isEnqueueing atomic.Bool
isDequeuing atomic.Bool

//queue storage backend
backend *QueueBackend
}

func NewConcurrentQueue() *ConcurrentQueue {
queue := ConcurrentQueue{}

//init mutexes
queue.lock = &sync.Mutex{}
queue.notEnqueueing = sync.NewCond(queue.lock)
queue.notDequeuing = sync.NewCond(queue.lock)

queue.isEnqueueing.Store(false)
queue.isDequeuing.Store(false)

//init backend
queue.backend = &QueueBackend{}
queue.backend.size = make(chan int,2)
queue.backend.size <- 0
queue.backend.head = nil
queue.backend.tail = nil
queue.backend.lock = &sync.Mutex{}

return &queue
}

//QueueBackend Backend storage of the queue, a double linked list
type QueueBackend struct {
//Pointers to root and end
head *Node
tail *Node
//keep track of current size
size chan int

lock *sync.Mutex
}

func (queue *QueueBackend) createNode(data []byte) *Node {
queue.lock.Lock()
node := Node{}
node.data = data
node.next = nil
node.prev = nil
queue.lock.Unlock()
return &node
}

func (queue *QueueBackend) put(data []byte) error {

actualSize := <-queue.size

if actualSize == 0 {
//new root node
node := queue.createNode(data)
queue.head = node
queue.tail = node

queue.size <- 0 + len(data)

return nil
}
//queue non-empty append to head
currentHead := queue.head
newHead := queue.createNode(data)
newHead.next = currentHead
currentHead.prev = newHead

queue.head = currentHead
queue.size <- actualSize + (len(data))
return nil

}

func (queue *QueueBackend) pop() ([]byte, error) {
queue.lock.Lock()
currentEnd := queue.tail
newEnd := currentEnd.prev
queue.lock.Unlock()

if newEnd != nil {
newEnd.next = nil
}
queue.size <- <-queue.size - len(currentEnd.data)
actualSize := <-queue.size

if actualSize == 0 {
queue.head = nil
queue.tail = nil
}
queue.size <- actualSize
return currentEnd.data, nil
}
func (c *ConcurrentQueue) Enqueue(data []byte) (*Item, error) {
if c.isEnqueueing.Load() {
c.notEnqueueing.Wait()
}
c.isEnqueueing.Toggle()
_ = c.backend.put(data)
item := &Item{
Value: data,
}
// Signal free
c.isEnqueueing.Toggle()
c.notEnqueueing.Signal()
return item, nil
}

type Item = queue.Item

func (c *ConcurrentQueue) Dequeue() (*Item, error) {
// another goroutine is dequeuing
if c.isDequeuing.Load() {
c.notDequeuing.Wait()
}
c.isDequeuing.Toggle()
c.lock.Lock()
if c.Length() == 0 {
c.isDequeuing.Toggle()
c.notDequeuing.Signal()
c.lock.Unlock()
return nil, ErrEmpty
}
c.lock.Unlock()
data, err := c.backend.pop()
item := &Item{
Value: data,
}
// Signal free
c.isDequeuing.Toggle()
c.notDequeuing.Signal()
return item, err
}

func (c *ConcurrentQueue) Length() uint64 {

size := <-c.backend.size
c.backend.size <-size
return uint64(size)
}
func (c *ConcurrentQueue) Close() {

}

BenchmarkLogzioSenderInmemory-12 5290641 227 ns/op 78 B/op 2 allocs/op

BenchmarkLogzioSender-12 91461 15551 ns/op 928 B/op 15 allocs/op
Loading

0 comments on commit abec779

Please sign in to comment.