Skip to content

Commit

Permalink
feat(client): blocking LineSenderPool
Browse files Browse the repository at this point in the history
  • Loading branch information
puzpuzpuz committed Aug 12, 2024
1 parent f7f593f commit 25482c0
Show file tree
Hide file tree
Showing 3 changed files with 206 additions and 71 deletions.
13 changes: 7 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,15 @@ To connect via TCP, set the configuration string to:
**Warning: Experimental feature designed for use with HTTP senders ONLY**

Version 3 of the client introduces a `LineSenderPool`, which provides a mechanism
to cache previously-used `LineSender`s in memory so they can be reused without
having to allocate and instantiate new senders.
to pool previously-used `LineSender`s so they can be reused without having
to allocate and instantiate new senders.

A LineSenderPool is thread-safe and can be used to concurrently Acquire and Release senders
A LineSenderPool is thread-safe and can be used to concurrently obtain senders
across multiple goroutines.

Since `LineSender`s must be used in a single-threaded context, a typical pattern is to Acquire
a sender from a `LineSenderPool` at the beginning of a goroutine and use a deferred
execution block to Release the sender at the end of the goroutine.
execution block to Close the sender at the end of the goroutine.

Here is an example of the `LineSenderPool` Acquire, Release, and Close semantics:

Expand All @@ -112,7 +112,7 @@ func main() {
}
}()

