Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Segment Replication stats throwing NPE when shards are unassigned or are in delayed allocation phase #14580

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Refactor remote-routing-table service inline with remote state interfaces([#14668](https://github.com/opensearch-project/OpenSearch/pull/14668))
- Add prefix mode verification setting for repository verification (([#14790](https://github.com/opensearch-project/OpenSearch/pull/14790)))
- Optimize TransportNodesAction to not send DiscoveryNodes for NodeStats, NodesInfo and ClusterStats call ([14749](https://github.com/opensearch-project/OpenSearch/pull/14749))
- Fix Segment Replication stats throwing NPE when shards are unassigned or are in delayed allocation phase ([#14580](https://github.com/opensearch-project/OpenSearch/issues/11945)

### Dependencies
- Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1252,8 +1252,9 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() {
}

private boolean isPrimaryRelocation(String allocationId) {
Optional<ShardRouting> shardRouting = routingTable.shards()
Optional<ShardRouting> shardRouting = routingTable.assignedShards()
rampreeth marked this conversation as resolved.
Show resolved Hide resolved
.stream()
.filter(routing -> Objects.nonNull(routing.allocationId()))
.filter(routing -> routing.allocationId().getId().equals(allocationId))
.findAny();
return shardRouting.isPresent() && shardRouting.get().primary();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public void testPeerRecoveryRetentionLeasesForAssignedCopiesDoNotEverExpire() {
assertThat(
leaseIds,
equalTo(
replicationTracker.routingTable.shards()
replicationTracker.routingTable.assignedShards()
.stream()
.map(ReplicationTracker::getPeerRecoveryRetentionLeaseId)
.collect(Collectors.toSet())
Expand Down Expand Up @@ -183,7 +183,7 @@ public void testPeerRecoveryRetentionLeasesForUnassignedCopiesDoNotExpireImmedia
equalTo(
Stream.concat(
Stream.of(ReplicationTracker.getPeerRecoveryRetentionLeaseId(unknownNodeId)),
replicationTracker.routingTable.shards().stream().map(ReplicationTracker::getPeerRecoveryRetentionLeaseId)
replicationTracker.routingTable.assignedShards().stream().map(ReplicationTracker::getPeerRecoveryRetentionLeaseId)
).collect(Collectors.toSet())
)
);
Expand Down Expand Up @@ -215,7 +215,7 @@ public void testPeerRecoveryRetentionLeasesForUnassignedCopiesExpireEventually()
assertThat(
leaseIds,
equalTo(
replicationTracker.routingTable.shards()
replicationTracker.routingTable.assignedShards()
.stream()
.map(ReplicationTracker::getPeerRecoveryRetentionLeaseId)
.collect(Collectors.toSet())
Expand Down Expand Up @@ -244,7 +244,7 @@ public void testPeerRecoveryRetentionLeasesForUnassignedCopiesExpireImmediatelyI
assertThat(
leaseIds,
equalTo(
replicationTracker.routingTable.shards()
replicationTracker.routingTable.assignedShards()
.stream()
.map(ReplicationTracker::getPeerRecoveryRetentionLeaseId)
.collect(Collectors.toSet())
Expand All @@ -271,7 +271,7 @@ public void testPeerRecoveryRetentionLeasesForUnassignedCopiesExpireIfRetainingT
assertThat(
leaseIds,
equalTo(
replicationTracker.routingTable.shards()
replicationTracker.routingTable.assignedShards()
.stream()
.map(ReplicationTracker::getPeerRecoveryRetentionLeaseId)
.collect(Collectors.toSet())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,22 @@ static String nodeIdFromAllocationId(final AllocationId allocationId) {
}

static IndexShardRoutingTable routingTable(final Set<AllocationId> initializingIds, final AllocationId primaryId) {
return routingTable(initializingIds, Collections.singleton(primaryId), primaryId);
return routingTable(initializingIds, Collections.singleton(primaryId), primaryId, false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to test this without adding this flag and updating these base methods. We can have an individual test create the routing table and add the routing with null aId. That should avoid the need to change unrelated tests like PeerRecoveryRetentionLease as well. Please see

final IndexShardRoutingTable.Builder builder = new IndexShardRoutingTable.Builder(shardId);
from testSegmentReplicationCheckpointForRelocatingPrimary.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will make that change

}

static IndexShardRoutingTable routingTable(
final Set<AllocationId> initializingIds,
final AllocationId primaryId,
final boolean shouldAddUnassignedShard
) {
return routingTable(initializingIds, Collections.singleton(primaryId), primaryId, shouldAddUnassignedShard);
}

static IndexShardRoutingTable routingTable(
final Set<AllocationId> initializingIds,
final Set<AllocationId> activeIds,
final AllocationId primaryId
final AllocationId primaryId,
final boolean shouldAddUnassignedShard
) {
final ShardId shardId = new ShardId("test", "_na_", 0);
final ShardRouting primaryShard = TestShardRouting.newShardRouting(
Expand All @@ -121,19 +130,28 @@ static IndexShardRoutingTable routingTable(
ShardRoutingState.STARTED,
primaryId
);
return routingTable(initializingIds, activeIds, primaryShard);
return routingTable(initializingIds, activeIds, primaryShard, shouldAddUnassignedShard);
}

static IndexShardRoutingTable routingTable(
final Set<AllocationId> initializingIds,
final Set<AllocationId> activeIds,
final ShardRouting primaryShard
final ShardRouting primaryShard,
final boolean shouldAddUnassignedShard
) {
assert initializingIds != null && activeIds != null;
assert !initializingIds.contains(primaryShard.allocationId());
assert activeIds.contains(primaryShard.allocationId());
final ShardId shardId = new ShardId("test", "_na_", 0);
final IndexShardRoutingTable.Builder builder = new IndexShardRoutingTable.Builder(shardId);

// Add a shard that is unassigned to simulate #11945
if (shouldAddUnassignedShard) {
builder.addShard(
TestShardRouting.newShardRoutingWithNullAllocationId(shardId, null, null, false, ShardRoutingState.UNASSIGNED)
);
}

for (final AllocationId initializingId : initializingIds) {
builder.addShard(
TestShardRouting.newShardRouting(
Expand All @@ -146,6 +164,7 @@ static IndexShardRoutingTable routingTable(
)
);
}

for (final AllocationId activeId : activeIds) {
if (activeId.equals(primaryShard.allocationId())) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ private static FakeClusterState initialState() {
return new FakeClusterState(
initialClusterStateVersion,
activeAllocationIds,
routingTable(initializingAllocationIds, Collections.singleton(primaryShard.allocationId()), primaryShard)
routingTable(initializingAllocationIds, Collections.singleton(primaryShard.allocationId()), primaryShard, true)
);
}

Expand Down Expand Up @@ -1031,7 +1031,8 @@ private static FakeClusterState randomUpdateClusterState(Set<String> allocationI
routingTable(
Sets.difference(Sets.union(initializingIdsExceptRelocationTargets, initializingIdsToAdd), initializingIdsToRemove),
Collections.singleton(clusterState.routingTable.primaryShard().allocationId()),
clusterState.routingTable.primaryShard()
clusterState.routingTable.primaryShard(),
true
)
);
}
Expand Down Expand Up @@ -1214,7 +1215,9 @@ public void testPeerRecoveryRetentionLeaseCreationAndRenewal() {
IndexShardRoutingTable.Builder routingTableBuilder = new IndexShardRoutingTable.Builder(routingTable);
for (ShardRouting replicaShard : routingTable.replicaShards()) {
routingTableBuilder.removeShard(replicaShard);
routingTableBuilder.addShard(replicaShard.moveToStarted());
if (replicaShard.assignedToNode()) {
mch2 marked this conversation as resolved.
Show resolved Hide resolved
routingTableBuilder.addShard(replicaShard.moveToStarted());
}
}
routingTable = routingTableBuilder.build();
activeAllocationIds.addAll(initializingAllocationIds);
Expand Down Expand Up @@ -1402,7 +1405,7 @@ public void testUpdateFromClusterManagerWithRemoteTranslogEnabled() {
logger.info(" - [{}], local checkpoint [{}], [{}]", aId, allocations.get(aId), type);
});

tracker.updateFromClusterManager(initialClusterStateVersion, ids(active), routingTable(initializing, active, primaryId));
tracker.updateFromClusterManager(initialClusterStateVersion, ids(active), routingTable(initializing, active, primaryId, true));
tracker.activatePrimaryMode(NO_OPS_PERFORMED);
assertEquals(tracker.getReplicationGroup().getReplicationTargets().size(), active.size());
initializing.forEach(aId -> markAsTrackingAndInSyncQuietly(tracker, aId.getId(), NO_OPS_PERFORMED, false));
Expand Down Expand Up @@ -1613,7 +1616,7 @@ public void testInSyncIdsAreRemovedIfNotValidatedByClusterManagerWithRemoteTrans
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true")
.build();
final ReplicationTracker tracker = newTracker(primaryId, settings, true);
tracker.updateFromClusterManager(initialClusterStateVersion, ids(active), routingTable(initializing, active, primaryId));
tracker.updateFromClusterManager(initialClusterStateVersion, ids(active), routingTable(initializing, active, primaryId, true));
tracker.activatePrimaryMode(NO_OPS_PERFORMED);
if (randomBoolean()) {
initializingToStay.keySet().forEach(k -> markAsTrackingAndInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,26 @@ public static ShardRouting newShardRouting(
);
}

public static ShardRouting newShardRoutingWithNullAllocationId(
ShardId shardId,
String currentNodeId,
String relocatingNodeId,
boolean primary,
ShardRoutingState state
) {
return new ShardRouting(
shardId,
currentNodeId,
relocatingNodeId,
primary,
state,
buildRecoveryTarget(primary, state),
buildUnassignedInfo(state),
null,
-1
);
}

public static ShardRouting newShardRouting(
String index,
int shardId,
Expand Down
Loading