Skip to content

Commit

Permalink
flight: Expose SerializeToBuffer() together with SerializeToString()
Browse files Browse the repository at this point in the history
  • Loading branch information
felipecrv committed Jul 15, 2024
1 parent 407e035 commit e55b931
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 35 deletions.
20 changes: 10 additions & 10 deletions cpp/src/arrow/flight/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -584,8 +584,8 @@ arrow::Result<std::unique_ptr<ResultStream>> FlightClient::DoAction(

arrow::Result<CancelFlightInfoResult> FlightClient::CancelFlightInfo(
const FlightCallOptions& options, const CancelFlightInfoRequest& request) {
ARROW_ASSIGN_OR_RAISE(auto body, request.SerializeToString());
Action action{ActionType::kCancelFlightInfo.type, Buffer::FromString(body)};
ARROW_ASSIGN_OR_RAISE(auto body, request.SerializeToBuffer());
Action action{ActionType::kCancelFlightInfo.type, std::move(body)};
ARROW_ASSIGN_OR_RAISE(auto stream, DoAction(options, action));
ARROW_ASSIGN_OR_RAISE(auto result, stream->Next());
ARROW_ASSIGN_OR_RAISE(auto cancel_result, CancelFlightInfoResult::Deserialize(
Expand All @@ -596,8 +596,8 @@ arrow::Result<CancelFlightInfoResult> FlightClient::CancelFlightInfo(

arrow::Result<FlightEndpoint> FlightClient::RenewFlightEndpoint(
const FlightCallOptions& options, const RenewFlightEndpointRequest& request) {
ARROW_ASSIGN_OR_RAISE(auto body, request.SerializeToString());
Action action{ActionType::kRenewFlightEndpoint.type, Buffer::FromString(body)};
ARROW_ASSIGN_OR_RAISE(auto body, request.SerializeToBuffer());
Action action{ActionType::kRenewFlightEndpoint.type, std::move(body)};
ARROW_ASSIGN_OR_RAISE(auto stream, DoAction(options, action));
ARROW_ASSIGN_OR_RAISE(auto result, stream->Next());
ARROW_ASSIGN_OR_RAISE(auto renewed_endpoint,
Expand Down Expand Up @@ -716,8 +716,8 @@ arrow::Result<FlightClient::DoExchangeResult> FlightClient::DoExchange(
::arrow::Result<SetSessionOptionsResult> FlightClient::SetSessionOptions(
const FlightCallOptions& options, const SetSessionOptionsRequest& request) {
RETURN_NOT_OK(CheckOpen());
ARROW_ASSIGN_OR_RAISE(auto body, request.SerializeToString());
Action action{ActionType::kSetSessionOptions.type, Buffer::FromString(body)};
ARROW_ASSIGN_OR_RAISE(auto body, request.SerializeToBuffer());
Action action{ActionType::kSetSessionOptions.type, std::move(body)};
ARROW_ASSIGN_OR_RAISE(auto stream, DoAction(options, action));
ARROW_ASSIGN_OR_RAISE(auto result, stream->Next());
ARROW_ASSIGN_OR_RAISE(
Expand All @@ -730,8 +730,8 @@ ::arrow::Result<SetSessionOptionsResult> FlightClient::SetSessionOptions(
::arrow::Result<GetSessionOptionsResult> FlightClient::GetSessionOptions(
const FlightCallOptions& options, const GetSessionOptionsRequest& request) {
RETURN_NOT_OK(CheckOpen());
ARROW_ASSIGN_OR_RAISE(auto body, request.SerializeToString());
Action action{ActionType::kGetSessionOptions.type, Buffer::FromString(body)};
ARROW_ASSIGN_OR_RAISE(auto body, request.SerializeToBuffer());
Action action{ActionType::kGetSessionOptions.type, std::move(body)};
ARROW_ASSIGN_OR_RAISE(auto stream, DoAction(options, action));
ARROW_ASSIGN_OR_RAISE(auto result, stream->Next());
ARROW_ASSIGN_OR_RAISE(
Expand All @@ -744,8 +744,8 @@ ::arrow::Result<GetSessionOptionsResult> FlightClient::GetSessionOptions(
::arrow::Result<CloseSessionResult> FlightClient::CloseSession(
const FlightCallOptions& options, const CloseSessionRequest& request) {
RETURN_NOT_OK(CheckOpen());
ARROW_ASSIGN_OR_RAISE(auto body, request.SerializeToString());
Action action{ActionType::kCloseSession.type, Buffer::FromString(body)};
ARROW_ASSIGN_OR_RAISE(auto body, request.SerializeToBuffer());
Action action{ActionType::kCloseSession.type, std::move(body)};
ARROW_ASSIGN_OR_RAISE(auto stream, DoAction(options, action));
ARROW_ASSIGN_OR_RAISE(auto result, stream->Next());
ARROW_ASSIGN_OR_RAISE(auto close_session_result,
Expand Down
33 changes: 8 additions & 25 deletions cpp/src/arrow/flight/sql/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -477,13 +477,11 @@ arrow::Result<Result> PackActionResult(ActionBeginTransactionResult result) {
}

arrow::Result<Result> PackActionResult(CancelFlightInfoResult result) {
ARROW_ASSIGN_OR_RAISE(auto serialized, result.SerializeToString());
return Result{Buffer::FromString(std::move(serialized))};
return result.SerializeToBuffer();
}

arrow::Result<Result> PackActionResult(const FlightEndpoint& endpoint) {
ARROW_ASSIGN_OR_RAISE(auto serialized, endpoint.SerializeToString());
return Result{Buffer::FromString(std::move(serialized))};
return endpoint.SerializeToBuffer();
}

arrow::Result<Result> PackActionResult(CancelResult result) {
Expand Down Expand Up @@ -525,21 +523,6 @@ arrow::Result<Result> PackActionResult(ActionCreatePreparedStatementResult resul
return PackActionResult(pb_result);
}

arrow::Result<Result> PackActionResult(SetSessionOptionsResult result) {
ARROW_ASSIGN_OR_RAISE(auto serialized, result.SerializeToString());
return Result{Buffer::FromString(std::move(serialized))};
}

arrow::Result<Result> PackActionResult(GetSessionOptionsResult result) {
ARROW_ASSIGN_OR_RAISE(auto serialized, result.SerializeToString());
return Result{Buffer::FromString(std::move(serialized))};
}

arrow::Result<Result> PackActionResult(CloseSessionResult result) {
ARROW_ASSIGN_OR_RAISE(auto serialized, result.SerializeToString());
return Result{Buffer::FromString(std::move(serialized))};
}

} // namespace

arrow::Result<StatementQueryTicket> StatementQueryTicket::Deserialize(
Expand Down Expand Up @@ -908,23 +891,23 @@ Status FlightSqlServerBase::DoAction(const ServerCallContext& context,
std::string_view body(*action.body);
ARROW_ASSIGN_OR_RAISE(auto request, SetSessionOptionsRequest::Deserialize(body));
ARROW_ASSIGN_OR_RAISE(auto result, SetSessionOptions(context, request));
ARROW_ASSIGN_OR_RAISE(auto packed_result, PackActionResult(std::move(result)));
ARROW_ASSIGN_OR_RAISE(auto packed_result, result.SerializeToBuffer());

results.push_back(std::move(packed_result));
results.emplace_back(std::move(packed_result));
} else if (action.type == ActionType::kGetSessionOptions.type) {
std::string_view body(*action.body);
ARROW_ASSIGN_OR_RAISE(auto request, GetSessionOptionsRequest::Deserialize(body));
ARROW_ASSIGN_OR_RAISE(auto result, GetSessionOptions(context, request));
ARROW_ASSIGN_OR_RAISE(auto packed_result, PackActionResult(std::move(result)));
ARROW_ASSIGN_OR_RAISE(auto packed_result, result.SerializeToBuffer());

results.push_back(std::move(packed_result));
results.emplace_back(std::move(packed_result));
} else if (action.type == ActionType::kCloseSession.type) {
std::string_view body(*action.body);
ARROW_ASSIGN_OR_RAISE(auto request, CloseSessionRequest::Deserialize(body));
ARROW_ASSIGN_OR_RAISE(auto result, CloseSession(context, request));
ARROW_ASSIGN_OR_RAISE(auto packed_result, PackActionResult(std::move(result)));
ARROW_ASSIGN_OR_RAISE(auto packed_result, result.SerializeToBuffer());

results.push_back(std::move(packed_result));
results.emplace_back(std::move(packed_result));
} else {
google::protobuf::Any any;
if (!any.ParseFromArray(action.body->data(), static_cast<int>(action.body->size()))) {
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/flight/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <variant>
#include <vector>

#include "arrow/buffer.h"
#include "arrow/flight/type_fwd.h"
#include "arrow/flight/visibility.h"
#include "arrow/ipc/options.h"
Expand Down Expand Up @@ -180,6 +181,12 @@ struct BaseType {
ARROW_RETURN_NOT_OK(self().DoSerializeToString(&out));
return out;
}

inline arrow::Result<std::shared_ptr<Buffer>> SerializeToBuffer() const {
std::string out;
ARROW_RETURN_NOT_OK(self().DoSerializeToString(&out));
return Buffer::FromString(std::move(out));
}
};

} // namespace internal
Expand Down

0 comments on commit e55b931

Please sign in to comment.