sender, err := pool.Acquire(ctx)
sender, err := pool.Sender(ctx)
if err != nil {
panic(err)
}
Expand All @@ -122,7 +122,8 @@ func main() {
Float64Column("price", 123.45).
AtNow(ctx)

if err := pool.Release(ctx, sender); err != nil {
// Close call returns the sender back to the pool
if err := sender.Close(ctx); err != nil {
panic(err)
}
}
Expand Down
178 changes: 144 additions & 34 deletions sender_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ import (
"context"
"errors"
"fmt"
"math/big"
"strings"
"sync"
"sync/atomic"
"time"
)

// LineSenderPool wraps a mutex-protected slice of [LineSender]. It allows a goroutine to
Expand All @@ -38,12 +41,21 @@ import (
// WARNING: This is an experimental API that is designed to work with HTTP senders ONLY.
type LineSenderPool struct {
maxSenders int
numSenders int // number of used and free senders
conf string

closed bool

senders []LineSender
mu *sync.Mutex
freeSenders []*pooledSender
mu *sync.Mutex
cond sync.Cond // used to wake up free sender waiters
}

type pooledSender struct {
pool *LineSenderPool
wrapped LineSender
dirty bool // set to true if any of the sender calls returned an error
tick uint64 // even values stand for free sender, odd values mean in-use sender
}

// LineSenderPoolOption defines line sender pool config option.
Expand All @@ -61,11 +73,12 @@ func PoolFromConf(conf string, opts ...LineSenderPoolOption) (*LineSenderPool, e
}

pool := &LineSenderPool{
maxSenders: 64,
conf: conf,
senders: []LineSender{},
mu: &sync.Mutex{},
maxSenders: 64,
conf: conf,
freeSenders: make([]*pooledSender, 0, 64),
mu: &sync.Mutex{},
}
pool.cond = *sync.NewCond(pool.mu)

for _, opt := range opts {
opt(pool)
Expand All @@ -82,50 +95,71 @@ func WithMaxSenders(count int) LineSenderPoolOption {
}
}

// Acquire obtains a LineSender from the pool. If the pool is empty, a new
// Sender obtains a LineSender from the pool. If the pool is empty, a new
// LineSender will be instantiated using the pool's config string.
func (p *LineSenderPool) Acquire(ctx context.Context) (LineSender, error) {
// If there is already maximum number of senders obtained from the pool,
// this calls will block until one of the senders is returned back to
// the pool by calling sender.Close().
func (p *LineSenderPool) Sender(ctx context.Context) (LineSender, error) {
p.mu.Lock()
defer p.mu.Unlock()

if p.closed {
return nil, fmt.Errorf("cannot Acquire a LineSender from a closed LineSenderPool")
return nil, errors.New("cannot Acquire a LineSender from a closed LineSenderPool")
}

// We may have to wait for a free sender
for len(p.freeSenders) == 0 && p.numSenders == p.maxSenders {
p.cond.Wait()
}

if len(p.senders) > 0 {
if len(p.freeSenders) > 0 {
// Pop sender off the slice and return it
s := p.senders[len(p.senders)-1]
p.senders = p.senders[0 : len(p.senders)-1]
s := p.freeSenders[len(p.freeSenders)-1]
atomic.AddUint64(&s.tick, 1)
p.freeSenders = p.freeSenders[0 : len(p.freeSenders)-1]
return s, nil
}

return LineSenderFromConf(ctx, p.conf)
s, err := LineSenderFromConf(ctx, p.conf)
if err != nil {
return nil, err
}
p.numSenders++
ps := &pooledSender{
pool: p,
wrapped: s,
}
atomic.AddUint64(&ps.tick, 1)
return ps, nil
}

// Release flushes the LineSender and returns it back to the pool. If the pool
// is full, the sender is closed and discarded. In cases where the sender's
// flush fails, it is not added back to the pool.
func (p *LineSenderPool) Release(ctx context.Context, s LineSender) error {
// If there is an error on flush, do not add the sender back to the pool
if err := s.Flush(ctx); err != nil {
return err
func (p *LineSenderPool) free(ctx context.Context, ps *pooledSender) error {
var flushErr error

if ps.dirty {
flushErr = ps.Flush(ctx)
}

p.mu.Lock()
defer p.mu.Unlock()

for i := range p.senders {
if p.senders[i] == s {
return fmt.Errorf("LineSender %p has already been released back to the pool", s)
}
if flushErr != nil {
// Failed to flush, close and call it a day
p.numSenders--
closeErr := ps.wrapped.Close(ctx)
return fmt.Errorf("%s %w", flushErr, closeErr)
}

if p.closed || len(p.senders) >= p.maxSenders {
return s.Close(ctx)
if ps.dirty || p.closed {
// Previous error or closed pool, close and call it a day
p.numSenders--
return ps.wrapped.Close(ctx)
}

p.senders = append(p.senders, s)

p.freeSenders = append(p.freeSenders, ps)
// Notify free sender waiters, if any
p.cond.Broadcast()
return nil
}

Expand All @@ -135,23 +169,28 @@ func (p *LineSenderPool) Close(ctx context.Context) error {
p.mu.Lock()
defer p.mu.Unlock()

if p.closed {
// Already closed
return nil
}
p.closed = true

var senderErrors []error

for _, s := range p.senders {
senderErr := s.Close(ctx)
for _, ps := range p.freeSenders {
senderErr := ps.wrapped.Close(ctx)
if senderErr != nil {
senderErrors = append(senderErrors, senderErr)

}
}
p.numSenders -= len(p.freeSenders)
p.freeSenders = []*pooledSender{}

if len(senderErrors) == 0 {
return nil
}

err := fmt.Errorf("error closing one or more LineSenders in the pool")
err := errors.New("error closing one or more LineSenders in the pool")
for _, senderErr := range senderErrors {
err = fmt.Errorf("%s %w", err, senderErr)
}
Expand All @@ -170,10 +209,81 @@ func (p *LineSenderPool) IsClosed() bool {
return p.closed
}

// Len returns the numbers of cached LineSenders in the pool.
// Len returns the number of LineSenders in the pool.
func (p *LineSenderPool) Len() int {
p.mu.Lock()
defer p.mu.Unlock()

return len(p.senders)
return p.numSenders
}

func (ps *pooledSender) Table(name string) LineSender {
ps.wrapped.Table(name)
return ps
}

func (ps *pooledSender) Symbol(name, val string) LineSender {
ps.wrapped.Symbol(name, val)
return ps
}

func (ps *pooledSender) Int64Column(name string, val int64) LineSender {
ps.wrapped.Int64Column(name, val)
return ps
}

func (ps *pooledSender) Long256Column(name string, val *big.Int) LineSender {
ps.wrapped.Long256Column(name, val)
return ps
}

func (ps *pooledSender) TimestampColumn(name string, ts time.Time) LineSender {
ps.wrapped.TimestampColumn(name, ts)
return ps
}

func (ps *pooledSender) Float64Column(name string, val float64) LineSender {
ps.wrapped.Float64Column(name, val)
return ps
}

func (ps *pooledSender) StringColumn(name, val string) LineSender {
ps.wrapped.StringColumn(name, val)
return ps
}

func (ps *pooledSender) BoolColumn(name string, val bool) LineSender {
ps.wrapped.BoolColumn(name, val)
return ps
}

func (ps *pooledSender) AtNow(ctx context.Context) error {
err := ps.wrapped.AtNow(ctx)
if err != nil {
ps.dirty = true
}
return err
}

func (ps *pooledSender) At(ctx context.Context, ts time.Time) error {
err := ps.wrapped.At(ctx, ts)
if err != nil {
ps.dirty = true
}
return err
}

func (ps *pooledSender) Flush(ctx context.Context) error {
err := ps.wrapped.Flush(ctx)
if err != nil {
ps.dirty = true
}
return err
}

func (ps *pooledSender) Close(ctx context.Context) error {
if atomic.AddUint64(&ps.tick, 1)&1 == 1 {
return errors.New("double sender close")
}
return ps.pool.free(ctx, ps)
}
Loading

0 comments on commit 25482c0

Please sign in to comment.