Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Misc. Improvements and Customizations #10

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
15 changes: 15 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
SHELL = /bin/bash
GO = /usr/local/go/bin/go

build:
$(GO) build ./...

setup:
$(GO) get -d ./...

test:
$(GO) generate ./...
$(GO) test ./...

install:
$(GO) install -v ./...
2 changes: 1 addition & 1 deletion ae_install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

ROOT="."
RPCGEN=$(dirname "$0")
PREFIX="github.com/kylelemons/go-rpcgen"
PREFIX="github.com/bradhe/go-rpcgen"

set -e

Expand Down
170 changes: 170 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package client

import (
"errors"
"github.com/bradhe/go-rpcgen/codec"
"math"
"net"
"net/rpc"
"sync"
"sync/atomic"
"time"
)

const (
DefaultRetryCount = 6

// Default number of seconds to timeout for all connections
DefaultTimeout = 200 * time.Millisecond
)

var (
ErrConnectionFailure = errors.New("failed to connect")
ErrClosed = errors.New("closed")
ErrInvalidPoolObject = errors.New("invalid pool object")
ErrPermanentlyShutdown = errors.New("permenantly shutdown")

// Number of seconds to use when timing out.
ConnectionTimeout = DefaultTimeout

// Number of clients to open for this connection pool.
PoolSize = 100
)

type Client struct {
wg sync.WaitGroup

addr string
pool *ConnectionPool
shutdown int32
}

func backoff(i int) time.Duration {
if i < 1 {
i = 1
}

ms := int(math.Exp2(float64(i)))
return time.Duration(ms) * time.Millisecond
}

func (c *Client) Close() {
logMessage("[go-rpcgen/client] Closing RPC client.")

// If someone else called this, we'll just wait a bit for it all to close down.
if atomic.LoadInt32(&c.shutdown) > 0 {
c.wg.Wait()
return
}

atomic.SwapInt32(&c.shutdown, 1)
c.wg.Wait()

// By adding a second number here that means that we're completely shut down.
atomic.AddInt32(&c.shutdown, 1)

// Now let's close down all of the connections in the poo in the pool.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"in the poo"? ;-)

for {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this never terminates?

c.pool.Close()
}
}

func (c *Client) create() (*rpc.Client, error) {
// If the service is shutdown, let's wait to kill it all.
if atomic.LoadInt32(&c.shutdown) > 1 {
return nil, nil
}

conn, err := net.DialTimeout("tcp", c.addr, ConnectionTimeout)

if err != nil {
return nil, err
}

co := rpc.NewClientWithCodec(codec.NewClientCodec(conn))
return co, nil
}

func (c *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) I would shorten that to args, reply interface{}

// If we're shut down, let's tell the user.
if c.shutdown > 0 {
return ErrClosed
}

// Signal that something is goig on.
c.wg.Add(1)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WaitGroups are not designed to be incremented from within the sections they're counting; the intended use is to increment before spawning a goroutine and decrementing inside the goroutine. Otherwise there's a race condition where the goroutine could be ready to go but hasn't incremented the counter yet, and Wait could return prematurely. It may be enough to increment the waitgroup before checking the shutdown atomic, but I'm not wholly sold on this mechanism for tracking operations. I don't typically find atomics to be necessary; channels are my preferred means of cleanup.

defer c.wg.Done()

// Number of times we've retried
var retry int

for {
client := c.pool.Get()

if client == nil {
return ErrConnectionFailure
}

err := client.Call(serviceMethod, args, reply)

// No error, so let's relinquish this back to the pool and get outta here.
if err == nil {
c.pool.Put(client)
break
}

// If we got here, let's see what type of error it is.
if err == rpc.ErrShutdown {
client.Close()

retry += 1
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like you'd want to just do for try := 0; ; try++ { ... } to make it easier to eyeball that the retry count does indeed increment each time the loop body is executed.


if retry > DefaultRetryCount {
return ErrPermanentlyShutdown
}

// Let's try again!
time.Sleep(backoff(retry))
} else {
client.Close()

// This means err != nil, so we just report the error.
return err
}
}

// We win the day!
return nil
}

func (c *Client) doCall(call *rpc.Call, serviceMethod string, args interface{}, reply interface{}, done chan *rpc.Call) {
err := c.Call(serviceMethod, args, reply)
call.Error = err
done <- call
}

func (c *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *rpc.Call) *rpc.Call {
call := new(rpc.Call)
call.ServiceMethod = serviceMethod
call.Args = args
call.Reply = reply
call.Done = done

// If we're shut down, let's tell the user.
if c.shutdown > 0 {
call.Error = ErrClosed
return call
}

// If we made it here, we're good.
go c.doCall(call, serviceMethod, args, reply, done)
return call
}

func NewClient(addr string) *Client {
c := new(Client)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to prefer

c := &client{
  addr: addr,
  pool: NewConnectionPool(PoolSize),
}

c.addr = addr
c.pool = NewConnectionPool(PoolSize)
c.pool.New = c.create
return c
}
63 changes: 63 additions & 0 deletions client/connection_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package client

import (
"net/rpc"
)

type ConnectionPool struct {
conns chan *rpc.Client

New func() (*rpc.Client, error)
}

func (p *ConnectionPool) open() *rpc.Client {
for {
c, err := p.New()

if IsTimeoutError(err) {
continue
}

if err != nil {
logMessage("[go-rpcgen/connection_pool] Error opening connection. %v", err)
return nil
}

return c
}

panic("unreachable")
}

func (p *ConnectionPool) Get() *rpc.Client {
select {
case c := <-p.conns:
return c
default:
return p.open()
}
}

func (p *ConnectionPool) Put(c *rpc.Client) {
select {
case p.conns <- c:
// Do nothing.
return
default:
c.Close()
}
}

func (p *ConnectionPool) Close() {
close(p.conns)

for c := range p.conns {
c.Close()
}
}

func NewConnectionPool(capacity int) *ConnectionPool {
p := new(ConnectionPool)
p.conns = make(chan *rpc.Client, capacity)
return p
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return &ConnectionPool{
  conns: make(chan *rpc.Client, capacity),
}

}
18 changes: 18 additions & 0 deletions client/logging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package client

import (
"log"
)

var (
// Set this to something non-null if you want log messages to flow.
Logger *log.Logger
)

func logMessage(format string, args ...interface{}) {
if Logger == nil {
return
}

Logger.Printf(format, args...)
}
23 changes: 23 additions & 0 deletions client/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package client

import (
"strings"
)

// Given an error, makes a crappy attempt to determine if the error is a
// timeout error.
func IsTimeoutError(err error) bool {
if err == nil {
return false
}

if strings.HasSuffix(err.Error(), "timed out") {
return true
}

if strings.HasSuffix(err.Error(), "i/o timeout") {
return true
}

return false
}
Loading