Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RSDK-7843: Do not remove resources when a remote has a transient error. #4268

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions robot/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package client

import (
"context"
"errors"
"flag"
"fmt"
"io"
Expand All @@ -15,7 +16,6 @@ import (
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/grpcreflect"
"github.com/pkg/errors"
"go.uber.org/multierr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -131,7 +131,7 @@ func isClosedPipeError(err error) bool {
}

func (rc *RobotClient) notConnectedToRemoteError() error {
return errors.Errorf("not connected to remote robot at %s", rc.address)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This usage adds a stack trace to the error. I found them to be an eyesore and not at all helpful.

return fmt.Errorf("not connected to remote robot at %s", rc.address)
}

func (rc *RobotClient) handleUnaryDisconnect(
Expand Down Expand Up @@ -346,6 +346,7 @@ func (rc *RobotClient) connectWithLock(ctx context.Context) error {
if err := rc.conn.Close(); err != nil {
return err
}

conn, err := grpc.Dial(ctx, rc.address, rc.logger, rc.dialOptions...)
if err != nil {
return err
Expand Down Expand Up @@ -616,7 +617,7 @@ func (rc *RobotClient) resources(ctx context.Context) ([]resource.Name, []resour
}
svcDesc, ok := symDesc.(*desc.ServiceDescriptor)
if !ok {
return nil, nil, errors.Errorf("expected descriptor to be service descriptor but got %T", symDesc)
return nil, nil, fmt.Errorf("expected descriptor to be service descriptor but got %T", symDesc)
}
resTypes = append(resTypes, resource.RPCAPI{
API: rprotoutils.ResourceNameFromProto(resAPI.Subtype).API,
Expand Down Expand Up @@ -709,9 +710,8 @@ func (rc *RobotClient) PackageManager() packages.Manager {
return nil
}

// ResourceNames returns a list of all known resource names connected to this machine.
//
// resource_names := machine.ResourceNames()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't know if this served a purpose -- happy to reintroduce if there's some magic documentation thing going on here

// ResourceNames returns a list of all known resource names connected to this machine. If we do not
// have a connection to this robot, return nil.
func (rc *RobotClient) ResourceNames() []resource.Name {
if err := rc.checkConnected(); err != nil {
rc.Logger().Errorw("failed to get remote resource names", "error", err.Error())
Expand Down
46 changes: 29 additions & 17 deletions robot/impl/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,29 +185,22 @@ func (manager *resourceManager) updateRemoteResourceNames(
manager.logger.CDebugw(ctx, "updating remote resource names", "remote", remoteName)
activeResourceNames := map[resource.Name]bool{}
newResources := rr.ResourceNames()
oldResources := manager.remoteResourceNames(remoteName)
for _, res := range oldResources {
if newResources == nil {
// nil implies our connection to the remote is currently broken. Return without changing any
// state for this remote.
return false
}

// Initialize a map with all existing resources as a map item. We will iterate through all of
// the `newResources` and set the map value for that item to true. Anything left with `false` at
// the end has been removed and will be marked for removal.
for _, res := range manager.remoteResourceNames(remoteName) {
activeResourceNames[res] = false
}

anythingChanged := false

for _, resName := range newResources {
remoteResName := resName
res, err := rr.ResourceByName(remoteResName) // this returns a remote known OR foreign resource client
Copy link
Member Author

@dgottlieb dgottlieb Aug 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing changed here. Just code movement. What's important is this is now below oldline 215/newline 208. I hope the comment I added to the new code outlines the significance.

There is no test coverage of the error case here.

if err != nil {
if errors.Is(err, client.ErrMissingClientRegistration) {
manager.logger.CDebugw(ctx, "couldn't obtain remote resource interface",
"name", remoteResName,
"reason", err)
} else {
manager.logger.CErrorw(ctx, "couldn't obtain remote resource interface",
"name", remoteResName,
"reason", err)
}
continue
}

resName = resName.PrependRemote(remoteName.Name)
gNode, ok := manager.resources.Node(resName)

Expand Down Expand Up @@ -239,6 +232,25 @@ func (manager *resourceManager) updateRemoteResourceNames(
}
}

// ResourceByName on the robot client will construct and return the client resource that
// should be added into the resource graph. This method can return an error if the
// connection to the remote has been lost. In this case, the resource has already been
// deemed "active" and will not be removed in the "mark for update" stage of updating remote
// resources.
res, err := rr.ResourceByName(remoteResName) // this returns a remote known OR foreign resource client
if err != nil {
if errors.Is(err, client.ErrMissingClientRegistration) {
manager.logger.CDebugw(ctx, "couldn't obtain remote resource interface",
"name", remoteResName,
"reason", err)
} else {
manager.logger.CErrorw(ctx, "couldn't obtain remote resource interface",
"name", remoteResName,
"reason", err)
}
continue
}

if ok {
gNode.SwapResource(res, unknownModel)
} else {
Expand Down
134 changes: 134 additions & 0 deletions robot/impl/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"sync"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/jhump/protoreflect/desc"
Expand Down Expand Up @@ -59,6 +60,7 @@ import (
"go.viam.com/rdk/robot/client"
"go.viam.com/rdk/robot/framesystem"
"go.viam.com/rdk/robot/packages"
weboptions "go.viam.com/rdk/robot/web/options"
"go.viam.com/rdk/services/motion"
"go.viam.com/rdk/services/shell"
"go.viam.com/rdk/services/vision"
Expand Down Expand Up @@ -2071,3 +2073,135 @@ func TestReconfigureParity(t *testing.T) {
testReconfigureParity(t, files[i], files[j])
}
}

// Consider a case where a main part viam-server is configured with a remote part viam-server. We
// want to ensure that once calling `ResourceNames` on the main part returns some remote resource --
// that remote resource will always be returned until it is configured away. When either the remote
// robot removes it from its config. Or when the main part removes the remote.
func TestOfflineRemoteResources(t *testing.T) {
logger, _ := logging.NewObservedTestLogger(t)
ctx := context.Background()

motorName := "remoteMotorFoo"
motorResourceName := resource.NewName(motor.API, motorName)

remoteCfg := &config.Config{
Components: []resource.Config{
resource.Config{
Name: motorName,
Model: resource.DefaultModelFamily.WithModel("fake"),
API: motor.API,
ConvertedAttributes: &fakemotor.Config{},
},
},
}

remoteRobot := setupLocalRobot(t, ctx, remoteCfg, logger.Sublogger("remote"))
remoteOptions, _, remoteAddr := robottestutils.CreateBaseOptionsAndListener(t)
err := remoteRobot.StartWeb(ctx, remoteOptions)
test.That(t, err, test.ShouldBeNil)

// Set up a local main robot which is connected to the remote.
mainRobotCfg := &config.Config{
Remotes: []config.Remote{
config.Remote{
Name: "remote",
Address: remoteAddr,
// These values dictate how quickly we'll observe the remote going offline. And how
// quickly we'll observe it coming back online.
ConnectionCheckInterval: 10 * time.Millisecond,
ReconnectInterval: 10 * time.Millisecond,
},
},
}
mainRobotI := setupLocalRobot(t, ctx, mainRobotCfg, logger.Sublogger("main"))
// We'll manually access the resource manager to move the test foward.
mainRobot := mainRobotI.(*localRobot)
mainOptions, _, mainAddr := robottestutils.CreateBaseOptionsAndListener(t)
mainRobot.StartWeb(ctx, mainOptions)

// Create an "application" client to the robot.
mainClient, err := client.New(ctx, mainAddr, logger.Sublogger("client"))
test.That(t, err, test.ShouldBeNil)
defer mainClient.Close(ctx)
resourceNames := mainClient.ResourceNames()

// When the `mainClient` requests `ResourceNames`, the motor will be annotated to include its
// remote.
motorResourceNameFromMain := motorResourceName.PrependRemote("remote")
// Search the list of "main" resources for the remote motor. Sanity check that we find it.
test.That(t, resourceNames, test.ShouldContain, motorResourceNameFromMain)

// Grab the RobotClient resource graph node from the main robot that is connected to the
// remote. We'll use this to know when the main robot observes the remote has gone offline.
mainToRemoteClientRes, _ := mainRobot.RemoteByName("remote")
test.That(t, mainToRemoteClientRes, test.ShouldNotBeNil)
mainToRemoteClient := mainToRemoteClientRes.(*client.RobotClient)
test.That(t, mainToRemoteClient.Connected(), test.ShouldBeTrue)

// Stop the remote's web server. Wait for the main robot to observe there's a connection problem.
logger.Info("Stopping web")
remoteRobot.StopWeb()
testutils.WaitForAssertion(t, func(tb testing.TB) {
tb.Helper()
test.That(tb, mainToRemoteClient.Connected(), test.ShouldBeFalse)
})

// Manually kick the resource manager to update remote resource names.
logger.Info("Updating remote resource names")
mainRobot.manager.updateRemotesResourceNames(logging.EnableDebugModeWithKey(ctx, "testOfflineRemoteResources.nodeOffline"))

// The robot client keeps a cache of resource names. Manually refresh before re-asking the main
// robot what resources it hsa.
mainClient.Refresh(ctx)
resourceNames = mainClient.ResourceNames()

// Scan again for the remote motor. Assert it still exists.
test.That(t, resourceNames, test.ShouldContain, motorResourceNameFromMain)

// Restart the remote web server. We closed the old listener, so just pass in the web address as
// part of the web options.
logger.Info("Restarting web server")
err = remoteRobot.StartWeb(ctx, weboptions.Options{
Network: config.NetworkConfig{
NetworkConfigData: config.NetworkConfigData{
BindAddress: remoteAddr,
},
},
})
test.That(t, err, test.ShouldBeNil)

// Wait until the main robot sees the remote is online. This gets stuck behind a 10 second dial
// timeout. So we must manually increase the time we're willing to wait.
testutils.WaitForAssertionWithSleep(t, 50*time.Millisecond, 1000, func(tb testing.TB) {
tb.Helper()
test.That(tb, mainToRemoteClient.Connected(), test.ShouldBeTrue)
})

// Again, manually refresh the list of resources to clear the cache. Assert the remote motor
// still exists.
mainToRemoteClient.Refresh(logging.EnableDebugModeWithKey(ctx, "refresh"))
test.That(t, resourceNames, test.ShouldContain, motorResourceNameFromMain)

// Reconfigure away the motor on the remote robot.
remoteCfg.Components = []resource.Config{}
remoteRobot.Reconfigure(ctx, remoteCfg)

// Assert the main robot's client object eventually observes that the motor is no longer a
// component.
testutils.WaitForAssertion(t, func(tb testing.TB) {
tb.Helper()
mainToRemoteClient.Refresh(ctx)
resourceNames := mainToRemoteClient.ResourceNames()
test.That(t, resourceNames, test.ShouldNotContain, motorResourceNameFromMain)
})

// Manually update remote resource names. Knowing the robot client servicing the information has
// the updated view.
mainRobot.manager.updateRemotesResourceNames(ctx)

// Force a refresh of resource names on the application client connection. Assert the motor no
// longer appears.
mainClient.Refresh(ctx)
test.That(t, resourceNames, test.ShouldNotContain, mainClient.ResourceNames())
}
Loading