Skip to content

Commit

Permalink
Add a retry throttle (#1689)
Browse files Browse the repository at this point in the history
Motivation:

To implement retries and hedging, transports need to be able to throttle
attempts to svoid overloading servers. The uses a token based system
where successful requests, ones which end with non-retryable status
code, add tokens and those which fail remove tokens from the system.
Successful requests add 1 token, failed requests remove `tokenRatio`
tokens (typically less than 1).

Modification:

- Implement a retry throttle
- Add a requirement to the `ClientTransport`

Result:

Retries can be throttled.
  • Loading branch information
glbrntt authored Oct 26, 2023
1 parent e595df4 commit f37ce30
Show file tree
Hide file tree
Showing 3 changed files with 229 additions and 0 deletions.
7 changes: 7 additions & 0 deletions Sources/GRPCCore/Transport/ClientTransport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ public protocol ClientTransport: Sendable {
associatedtype Inbound: (AsyncSequence & Sendable) where Inbound.Element == RPCResponsePart
associatedtype Outbound: ClosableRPCWriterProtocol<RPCRequestPart>

/// Returns a throttle which gRPC uses to determine whether retries can be executed.
///
/// Client transports don't need to implement the throttle or interact with it beyond its
/// creation. gRPC will record the results of requests to determine whether retries can be
/// performed.
var retryThrottle: RetryThrottle { get }

/// Establish and maintain a connection to the remote destination.
///
/// Maintains a long-lived connection, or set of connections, to a remote destination.
Expand Down
124 changes: 124 additions & 0 deletions Sources/GRPCCore/Transport/RetryThrottle.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright 2023, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/// A throttle used to rate-limit retries and hedging attempts.
///
/// gRPC prevents servers from being overloaded by retries and hedging by using a token-based
/// throttling mechanism at the transport level.
///
/// Each client transport maintains a throttle for the server it is connected to and gRPC records
/// successful and failed RPC attempts. Successful attempts increment the number of tokens
/// by ``tokenRatio`` and failed attempts decrement the available tokens by one. In the context
/// of throttling, a failed attempt is one where the server terminates the RPC with a status code
/// which is retryable or non fatal (as defined by ``RetryPolicy/retryableStatusCodes`` and
/// ``HedgingPolicy/nonFatalStatusCodes``) or when the client receives a pushback response from
/// the server.
///
/// See also [gRFC A6: client retries](https://github.com/grpc/proposal/blob/master/A6-client-retries.md).
public struct RetryThrottle: Sendable {
// Note: only three figures after the decimal point from the original token ratio are used so
// all computation is done a scaled number of tokens (tokens * 1000). This allows us to do all
// computation in integer space.

/// The number of tokens available, multiplied by 1000.
private let scaledTokensAvailable: LockedValueBox<Int>
/// The number of tokens, multiplied by 1000.
private let scaledTokenRatio: Int
/// The maximum number of tokens, multiplied by 1000.
private let scaledMaximumTokens: Int
/// The retry threshold, multiplied by 1000. If ``scaledTokensAvailable`` is above this then
/// retries are permitted.
private let scaledRetryThreshold: Int

/// Returns the throttling token ratio.
///
/// The number of tokens held by the throttle is incremented by this value for each successful
/// response. In the context of throttling, a successful response is one which:
/// - receives metadata from the server, or
/// - is terminated with a non-retryable or fatal status code.
///
/// If the response is a pushback response then it is not considered to be successful, even if
/// either of the preceding conditions are met.
public var tokenRatio: Double {
Double(self.scaledTokenRatio) / 1000
}

/// The maximum number of tokens the throttle may hold.
public var maximumTokens: Int {
self.scaledMaximumTokens / 1000
}

/// The number of tokens the throttle currently has.
///
/// If this value is less than or equal to the retry threshold (defined as `maximumTokens / 2`)
/// then RPCs will not be retried and hedging will be disabled.
public var tokens: Double {
self.scaledTokensAvailable.withLockedValue {
Double($0) / 1000
}
}

/// Returns whether retries and hedging are permitted at this time.
public var isRetryPermitted: Bool {
self.scaledTokensAvailable.withLockedValue {
$0 > self.scaledRetryThreshold
}
}

/// Create a new throttle.
///
/// - Parameters:
/// - maximumTokens: The maximum number of tokens available. Must be in the range `1...1000`.
/// - tokenRatio: The number of tokens to increment the available tokens by for successful
/// responses. See the documentation on this type for a description of what counts as a
/// successful response. Note that only three decimal places are used from this value.
/// - Precondition: `maximumTokens` must be in the range `1...1000`.
/// - Precondition: `tokenRatio` must be `>= 0.001`.
public init(maximumTokens: Int, tokenRatio: Double) {
precondition(
(1 ... 1000).contains(maximumTokens),
"maximumTokens must be in the range 1...1000 (is \(maximumTokens))"
)

let scaledTokenRatio = Int(tokenRatio * 1000)
precondition(scaledTokenRatio > 0, "tokenRatio must be >= 0.001 (is \(tokenRatio))")

let scaledTokens = maximumTokens * 1000
self.scaledMaximumTokens = scaledTokens
self.scaledRetryThreshold = scaledTokens / 2
self.scaledTokenRatio = scaledTokenRatio
self.scaledTokensAvailable = LockedValueBox(scaledTokens)
}

/// Records a success, adding a token to the throttle.
@usableFromInline
func recordSuccess() {
self.scaledTokensAvailable.withLockedValue { value in
value = min(self.scaledMaximumTokens, value &+ self.scaledTokenRatio)
}
}

/// Records a failure, removing tokens from the throttle.
/// - Returns: Whether retries will now be throttled.
@usableFromInline
@discardableResult
func recordFailure() -> Bool {
self.scaledTokensAvailable.withLockedValue { value in
value = max(0, value &- 1000)
return value <= self.scaledRetryThreshold
}
}
}
98 changes: 98 additions & 0 deletions Tests/GRPCCoreTests/Transport/RetryThrottleTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2023, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import XCTest

@testable import GRPCCore

final class RetryThrottleTests: XCTestCase {
func testThrottleOnInit() {
let throttle = RetryThrottle(maximumTokens: 10, tokenRatio: 0.1)
// Start with max tokens, so permitted.
XCTAssertTrue(throttle.isRetryPermitted)
XCTAssertEqual(throttle.maximumTokens, 10)
XCTAssertEqual(throttle.tokens, 10)
XCTAssertEqual(throttle.tokenRatio, 0.1)
}

func testThrottleIgnoresMoreThanThreeDecimals() {
let throttle = RetryThrottle(maximumTokens: 10, tokenRatio: 0.1239)
XCTAssertEqual(throttle.tokenRatio, 0.123)
}

func testFailureReducesTokens() {
let throttle = RetryThrottle(maximumTokens: 10, tokenRatio: 0.1)
XCTAssertEqual(throttle.tokens, 10)
XCTAssert(throttle.isRetryPermitted)

throttle.recordFailure()
XCTAssertEqual(throttle.tokens, 9)
XCTAssert(throttle.isRetryPermitted)

throttle.recordFailure()
XCTAssertEqual(throttle.tokens, 8)
XCTAssert(throttle.isRetryPermitted)

throttle.recordFailure()
XCTAssertEqual(throttle.tokens, 7)
XCTAssert(throttle.isRetryPermitted)

throttle.recordFailure()
XCTAssertEqual(throttle.tokens, 6)
XCTAssert(throttle.isRetryPermitted)

// Drop to threshold, retries no longer allowed.
throttle.recordFailure()
XCTAssertEqual(throttle.tokens, 5)
XCTAssertFalse(throttle.isRetryPermitted)
}

func testTokensCantDropBelowZero() {
let throttle = RetryThrottle(maximumTokens: 10, tokenRatio: 0.1)
for _ in 0 ..< 1000 {
throttle.recordFailure()
XCTAssertGreaterThanOrEqual(throttle.tokens, 0)
}
XCTAssertEqual(throttle.tokens, 0)
}

func testSuccessIncreasesTokens() {
let throttle = RetryThrottle(maximumTokens: 10, tokenRatio: 0.1)

// Drop to zero.
for _ in 0 ..< 10 {
throttle.recordFailure()
}
XCTAssertEqual(throttle.tokens, 0)

// Start recording successes.
throttle.recordSuccess()
XCTAssertEqual(throttle.tokens, 0.1)

throttle.recordSuccess()
XCTAssertEqual(throttle.tokens, 0.2)

throttle.recordSuccess()
XCTAssertEqual(throttle.tokens, 0.3)
}

func testTokensCantRiseAboveMax() {
let throttle = RetryThrottle(maximumTokens: 10, tokenRatio: 0.1)
XCTAssertEqual(throttle.tokens, 10)
throttle.recordSuccess()
XCTAssertEqual(throttle.tokens, 10)
}
}

0 comments on commit f37ce30

Please sign in to comment.