-
Notifications
You must be signed in to change notification settings - Fork 110
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
⚠️ RSDK-7903 Include unhealthy remotes in resource list with status #4273
Changes from all commits
968b479
282e057
d837e20
07d9a72
83cdc93
cf85b6b
f274e56
4754282
b232589
05a7329
eed4bed
c93b936
a2d658d
d5e9116
8cb9b03
fd5da19
58f555f
6480d49
673ea72
2d18da7
49cb0db
278ad51
3e190ef
c5cc1fd
af977e5
5506d55
426e873
0c778b0
c64fe9d
b5c4898
c32f367
32d33fd
40e0559
d79ebc4
d2465ff
e401dc0
c4c0b89
864e1d2
c5cf943
875332c
022194a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,6 +36,9 @@ const ( | |
|
||
// NodeStateUnhealthy denotes a resource is unhealthy. | ||
NodeStateUnhealthy | ||
|
||
// NodeStateDisconnected denotes a resource is disconnected. | ||
NodeStateDisconnected | ||
Comment on lines
+39
to
+41
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. An alternative to defining a new state would be to mark disconnected resource nodes as "unhealthy" with an appropriate error message. See "Open questions" in the description of this PR for more details and revert this commit to see how this alternative approach would look. |
||
) | ||
|
||
// A GraphNode contains the current state of a resource. | ||
|
@@ -361,6 +364,13 @@ func (w *GraphNode) SetNeedsUpdate() { | |
w.setNeedsReconfigure(w.Config(), false, w.UnresolvedDependencies()) | ||
} | ||
|
||
// SetDisconnected is used to mark a remote node as disconnected. | ||
func (w *GraphNode) SetDisconnected() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIUC, in the resource graph we have:
Does this Second: I think the answers would work well in the documentation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
it applies to the remote resource objects. also note that resources representing full remote machine clients are excluded from the MachineStatus endpoint.
thanks for drawing attention to this. the flow I want is for disconnected nodes to go directly back into the ready state (unless they are unhealthy for some other reason). however, I don't think the current logic actually transitions disconnected nodes back to ready, so let me address that.
will do |
||
w.mu.Lock() | ||
defer w.mu.Unlock() | ||
w.transitionTo(NodeStateDisconnected) | ||
} | ||
|
||
// setUnresolvedDependencies sets names that are yet to be resolved as | ||
// dependencies for the node. Note that even an empty list will still | ||
// set needsDependencyResolution to true. If no resolution is needed, | ||
|
@@ -457,6 +467,7 @@ func (w *GraphNode) replace(other *GraphNode) error { | |
} | ||
|
||
func (w *GraphNode) canTransitionTo(state NodeState) bool { | ||
//nolint // TODO add NodeStateDisconnected if agreed to | ||
switch w.state { | ||
case NodeStateUnknown: | ||
case NodeStateUnconfigured: | ||
|
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the high-level change here is that a robot client now caches the results of |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -71,7 +71,7 @@ type RobotClient struct { | |
dialOptions []rpc.DialOption | ||
|
||
mu sync.RWMutex | ||
resourceNames []resource.Name | ||
cachedMachineStatus robot.MachineStatus | ||
resourceRPCAPIs []resource.RPCAPI | ||
resourceClients map[resource.Name]resource.Resource | ||
remoteNameMap map[resource.Name]resource.Name | ||
|
@@ -415,8 +415,8 @@ func (rc *RobotClient) connectWithLock(ctx context.Context) error { | |
func (rc *RobotClient) updateResourceClients(ctx context.Context) error { | ||
activeResources := make(map[resource.Name]bool) | ||
|
||
for _, name := range rc.resourceNames { | ||
activeResources[name] = true | ||
for _, rs := range rc.cachedMachineStatus.Resources { | ||
activeResources[rs.Name] = true | ||
} | ||
|
||
for resourceName, client := range rc.resourceClients { | ||
|
@@ -464,7 +464,7 @@ func (rc *RobotClient) checkConnection(ctx context.Context, checkEvery, reconnec | |
return err | ||
} | ||
} else { | ||
if _, _, err := rc.resources(ctx); err != nil { | ||
if _, _, err := rc.machineStatusAndRPCAPIs(ctx); err != nil { | ||
return err | ||
} | ||
} | ||
|
@@ -514,7 +514,8 @@ func (rc *RobotClient) checkConnection(ctx context.Context, checkEvery, reconnec | |
} | ||
} | ||
|
||
// Close closes the underlying client connections to the machine and stops any periodic tasks running in the client. | ||
// Close closes the underlying client connections to the machine and stops any | ||
// periodic tasks running in the client. | ||
// | ||
// err := machine.Close(ctx.Background()) | ||
func (rc *RobotClient) Close(ctx context.Context) error { | ||
|
@@ -586,8 +587,8 @@ func (rc *RobotClient) ResourceByName(name resource.Name) (resource.Resource, er | |
} | ||
|
||
// finally, before adding a new resource, make sure this name exists and is known | ||
for _, knownName := range rc.resourceNames { | ||
if name == knownName { | ||
for _, rs := range rc.cachedMachineStatus.Resources { | ||
if name == rs.Name { | ||
resourceClient, err := rc.createClient(name) | ||
if err != nil { | ||
return nil, err | ||
|
@@ -608,7 +609,7 @@ func (rc *RobotClient) createClient(name resource.Name) (resource.Resource, erro | |
return apiInfo.RPCClient(rc.backgroundCtx, &rc.conn, rc.remoteName, name, logger) | ||
} | ||
|
||
func (rc *RobotClient) resources(ctx context.Context) ([]resource.Name, []resource.RPCAPI, error) { | ||
func (rc *RobotClient) machineStatusAndRPCAPIs(ctx context.Context) (robot.MachineStatus, []resource.RPCAPI, error) { | ||
// RSDK-5356 If we are in a testing environment, never apply | ||
// defaultResourcesTimeout. Tests run in parallel, and if execution of a test | ||
// pauses for longer than 5s, below calls to ResourceNames or | ||
|
@@ -620,22 +621,29 @@ func (rc *RobotClient) resources(ctx context.Context) ([]resource.Name, []resour | |
defer cancel() | ||
} | ||
|
||
resp, err := rc.client.ResourceNames(ctx, &pb.ResourceNamesRequest{}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wait, are you changing robots to use machine status instead of resource names to populate remote resources? |
||
resp, err := rc.machineStatus(ctx) | ||
if err != nil { | ||
return nil, nil, err | ||
return robot.MachineStatus{}, nil, err | ||
} | ||
|
||
var resTypes []resource.RPCAPI | ||
|
||
resources := make([]resource.Name, 0, len(resp.Resources)) | ||
for _, name := range resp.Resources { | ||
newName := rprotoutils.ResourceNameFromProto(name) | ||
resources = append(resources, newName) | ||
mStatus := resp | ||
resources := make([]resource.Status, 0, len(resp.Resources)) | ||
for _, rs := range resp.Resources { | ||
if rs.Name.API == RemoteAPI { | ||
continue | ||
} | ||
if rs.Name.API.Type.Namespace == resource.APINamespaceRDKInternal { | ||
continue | ||
} | ||
resources = append(resources, rs) | ||
} | ||
mStatus.Resources = resources | ||
|
||
// resource has previously returned an unimplemented response, skip rpc call | ||
if rc.rpcSubtypesUnimplemented { | ||
return resources, resTypes, nil | ||
return mStatus, resTypes, nil | ||
} | ||
|
||
typesResp, err := rc.client.ResourceRPCSubtypes(ctx, &pb.ResourceRPCSubtypesRequest{}) | ||
|
@@ -656,7 +664,7 @@ func (rc *RobotClient) resources(ctx context.Context) ([]resource.Name, []resour | |
} | ||
svcDesc, ok := symDesc.(*desc.ServiceDescriptor) | ||
if !ok { | ||
return nil, nil, fmt.Errorf("expected descriptor to be service descriptor but got %T", symDesc) | ||
return robot.MachineStatus{}, nil, fmt.Errorf("expected descriptor to be service descriptor but got %T", symDesc) | ||
} | ||
resTypes = append(resTypes, resource.RPCAPI{ | ||
API: rprotoutils.ResourceNameFromProto(resAPI.Subtype).API, | ||
|
@@ -665,13 +673,13 @@ func (rc *RobotClient) resources(ctx context.Context) ([]resource.Name, []resour | |
} | ||
} else { | ||
if s, ok := status.FromError(err); !(ok && (s.Code() == codes.Unimplemented)) { | ||
return nil, nil, err | ||
return robot.MachineStatus{}, nil, err | ||
} | ||
// prevent future calls to ResourceRPCSubtypes | ||
rc.rpcSubtypesUnimplemented = true | ||
} | ||
|
||
return resources, resTypes, nil | ||
return mStatus, resTypes, nil | ||
} | ||
|
||
// Refresh manually updates the underlying parts of this machine. | ||
|
@@ -686,13 +694,12 @@ func (rc *RobotClient) Refresh(ctx context.Context) (err error) { | |
func (rc *RobotClient) updateResources(ctx context.Context) error { | ||
// call metadata service. | ||
|
||
names, rpcAPIs, err := rc.resources(ctx) | ||
mStatus, rpcAPIs, err := rc.machineStatusAndRPCAPIs(ctx) | ||
if err != nil && status.Code(err) != codes.Unimplemented { | ||
return fmt.Errorf("error updating resources: %w", err) | ||
} | ||
|
||
rc.resourceNames = make([]resource.Name, 0, len(names)) | ||
rc.resourceNames = append(rc.resourceNames, names...) | ||
rc.cachedMachineStatus = mStatus | ||
rc.resourceRPCAPIs = rpcAPIs | ||
|
||
rc.updateRemoteNameMap() | ||
|
@@ -703,17 +710,17 @@ func (rc *RobotClient) updateResources(ctx context.Context) error { | |
func (rc *RobotClient) updateRemoteNameMap() { | ||
tempMap := make(map[resource.Name]resource.Name) | ||
dupMap := make(map[resource.Name]bool) | ||
for _, n := range rc.resourceNames { | ||
if err := n.Validate(); err != nil { | ||
for _, rs := range rc.cachedMachineStatus.Resources { | ||
if err := rs.Name.Validate(); err != nil { | ||
rc.Logger().Error(err) | ||
continue | ||
} | ||
tempName := resource.RemoveRemoteName(n) | ||
tempName := resource.RemoveRemoteName(rs.Name) | ||
// If the short name already exists in the map then there is a collision and we make the long name empty. | ||
if _, ok := tempMap[tempName]; ok { | ||
dupMap[tempName] = true | ||
} else { | ||
tempMap[tempName] = n | ||
tempMap[tempName] = rs.Name | ||
} | ||
} | ||
for key := range dupMap { | ||
|
@@ -759,8 +766,10 @@ func (rc *RobotClient) ResourceNames() []resource.Name { | |
} | ||
rc.mu.RLock() | ||
defer rc.mu.RUnlock() | ||
names := make([]resource.Name, 0, len(rc.resourceNames)) | ||
names = append(names, rc.resourceNames...) | ||
names := make([]resource.Name, 0, len(rc.cachedMachineStatus.Resources)) | ||
for _, rs := range rc.cachedMachineStatus.Resources { | ||
names = append(names, rs.Name) | ||
} | ||
return names | ||
} | ||
|
||
|
@@ -1058,8 +1067,21 @@ func (rc *RobotClient) Shutdown(ctx context.Context) error { | |
return nil | ||
} | ||
|
||
// ErrDisconnected that a robot is disconnected. | ||
var ErrDisconnected = errors.New("disconnected") | ||
|
||
// MachineStatus returns the current status of the robot. | ||
func (rc *RobotClient) MachineStatus(ctx context.Context) (robot.MachineStatus, error) { | ||
if rc.checkConnected() != nil { | ||
return robot.MachineStatus{}, ErrDisconnected | ||
} | ||
|
||
rc.mu.Lock() | ||
defer rc.mu.Unlock() | ||
return rc.cachedMachineStatus, nil | ||
} | ||
|
||
func (rc *RobotClient) machineStatus(ctx context.Context) (robot.MachineStatus, error) { | ||
mStatus := robot.MachineStatus{} | ||
|
||
req := &pb.GetMachineStatusRequest{} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An alternative approach to the changes in this file is to define a new state (e.g.
NodeStateDisconnected
) - see the "Open questions" section in the PR description for more details and see the revert of this commit for a general idea of how this would look.