Skip to content

Commit

Permalink
RSDK-8081 Fix max message size limit (#264)
Browse files Browse the repository at this point in the history
  • Loading branch information
lia-viam committed Jul 9, 2024
1 parent a6b4ac1 commit 45477ae
Show file tree
Hide file tree
Showing 11 changed files with 74 additions and 18 deletions.
2 changes: 2 additions & 0 deletions src/viam/sdk/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ target_sources(viamsdk
robot/service.cpp
rpc/dial.cpp
rpc/server.cpp
rpc/private/viam_grpc_channel.cpp
services/generic.cpp
services/mlmodel.cpp
services/motion.cpp
Expand Down Expand Up @@ -161,6 +162,7 @@ target_sources(viamsdk
../../viam/sdk/robot/client.hpp
../../viam/sdk/robot/service.hpp
../../viam/sdk/rpc/dial.hpp
../../viam/sdk/rpc/message_sizes.hpp
../../viam/sdk/rpc/server.hpp
../../viam/sdk/services/generic.hpp
../../viam/sdk/services/mlmodel.hpp
Expand Down
4 changes: 2 additions & 2 deletions src/viam/sdk/robot/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
#include <boost/log/trivial.hpp>
#include <grpcpp/channel.h>
#include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/support/status.h>

Expand All @@ -26,6 +25,7 @@
#include <viam/sdk/registry/registry.hpp>
#include <viam/sdk/resource/resource.hpp>
#include <viam/sdk/rpc/dial.hpp>
#include <viam/sdk/rpc/private/viam_grpc_channel.hpp>
#include <viam/sdk/services/service.hpp>

namespace viam {
Expand Down Expand Up @@ -354,7 +354,7 @@ std::shared_ptr<RobotClient> RobotClient::at_local_socket(const std::string& add
const std::string addr = "unix://" + address;
const char* uri = addr.c_str();
const std::shared_ptr<grpc::Channel> channel =
grpc::CreateChannel(uri, grpc::InsecureChannelCredentials());
sdk::impl::create_viam_channel(uri, grpc::InsecureChannelCredentials());
auto viam_channel = std::make_shared<ViamChannel>(channel, address.c_str(), nullptr);
std::shared_ptr<RobotClient> robot = RobotClient::with_channel(viam_channel, options);
robot->should_close_channel_ = true;
Expand Down
8 changes: 4 additions & 4 deletions src/viam/sdk/rpc/dial.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
#include <boost/none.hpp>
#include <boost/optional.hpp>
#include <grpcpp/channel.h>
#include <grpcpp/create_channel.h>
#include <grpcpp/security/credentials.h>

#include <viam/api/robot/v1/robot.grpc.pb.h>
#include <viam/api/robot/v1/robot.pb.h>
#include <viam/sdk/common/exception.hpp>
#include <viam/sdk/rpc/private/viam_grpc_channel.hpp>

extern "C" void* init_rust_runtime();
extern "C" int free_rust_runtime(void* ptr);
Expand Down Expand Up @@ -110,7 +110,7 @@ std::shared_ptr<ViamChannel> ViamChannel::dial(const char* uri,
std::string address("unix://");
address += socket_path;
const std::shared_ptr<grpc::Channel> channel =
grpc::CreateChannel(address, grpc::InsecureChannelCredentials());
impl::create_viam_channel(address, grpc::InsecureChannelCredentials());
const std::unique_ptr<viam::robot::v1::RobotService::Stub> st =
viam::robot::v1::RobotService::NewStub(channel);
return std::make_shared<ViamChannel>(channel, socket_path, ptr);
Expand All @@ -125,10 +125,10 @@ const boost::optional<DialOptions>& Options::dial_options() const {
}

Credentials::Credentials(std::string payload)
: type_("robot-location-secret"), payload_(std::move(payload)){};
: type_("robot-location-secret"), payload_(std::move(payload)) {}

Credentials::Credentials(std::string type, std::string payload)
: type_(std::move(type)), payload_(std::move(payload)){};
: type_(std::move(type)), payload_(std::move(payload)) {}

} // namespace sdk
} // namespace viam
10 changes: 10 additions & 0 deletions src/viam/sdk/rpc/message_sizes.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#pragma once

namespace viam {
namespace sdk {

/// Max message size used for server builder and channel arguments.
constexpr int kMaxMessageSize = 1 << 25;

} // namespace sdk
} // namespace viam
23 changes: 23 additions & 0 deletions src/viam/sdk/rpc/private/viam_grpc_channel.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#include <viam/sdk/rpc/private/viam_grpc_channel.hpp>

#include <grpcpp/create_channel.h>
#include <grpcpp/support/channel_arguments.h>

#include <viam/sdk/rpc/message_sizes.hpp>

namespace viam {
namespace sdk {
namespace impl {

std::shared_ptr<grpc::Channel> create_viam_channel(
const grpc::string& target, const std::shared_ptr<grpc::ChannelCredentials>& credentials) {
grpc::ChannelArguments args;
args.SetMaxSendMessageSize(kMaxMessageSize);
args.SetMaxReceiveMessageSize(kMaxMessageSize);

return grpc::CreateCustomChannel(target, credentials, args);
}

} // namespace impl
} // namespace sdk
} // namespace viam
19 changes: 19 additions & 0 deletions src/viam/sdk/rpc/private/viam_grpc_channel.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#pragma once

#include <memory>

#include <grpcpp/channel.h>
#include <grpcpp/security/credentials.h>

namespace viam {
namespace sdk {
namespace impl {

/// @brief Like grpc::CreateChannel, but returns a channel suitable for transmitting messages of
/// size kMaxMessageSize.
std::shared_ptr<grpc::Channel> create_viam_channel(
const grpc::string& target, const std::shared_ptr<grpc::ChannelCredentials>& credentials);

} // namespace impl
} // namespace sdk
} // namespace viam
7 changes: 4 additions & 3 deletions src/viam/sdk/rpc/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@

#include <viam/sdk/common/exception.hpp>
#include <viam/sdk/registry/registry.hpp>
#include <viam/sdk/rpc/message_sizes.hpp>

namespace viam {
namespace sdk {

Server::Server() : builder_(std::make_unique<grpc::ServerBuilder>()) {
builder_->SetMaxReceiveMessageSize(1 << 25);
builder_->SetMaxSendMessageSize(1 << 25);
builder_->SetMaxMessageSize(1 << 25);
builder_->SetMaxReceiveMessageSize(kMaxMessageSize);
builder_->SetMaxSendMessageSize(kMaxMessageSize);
builder_->SetMaxMessageSize(kMaxMessageSize);
Registry::initialize();
for (const auto& rr : Registry::registered_resource_servers()) {
auto new_manager = std::make_shared<ResourceManager>();
Expand Down
2 changes: 1 addition & 1 deletion src/viam/sdk/tests/mocks/camera_mocks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ Camera::image_collection fake_raw_images() {
Camera::point_cloud fake_point_cloud() {
Camera::point_cloud point_cloud;
point_cloud.mime_type = "pointcloud/pcd";
std::vector<unsigned char> bytes = {'a', 'b', 'c'};
std::vector<unsigned char> bytes(1 << 24, 'a');
point_cloud.pc = bytes;
return point_cloud;
}
Expand Down
3 changes: 1 addition & 2 deletions src/viam/sdk/tests/test_robot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,8 @@ void robot_client_to_mocks_pipeline(F&& test_case) {

// Create a RobotClient to the MockRobotService over an established
// in-process gRPC channel.
grpc::ChannelArguments args;
auto test_server = TestServer(server);
auto grpc_channel = test_server.grpc_in_process_channel(args);
auto grpc_channel = test_server.grpc_in_process_channel();
auto viam_channel = std::make_shared<ViamChannel>(grpc_channel, "", nullptr);
auto client = RobotClient::with_channel(viam_channel, Options(0, boost::none));

Expand Down
9 changes: 6 additions & 3 deletions src/viam/sdk/tests/test_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include <viam/sdk/common/proto_type.hpp>
#include <viam/sdk/config/resource.hpp>
#include <viam/sdk/rpc/message_sizes.hpp>
#include <viam/sdk/spatialmath/geometry.hpp>
#include <viam/sdk/spatialmath/orientation.hpp>
#include <viam/sdk/spatialmath/orientation_types.hpp>
Expand Down Expand Up @@ -60,12 +61,14 @@ std::vector<GeometryConfig> fake_geometries() {
std::move(capsule_config)};
}

TestServer::TestServer(std::shared_ptr<Server> sdk_server) : sdk_server_(sdk_server){};
TestServer::TestServer(std::shared_ptr<Server> sdk_server) : sdk_server_(sdk_server) {}

TestServer::~TestServer() = default;

std::shared_ptr<grpc::Channel> TestServer::grpc_in_process_channel(
const grpc::ChannelArguments& args) {
std::shared_ptr<grpc::Channel> TestServer::grpc_in_process_channel() {
grpc::ChannelArguments args;
args.SetMaxSendMessageSize(kMaxMessageSize);
args.SetMaxReceiveMessageSize(kMaxMessageSize);
return sdk_server_->server_->InProcessChannel(args);
}

Expand Down
5 changes: 2 additions & 3 deletions src/viam/sdk/tests/test_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class TestServer {
TestServer(std::shared_ptr<Server> sdk_server);
~TestServer();

std::shared_ptr<grpc::Channel> grpc_in_process_channel(const grpc::ChannelArguments& args);
std::shared_ptr<grpc::Channel> grpc_in_process_channel();

private:
std::shared_ptr<Server> sdk_server_;
Expand Down Expand Up @@ -58,9 +58,8 @@ void client_to_mock_pipeline(std::shared_ptr<Resource> mock, F&& test_case) {

// Create a resource-specific client to the mock over an established
// in-process gRPC channel.
grpc::ChannelArguments args;
auto test_server = TestServer(server);
auto grpc_channel = test_server.grpc_in_process_channel(args);
auto grpc_channel = test_server.grpc_in_process_channel();

auto resource_client = Registry::lookup_resource_client(API::get<ResourceType>())
->create_rpc_client(mock->name(), std::move(grpc_channel));
Expand Down

0 comments on commit 45477ae

Please sign in to comment.