Skip to content

Commit

Permalink
connect timeout as config to the pool
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal committed Feb 19, 2025
1 parent 344f309 commit ba2324d
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 35 deletions.
34 changes: 19 additions & 15 deletions go/pools/smartconnpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ var (

// PoolCloseTimeout is how long to wait for all connections to be returned to the pool during close
PoolCloseTimeout = 10 * time.Second

// ConnectTimeout is the timeout for the connection to be established
ConnectTimeout = 1 * time.Second
)

type Metrics struct {
Expand Down Expand Up @@ -94,6 +91,7 @@ type RefreshCheck func() (bool, error)

type Config[C Connection] struct {
Capacity int64
ConnectTimeout time.Duration
MaxIdleCount int64
IdleTimeout time.Duration
MaxLifetime time.Duration
Expand Down Expand Up @@ -137,9 +135,10 @@ type ConnPool[C Connection] struct {
config struct {
// connect is the callback to create a new connection for the pool
connect Connector[C]
// connectTimeout is the timeout for the connection to be established
connectTimeout time.Duration
// refresh is the callback to check whether the pool needs to be refreshed
refresh RefreshCheck

// maxCapacity is the maximum value to which capacity can be set; when the pool
// is re-opened, it defaults to this capacity
maxCapacity int64
Expand All @@ -163,6 +162,7 @@ type ConnPool[C Connection] struct {
// The pool must be ConnPool.Open before it can start giving out connections
func NewPool[C Connection](config *Config[C]) *ConnPool[C] {
pool := &ConnPool[C]{}
pool.config.connectTimeout = config.ConnectTimeout
pool.config.maxCapacity = config.Capacity
pool.config.maxIdleCount = config.MaxIdleCount
pool.config.maxLifetime.Store(config.MaxLifetime.Nanoseconds())
Expand Down Expand Up @@ -403,10 +403,8 @@ func (pool *ConnPool[C]) put(conn *Pooled[C]) {
pool.borrowed.Add(-1)

if conn == nil {
ctx, cancel := context.WithTimeout(context.Background(), ConnectTimeout)
defer cancel()
var err error
conn, err = pool.connNew(ctx)
conn, err = pool.connNew(context.Background())
if err != nil {
pool.closedConn()
return
Expand All @@ -418,9 +416,7 @@ func (pool *ConnPool[C]) put(conn *Pooled[C]) {
if lifetime > 0 && conn.timeCreated.elapsed() > lifetime {
pool.Metrics.maxLifetimeClosed.Add(1)
conn.Close()
ctx, cancel := context.WithTimeout(context.Background(), ConnectTimeout)
defer cancel()
if err := pool.connReopen(ctx, conn, conn.timeUsed.get()); err != nil {
if err := pool.connReopen(context.Background(), conn, conn.timeUsed.get()); err != nil {
pool.closedConn()
return
}
Expand Down Expand Up @@ -503,8 +499,13 @@ func (pool *ConnPool[D]) extendedMaxLifetime() time.Duration {
return time.Duration(maxLifetime) + time.Duration(rand.Uint32N(uint32(maxLifetime)))
}

func (pool *ConnPool[C]) connReopen(ctx context.Context, dbconn *Pooled[C], now time.Duration) error {
var err error
func (pool *ConnPool[C]) connReopen(ctx context.Context, dbconn *Pooled[C], now time.Duration) (err error) {
if pool.config.connectTimeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, pool.config.connectTimeout)
defer cancel()
}

dbconn.Conn, err = pool.config.connect(ctx)
if err != nil {
return err
Expand All @@ -524,6 +525,11 @@ func (pool *ConnPool[C]) connReopen(ctx context.Context, dbconn *Pooled[C], now
}

func (pool *ConnPool[C]) connNew(ctx context.Context) (*Pooled[C], error) {
if pool.config.connectTimeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, pool.config.connectTimeout)
defer cancel()
}
conn, err := pool.config.connect(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -779,11 +785,9 @@ func (pool *ConnPool[C]) closeIdleResources(now time.Time) {
if conn.timeUsed.expired(mono, timeout) {
pool.Metrics.idleClosed.Add(1)
conn.Close()
ctx, cancel := context.WithTimeout(context.Background(), ConnectTimeout)
if err := pool.connReopen(ctx, conn, mono); err != nil {
if err := pool.connReopen(context.Background(), conn, mono); err != nil {
pool.closedConn()
}
cancel()
}
}
}
Expand Down
14 changes: 8 additions & 6 deletions go/pools/smartconnpool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,10 +593,11 @@ func TestConnReopen(t *testing.T) {
var state TestState

p := NewPool(&Config[*TestConn]{
Capacity: 1,
IdleTimeout: 200 * time.Millisecond,
MaxLifetime: 10 * time.Millisecond,
LogWait: state.LogWait,
Capacity: 1,
ConnectTimeout: 5 * time.Millisecond,
IdleTimeout: 200 * time.Millisecond,
MaxLifetime: 10 * time.Millisecond,
LogWait: state.LogWait,
}).Open(newConnector(&state), nil)

defer p.Close()
Expand All @@ -620,9 +621,10 @@ func TestConnReopen(t *testing.T) {
assert.GreaterOrEqual(t, p.Metrics.IdleClosed(), int64(1))

// adding connection delay
state.chaos.delayConnect = 1500 * time.Millisecond
state.chaos.delayConnect = 10 * time.Millisecond
// wait enough to reach idle timeout and connect timeout.
time.Sleep(3 * time.Second)
time.Sleep(500 * time.Millisecond)
// no active connection should be left.
assert.Zero(t, p.Active())

}
Expand Down
3 changes: 2 additions & 1 deletion go/vt/dbconnpool/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ type ConnectionPool struct {

// NewConnectionPool creates a new ConnectionPool. The name is used
// to publish stats only.
func NewConnectionPool(name string, stats *servenv.Exporter, capacity int, idleTimeout time.Duration, maxLifetime time.Duration, dnsResolutionFrequency time.Duration) *ConnectionPool {
func NewConnectionPool(name string, stats *servenv.Exporter, capacity int, connectTimeout time.Duration, idleTimeout time.Duration, maxLifetime time.Duration, dnsResolutionFrequency time.Duration) *ConnectionPool {
config := smartconnpool.Config[*DBConnection]{
ConnectTimeout: connectTimeout,
Capacity: int64(capacity),
IdleTimeout: idleTimeout,
MaxLifetime: maxLifetime,
Expand Down
4 changes: 2 additions & 2 deletions go/vt/mysqlctl/fakemysqldaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type FakeMysqlDaemon struct {
// Replicating is updated when calling StartReplication /
// StopReplication (it is not used at all when calling
// ReplicationStatus, it is the test owner responsibility
//to have these two match)
// to have these two match)
Replicating bool

// IOThreadRunning is always true except in one testcase where
Expand Down Expand Up @@ -211,7 +211,7 @@ func NewFakeMysqlDaemon(db *fakesqldb.DB) *FakeMysqlDaemon {
Version: "8.0.32",
}
if db != nil {
result.appPool = dbconnpool.NewConnectionPool("AppConnPool", nil, 5, time.Minute, 0, 0)
result.appPool = dbconnpool.NewConnectionPool("AppConnPool", nil, 5, time.Minute, time.Minute, 0, 0)
result.appPool.Open(dbconfigs.New(db.ConnParams()))
}
return result
Expand Down
5 changes: 3 additions & 2 deletions go/vt/mysqlctl/mysqld.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,13 @@ func NewMysqld(dbcfgs *dbconfigs.DBConfigs) *Mysqld {
dbcfgs: dbcfgs,
}

connectTimeout := time.Duration(dbcfgs.ConnectTimeoutMilliseconds) * time.Millisecond
// Create and open the connection pool for dba access.
result.dbaPool = dbconnpool.NewConnectionPool("DbaConnPool", nil, dbaPoolSize, DbaIdleTimeout, 0, PoolDynamicHostnameResolution)
result.dbaPool = dbconnpool.NewConnectionPool("DbaConnPool", nil, dbaPoolSize, connectTimeout, DbaIdleTimeout, 0, PoolDynamicHostnameResolution)
result.dbaPool.Open(dbcfgs.DbaWithDB())

// Create and open the connection pool for app access.
result.appPool = dbconnpool.NewConnectionPool("AppConnPool", nil, appPoolSize, appIdleTimeout, 0, PoolDynamicHostnameResolution)
result.appPool = dbconnpool.NewConnectionPool("AppConnPool", nil, appPoolSize, connectTimeout, appIdleTimeout, 0, PoolDynamicHostnameResolution)
result.appPool.Open(dbcfgs.AppWithDB())

/*
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vttablet/tabletserver/connpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func NewPool(env tabletenv.Env, name string, cfg tabletenv.ConnPoolConfig) *Pool

config := smartconnpool.Config[*Conn]{
Capacity: int64(cfg.Size),
ConnectTimeout: cfg.ConnectTimeout,
IdleTimeout: cfg.IdleTimeout,
MaxIdleCount: int64(cfg.MaxIdleCount),
MaxLifetime: cfg.MaxLifetime,
Expand All @@ -85,7 +86,7 @@ func NewPool(env tabletenv.Env, name string, cfg tabletenv.ConnPoolConfig) *Pool
cp.ConnPool = smartconnpool.NewPool(&config)
cp.ConnPool.RegisterStats(env.Exporter(), name)

cp.dbaPool = dbconnpool.NewConnectionPool("", env.Exporter(), 1, config.IdleTimeout, config.MaxLifetime, 0)
cp.dbaPool = dbconnpool.NewConnectionPool("", env.Exporter(), 1, config.ConnectTimeout, config.IdleTimeout, config.MaxLifetime, 0)

return cp
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/repltracker/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ func newHeartbeatWriter(env tabletenv.Env, alias *topodatapb.TabletAlias) *heart
errorLog: logutil.NewThrottledLogger("HeartbeatWriter", 60*time.Second),
// We make this pool size 2; to prevent pool exhausted
// stats from incrementing continually, and causing concern
appPool: dbconnpool.NewConnectionPool("HeartbeatWriteAppPool", env.Exporter(), 2, mysqlctl.DbaIdleTimeout, 0, mysqlctl.PoolDynamicHostnameResolution),
allPrivsPool: dbconnpool.NewConnectionPool("HeartbeatWriteAllPrivsPool", env.Exporter(), 2, mysqlctl.DbaIdleTimeout, 0, mysqlctl.PoolDynamicHostnameResolution),
appPool: dbconnpool.NewConnectionPool("HeartbeatWriteAppPool", env.Exporter(), 2, 0, mysqlctl.DbaIdleTimeout, 0, mysqlctl.PoolDynamicHostnameResolution),
allPrivsPool: dbconnpool.NewConnectionPool("HeartbeatWriteAllPrivsPool", env.Exporter(), 2, 0, mysqlctl.DbaIdleTimeout, 0, mysqlctl.PoolDynamicHostnameResolution),
}

w.writeConnID.Store(-1)
Expand Down
31 changes: 25 additions & 6 deletions go/vt/vttablet/tabletserver/tabletenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,18 @@ var (

// Init must be called after flag.Parse, and before doing any other operations.
func Init() {
// IdleTimeout is only initialized for OltpReadPool , but the other pools need to inherit the value.
// TODO(sougou): Make a decision on whether this should be global or per-pool.
// IdleTimeout, MaxLifetime is only initialized for OltpReadPool. Other pools inherit the value.
currentConfig.OlapReadPool.IdleTimeout = currentConfig.OltpReadPool.IdleTimeout
currentConfig.TxPool.IdleTimeout = currentConfig.OltpReadPool.IdleTimeout
currentConfig.OlapReadPool.MaxLifetime = currentConfig.OltpReadPool.MaxLifetime
currentConfig.TxPool.MaxLifetime = currentConfig.OltpReadPool.MaxLifetime

// ConnectTimeout is global at DBConfigs level. It is inherited by all pools.
connectTimeout := time.Duration(currentConfig.DB.ConnectTimeoutMilliseconds) * time.Millisecond
currentConfig.OltpReadPool.ConnectTimeout = connectTimeout
currentConfig.OlapReadPool.ConnectTimeout = connectTimeout
currentConfig.TxPool.ConnectTimeout = connectTimeout

if enableHotRowProtection {
if enableHotRowProtectionDryRun {
currentConfig.HotRowProtection.Mode = Dryrun
Expand Down Expand Up @@ -428,6 +433,7 @@ func (cfg *TabletConfig) UnmarshalJSON(data []byte) (err error) {
// ConnPoolConfig contains the config for a conn pool.
type ConnPoolConfig struct {
Size int `json:"size,omitempty"`
ConnectTimeout time.Duration `json:"connectTimeoutSeconds,omitempty"`
Timeout time.Duration `json:"timeoutSeconds,omitempty"`
IdleTimeout time.Duration `json:"idleTimeoutSeconds,omitempty"`
MaxIdleCount int `json:"maxIdleCount,omitempty"`
Expand All @@ -440,14 +446,19 @@ func (cfg *ConnPoolConfig) MarshalJSON() ([]byte, error) {

tmp := struct {
Proxy
Timeout string `json:"timeoutSeconds,omitempty"`
IdleTimeout string `json:"idleTimeoutSeconds,omitempty"`
MaxIdleCount int `json:"maxIdleCount,omitempty"`
MaxLifetime string `json:"maxLifetimeSeconds,omitempty"`
ConnectTimeout string `json:"connectTimeoutSeconds,omitempty"`
Timeout string `json:"timeoutSeconds,omitempty"`
IdleTimeout string `json:"idleTimeoutSeconds,omitempty"`
MaxIdleCount int `json:"maxIdleCount,omitempty"`
MaxLifetime string `json:"maxLifetimeSeconds,omitempty"`
}{
Proxy: Proxy(*cfg),
}

if d := cfg.ConnectTimeout; d != 0 {
tmp.ConnectTimeout = d.String()
}

if d := cfg.Timeout; d != 0 {
tmp.Timeout = d.String()
}
Expand All @@ -466,6 +477,7 @@ func (cfg *ConnPoolConfig) MarshalJSON() ([]byte, error) {
func (cfg *ConnPoolConfig) UnmarshalJSON(data []byte) (err error) {
var tmp struct {
Size int `json:"size,omitempty"`
ConnectTimeout string `json:"connectTimeoutSeconds,omitempty"`
Timeout string `json:"timeoutSeconds,omitempty"`
IdleTimeout string `json:"idleTimeoutSeconds,omitempty"`
MaxIdleCount int `json:"maxIdleCount,omitempty"`
Expand All @@ -477,6 +489,13 @@ func (cfg *ConnPoolConfig) UnmarshalJSON(data []byte) (err error) {
return err
}

if tmp.ConnectTimeout != "" {
cfg.ConnectTimeout, err = time.ParseDuration(tmp.ConnectTimeout)
if err != nil {
return err
}
}

if tmp.Timeout != "" {
cfg.Timeout, err = time.ParseDuration(tmp.Timeout)
if err != nil {
Expand Down

0 comments on commit ba2324d

Please sign in to comment.