diff --git a/host_source.go b/host_source.go index 31132e38f..ea5c5980a 100644 --- a/host_source.go +++ b/host_source.go @@ -409,6 +409,11 @@ func (h *HostInfo) IsUp() bool { return h != nil && h.State() == NodeUp } +func (h *HostInfo) IsBusy(s *Session) bool { + pool, ok := s.pool.getPool(h) + return ok && h != nil && pool.Size() >= MAX_IN_FLIGHT_THRESHOLD +} + func (h *HostInfo) HostnameAndPort() string { h.mu.Lock() defer h.mu.Unlock() diff --git a/policies.go b/policies.go index 70ea00164..f679d7ae5 100644 --- a/policies.go +++ b/policies.go @@ -391,6 +391,16 @@ func ShuffleReplicas() func(*tokenAwareHostPolicy) { } } +// AvoidSlowReplicas enabled avoiding slow replicas +// +// TokenAwareHostPolicy normally does not check how busy replica is, with avoidSlowReplicas enabled it avoids replicas +// if they have equal or more than MAX_IN_FLIGHT_THRESHOLD connections in flight +func AvoidSlowReplicas() func(policy *tokenAwareHostPolicy) { + return func(t *tokenAwareHostPolicy) { + t.avoidSlowReplicas = true + } +} + // NonLocalReplicasFallback enables fallback to replicas that are not considered local. // // TokenAwareHostPolicy used with DCAwareHostPolicy fallback first selects replicas by partition key in local DC, then @@ -424,6 +434,8 @@ type clusterMeta struct { tokenRing *tokenRing } +const MAX_IN_FLIGHT_THRESHOLD int = 10 + type tokenAwareHostPolicy struct { fallback HostSelectionPolicy getKeyspaceMetadata func(keyspace string) (*KeyspaceMetadata, error) @@ -443,6 +455,8 @@ type tokenAwareHostPolicy struct { // Experimental, this interface and use may change tablets cowTabletList + + avoidSlowReplicas bool } func (t *tokenAwareHostPolicy) Init(s *Session) { @@ -687,6 +701,21 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost { } } + if s := qry.GetSession(); s != nil && t.avoidSlowReplicas { + healthyReplicas := make([]*HostInfo, 0, len(replicas)) + unhealthyReplicas := make([]*HostInfo, 0, len(replicas)) + + for _, h := range replicas { + if h.IsBusy(s) { + unhealthyReplicas = append(unhealthyReplicas, h) + } else { + healthyReplicas = append(healthyReplicas, h) + } + } + + replicas = append(healthyReplicas, unhealthyReplicas...) + } + var ( fallbackIter NextHost i, j, k int