Skip to content

Commit

Permalink
Only track subscribed resources in Delta subscription state (#18295)
Browse files Browse the repository at this point in the history
Signed-off-by: Xin Zhuang <[email protected]>
  • Loading branch information
stevenzzzz authored Oct 1, 2021
1 parent 7e8ac45 commit b2db7a9
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 2 deletions.
6 changes: 5 additions & 1 deletion source/common/config/delta_subscription_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,11 @@ void DeltaSubscriptionState::handleGoodResponse(
{
const auto scoped_update = ttl_.scopedTtlUpdate();
for (const auto& resource : message.resources()) {
addResourceState(resource);
if (wildcard_ || resource_state_.contains(resource.name())) {
// Only consider tracked resources.
// NOTE: This is not gonna work for xdstp resources with glob resource matching.
addResourceState(resource);
}
}
}

Expand Down
6 changes: 5 additions & 1 deletion source/common/config/xds_mux/delta_subscription_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,11 @@ void DeltaSubscriptionState::handleGoodResponse(
{
const auto scoped_update = ttl_.scopedTtlUpdate();
for (const auto& resource : message.resources()) {
addResourceState(resource);
if (wildcard_ || resource_state_.contains(resource.name())) {
// Only consider tracked resources.
// NOTE: This is not gonna work for xdstp resources with glob resource matching.
addResourceState(resource);
}
}
}

Expand Down
88 changes: 88 additions & 0 deletions test/common/config/delta_subscription_state_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,50 @@ TEST_P(DeltaSubscriptionStateTest, SubscribeAndUnsubscribe) {
}
}

// Resources has no subscriptions should not be tracked.
TEST_P(DeltaSubscriptionStateTest, NewPushDoesntAddUntrackedResources) {
{ // Add "name4", "name5", "name6" and remove "name1", "name2", "name3".
updateSubscriptionInterest({"name4", "name5", "name6"}, {"name1", "name2", "name3"});
auto cur_request = getNextRequestAckless();
EXPECT_THAT(cur_request->resource_names_subscribe(),
UnorderedElementsAre("name4", "name5", "name6"));
EXPECT_THAT(cur_request->resource_names_unsubscribe(),
UnorderedElementsAre("name1", "name2", "name3"));
}
{
// On Reconnection, only "name4", "name5", "name6" are sent.
markStreamFresh();
auto cur_request = getNextRequestAckless();
EXPECT_THAT(cur_request->resource_names_subscribe(),
UnorderedElementsAre("name4", "name5", "name6"));
EXPECT_TRUE(cur_request->resource_names_unsubscribe().empty());
EXPECT_TRUE(cur_request->initial_resource_versions().empty());
}
// The xDS server's first response includes removed items name1 and 2, and a
// completely unrelated resource "bluhbluh".
{
Protobuf::RepeatedPtrField<envoy::service::discovery::v3::Resource> added_resources =
populateRepeatedResource({{"name1", "version1A"},
{"bluhbluh", "bluh"},
{"name6", "version6A"},
{"name2", "version2A"}});
EXPECT_CALL(*ttl_timer_, disableTimer());
UpdateAck ack = deliverDiscoveryResponse(added_resources, {}, "debug1", "nonce1");
EXPECT_EQ("nonce1", ack.nonce_);
EXPECT_EQ(Grpc::Status::WellKnownGrpcStatus::Ok, ack.error_detail_.code());
}
{ // Simulate a stream reconnection, just to see the current resource_state_.
markStreamFresh();
auto cur_request = getNextRequestAckless();
EXPECT_THAT(cur_request->resource_names_subscribe(),
UnorderedElementsAre("name4", "name5", "name6"));
EXPECT_TRUE(cur_request->resource_names_unsubscribe().empty());
ASSERT_EQ(cur_request->initial_resource_versions().size(), 1);
EXPECT_TRUE(cur_request->initial_resource_versions().contains("name6"));
EXPECT_EQ(cur_request->initial_resource_versions().at("name6"), "version6A");
}
}

// Delta xDS reliably queues up and sends all discovery requests, even in situations where it isn't
// strictly necessary. E.g.: if you subscribe but then unsubscribe to a given resource, all before a
// request was able to be sent, two requests will be sent. The following tests demonstrate this.
Expand Down Expand Up @@ -425,6 +469,50 @@ TEST_P(WildcardDeltaSubscriptionStateTest, SubscribeAndUnsubscribeAfterReconnect
EXPECT_TRUE(cur_request->resource_names_unsubscribe().empty());
}

// All resources from the server should be tracked.
TEST_P(WildcardDeltaSubscriptionStateTest, AllResourcesFromServerAreTrackedInWildcardXDS) {
{ // Add "name4", "name5", "name6" and remove "name1", "name2", "name3".
updateSubscriptionInterest({"name4", "name5", "name6"}, {"name1", "name2", "name3"});
auto cur_request = getNextRequestAckless();
EXPECT_THAT(cur_request->resource_names_subscribe(),
UnorderedElementsAre("name4", "name5", "name6"));
EXPECT_THAT(cur_request->resource_names_unsubscribe(),
UnorderedElementsAre("name1", "name2", "name3"));
}
{
// On Reconnection, only "name4", "name5", "name6" are sent.
markStreamFresh();
auto cur_request = getNextRequestAckless();
EXPECT_TRUE(cur_request->resource_names_subscribe().empty());
EXPECT_TRUE(cur_request->resource_names_unsubscribe().empty());
EXPECT_TRUE(cur_request->initial_resource_versions().empty());
}
// The xDS server's first response includes removed items name1 and 2, and a
// completely unrelated resource "bluhbluh".
{
Protobuf::RepeatedPtrField<envoy::service::discovery::v3::Resource> added_resources =
populateRepeatedResource({{"name1", "version1A"},
{"bluhbluh", "bluh"},
{"name6", "version6A"},
{"name2", "version2A"}});
EXPECT_CALL(*ttl_timer_, disableTimer());
UpdateAck ack = deliverDiscoveryResponse(added_resources, {}, "debug1", "nonce1");
EXPECT_EQ("nonce1", ack.nonce_);
EXPECT_EQ(Grpc::Status::WellKnownGrpcStatus::Ok, ack.error_detail_.code());
}
{ // Simulate a stream reconnection, just to see the current resource_state_.
markStreamFresh();
auto cur_request = getNextRequestAckless();
EXPECT_TRUE(cur_request->resource_names_subscribe().empty());
EXPECT_TRUE(cur_request->resource_names_unsubscribe().empty());
ASSERT_EQ(cur_request->initial_resource_versions().size(), 4);
EXPECT_EQ(cur_request->initial_resource_versions().at("name1"), "version1A");
EXPECT_EQ(cur_request->initial_resource_versions().at("bluhbluh"), "bluh");
EXPECT_EQ(cur_request->initial_resource_versions().at("name6"), "version6A");
EXPECT_EQ(cur_request->initial_resource_versions().at("name2"), "version2A");
}
}

// initial_resource_versions should not be present on messages after the first in a stream.
TEST_P(DeltaSubscriptionStateTest, InitialVersionMapFirstMessageOnly) {
// First, verify that the first message of a new stream sends initial versions.
Expand Down

0 comments on commit b2db7a9

Please sign in to comment.