Skip to content

Commit

Permalink
fix: bad pointer
Browse files Browse the repository at this point in the history
  • Loading branch information
euskadi31 committed May 4, 2023
1 parent 9e42592 commit d5352da
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 47 deletions.
79 changes: 47 additions & 32 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@ type client struct {
retryInterval time.Duration
retrySize int
httpClient *http.Client
msgs chan Event
events []Event
msgs chan *Event
events []*Event
retries chan *Payload
quit chan struct{}
shutdown chan struct{}
quitCh chan struct{}
shutdownCh chan struct{}
flushCh chan struct{}
mtx sync.Mutex
}

Expand All @@ -60,15 +61,16 @@ func New(key string, opts ...Option) Client {
maxRetry: 3,
retryInterval: time.Second * 1,
retrySize: 1000,
quit: make(chan struct{}),
shutdown: make(chan struct{}),
quitCh: make(chan struct{}, 1),
shutdownCh: make(chan struct{}, 1),
flushCh: make(chan struct{}, 1),
}

c.httpClient = &http.Client{
Timeout: c.timeout,
}
c.msgs = make(chan Event, c.bufferSize)
c.events = make([]Event, 0, c.bufferSize)
c.msgs = make(chan *Event, c.bufferSize)
c.events = []*Event{} /*make(, 0, c.bufferSize)*/
c.retries = make(chan *Payload, c.retrySize)

