Skip to content

Commit

Permalink
[GCS]Fix TestActorTableResubscribe bug (#11830)
Browse files Browse the repository at this point in the history
* fix compile bug

* [GCS]Fix TestActorTableResubscribe bug

* rm unused code

* fix lint error

* fix review comment

* fix ut bug

Co-authored-by: 灵洵 <[email protected]>
  • Loading branch information
ffbin and 灵洵 authored Nov 9, 2020
1 parent 64ca30c commit 407a212
Showing 1 changed file with 44 additions and 19 deletions.
63 changes: 44 additions & 19 deletions src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,18 @@ class ServiceBasedGcsClientTest : public ::testing::Test {
.AsyncRegisterActor(task_spec, [](Status status) {})
.ok();
}
std::promise<bool> promise;
RAY_CHECK_OK(gcs_client_->Actors().AsyncRegisterActor(
task_spec, [&promise](Status status) { promise.set_value(status.ok()); }));
return WaitReady(promise.get_future(), timeout_ms_);

// NOTE: GCS will not reply when actor registration fails, so when GCS restarts, gcs
// client will register the actor again and promise may be set twice.
auto promise = std::make_shared<std::promise<bool>>();
RAY_CHECK_OK(
gcs_client_->Actors().AsyncRegisterActor(task_spec, [promise](Status status) {
try {
promise->set_value(status.ok());
} catch (...) {
}
}));
return WaitReady(promise->get_future(), timeout_ms_);
}

rpc::ActorTableData GetActor(const ActorID &actor_id) {
Expand All @@ -208,14 +216,22 @@ class ServiceBasedGcsClientTest : public ::testing::Test {
return actor_table_data;
}

std::vector<rpc::ActorTableData> GetAllActors() {
std::vector<rpc::ActorTableData> GetAllActors(bool filter_non_dead_actor = false) {
std::promise<bool> promise;
std::vector<rpc::ActorTableData> actors;
RAY_CHECK_OK(gcs_client_->Actors().AsyncGetAll(
[&actors, &promise](Status status,
const std::vector<rpc::ActorTableData> &result) {
[filter_non_dead_actor, &actors, &promise](
Status status, const std::vector<rpc::ActorTableData> &result) {
if (!result.empty()) {
actors.assign(result.begin(), result.end());
if (filter_non_dead_actor) {
for (auto &iter : result) {
if (iter.state() == gcs::ActorTableData::DEAD) {
actors.emplace_back(iter);
}
}
} else {
actors.assign(result.begin(), result.end());
}
}
promise.set_value(true);
}));
Expand Down Expand Up @@ -1041,15 +1057,22 @@ TEST_F(ServiceBasedGcsClientTest, TestActorTableResubscribe) {
// didn't restart, it will fetch data again from the GCS server. The GCS will destroy
// the actor because it finds that the actor is out of scope, so we'll receive another
// notification of DEAD state.
WaitForExpectedCount(num_subscribe_all_notifications, 3);
WaitForExpectedCount(num_subscribe_one_notifications, 3);
/// NOTE: GCS will not reply when actor registration fails, so when GCS restarts, gcs
/// client will register the actor again. When an actor is registered, the status in GCS
/// is `DEPENDENCIES_UNREADY`. When GCS finds that the owner of an actor is nil, it will
/// destroy the actor and the status of the actor will change to `DEAD`. The GCS client
/// fetch actor info from the GCS server, and the status of the actor may be
/// `DEPENDENCIES_UNREADY` or `DEAD`, so we do not assert the actor status here any
/// more.

// NOTE: GCS will not reply when actor registration fails, so when GCS restarts, gcs
// client will register the actor again. When an actor is registered, the status in GCS
// is `DEPENDENCIES_UNREADY`. When GCS finds that the owner of an actor is nil, it will
// destroy the actor and the status of the actor will change to `DEAD`. The GCS client
// fetch actor info from the GCS server, and the status of the actor may be
// `DEPENDENCIES_UNREADY` or `DEAD`, so we do not assert the actor status here any
// more.
// If the status of the actor is `DEPENDENCIES_UNREADY`, we will fetch two records, so
// `num_subscribe_all_notifications` will be 4. If the status of the actor is `DEAD`, we
// will fetch one record, so `num_subscribe_all_notifications` will be 3.
auto condition = [&num_subscribe_all_notifications]() {
return num_subscribe_all_notifications == 3 || num_subscribe_all_notifications == 4;
};
EXPECT_TRUE(WaitForCondition(condition, timeout_ms_.count()));
}

TEST_F(ServiceBasedGcsClientTest, TestObjectTableResubscribe) {
Expand Down Expand Up @@ -1346,14 +1369,16 @@ TEST_F(ServiceBasedGcsClientTest, TestEvictExpiredDestroyedActors) {
actor_ids.insert(ActorID::FromBinary(actor_table_data->actor_id()));
}

// Get all actors.
// NOTE: GCS will not reply when actor registration fails, so when GCS restarts, gcs
// client will register the actor again and the status of the actor may be
// `DEPENDENCIES_UNREADY` or `DEAD`. We should get all dead actors.
auto condition = [this]() {
return GetAllActors().size() ==
return GetAllActors(true).size() ==
RayConfig::instance().maximum_gcs_destroyed_actor_cached_count();
};
EXPECT_TRUE(WaitForCondition(condition, timeout_ms_.count()));

auto actors = GetAllActors();
auto actors = GetAllActors(true);
for (const auto &actor : actors) {
EXPECT_TRUE(actor_ids.contains(ActorID::FromBinary(actor.actor_id())));
}
Expand Down

0 comments on commit 407a212

Please sign in to comment.