From 45477aecb490e00db36001e865ceeb225d53ecc7 Mon Sep 17 00:00:00 2001 From: lia <167905060+lia-viam@users.noreply.github.com> Date: Tue, 9 Jul 2024 16:42:37 -0400 Subject: [PATCH] RSDK-8081 Fix max message size limit (#264) --- src/viam/sdk/CMakeLists.txt | 2 ++ src/viam/sdk/robot/client.cpp | 4 ++-- src/viam/sdk/rpc/dial.cpp | 8 +++---- src/viam/sdk/rpc/message_sizes.hpp | 10 ++++++++ .../sdk/rpc/private/viam_grpc_channel.cpp | 23 +++++++++++++++++++ .../sdk/rpc/private/viam_grpc_channel.hpp | 19 +++++++++++++++ src/viam/sdk/rpc/server.cpp | 7 +++--- src/viam/sdk/tests/mocks/camera_mocks.cpp | 2 +- src/viam/sdk/tests/test_robot.cpp | 3 +-- src/viam/sdk/tests/test_utils.cpp | 9 +++++--- src/viam/sdk/tests/test_utils.hpp | 5 ++-- 11 files changed, 74 insertions(+), 18 deletions(-) create mode 100644 src/viam/sdk/rpc/message_sizes.hpp create mode 100644 src/viam/sdk/rpc/private/viam_grpc_channel.cpp create mode 100644 src/viam/sdk/rpc/private/viam_grpc_channel.hpp diff --git a/src/viam/sdk/CMakeLists.txt b/src/viam/sdk/CMakeLists.txt index 5136d59e5..7d44e638e 100644 --- a/src/viam/sdk/CMakeLists.txt +++ b/src/viam/sdk/CMakeLists.txt @@ -104,6 +104,7 @@ target_sources(viamsdk robot/service.cpp rpc/dial.cpp rpc/server.cpp + rpc/private/viam_grpc_channel.cpp services/generic.cpp services/mlmodel.cpp services/motion.cpp @@ -161,6 +162,7 @@ target_sources(viamsdk ../../viam/sdk/robot/client.hpp ../../viam/sdk/robot/service.hpp ../../viam/sdk/rpc/dial.hpp + ../../viam/sdk/rpc/message_sizes.hpp ../../viam/sdk/rpc/server.hpp ../../viam/sdk/services/generic.hpp ../../viam/sdk/services/mlmodel.hpp diff --git a/src/viam/sdk/robot/client.cpp b/src/viam/sdk/robot/client.cpp index 2f2038f6f..00f094453 100644 --- a/src/viam/sdk/robot/client.cpp +++ b/src/viam/sdk/robot/client.cpp @@ -12,7 +12,6 @@ #include #include #include -#include #include #include @@ -26,6 +25,7 @@ #include #include #include +#include #include namespace viam { @@ -354,7 +354,7 @@ std::shared_ptr RobotClient::at_local_socket(const std::string& add const std::string addr = "unix://" + address; const char* uri = addr.c_str(); const std::shared_ptr channel = - grpc::CreateChannel(uri, grpc::InsecureChannelCredentials()); + sdk::impl::create_viam_channel(uri, grpc::InsecureChannelCredentials()); auto viam_channel = std::make_shared(channel, address.c_str(), nullptr); std::shared_ptr robot = RobotClient::with_channel(viam_channel, options); robot->should_close_channel_ = true; diff --git a/src/viam/sdk/rpc/dial.cpp b/src/viam/sdk/rpc/dial.cpp index 56cdd44e2..2f4cbaa8d 100644 --- a/src/viam/sdk/rpc/dial.cpp +++ b/src/viam/sdk/rpc/dial.cpp @@ -6,12 +6,12 @@ #include #include #include -#include #include #include #include #include +#include extern "C" void* init_rust_runtime(); extern "C" int free_rust_runtime(void* ptr); @@ -110,7 +110,7 @@ std::shared_ptr ViamChannel::dial(const char* uri, std::string address("unix://"); address += socket_path; const std::shared_ptr channel = - grpc::CreateChannel(address, grpc::InsecureChannelCredentials()); + impl::create_viam_channel(address, grpc::InsecureChannelCredentials()); const std::unique_ptr st = viam::robot::v1::RobotService::NewStub(channel); return std::make_shared(channel, socket_path, ptr); @@ -125,10 +125,10 @@ const boost::optional& Options::dial_options() const { } Credentials::Credentials(std::string payload) - : type_("robot-location-secret"), payload_(std::move(payload)){}; + : type_("robot-location-secret"), payload_(std::move(payload)) {} Credentials::Credentials(std::string type, std::string payload) - : type_(std::move(type)), payload_(std::move(payload)){}; + : type_(std::move(type)), payload_(std::move(payload)) {} } // namespace sdk } // namespace viam diff --git a/src/viam/sdk/rpc/message_sizes.hpp b/src/viam/sdk/rpc/message_sizes.hpp new file mode 100644 index 000000000..11aec6380 --- /dev/null +++ b/src/viam/sdk/rpc/message_sizes.hpp @@ -0,0 +1,10 @@ +#pragma once + +namespace viam { +namespace sdk { + +/// Max message size used for server builder and channel arguments. +constexpr int kMaxMessageSize = 1 << 25; + +} // namespace sdk +} // namespace viam diff --git a/src/viam/sdk/rpc/private/viam_grpc_channel.cpp b/src/viam/sdk/rpc/private/viam_grpc_channel.cpp new file mode 100644 index 000000000..909ec6a55 --- /dev/null +++ b/src/viam/sdk/rpc/private/viam_grpc_channel.cpp @@ -0,0 +1,23 @@ +#include + +#include +#include + +#include + +namespace viam { +namespace sdk { +namespace impl { + +std::shared_ptr create_viam_channel( + const grpc::string& target, const std::shared_ptr& credentials) { + grpc::ChannelArguments args; + args.SetMaxSendMessageSize(kMaxMessageSize); + args.SetMaxReceiveMessageSize(kMaxMessageSize); + + return grpc::CreateCustomChannel(target, credentials, args); +} + +} // namespace impl +} // namespace sdk +} // namespace viam diff --git a/src/viam/sdk/rpc/private/viam_grpc_channel.hpp b/src/viam/sdk/rpc/private/viam_grpc_channel.hpp new file mode 100644 index 000000000..1f6989317 --- /dev/null +++ b/src/viam/sdk/rpc/private/viam_grpc_channel.hpp @@ -0,0 +1,19 @@ +#pragma once + +#include + +#include +#include + +namespace viam { +namespace sdk { +namespace impl { + +/// @brief Like grpc::CreateChannel, but returns a channel suitable for transmitting messages of +/// size kMaxMessageSize. +std::shared_ptr create_viam_channel( + const grpc::string& target, const std::shared_ptr& credentials); + +} // namespace impl +} // namespace sdk +} // namespace viam diff --git a/src/viam/sdk/rpc/server.cpp b/src/viam/sdk/rpc/server.cpp index 35346e24a..94bb72542 100644 --- a/src/viam/sdk/rpc/server.cpp +++ b/src/viam/sdk/rpc/server.cpp @@ -7,14 +7,15 @@ #include #include +#include namespace viam { namespace sdk { Server::Server() : builder_(std::make_unique()) { - builder_->SetMaxReceiveMessageSize(1 << 25); - builder_->SetMaxSendMessageSize(1 << 25); - builder_->SetMaxMessageSize(1 << 25); + builder_->SetMaxReceiveMessageSize(kMaxMessageSize); + builder_->SetMaxSendMessageSize(kMaxMessageSize); + builder_->SetMaxMessageSize(kMaxMessageSize); Registry::initialize(); for (const auto& rr : Registry::registered_resource_servers()) { auto new_manager = std::make_shared(); diff --git a/src/viam/sdk/tests/mocks/camera_mocks.cpp b/src/viam/sdk/tests/mocks/camera_mocks.cpp index 9fe9b5443..857936721 100644 --- a/src/viam/sdk/tests/mocks/camera_mocks.cpp +++ b/src/viam/sdk/tests/mocks/camera_mocks.cpp @@ -64,7 +64,7 @@ Camera::image_collection fake_raw_images() { Camera::point_cloud fake_point_cloud() { Camera::point_cloud point_cloud; point_cloud.mime_type = "pointcloud/pcd"; - std::vector bytes = {'a', 'b', 'c'}; + std::vector bytes(1 << 24, 'a'); point_cloud.pc = bytes; return point_cloud; } diff --git a/src/viam/sdk/tests/test_robot.cpp b/src/viam/sdk/tests/test_robot.cpp index 5122bb8e0..f454389d6 100644 --- a/src/viam/sdk/tests/test_robot.cpp +++ b/src/viam/sdk/tests/test_robot.cpp @@ -52,9 +52,8 @@ void robot_client_to_mocks_pipeline(F&& test_case) { // Create a RobotClient to the MockRobotService over an established // in-process gRPC channel. - grpc::ChannelArguments args; auto test_server = TestServer(server); - auto grpc_channel = test_server.grpc_in_process_channel(args); + auto grpc_channel = test_server.grpc_in_process_channel(); auto viam_channel = std::make_shared(grpc_channel, "", nullptr); auto client = RobotClient::with_channel(viam_channel, Options(0, boost::none)); diff --git a/src/viam/sdk/tests/test_utils.cpp b/src/viam/sdk/tests/test_utils.cpp index 84d727226..e61062fba 100644 --- a/src/viam/sdk/tests/test_utils.cpp +++ b/src/viam/sdk/tests/test_utils.cpp @@ -4,6 +4,7 @@ #include #include +#include #include #include #include @@ -60,12 +61,14 @@ std::vector fake_geometries() { std::move(capsule_config)}; } -TestServer::TestServer(std::shared_ptr sdk_server) : sdk_server_(sdk_server){}; +TestServer::TestServer(std::shared_ptr sdk_server) : sdk_server_(sdk_server) {} TestServer::~TestServer() = default; -std::shared_ptr TestServer::grpc_in_process_channel( - const grpc::ChannelArguments& args) { +std::shared_ptr TestServer::grpc_in_process_channel() { + grpc::ChannelArguments args; + args.SetMaxSendMessageSize(kMaxMessageSize); + args.SetMaxReceiveMessageSize(kMaxMessageSize); return sdk_server_->server_->InProcessChannel(args); } diff --git a/src/viam/sdk/tests/test_utils.hpp b/src/viam/sdk/tests/test_utils.hpp index 6b3b8eabf..0b0e31b7a 100644 --- a/src/viam/sdk/tests/test_utils.hpp +++ b/src/viam/sdk/tests/test_utils.hpp @@ -22,7 +22,7 @@ class TestServer { TestServer(std::shared_ptr sdk_server); ~TestServer(); - std::shared_ptr grpc_in_process_channel(const grpc::ChannelArguments& args); + std::shared_ptr grpc_in_process_channel(); private: std::shared_ptr sdk_server_; @@ -58,9 +58,8 @@ void client_to_mock_pipeline(std::shared_ptr mock, F&& test_case) { // Create a resource-specific client to the mock over an established // in-process gRPC channel. - grpc::ChannelArguments args; auto test_server = TestServer(server); - auto grpc_channel = test_server.grpc_in_process_channel(args); + auto grpc_channel = test_server.grpc_in_process_channel(); auto resource_client = Registry::lookup_resource_client(API::get()) ->create_rpc_client(mock->name(), std::move(grpc_channel));