for _, opt := range opts {
Expand All @@ -80,14 +82,27 @@ func New(key string, opts ...Option) Client {
return c
}

func (c *client) addEvent(event *Event) {
c.mtx.Lock()
defer c.mtx.Unlock()

c.events = append(c.events, event)

if len(c.events) == c.bufferSize {
c.flushCh <- struct{}{}
}
}

func (c *client) loop() {
defer close(c.shutdown)
defer close(c.shutdownCh)

tick := time.NewTicker(c.interval)
defer tick.Stop()

for {
select {
case <-c.flushCh:
c.flush()
case payload := <-c.retries:
if err := c.sendBatch(payload); err != nil {
if payload.Attempts > c.maxRetry {
Expand All @@ -99,28 +114,20 @@ func (c *client) loop() {
c.retries <- payload
}
case event := <-c.msgs:
c.events = append(c.events, event)

if len(c.events) == c.bufferSize {
c.flush()
}
c.addEvent(event)

case <-tick.C:
c.flush()

case <-c.quit:
case <-c.quitCh:
log.Debug().Msg("exit requested - draining messages")

// Drain the msg channel, we have to close it first so no more
// messages can be pushed and otherwise the loop would never end.
close(c.msgs)

for event := range c.msgs {
c.events = append(c.events, event)

if len(c.events) == cap(c.events) {
c.flush()
}
c.addEvent(event)
}

c.flush()
Expand All @@ -142,16 +149,16 @@ func (c *client) loop() {

func (c *client) Close() (err error) {
defer func() {
// Always recover, a panic could be raised if `c`.quit was closed which
// Always recover, a panic could be raised if `c`.quitCh was closed which
// means the method was called more than once.
if recover() != nil {
err = ErrClosed
}
}()

close(c.quit)
close(c.quitCh)

<-c.shutdown
<-c.shutdownCh

return
}
Expand Down Expand Up @@ -208,23 +215,33 @@ func (c *client) sendBatch(payload *Payload) error {
return nil
}

func (c *client) flush() error {
func (c *client) getBatchEvents() []*Event {
c.mtx.Lock()
defer c.mtx.Unlock()

if len(c.events) == 0 {
return nil
return []*Event{}
}

end := c.batchSize
if length := len(c.events); length < end {
end = length
}

var events []Event
var events []*Event

events, c.events = c.events[0:end], c.events[end:]

return events
}

func (c *client) flush() error {
events := c.getBatchEvents()

if len(events) == 0 {
return nil
}

reqPayload := &RequestPayload{
APIKey: c.key,
Events: events,
Expand Down Expand Up @@ -262,13 +279,11 @@ func (c *client) Enqueue(event *Event) (err error) {
}
}()

c.msgs <- *event

if len(c.msgs) == cap(c.msgs) {
go func() {
err = c.flush()
}()
if len(c.msgs) == (cap(c.msgs) - 1) {
c.flushCh <- struct{}{}
}

c.msgs <- event

return
}
120 changes: 106 additions & 14 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func TestClient(t *testing.T) {
c := New(
"foo",
WithURL(ts.URL),
WithTimeout(time.Second*2),
WithInterval(time.Second*5),
WithTimeout(time.Second*1),
WithInterval(time.Millisecond*100),
WithBatchSize(2),
WithBufferSize(2),
WithMaxRetry(2),
Expand Down Expand Up @@ -107,8 +107,8 @@ func TestClientWithRetry(t *testing.T) {
c := New(
"foo",
WithURL(ts.URL),
WithTimeout(time.Second*2),
WithInterval(time.Second*5),
WithTimeout(time.Second*1),
WithInterval(time.Millisecond*100),
WithBatchSize(2),
WithBufferSize(2),
WithMaxRetry(2),
Expand Down Expand Up @@ -151,8 +151,8 @@ func TestClientDroppedMessage(t *testing.T) {
c := New(
"foo",
WithURL(ts.URL),
WithTimeout(time.Second*2),
WithInterval(time.Second*5),
WithTimeout(time.Second*1),
WithInterval(time.Millisecond*100),
WithBatchSize(2),
WithBufferSize(2),
WithMaxRetry(2),
Expand Down Expand Up @@ -208,20 +208,20 @@ func TestClientWithMultipleEvents(t *testing.T) {
case 0:
assert.Equal(t, 2, len(msg.Events))
case 1:
assert.Equal(t, 1, len(msg.Events))
assert.Equal(t, 2, len(msg.Events))
}
}))
defer ts.Close()

c := New(
"foo",
WithURL(ts.URL),
WithTimeout(time.Second*2),
WithInterval(time.Second*5),
WithTimeout(time.Second*1),
WithInterval(time.Millisecond*500),
WithBatchSize(2),
WithBufferSize(2),
WithBufferSize(3),
WithMaxRetry(2),
WithRetryInterval(time.Second*5),
WithRetryInterval(time.Millisecond*100),
)
defer c.Close()

Expand Down Expand Up @@ -267,6 +267,20 @@ func TestClientWithMultipleEvents(t *testing.T) {
})
assert.NoError(t, err)

err = c.Enqueue(&Event{
UserID: "f892be22-8f8e-445d-83b0-af199b9a5c72",
DeviceID: "0a16e988-8f70-4877-bdc6-08997832cfff",
Timestamp: 1643367217,
EventType: "user.created",
Platform: "ios",
OSName: "iOS",
OSVersion: "15.2.1",
DeviceModel: "iPhone13,3",
Language: "fr-FR",
InsertID: "a5461410-6b12-4a7a-905d-166cc00af4b2",
})
assert.NoError(t, err)

wg.Wait()
}

Expand Down Expand Up @@ -298,12 +312,12 @@ func TestClientClose(t *testing.T) {
c := New(
"foo",
WithURL(ts.URL),
WithTimeout(time.Second*2),
WithInterval(time.Second*10),
WithTimeout(time.Second*1),
WithInterval(time.Second*2),
WithBatchSize(2),
WithBufferSize(2),
WithMaxRetry(2),
WithRetryInterval(time.Second*5),
WithRetryInterval(time.Second*2),
)

err := c.Enqueue(&Event{
Expand All @@ -324,3 +338,81 @@ func TestClientClose(t *testing.T) {

wg.Wait()
}

func TestClientGetBatchEvents(t *testing.T) {

c := &client{
timeout: time.Second * 1,
interval: time.Second * 10,
batchSize: 2,
bufferSize: 4,
maxRetry: 3,
retryInterval: time.Second * 1,
retrySize: 1000,
}

c.events = []*Event{
{
UserID: "f892be22-8f8e-445d-83b0-af199b9a5c71",
},
{
UserID: "f892be22-8f8e-445d-83b0-af199b9a5c72",
},
{
UserID: "f892be22-8f8e-445d-83b0-af199b9a5c73",
},
}

events := c.getBatchEvents()

assert.Equal(t, 2, len(events))

events = c.getBatchEvents()

assert.Equal(t, 1, len(events))
}

/*
func TestClientRace(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
}))
defer ts.Close()
c := New(
"foo",
WithURL(ts.URL),
WithTimeout(time.Second*2),
WithInterval(time.Second*10),
WithRetryInterval(time.Second*5),
).(*client)
for i := 0; i < 2000; i++ {
now := time.Now().Unix()
id := uuid.New().String()
err := c.Enqueue(&Event{
UserID: "f892be22-8f8e-445d-83b0-af199b9a5c72",
DeviceID: "0a16e988-8f70-4877-bdc6-08997832cfff",
Timestamp: now,
EventType: "user.created",
Platform: "ios",
OSName: "iOS",
OSVersion: "15.2.1",
DeviceModel: "iPhone13,3",
Language: "fr-FR",
InsertID: id,
})
assert.NoError(t, err)
}
assert.NoError(t, c.Close())
wait:
for {
if len(c.msgs) == 0 && len(c.events) == 0 && len(c.retries) == 0 {
break wait
}
}
}
*/
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZbaA40=
github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y=
Expand Down
2 changes: 1 addition & 1 deletion payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import "fmt"

type RequestPayload struct {
APIKey string `json:"api_key"`
Events []Event `json:"events"`
Events []*Event `json:"events"`
Options *PayloadOptions `json:"options,omitempty"`
}

Expand Down

0 comments on commit d5352da

Please sign in to comment.