Skip to content

Commit

Permalink
GH-37635: [Format][C++][Go] Add app_metadata to FlightInfo and Flight…
Browse files Browse the repository at this point in the history
…Endpoint (#37679)

### Rationale for this change
As suggested here: #37635 (comment) this just adds an `app_metadata` field to FlightInfo and FlightEndpoint

### What changes are included in this PR?
Just the updated proto file and the generated Go code from the proto

### Are there any user-facing changes?

Yes.

**This PR includes breaking changes to public APIs.**

* Closes: #37635

Lead-authored-by: Matt Topol <[email protected]>
Co-authored-by: Sutou Kouhei <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
  • Loading branch information
zeroshade and kou authored Oct 3, 2023
1 parent f6afc33 commit 92de9a3
Show file tree
Hide file tree
Showing 18 changed files with 803 additions and 450 deletions.
69 changes: 37 additions & 32 deletions cpp/src/arrow/flight/flight_internals_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -191,33 +191,33 @@ TEST(FlightTypes, FlightEndpoint) {
Timestamp expiration_time(
std::chrono::duration_cast<Timestamp::duration>(expiration_time_duration));
std::vector<FlightEndpoint> values = {
{{""}, {}, std::nullopt},
{{"foo"}, {}, std::nullopt},
{{"bar"}, {}, std::nullopt},
{{"foo"}, {}, expiration_time},
{{"foo"}, {location1}, std::nullopt},
{{"bar"}, {location1}, std::nullopt},
{{"foo"}, {location2}, std::nullopt},
{{"foo"}, {location1, location2}, std::nullopt},
{{""}, {}, std::nullopt, {}},
{{"foo"}, {}, std::nullopt, {}},
{{"bar"}, {}, std::nullopt, {"\xDE\xAD\xBE\xEF"}},
{{"foo"}, {}, expiration_time, {}},
{{"foo"}, {location1}, std::nullopt, {}},
{{"bar"}, {location1}, std::nullopt, {}},
{{"foo"}, {location2}, std::nullopt, {}},
{{"foo"}, {location1, location2}, std::nullopt, {"\xba\xdd\xca\xfe"}},
};
std::vector<std::string> reprs = {
"<FlightEndpoint ticket=<Ticket ticket=''> locations=[] "
"expiration_time=null>",
"expiration_time=null app_metadata=''>",
"<FlightEndpoint ticket=<Ticket ticket='foo'> locations=[] "
"expiration_time=null>",
"expiration_time=null app_metadata=''>",
"<FlightEndpoint ticket=<Ticket ticket='bar'> locations=[] "
"expiration_time=null>",
"expiration_time=null app_metadata='DEADBEEF'>",
"<FlightEndpoint ticket=<Ticket ticket='foo'> locations=[] "
"expiration_time=2023-06-19 03:14:06.004339000>",
"expiration_time=2023-06-19 03:14:06.004339000 app_metadata=''>",
"<FlightEndpoint ticket=<Ticket ticket='foo'> locations="
"[grpc+tcp://localhost:1024] expiration_time=null>",
"[grpc+tcp://localhost:1024] expiration_time=null app_metadata=''>",
"<FlightEndpoint ticket=<Ticket ticket='bar'> locations="
"[grpc+tcp://localhost:1024] expiration_time=null>",
"[grpc+tcp://localhost:1024] expiration_time=null app_metadata=''>",
"<FlightEndpoint ticket=<Ticket ticket='foo'> locations="
"[grpc+tls://localhost:1024] expiration_time=null>",
"[grpc+tls://localhost:1024] expiration_time=null app_metadata=''>",
"<FlightEndpoint ticket=<Ticket ticket='foo'> locations="
"[grpc+tcp://localhost:1024, grpc+tls://localhost:1024] "
"expiration_time=null>",
"expiration_time=null app_metadata='BADDCAFE'>",
};

ASSERT_NO_FATAL_FAILURE(TestRoundtrip<pb::FlightEndpoint>(values, reprs));
Expand All @@ -229,30 +229,35 @@ TEST(FlightTypes, FlightInfo) {
Schema schema2({});
auto desc1 = FlightDescriptor::Command("foo");
auto desc2 = FlightDescriptor::Command("bar");
auto endpoint1 = FlightEndpoint{Ticket{"foo"}, {}, std::nullopt};
auto endpoint2 = FlightEndpoint{Ticket{"foo"}, {location}, std::nullopt};
auto endpoint1 = FlightEndpoint{Ticket{"foo"}, {}, std::nullopt, ""};
auto endpoint2 =
FlightEndpoint{Ticket{"foo"}, {location}, std::nullopt, "\xCA\xFE\xD0\x0D"};
std::vector<FlightInfo> values = {
MakeFlightInfo(schema1, desc1, {}, -1, -1, false),
MakeFlightInfo(schema1, desc2, {}, -1, -1, true),
MakeFlightInfo(schema2, desc1, {}, -1, -1, false),
MakeFlightInfo(schema1, desc1, {endpoint1}, -1, 42, true),
MakeFlightInfo(schema1, desc2, {endpoint1, endpoint2}, 64, -1, false),
MakeFlightInfo(schema1, desc1, {}, -1, -1, false, ""),
MakeFlightInfo(schema1, desc2, {}, -1, -1, true, ""),
MakeFlightInfo(schema2, desc1, {}, -1, -1, false, ""),
MakeFlightInfo(schema1, desc1, {endpoint1}, -1, 42, true, ""),
MakeFlightInfo(schema1, desc2, {endpoint1, endpoint2}, 64, -1, false,
"\xDE\xAD\xC0\xDE"),
};
std::vector<std::string> reprs = {
"<FlightInfo schema=(serialized) descriptor=<FlightDescriptor cmd='foo'> "
"endpoints=[] total_records=-1 total_bytes=-1 ordered=false>",
"endpoints=[] total_records=-1 total_bytes=-1 ordered=false app_metadata=''>",
"<FlightInfo schema=(serialized) descriptor=<FlightDescriptor cmd='bar'> "
"endpoints=[] total_records=-1 total_bytes=-1 ordered=true>",
"endpoints=[] total_records=-1 total_bytes=-1 ordered=true app_metadata=''>",
"<FlightInfo schema=(serialized) descriptor=<FlightDescriptor cmd='foo'> "
"endpoints=[] total_records=-1 total_bytes=-1 ordered=false>",
"endpoints=[] total_records=-1 total_bytes=-1 ordered=false app_metadata=''>",
"<FlightInfo schema=(serialized) descriptor=<FlightDescriptor cmd='foo'> "
"endpoints=[<FlightEndpoint ticket=<Ticket ticket='foo'> locations=[] "
"expiration_time=null>] total_records=-1 total_bytes=42 ordered=true>",
"expiration_time=null app_metadata=''>] total_records=-1 total_bytes=42 "
"ordered=true app_metadata=''>",
"<FlightInfo schema=(serialized) descriptor=<FlightDescriptor cmd='bar'> "
"endpoints=[<FlightEndpoint ticket=<Ticket ticket='foo'> locations=[] "
"expiration_time=null>, <FlightEndpoint ticket=<Ticket ticket='foo'> "
"locations=[grpc+tcp://localhost:1234] expiration_time=null>] "
"total_records=64 total_bytes=-1 ordered=false>",
"expiration_time=null app_metadata=''>, <FlightEndpoint ticket=<Ticket "
"ticket='foo'> "
"locations=[grpc+tcp://localhost:1234] expiration_time=null "
"app_metadata='CAFED00D'>] "
"total_records=64 total_bytes=-1 ordered=false app_metadata='DEADC0DE'>",
};

ASSERT_NO_FATAL_FAILURE(TestRoundtrip<pb::FlightInfo>(values, reprs));
Expand All @@ -262,8 +267,8 @@ TEST(FlightTypes, PollInfo) {
ASSERT_OK_AND_ASSIGN(auto location, Location::ForGrpcTcp("localhost", 1234));
Schema schema({field("ints", int64())});
auto desc = FlightDescriptor::Command("foo");
auto endpoint = FlightEndpoint{Ticket{"foo"}, {}, std::nullopt};
auto info = MakeFlightInfo(schema, desc, {endpoint}, -1, 42, true);
auto endpoint = FlightEndpoint{Ticket{"foo"}, {}, std::nullopt, ""};
auto info = MakeFlightInfo(schema, desc, {endpoint}, -1, 42, true, "");
// 2023-06-19 03:14:06.004330100
// We must use microsecond resolution here for portability.
// std::chrono::system_clock::time_point may not provide nanosecond
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ TEST(FlightIntegration, ExpirationTimeRenewFlightEndpoint) {

TEST(FlightIntegration, PollFlightInfo) { ASSERT_OK(RunScenario("poll_flight_info")); }

TEST(FlightIntegration, AppMetadataFlightInfoEndpoint) {
ASSERT_OK(RunScenario("app_metadata_flight_info_endpoint"));
}

TEST(FlightIntegration, FlightSql) { ASSERT_OK(RunScenario("flight_sql")); }

TEST(FlightIntegration, FlightSqlExtension) {
Expand Down
61 changes: 61 additions & 0 deletions cpp/src/arrow/flight/integration_tests/test_integration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,64 @@ class PollFlightInfoScenario : public Scenario {
}
};

/// \brief The server used for testing app_metadata in FlightInfo and FlightEndpoint
class AppMetadataFlightInfoEndpointServer : public FlightServerBase {
public:
AppMetadataFlightInfoEndpointServer() : FlightServerBase() {}

Status GetFlightInfo(const ServerCallContext& context, const FlightDescriptor& request,
std::unique_ptr<FlightInfo>* info) override {
if (request.type != FlightDescriptor::CMD) {
return Status::Invalid("request descriptor should be of type CMD");
}

auto schema = arrow::schema({arrow::field("number", arrow::uint32(), false)});
std::vector<FlightEndpoint> endpoints = {
FlightEndpoint{{}, {}, std::nullopt, request.cmd}};
ARROW_ASSIGN_OR_RAISE(auto result, FlightInfo::Make(*schema, request, endpoints, -1,
-1, false, request.cmd));
*info = std::make_unique<FlightInfo>(std::move(result));
return Status::OK();
}
};

/// \brief The AppMetadataFlightInfoEndpoint scenario.
///
/// This tests that the client can receive and use the `app_metadata` field in
/// the FlightInfo and FlightEndpoint messages.
///
/// The server only implements GetFlightInfo and will return a FlightInfo with a non-
/// empty app_metadata value that should match the app_metadata field in the
/// included FlightEndpoint. The value should be the same as the cmd bytes passed
/// in the call to GetFlightInfo by the client.
class AppMetadataFlightInfoEndpointScenario : public Scenario {
Status MakeServer(std::unique_ptr<FlightServerBase>* server,
FlightServerOptions* options) override {
*server = std::make_unique<AppMetadataFlightInfoEndpointServer>();
return Status::OK();
}

Status MakeClient(FlightClientOptions* options) override { return Status::OK(); }

Status RunClient(std::unique_ptr<FlightClient> client) override {
ARROW_ASSIGN_OR_RAISE(auto info,
client->GetFlightInfo(FlightDescriptor::Command("foobar")));
if (info->app_metadata() != "foobar") {
return Status::Invalid("app_metadata should have been 'foobar', got: ",
info->app_metadata());
}
if (info->endpoints().size() != 1) {
return Status::Invalid("should have gotten exactly one FlightEndpoint back, got: ",
info->endpoints().size());
}
if (info->endpoints()[0].app_metadata != "foobar") {
return Status::Invalid("FlightEndpoint app_metadata should be 'foobar', got: ",
info->endpoints()[0].app_metadata);
}
return Status::OK();
}
};

/// \brief Schema to be returned for mocking the statement/prepared statement results.
///
/// Must be the same across all languages.
Expand Down Expand Up @@ -1897,6 +1955,9 @@ Status GetScenario(const std::string& scenario_name, std::shared_ptr<Scenario>*
} else if (scenario_name == "poll_flight_info") {
*out = std::make_shared<PollFlightInfoScenario>();
return Status::OK();
} else if (scenario_name == "app_metadata_flight_info_endpoint") {
*out = std::make_shared<AppMetadataFlightInfoEndpointScenario>();
return Status::OK();
} else if (scenario_name == "flight_sql") {
*out = std::make_shared<FlightSqlScenario>();
return Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/flight/perf_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ class FlightPerfServer : public FlightServerBase {
perf_request.stream_count() * perf_request.records_per_stream();

*info = std::make_unique<FlightInfo>(
MakeFlightInfo(*perf_schema_, request, endpoints, total_records, -1, false));
MakeFlightInfo(*perf_schema_, request, endpoints, total_records, -1, false, ""));
return Status::OK();
}

Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/flight/serialization_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ Status FromProto(const pb::FlightEndpoint& pb_endpoint, FlightEndpoint* endpoint
RETURN_NOT_OK(FromProto(pb_endpoint.expiration_time(), &expiration_time));
endpoint->expiration_time = std::move(expiration_time);
}
endpoint->app_metadata = pb_endpoint.app_metadata();
return Status::OK();
}

Expand All @@ -190,6 +191,7 @@ Status ToProto(const FlightEndpoint& endpoint, pb::FlightEndpoint* pb_endpoint)
RETURN_NOT_OK(ToProto(endpoint.expiration_time.value(),
pb_endpoint->mutable_expiration_time()));
}
pb_endpoint->set_app_metadata(endpoint.app_metadata);
return Status::OK();
}

Expand Down Expand Up @@ -255,6 +257,7 @@ arrow::Result<FlightInfo> FromProto(const pb::FlightInfo& pb_info) {
info.total_records = pb_info.total_records();
info.total_bytes = pb_info.total_bytes();
info.ordered = pb_info.ordered();
info.app_metadata = pb_info.app_metadata();
return FlightInfo(std::move(info));
}

Expand Down Expand Up @@ -296,6 +299,7 @@ Status ToProto(const FlightInfo& info, pb::FlightInfo* pb_info) {
pb_info->set_total_records(info.total_records());
pb_info->set_total_bytes(info.total_bytes());
pb_info->set_ordered(info.ordered());
pb_info->set_app_metadata(info.app_metadata());
return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/flight/sql/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -930,7 +930,7 @@ arrow::Result<std::unique_ptr<FlightInfo>> FlightSqlServerBase::GetFlightInfoSql
}

std::vector<FlightEndpoint> endpoints{
FlightEndpoint{{descriptor.cmd}, {}, std::nullopt}};
FlightEndpoint{{descriptor.cmd}, {}, std::nullopt, {}}};
ARROW_ASSIGN_OR_RAISE(
auto result, FlightInfo::Make(*SqlSchema::GetSqlInfoSchema(), descriptor, endpoints,
-1, -1, false))
Expand Down
26 changes: 14 additions & 12 deletions cpp/src/arrow/flight/test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -531,9 +531,11 @@ std::unique_ptr<FlightServerBase> ExampleTestServer() {

FlightInfo MakeFlightInfo(const Schema& schema, const FlightDescriptor& descriptor,
const std::vector<FlightEndpoint>& endpoints,
int64_t total_records, int64_t total_bytes, bool ordered) {
EXPECT_OK_AND_ASSIGN(auto info, FlightInfo::Make(schema, descriptor, endpoints,
total_records, total_bytes, ordered));
int64_t total_records, int64_t total_bytes, bool ordered,
std::string app_metadata) {
EXPECT_OK_AND_ASSIGN(auto info,
FlightInfo::Make(schema, descriptor, endpoints, total_records,
total_bytes, ordered, std::move(app_metadata)));
return info;
}

Expand Down Expand Up @@ -602,11 +604,11 @@ std::vector<FlightInfo> ExampleFlightInfo() {
Location location4 = *Location::ForGrpcTcp("foo4.bar.com", 12345);
Location location5 = *Location::ForGrpcTcp("foo5.bar.com", 12345);

FlightEndpoint endpoint1({{"ticket-ints-1"}, {location1}, std::nullopt});
FlightEndpoint endpoint2({{"ticket-ints-2"}, {location2}, std::nullopt});
FlightEndpoint endpoint3({{"ticket-cmd"}, {location3}, std::nullopt});
FlightEndpoint endpoint4({{"ticket-dicts-1"}, {location4}, std::nullopt});
FlightEndpoint endpoint5({{"ticket-floats-1"}, {location5}, std::nullopt});
FlightEndpoint endpoint1({{"ticket-ints-1"}, {location1}, std::nullopt, {}});
FlightEndpoint endpoint2({{"ticket-ints-2"}, {location2}, std::nullopt, {}});
FlightEndpoint endpoint3({{"ticket-cmd"}, {location3}, std::nullopt, {}});
FlightEndpoint endpoint4({{"ticket-dicts-1"}, {location4}, std::nullopt, {}});
FlightEndpoint endpoint5({{"ticket-floats-1"}, {location5}, std::nullopt, {}});

FlightDescriptor descr1{FlightDescriptor::PATH, "", {"examples", "ints"}};
FlightDescriptor descr2{FlightDescriptor::CMD, "my_command", {}};
Expand All @@ -619,10 +621,10 @@ std::vector<FlightInfo> ExampleFlightInfo() {
auto schema4 = ExampleFloatSchema();

return {
MakeFlightInfo(*schema1, descr1, {endpoint1, endpoint2}, 1000, 100000, false),
MakeFlightInfo(*schema2, descr2, {endpoint3}, 1000, 100000, false),
MakeFlightInfo(*schema3, descr3, {endpoint4}, -1, -1, false),
MakeFlightInfo(*schema4, descr4, {endpoint5}, 1000, 100000, false),
MakeFlightInfo(*schema1, descr1, {endpoint1, endpoint2}, 1000, 100000, false, ""),
MakeFlightInfo(*schema2, descr2, {endpoint3}, 1000, 100000, false, ""),
MakeFlightInfo(*schema3, descr3, {endpoint4}, -1, -1, false, ""),
MakeFlightInfo(*schema4, descr4, {endpoint5}, 1000, 100000, false, ""),
};
}

Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/flight/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ std::vector<ActionType> ExampleActionTypes();
ARROW_FLIGHT_EXPORT
FlightInfo MakeFlightInfo(const Schema& schema, const FlightDescriptor& descriptor,
const std::vector<FlightEndpoint>& endpoints,
int64_t total_records, int64_t total_bytes, bool ordered);
int64_t total_records, int64_t total_bytes, bool ordered,
std::string app_metadata);

// ----------------------------------------------------------------------
// A pair of authentication handlers that check for a predefined password
Expand Down
11 changes: 9 additions & 2 deletions cpp/src/arrow/flight/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -260,13 +260,14 @@ arrow::Result<FlightInfo> FlightInfo::Make(const Schema& schema,
const FlightDescriptor& descriptor,
const std::vector<FlightEndpoint>& endpoints,
int64_t total_records, int64_t total_bytes,
bool ordered) {
bool ordered, std::string app_metadata) {
FlightInfo::Data data;
data.descriptor = descriptor;
data.endpoints = endpoints;
data.total_records = total_records;
data.total_bytes = total_bytes;
data.ordered = ordered;
data.app_metadata = std::move(app_metadata);
RETURN_NOT_OK(internal::SchemaToString(schema, &data.schema));
return FlightInfo(data);
}
Expand Down Expand Up @@ -328,6 +329,7 @@ std::string FlightInfo::ToString() const {
ss << "] total_records=" << data_.total_records;
ss << " total_bytes=" << data_.total_bytes;
ss << " ordered=" << (data_.ordered ? "true" : "false");
ss << " app_metadata='" << HexEncode(data_.app_metadata) << "'";
ss << '>';
return ss.str();
}
Expand All @@ -338,7 +340,8 @@ bool FlightInfo::Equals(const FlightInfo& other) const {
data_.endpoints == other.data_.endpoints &&
data_.total_records == other.data_.total_records &&
data_.total_bytes == other.data_.total_bytes &&
data_.ordered == other.data_.ordered;
data_.ordered == other.data_.ordered &&
data_.app_metadata == other.data_.app_metadata;
}

arrow::Result<std::string> PollInfo::SerializeToString() const {
Expand Down Expand Up @@ -535,6 +538,7 @@ std::string FlightEndpoint::ToString() const {
} else {
ss << "null";
}
ss << " app_metadata='" << HexEncode(app_metadata) << "'";
ss << ">";
return ss.str();
}
Expand All @@ -554,6 +558,9 @@ bool FlightEndpoint::Equals(const FlightEndpoint& other) const {
return false;
}
}
if (app_metadata != other.app_metadata) {
return false;
}
return true;
}

