Skip to content

Commit

Permalink
Unary client supports timeout (cartographer-project#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
gaschler authored Aug 9, 2018
1 parent 4bbe45e commit 771af45
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 28 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ set(ALL_LIBRARY_SRCS
async_grpc/token_file_credentials.cc)

set(ALL_TESTS
async_grpc/client_test.cc
async_grpc/server_test.cc
async_grpc/type_traits_test.cc)

Expand Down
1 change: 1 addition & 0 deletions async_grpc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ cc_library(

[cc_test(
name = src.replace("/", "_").replace(".cc", ""),
size = "small",
srcs = [src],
deps = [
":async_grpc",
Expand Down
56 changes: 30 additions & 26 deletions async_grpc/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#ifndef CPP_GRPC_CLIENT_H
#define CPP_GRPC_CLIENT_H

#include "async_grpc/common/optional.h"
#include "async_grpc/retry.h"
#include "async_grpc/rpc_handler_interface.h"
#include "async_grpc/rpc_service_method_traits.h"
Expand Down Expand Up @@ -47,30 +48,39 @@ class Client<RpcServiceMethodConcept, ::grpc::internal::RpcMethod::NORMAL_RPC> {
using ResponseType = typename RpcServiceMethod::ResponseType;

public:
Client(std::shared_ptr<::grpc::Channel> channel, RetryStrategy retry_strategy)
Client(std::shared_ptr<::grpc::Channel> channel)
: channel_(channel),
client_context_(common::make_unique<::grpc::ClientContext>()),
rpc_method_name_(RpcServiceMethod::MethodName()),
rpc_method_(rpc_method_name_.c_str(), RpcServiceMethod::StreamType,
channel_),
retry_strategy_(retry_strategy) {}
channel_) {}

Client(std::shared_ptr<::grpc::Channel> channel)
// 'timeout' is used for every 'Write' separately, but multiple retries count
// towards a single timeout.
Client(std::shared_ptr<::grpc::Channel> channel, common::Duration timeout,
RetryStrategy retry_strategy = nullptr)
: channel_(channel),
client_context_(common::make_unique<::grpc::ClientContext>()),
rpc_method_name_(RpcServiceMethod::MethodName()),
rpc_method_(rpc_method_name_.c_str(), RpcServiceMethod::StreamType,
channel_) {}
channel_),
timeout_(timeout),
retry_strategy_(retry_strategy) {}

bool Write(const RequestType& request, ::grpc::Status* status = nullptr) {
::grpc::Status internal_status;
bool result = RetryWithStrategy(retry_strategy_,
[this, &request, &internal_status] {
WriteImpl(request, &internal_status);
return internal_status;
},
[this] { Reset(); });

common::optional<std::chrono::system_clock::time_point> deadline;
if (timeout_.has_value()) {
deadline = std::chrono::system_clock::now() + timeout_.value();
}
client_context_ = ResetContext(deadline);
bool result = RetryWithStrategy(
retry_strategy_,
[this, &request, &internal_status] {
WriteImpl(request, &internal_status);
return internal_status;
},
[this, deadline] { client_context_ = ResetContext(deadline); });
if (status != nullptr) {
*status = internal_status;
}
Expand All @@ -80,8 +90,13 @@ class Client<RpcServiceMethodConcept, ::grpc::internal::RpcMethod::NORMAL_RPC> {
const ResponseType& response() { return response_; }

private:
void Reset() {
client_context_ = common::make_unique<::grpc::ClientContext>();
static std::unique_ptr<::grpc::ClientContext> ResetContext(
common::optional<std::chrono::system_clock::time_point> deadline) {
auto context = common::make_unique<::grpc::ClientContext>();
if (deadline.has_value()) {
context->set_deadline(deadline.value());
}
return context;
}

bool WriteImpl(const RequestType& request, ::grpc::Status* status) {
Expand All @@ -101,6 +116,7 @@ class Client<RpcServiceMethodConcept, ::grpc::internal::RpcMethod::NORMAL_RPC> {
std::unique_ptr<::grpc::ClientContext> client_context_;
const std::string rpc_method_name_;
const ::grpc::internal::RpcMethod rpc_method_;
common::optional<common::Duration> timeout_;

ResponseType response_;
RetryStrategy retry_strategy_;
Expand Down Expand Up @@ -143,10 +159,6 @@ class Client<RpcServiceMethodConcept,
const ResponseType& response() { return response_; }

private:
void Reset() {
client_context_ = common::make_unique<::grpc::ClientContext>();
}

bool WriteImpl(const RequestType& request, ::grpc::Status* status) {
InstantiateClientWriterIfNeeded();
return client_writer_->Write(request);
Expand Down Expand Up @@ -204,10 +216,6 @@ class Client<RpcServiceMethodConcept,
}

private:
void Reset() {
client_context_ = common::make_unique<::grpc::ClientContext>();
}

bool WriteImpl(const RequestType& request, ::grpc::Status* status) {
InstantiateClientReader(request);
return true;
Expand Down Expand Up @@ -267,10 +275,6 @@ class Client<RpcServiceMethodConcept,
}

private:
void Reset() {
client_context_ = common::make_unique<::grpc::ClientContext>();
}

bool WriteImpl(const RequestType& request, ::grpc::Status* status) {
InstantiateClientReaderWriterIfNeeded();
return client_reader_writer_->Write(request);
Expand Down
61 changes: 61 additions & 0 deletions async_grpc/client_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2018 The Cartographer Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "async_grpc/client.h"

#include "async_grpc/proto/math_service.pb.h"
#include "async_grpc/retry.h"
#include "glog/logging.h"
#include "grpc++/grpc++.h"
#include "gtest/gtest.h"

namespace async_grpc {
namespace {

struct GetEchoMethod {
static constexpr const char* MethodName() {
return "/async_grpc.proto.Math/GetEcho";
}
using IncomingType = proto::GetEchoRequest;
using OutgoingType = proto::GetEchoResponse;
};

const char* kWrongAddress = "wrong-domain-does-not-exist:50051";

TEST(ClientTest, TimesOut) {
auto client_channel = ::grpc::CreateChannel(
kWrongAddress, ::grpc::InsecureChannelCredentials());
Client<GetEchoMethod> client(client_channel, common::FromSeconds(0.1));
proto::GetEchoRequest request;
grpc::Status status;
EXPECT_FALSE(client.Write(request, &status));
EXPECT_EQ(status.error_code(), grpc::StatusCode::DEADLINE_EXCEEDED);
}

TEST(ClientTest, TimesOutWithRetries) {
auto client_channel = ::grpc::CreateChannel(
kWrongAddress, ::grpc::InsecureChannelCredentials());
Client<GetEchoMethod> client(
client_channel, common::FromSeconds(0.5),
CreateLimitedBackoffStrategy(common::FromSeconds(0.1), 1, 3));
proto::GetEchoRequest request;
grpc::Status status;
EXPECT_FALSE(client.Write(request, &status));
EXPECT_EQ(status.error_code(), grpc::StatusCode::DEADLINE_EXCEEDED);
}

} // namespace
} // namespace async_grpc
5 changes: 3 additions & 2 deletions async_grpc/server_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,9 @@ TEST_F(ServerTest, RetryWithUnrecoverableError) {
server_->Start();

Client<GetSquareMethod> client(
client_channel_, CreateUnlimitedConstantDelayStrategy(
common::FromSeconds(1), {::grpc::INTERNAL}));
client_channel_, common::FromSeconds(5),
CreateUnlimitedConstantDelayStrategy(common::FromSeconds(1),
{::grpc::INTERNAL}));
proto::GetSquareRequest request;
request.set_input(-11);
EXPECT_FALSE(client.Write(request));
Expand Down

0 comments on commit 771af45

Please sign in to comment.