diff --git a/robot/client/client.go b/robot/client/client.go index f4d9496cf6b..a684ea2e4da 100644 --- a/robot/client/client.go +++ b/robot/client/client.go @@ -3,6 +3,7 @@ package client import ( "context" + "errors" "flag" "fmt" "io" @@ -15,7 +16,6 @@ import ( grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" "github.com/jhump/protoreflect/desc" "github.com/jhump/protoreflect/grpcreflect" - "github.com/pkg/errors" "go.uber.org/multierr" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -131,7 +131,7 @@ func isClosedPipeError(err error) bool { } func (rc *RobotClient) notConnectedToRemoteError() error { - return errors.Errorf("not connected to remote robot at %s", rc.address) + return fmt.Errorf("not connected to remote robot at %s", rc.address) } func (rc *RobotClient) handleUnaryDisconnect( @@ -346,6 +346,7 @@ func (rc *RobotClient) connectWithLock(ctx context.Context) error { if err := rc.conn.Close(); err != nil { return err } + conn, err := grpc.Dial(ctx, rc.address, rc.logger, rc.dialOptions...) if err != nil { return err @@ -616,7 +617,7 @@ func (rc *RobotClient) resources(ctx context.Context) ([]resource.Name, []resour } svcDesc, ok := symDesc.(*desc.ServiceDescriptor) if !ok { - return nil, nil, errors.Errorf("expected descriptor to be service descriptor but got %T", symDesc) + return nil, nil, fmt.Errorf("expected descriptor to be service descriptor but got %T", symDesc) } resTypes = append(resTypes, resource.RPCAPI{ API: rprotoutils.ResourceNameFromProto(resAPI.Subtype).API, @@ -709,9 +710,8 @@ func (rc *RobotClient) PackageManager() packages.Manager { return nil } -// ResourceNames returns a list of all known resource names connected to this machine. -// -// resource_names := machine.ResourceNames() +// ResourceNames returns a list of all known resource names connected to this machine. If we do not +// have a connection to this robot, return nil. func (rc *RobotClient) ResourceNames() []resource.Name { if err := rc.checkConnected(); err != nil { rc.Logger().Errorw("failed to get remote resource names", "error", err.Error()) diff --git a/robot/impl/resource_manager.go b/robot/impl/resource_manager.go index d63201bbcf9..d626a03123b 100644 --- a/robot/impl/resource_manager.go +++ b/robot/impl/resource_manager.go @@ -185,29 +185,22 @@ func (manager *resourceManager) updateRemoteResourceNames( manager.logger.CDebugw(ctx, "updating remote resource names", "remote", remoteName) activeResourceNames := map[resource.Name]bool{} newResources := rr.ResourceNames() - oldResources := manager.remoteResourceNames(remoteName) - for _, res := range oldResources { + if newResources == nil { + // nil implies our connection to the remote is currently broken. Return without changing any + // state for this remote. + return false + } + + // Initialize a map with all existing resources as a map item. We will iterate through all of + // the `newResources` and set the map value for that item to true. Anything left with `false` at + // the end has been removed and will be marked for removal. + for _, res := range manager.remoteResourceNames(remoteName) { activeResourceNames[res] = false } anythingChanged := false - for _, resName := range newResources { remoteResName := resName - res, err := rr.ResourceByName(remoteResName) // this returns a remote known OR foreign resource client - if err != nil { - if errors.Is(err, client.ErrMissingClientRegistration) { - manager.logger.CDebugw(ctx, "couldn't obtain remote resource interface", - "name", remoteResName, - "reason", err) - } else { - manager.logger.CErrorw(ctx, "couldn't obtain remote resource interface", - "name", remoteResName, - "reason", err) - } - continue - } - resName = resName.PrependRemote(remoteName.Name) gNode, ok := manager.resources.Node(resName) @@ -239,6 +232,25 @@ func (manager *resourceManager) updateRemoteResourceNames( } } + // ResourceByName on the robot client will construct and return the client resource that + // should be added into the resource graph. This method can return an error if the + // connection to the remote has been lost. In this case, the resource has already been + // deemed "active" and will not be removed in the "mark for update" stage of updating remote + // resources. + res, err := rr.ResourceByName(remoteResName) // this returns a remote known OR foreign resource client + if err != nil { + if errors.Is(err, client.ErrMissingClientRegistration) { + manager.logger.CDebugw(ctx, "couldn't obtain remote resource interface", + "name", remoteResName, + "reason", err) + } else { + manager.logger.CErrorw(ctx, "couldn't obtain remote resource interface", + "name", remoteResName, + "reason", err) + } + continue + } + if ok { gNode.SwapResource(res, unknownModel) } else { diff --git a/robot/impl/resource_manager_test.go b/robot/impl/resource_manager_test.go index 7b521ab4122..250b4b1fc30 100644 --- a/robot/impl/resource_manager_test.go +++ b/robot/impl/resource_manager_test.go @@ -9,6 +9,7 @@ import ( "os" "sync" "testing" + "time" "github.com/google/go-cmp/cmp" "github.com/jhump/protoreflect/desc" @@ -59,6 +60,7 @@ import ( "go.viam.com/rdk/robot/client" "go.viam.com/rdk/robot/framesystem" "go.viam.com/rdk/robot/packages" + weboptions "go.viam.com/rdk/robot/web/options" "go.viam.com/rdk/services/motion" "go.viam.com/rdk/services/shell" "go.viam.com/rdk/services/vision" @@ -2071,3 +2073,135 @@ func TestReconfigureParity(t *testing.T) { testReconfigureParity(t, files[i], files[j]) } } + +// Consider a case where a main part viam-server is configured with a remote part viam-server. We +// want to ensure that once calling `ResourceNames` on the main part returns some remote resource -- +// that remote resource will always be returned until it is configured away. When either the remote +// robot removes it from its config. Or when the main part removes the remote. +func TestOfflineRemoteResources(t *testing.T) { + logger, _ := logging.NewObservedTestLogger(t) + ctx := context.Background() + + motorName := "remoteMotorFoo" + motorResourceName := resource.NewName(motor.API, motorName) + + remoteCfg := &config.Config{ + Components: []resource.Config{ + resource.Config{ + Name: motorName, + Model: resource.DefaultModelFamily.WithModel("fake"), + API: motor.API, + ConvertedAttributes: &fakemotor.Config{}, + }, + }, + } + + remoteRobot := setupLocalRobot(t, ctx, remoteCfg, logger.Sublogger("remote")) + remoteOptions, _, remoteAddr := robottestutils.CreateBaseOptionsAndListener(t) + err := remoteRobot.StartWeb(ctx, remoteOptions) + test.That(t, err, test.ShouldBeNil) + + // Set up a local main robot which is connected to the remote. + mainRobotCfg := &config.Config{ + Remotes: []config.Remote{ + config.Remote{ + Name: "remote", + Address: remoteAddr, + // These values dictate how quickly we'll observe the remote going offline. And how + // quickly we'll observe it coming back online. + ConnectionCheckInterval: 10 * time.Millisecond, + ReconnectInterval: 10 * time.Millisecond, + }, + }, + } + mainRobotI := setupLocalRobot(t, ctx, mainRobotCfg, logger.Sublogger("main")) + // We'll manually access the resource manager to move the test foward. + mainRobot := mainRobotI.(*localRobot) + mainOptions, _, mainAddr := robottestutils.CreateBaseOptionsAndListener(t) + mainRobot.StartWeb(ctx, mainOptions) + + // Create an "application" client to the robot. + mainClient, err := client.New(ctx, mainAddr, logger.Sublogger("client")) + test.That(t, err, test.ShouldBeNil) + defer mainClient.Close(ctx) + resourceNames := mainClient.ResourceNames() + + // When the `mainClient` requests `ResourceNames`, the motor will be annotated to include its + // remote. + motorResourceNameFromMain := motorResourceName.PrependRemote("remote") + // Search the list of "main" resources for the remote motor. Sanity check that we find it. + test.That(t, resourceNames, test.ShouldContain, motorResourceNameFromMain) + + // Grab the RobotClient resource graph node from the main robot that is connected to the + // remote. We'll use this to know when the main robot observes the remote has gone offline. + mainToRemoteClientRes, _ := mainRobot.RemoteByName("remote") + test.That(t, mainToRemoteClientRes, test.ShouldNotBeNil) + mainToRemoteClient := mainToRemoteClientRes.(*client.RobotClient) + test.That(t, mainToRemoteClient.Connected(), test.ShouldBeTrue) + + // Stop the remote's web server. Wait for the main robot to observe there's a connection problem. + logger.Info("Stopping web") + remoteRobot.StopWeb() + testutils.WaitForAssertion(t, func(tb testing.TB) { + tb.Helper() + test.That(tb, mainToRemoteClient.Connected(), test.ShouldBeFalse) + }) + + // Manually kick the resource manager to update remote resource names. + logger.Info("Updating remote resource names") + mainRobot.manager.updateRemotesResourceNames(logging.EnableDebugModeWithKey(ctx, "testOfflineRemoteResources.nodeOffline")) + + // The robot client keeps a cache of resource names. Manually refresh before re-asking the main + // robot what resources it hsa. + mainClient.Refresh(ctx) + resourceNames = mainClient.ResourceNames() + + // Scan again for the remote motor. Assert it still exists. + test.That(t, resourceNames, test.ShouldContain, motorResourceNameFromMain) + + // Restart the remote web server. We closed the old listener, so just pass in the web address as + // part of the web options. + logger.Info("Restarting web server") + err = remoteRobot.StartWeb(ctx, weboptions.Options{ + Network: config.NetworkConfig{ + NetworkConfigData: config.NetworkConfigData{ + BindAddress: remoteAddr, + }, + }, + }) + test.That(t, err, test.ShouldBeNil) + + // Wait until the main robot sees the remote is online. This gets stuck behind a 10 second dial + // timeout. So we must manually increase the time we're willing to wait. + testutils.WaitForAssertionWithSleep(t, 50*time.Millisecond, 1000, func(tb testing.TB) { + tb.Helper() + test.That(tb, mainToRemoteClient.Connected(), test.ShouldBeTrue) + }) + + // Again, manually refresh the list of resources to clear the cache. Assert the remote motor + // still exists. + mainToRemoteClient.Refresh(logging.EnableDebugModeWithKey(ctx, "refresh")) + test.That(t, resourceNames, test.ShouldContain, motorResourceNameFromMain) + + // Reconfigure away the motor on the remote robot. + remoteCfg.Components = []resource.Config{} + remoteRobot.Reconfigure(ctx, remoteCfg) + + // Assert the main robot's client object eventually observes that the motor is no longer a + // component. + testutils.WaitForAssertion(t, func(tb testing.TB) { + tb.Helper() + mainToRemoteClient.Refresh(ctx) + resourceNames := mainToRemoteClient.ResourceNames() + test.That(t, resourceNames, test.ShouldNotContain, motorResourceNameFromMain) + }) + + // Manually update remote resource names. Knowing the robot client servicing the information has + // the updated view. + mainRobot.manager.updateRemotesResourceNames(ctx) + + // Force a refresh of resource names on the application client connection. Assert the motor no + // longer appears. + mainClient.Refresh(ctx) + test.That(t, resourceNames, test.ShouldNotContain, mainClient.ResourceNames()) +}