Skip to content

Commit

Permalink
feat: rewrap certain grpc errors (#810)
Browse files Browse the repository at this point in the history
  • Loading branch information
viacheslav-rostovtsev authored Sep 15, 2022
1 parent c464fc5 commit 9842f9c
Show file tree
Hide file tree
Showing 9 changed files with 178 additions and 26 deletions.
2 changes: 1 addition & 1 deletion gapic-common/lib/gapic/call_options/retry_policy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def hash
private

def retry? error
error.is_a?(GRPC::BadStatus) && retry_codes.include?(error.code)
error.is_a?(::GRPC::BadStatus) && retry_codes.include?(error.code)
end

def delay!
Expand Down
1 change: 1 addition & 0 deletions gapic-common/lib/gapic/grpc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@
# limitations under the License.

require "grpc"
require "gapic/grpc/errors"
require "gapic/grpc/service_stub"
require "gapic/grpc/status_details"
60 changes: 60 additions & 0 deletions gapic-common/lib/gapic/grpc/errors.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

require "googleauth"
require "gapic/common/error"

module Gapic
module GRPC
##
# An error class to represent the Authorization Error.
# The GRPC layer wraps auth plugin errors in ::GRPC::Unavailable.
# This class rewraps those GRPC layer errors, presenting a correct status code.
#
class AuthorizationError < ::GRPC::Unauthenticated
end

##
# An error class that represents Deadline Exceeded error with an optional
# retry root cause.
#
# The GRPC layer throws ::GRPC::DeadlineExceeded without any context.
# If the deadline was exceeded while retrying another exception (e.g.
# ::GRPC::Unavailable), that exception could be useful for understanding
# the readon for the timeout.
#
# This exception rewraps ::GRPC::DeadlineExceeded, adding an exception
# that was being retried until the deadline was exceeded (if any) as a
# `root_cause` attribute.
#
# @!attribute [r] root_cause
# @return [Object, nil] The exception that was being retried
# when the DeadlineExceeded error occured.
#
class DeadlineExceededError < ::GRPC::DeadlineExceeded
attr_reader :root_cause

##
# @param message [String] The error message.
#
# @param root_cause [Object, nil] The exception that was being retried
# when the DeadlineExceeded error occured.
#
def initialize message, root_cause: nil
super message
@root_cause = root_cause
end
end
end
end
30 changes: 15 additions & 15 deletions gapic-common/lib/gapic/grpc/service_stub.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,20 @@ class ServiceStub
# @param grpc_stub_class [Class] gRPC stub class to create a new instance of.
# @param endpoint [String] The endpoint of the API.
# @param credentials [Google::Auth::Credentials, Signet::OAuth2::Client, String, Hash, Proc,
# GRPC::Core::Channel, GRPC::Core::ChannelCredentials] Provides the means for authenticating requests made by
# ::GRPC::Core::Channel, ::GRPC::Core::ChannelCredentials] Provides the means for authenticating requests made by
# the client. This parameter can be many types:
#
# * A `Google::Auth::Credentials` uses a the properties of its represented keyfile for authenticating requests
# made by this client.
# * A `Signet::OAuth2::Client` object used to apply the OAuth credentials.
# * A `GRPC::Core::Channel` will be used to make calls through.
# * A `GRPC::Core::ChannelCredentials` for the setting up the RPC client. The channel credentials should
# already be composed with a `GRPC::Core::CallCredentials` object.
# * A `::GRPC::Core::Channel` will be used to make calls through.
# * A `::GRPC::Core::ChannelCredentials` for the setting up the RPC client. The channel credentials should
# already be composed with a `::GRPC::Core::CallCredentials` object.
# * A `Proc` will be used as an updater_proc for the Grpc channel. The proc transforms the metadata for
# requests, generally, to give OAuth credentials.
# @param channel_args [Hash] The channel arguments. (This argument is ignored when `credentials` is
# provided as a `GRPC::Core::Channel`.)
# @param interceptors [Array<GRPC::ClientInterceptor>] An array of {GRPC::ClientInterceptor} objects that will
# provided as a `::GRPC::Core::Channel`.)
# @param interceptors [Array<::GRPC::ClientInterceptor>] An array of {::GRPC::ClientInterceptor} objects that will
# be used for intercepting calls before they are executed Interceptors are an EXPERIMENTAL API.
#
def initialize grpc_stub_class, endpoint:, credentials:, channel_args: nil, interceptors: nil
Expand All @@ -59,19 +59,19 @@ def initialize grpc_stub_class, endpoint:, credentials:, channel_args: nil, inte
interceptors = Array interceptors

@grpc_stub = case credentials
when GRPC::Core::Channel
when ::GRPC::Core::Channel
grpc_stub_class.new endpoint, nil, channel_override: credentials,
interceptors: interceptors
when GRPC::Core::ChannelCredentials, Symbol
when ::GRPC::Core::ChannelCredentials, Symbol
grpc_stub_class.new endpoint, credentials, channel_args: channel_args,
interceptors: interceptors
else
updater_proc = credentials.updater_proc if credentials.respond_to? :updater_proc
updater_proc ||= credentials if credentials.is_a? Proc
raise ArgumentError, "invalid credentials (#{credentials.class})" if updater_proc.nil?

call_creds = GRPC::Core::CallCredentials.new updater_proc
chan_creds = GRPC::Core::ChannelCredentials.new.compose call_creds
call_creds = ::GRPC::Core::CallCredentials.new updater_proc
chan_creds = ::GRPC::Core::ChannelCredentials.new.compose call_creds
grpc_stub_class.new endpoint, chan_creds, channel_args: channel_args,
interceptors: interceptors
end
Expand All @@ -88,7 +88,7 @@ def initialize grpc_stub_class, endpoint:, credentials:, channel_args: nil, inte
#
# @yield [response, operation] Access the response along with the RPC operation.
# @yieldparam response [Object] The response object.
# @yieldparam operation [GRPC::ActiveCall::Operation] The RPC operation for the response.
# @yieldparam operation [::GRPC::ActiveCall::Operation] The RPC operation for the response.
#
# @return [Object] The response object.
#
Expand All @@ -98,7 +98,7 @@ def initialize grpc_stub_class, endpoint:, credentials:, channel_args: nil, inte
# require "gapic"
# require "gapic/grpc"
#
# echo_channel = GRPC::Core::Channel.new(
# echo_channel = ::GRPC::Core::Channel.new(
# "localhost:7469", nil, :this_channel_is_insecure
# )
# echo_stub = Gapic::ServiceStub.new(
Expand All @@ -115,7 +115,7 @@ def initialize grpc_stub_class, endpoint:, credentials:, channel_args: nil, inte
# require "gapic"
# require "gapic/grpc"
#
# echo_channel = GRPC::Core::Channel.new(
# echo_channel = ::GRPC::Core::Channel.new(
# "localhost:7469", nil, :this_channel_is_insecure
# )
# echo_stub = Gapic::ServiceStub.new(
Expand All @@ -126,7 +126,7 @@ def initialize grpc_stub_class, endpoint:, credentials:, channel_args: nil, inte
# request = Google::Showcase::V1beta1::EchoRequest.new
# options = Gapic::CallOptions.new(
# retry_policy = {
# retry_codes: [GRPC::Core::StatusCodes::UNAVAILABLE]
# retry_codes: [::GRPC::Core::StatusCodes::UNAVAILABLE]
# }
# )
# response = echo_stub.call_rpc :echo, request
Expand All @@ -138,7 +138,7 @@ def initialize grpc_stub_class, endpoint:, credentials:, channel_args: nil, inte
# require "gapic"
# require "gapic/grpc"
#
# echo_channel = GRPC::Core::Channel.new(
# echo_channel = ::GRPC::Core::Channel.new(
# "localhost:7469", nil, :this_channel_is_insecure
# )
# echo_stub = Gapic::ServiceStub.new(
Expand Down
23 changes: 17 additions & 6 deletions gapic-common/lib/gapic/grpc/service_stub/rpc_call.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

require "gapic/call_options"
require "grpc/errors"

module Gapic
class ServiceStub
Expand Down Expand Up @@ -45,7 +46,7 @@ def initialize stub_method
#
# @yield [response, operation] Access the response along with the RPC operation.
# @yieldparam response [Object] The response object.
# @yieldparam operation [GRPC::ActiveCall::Operation] The RPC operation for the response.
# @yieldparam operation [::GRPC::ActiveCall::Operation] The RPC operation for the response.
#
# @return [Object] The response object.
#
Expand All @@ -55,7 +56,7 @@ def initialize stub_method
# require "gapic"
# require "gapic/grpc"
#
# echo_channel = GRPC::Core::Channel.new(
# echo_channel = ::GRPC::Core::Channel.new(
# "localhost:7469", nil, :this_channel_is_insecure
# )
# echo_stub = Gapic::ServiceStub.new(
Expand All @@ -73,7 +74,7 @@ def initialize stub_method
# require "gapic"
# require "gapic/grpc"
#
# echo_channel = GRPC::Core::Channel.new(
# echo_channel = ::GRPC::Core::Channel.new(
# "localhost:7469", nil, :this_channel_is_insecure
# )
# echo_stub = Gapic::ServiceStub.new(
Expand All @@ -85,7 +86,7 @@ def initialize stub_method
# request = Google::Showcase::V1beta1::EchoRequest.new
# options = Gapic::CallOptions.new(
# retry_policy = {
# retry_codes: [GRPC::Core::StatusCodes::UNAVAILABLE]
# retry_codes: [::GRPC::Core::StatusCodes::UNAVAILABLE]
# }
# )
# response = echo_call.call request, options: options
Expand All @@ -96,7 +97,7 @@ def initialize stub_method
# require "gapic"
# require "gapic/grpc"
#
# echo_channel = GRPC::Core::Channel.new(
# echo_channel = ::GRPC::Core::Channel.new(
# "localhost:7469", nil, :this_channel_is_insecure
# )
# echo_stub = Gapic::ServiceStub.new(
Expand All @@ -116,13 +117,23 @@ def call request, options: nil
deadline = calculate_deadline options
metadata = options.metadata

retried_exception = nil
begin
operation = stub_method.call request, deadline: deadline, metadata: metadata, return_op: true
response = operation.execute
yield response, operation if block_given?
response
rescue ::GRPC::DeadlineExceeded => e
raise Gapic::GRPC::DeadlineExceededError.new e.message, root_cause: retried_exception
rescue StandardError => e
retry if check_retry?(deadline) && options.retry_policy.call(e)
if e.is_a?(::GRPC::Unavailable) && /Signet::AuthorizationError/ =~ e.message
e = Gapic::GRPC::AuthorizationError.new e.message.gsub(%r{^\d+:}, "")
end

if check_retry?(deadline) && options.retry_policy.call(e)
retried_exception = e
retry
end

raise e
end
Expand Down
2 changes: 1 addition & 1 deletion gapic-common/lib/gapic/headers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ module Headers
# @param lib_version [String] The client library version.
# @param gax_version [String] The Gapic version. Defaults to `Gapic::Common::VERSION`.
# @param gapic_version [String] The Gapic version.
# @param grpc_version [String] The GRPC version. Defaults to `GRPC::VERSION`.
# @param grpc_version [String] The GRPC version. Defaults to `::GRPC::VERSION`.
# @param rest_version [String] The Rest Library (Faraday) version. Defaults to `Faraday::VERSION`.
# @param transports_version_send [Array] Which transports to send versions for.
# Allowed values to contain are:
Expand Down
6 changes: 3 additions & 3 deletions gapic-common/lib/gapic/paged_enumerable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class PagedEnumerable
# @param method_name [Symbol] The RPC method name.
# @param request [Object] The request object.
# @param response [Object] The response object.
# @param operation [GRPC::ActiveCall::Operation] The RPC operation for the response.
# @param operation [::GRPC::ActiveCall::Operation] The RPC operation for the response.
# @param options [Gapic::CallOptions] The options for making the RPC call.
# @param format_resource [Proc] A Proc object to format the resource object. The Proc should accept response as an
# argument, and return a formatted resource object. Optional.
Expand Down Expand Up @@ -195,7 +195,7 @@ def verify_response!
# @attribute [r] response
# @return [Object] the response object for the page.
# @attribute [r] operation
# @return [GRPC::ActiveCall::Operation] the RPC operation for the page.
# @return [::GRPC::ActiveCall::Operation] the RPC operation for the page.
class Page
include Enumerable
attr_reader :response
Expand All @@ -205,7 +205,7 @@ class Page
# @private
# @param response [Object] The response object for the page.
# @param resource_field [String] The name of the field in response which holds the resources.
# @param operation [GRPC::ActiveCall::Operation] the RPC operation for the page.
# @param operation [::GRPC::ActiveCall::Operation] the RPC operation for the page.
# @param format_resource [Proc] A Proc object to format the resource object. The Proc should accept response as an
# argument, and return a formatted resource object. Optional.
#
Expand Down
33 changes: 33 additions & 0 deletions gapic-common/test/gapic/grpc/rpc_call/raise_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,37 @@ def test_wont_wrap_non_grpc_errors
assert_kind_of Time, deadline_arg
assert_equal 1, call_count
end

##
# Tests that if a layer underlying the RpcCall throws a ::GRPC::Unavailable
# that contains a Signet::AuthorizationError in its text,
# it gets extracted and rewrapped into a G
def test_will_rewrap_signet_errors
signet_error_text = <<-SIGNET
<Signet::AuthorizationError: Authorization failed. Server message:
{
"error": "invalid_grant",
"error_description": "Bad Request"
}>
SIGNET

unauth_error_text = "#{::GRPC::Core::StatusCodes::UNAUTHENTICATED}:#{signet_error_text}"

api_meth_stub = proc do |*_args|
raise GRPC::Unavailable.new(signet_error_text)
end

rpc_call = Gapic::ServiceStub::RpcCall.new(
api_meth_stub
)

ex = assert_raises Gapic::GRPC::AuthorizationError do
rpc_call.call Object.new
end

assert_equal ::GRPC::Core::StatusCodes::UNAUTHENTICATED, ex.code
assert_equal unauth_error_text, ex.message
refute_nil ex.cause
assert_kind_of GRPC::Unavailable, ex.cause
end
end
47 changes: 47 additions & 0 deletions gapic-common/test/gapic/grpc/rpc_call/retry/raise_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,53 @@ def test_times_out
sleep_mock.verify
end

##
# Tests that if a layer underlying the RpcCall repeatedly throws a retriable exception
# and then throws a DeadlineExceeded, the two exceptions get wrapped in a Gapic::GRPC::DeadlineExceededError
#
def test_deadline_exceeded
call_count = 0
err_message = "foo"
unavailable_err_message = "#{GRPC::Core::StatusCodes::UNAVAILABLE}:#{err_message}"

api_meth_stub = proc do |deadline: nil, **_kwargs|
call_count += 1
raise GRPC::DeadlineExceeded if call_count == 3
raise GRPC::Unavailable.new(err_message)
end

rpc_call = Gapic::ServiceStub::RpcCall.new api_meth_stub

sleep_mock = Minitest::Mock.new
sleep_mock.expect :sleep, nil, [1]
sleep_mock.expect :sleep, nil, [1 * 1.3]
sleep_proc = ->(count) { sleep_mock.sleep count }

options = Gapic::CallOptions.new(
timeout: 300,
retry_policy: { retry_codes: [GRPC::Core::StatusCodes::UNAVAILABLE] }
)

Kernel.stub :sleep, sleep_proc do
ex = assert_raises Gapic::GRPC::DeadlineExceededError do
rpc_call.call Object.new, options: options
end

assert_equal 3, call_count

assert_equal ::GRPC::Core::StatusCodes::DEADLINE_EXCEEDED, ex.code

refute_nil ex.cause
assert_kind_of ::GRPC::DeadlineExceeded, ex.cause

refute_nil ex.root_cause
assert_kind_of ::GRPC::Unavailable, ex.root_cause
assert_equal unavailable_err_message, ex.root_cause.message
end

sleep_mock.verify
end

def test_aborts_on_unexpected_exception
call_count = 0

Expand Down

0 comments on commit 9842f9c

Please sign in to comment.