Expand Down
10 changes: 9 additions & 1 deletion cpp/src/arrow/flight/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,9 @@ struct ARROW_FLIGHT_EXPORT FlightEndpoint {
/// retrying DoGet requests.
std::optional<Timestamp> expiration_time;

/// Opaque Application-defined metadata
std::string app_metadata;

std::string ToString() const;
bool Equals(const FlightEndpoint& other) const;

Expand Down Expand Up @@ -583,6 +586,7 @@ class ARROW_FLIGHT_EXPORT FlightInfo {
int64_t total_records = -1;
int64_t total_bytes = -1;
bool ordered = false;
std::string app_metadata;
};

explicit FlightInfo(Data data) : data_(std::move(data)), reconstructed_schema_(false) {}
Expand All @@ -592,7 +596,8 @@ class ARROW_FLIGHT_EXPORT FlightInfo {
const FlightDescriptor& descriptor,
const std::vector<FlightEndpoint>& endpoints,
int64_t total_records, int64_t total_bytes,
bool ordered = false);
bool ordered = false,
std::string app_metadata = "");

/// \brief Deserialize the Arrow schema of the dataset. Populate any
/// dictionary encoded fields into a DictionaryMemo for
Expand Down Expand Up @@ -621,6 +626,9 @@ class ARROW_FLIGHT_EXPORT FlightInfo {
/// Whether endpoints are in the same order as the data.
bool ordered() const { return data_.ordered; }

/// Application-defined opaque metadata
const std::string& app_metadata() const { return data_.app_metadata; }

/// \brief Get the wire-format representation of this type.
///
/// Useful when interoperating with non-Flight systems (e.g. REST
Expand Down
Loading

0 comments on commit 92de9a3

Please sign in to comment.