From d705548c89c088c5725ecb94b15ce492b3038381 Mon Sep 17 00:00:00 2001 From: Dan Gottlieb Date: Mon, 5 Aug 2024 13:31:18 -0400 Subject: [PATCH 1/2] RSDK-7843: Do not remove resources when a remote has a transient error. --- robot/client/client.go | 17 ++-- robot/impl/resource_manager.go | 46 ++++++---- robot/impl/resource_manager_test.go | 133 ++++++++++++++++++++++++++++ 3 files changed, 172 insertions(+), 24 deletions(-) diff --git a/robot/client/client.go b/robot/client/client.go index f4d9496cf6b..c73468e2b94 100644 --- a/robot/client/client.go +++ b/robot/client/client.go @@ -3,6 +3,7 @@ package client import ( "context" + "errors" "flag" "fmt" "io" @@ -15,7 +16,6 @@ import ( grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" "github.com/jhump/protoreflect/desc" "github.com/jhump/protoreflect/grpcreflect" - "github.com/pkg/errors" "go.uber.org/multierr" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -131,7 +131,7 @@ func isClosedPipeError(err error) bool { } func (rc *RobotClient) notConnectedToRemoteError() error { - return errors.Errorf("not connected to remote robot at %s", rc.address) + return fmt.Errorf("not connected to remote robot at %s", rc.address) } func (rc *RobotClient) handleUnaryDisconnect( @@ -346,7 +346,10 @@ func (rc *RobotClient) connectWithLock(ctx context.Context) error { if err := rc.conn.Close(); err != nil { return err } - conn, err := grpc.Dial(ctx, rc.address, rc.logger, rc.dialOptions...) + + cancelCtx, stop := context.WithTimeout(ctx, time.Second) + defer stop() + conn, err := grpc.Dial(cancelCtx, rc.address, rc.logger, rc.dialOptions...) if err != nil { return err } @@ -406,6 +409,7 @@ func (rc *RobotClient) checkConnection(ctx context.Context, checkEvery, reconnec return } } + rc.logger.Info("Waiting to recheck. WaitTime:", waitTime) if !utils.SelectContextOrWait(ctx, waitTime) { return } @@ -616,7 +620,7 @@ func (rc *RobotClient) resources(ctx context.Context) ([]resource.Name, []resour } svcDesc, ok := symDesc.(*desc.ServiceDescriptor) if !ok { - return nil, nil, errors.Errorf("expected descriptor to be service descriptor but got %T", symDesc) + return nil, nil, fmt.Errorf("expected descriptor to be service descriptor but got %T", symDesc) } resTypes = append(resTypes, resource.RPCAPI{ API: rprotoutils.ResourceNameFromProto(resAPI.Subtype).API, @@ -709,9 +713,8 @@ func (rc *RobotClient) PackageManager() packages.Manager { return nil } -// ResourceNames returns a list of all known resource names connected to this machine. -// -// resource_names := machine.ResourceNames() +// ResourceNames returns a list of all known resource names connected to this machine. If we do not +// have a connection to this robot, return nil. func (rc *RobotClient) ResourceNames() []resource.Name { if err := rc.checkConnected(); err != nil { rc.Logger().Errorw("failed to get remote resource names", "error", err.Error()) diff --git a/robot/impl/resource_manager.go b/robot/impl/resource_manager.go index d63201bbcf9..d626a03123b 100644 --- a/robot/impl/resource_manager.go +++ b/robot/impl/resource_manager.go @@ -185,29 +185,22 @@ func (manager *resourceManager) updateRemoteResourceNames( manager.logger.CDebugw(ctx, "updating remote resource names", "remote", remoteName) activeResourceNames := map[resource.Name]bool{} newResources := rr.ResourceNames() - oldResources := manager.remoteResourceNames(remoteName) - for _, res := range oldResources { + if newResources == nil { + // nil implies our connection to the remote is currently broken. Return without changing any + // state for this remote. + return false + } + + // Initialize a map with all existing resources as a map item. We will iterate through all of + // the `newResources` and set the map value for that item to true. Anything left with `false` at + // the end has been removed and will be marked for removal. + for _, res := range manager.remoteResourceNames(remoteName) { activeResourceNames[res] = false } anythingChanged := false - for _, resName := range newResources { remoteResName := resName - res, err := rr.ResourceByName(remoteResName) // this returns a remote known OR foreign resource client - if err != nil { - if errors.Is(err, client.ErrMissingClientRegistration) { - manager.logger.CDebugw(ctx, "couldn't obtain remote resource interface", - "name", remoteResName, - "reason", err) - } else { - manager.logger.CErrorw(ctx, "couldn't obtain remote resource interface", - "name", remoteResName, - "reason", err) - } - continue - } - resName = resName.PrependRemote(remoteName.Name) gNode, ok := manager.resources.Node(resName) @@ -239,6 +232,25 @@ func (manager *resourceManager) updateRemoteResourceNames( } } + // ResourceByName on the robot client will construct and return the client resource that + // should be added into the resource graph. This method can return an error if the + // connection to the remote has been lost. In this case, the resource has already been + // deemed "active" and will not be removed in the "mark for update" stage of updating remote + // resources. + res, err := rr.ResourceByName(remoteResName) // this returns a remote known OR foreign resource client + if err != nil { + if errors.Is(err, client.ErrMissingClientRegistration) { + manager.logger.CDebugw(ctx, "couldn't obtain remote resource interface", + "name", remoteResName, + "reason", err) + } else { + manager.logger.CErrorw(ctx, "couldn't obtain remote resource interface", + "name", remoteResName, + "reason", err) + } + continue + } + if ok { gNode.SwapResource(res, unknownModel) } else { diff --git a/robot/impl/resource_manager_test.go b/robot/impl/resource_manager_test.go index 7b521ab4122..a060a03458a 100644 --- a/robot/impl/resource_manager_test.go +++ b/robot/impl/resource_manager_test.go @@ -9,6 +9,7 @@ import ( "os" "sync" "testing" + "time" "github.com/google/go-cmp/cmp" "github.com/jhump/protoreflect/desc" @@ -59,6 +60,7 @@ import ( "go.viam.com/rdk/robot/client" "go.viam.com/rdk/robot/framesystem" "go.viam.com/rdk/robot/packages" + weboptions "go.viam.com/rdk/robot/web/options" "go.viam.com/rdk/services/motion" "go.viam.com/rdk/services/shell" "go.viam.com/rdk/services/vision" @@ -2071,3 +2073,134 @@ func TestReconfigureParity(t *testing.T) { testReconfigureParity(t, files[i], files[j]) } } + +// Consider a case where a main part viam-server is configured with a remote part viam-server. We +// want to ensure that once calling `ResourceNames` on the main part returns some remote resource -- +// that remote resource will always be returned until it is configured away. When either the remote +// robot removes it from its config. Or when the main part removes the remote. +func TestOfflineRemoteResources(t *testing.T) { + logger, _ := logging.NewObservedTestLogger(t) + ctx := context.Background() + + motorName := "remoteMotorFoo" + motorResourceName := resource.NewName(motor.API, motorName) + + remoteCfg := &config.Config{ + Components: []resource.Config{ + resource.Config{ + Name: motorName, + Model: resource.DefaultModelFamily.WithModel("fake"), + API: motor.API, + ConvertedAttributes: &fakemotor.Config{}, + }, + }, + } + + remoteRobot := setupLocalRobot(t, ctx, remoteCfg, logger.Sublogger("remote")) + remoteOptions, _, remoteAddr := robottestutils.CreateBaseOptionsAndListener(t) + err := remoteRobot.StartWeb(ctx, remoteOptions) + test.That(t, err, test.ShouldBeNil) + + // Set up a local main robot which is connected to the remote. + mainRobotCfg := &config.Config{ + Remotes: []config.Remote{ + config.Remote{ + Name: "remote", + Address: remoteAddr, + // These values dictate how quickly we'll observe the remote going offline. And how + // quickly we'll observe it coming back online. + ConnectionCheckInterval: 10 * time.Millisecond, + ReconnectInterval: 10 * time.Millisecond, + }, + }, + } + mainRobotI := setupLocalRobot(t, ctx, mainRobotCfg, logger.Sublogger("main")) + // We'll manually access the resource manager to move the test foward. + mainRobot := mainRobotI.(*localRobot) + mainOptions, _, mainAddr := robottestutils.CreateBaseOptionsAndListener(t) + mainRobot.StartWeb(ctx, mainOptions) + + // Create an "application" client to the robot. + mainClient, err := client.New(ctx, mainAddr, logger.Sublogger("client")) + test.That(t, err, test.ShouldBeNil) + defer mainClient.Close(ctx) + resourceNames := mainClient.ResourceNames() + + // When the `mainClient` requests `ResourceNames`, the motor will be annotated to include its + // remote. + motorResourceNameFromMain := motorResourceName.PrependRemote("remote") + // Search the list of "main" resources for the remote motor. Sanity check that we find it. + test.That(t, resourceNames, test.ShouldContain, motorResourceNameFromMain) + + // Grab the RobotClient resource graph node from the main robot that is connected to the + // remote. We'll use this to know when the main robot observes the remote has gone offline. + mainToRemoteClientRes, _ := mainRobot.RemoteByName("remote") + test.That(t, mainToRemoteClientRes, test.ShouldNotBeNil) + mainToRemoteClient := mainToRemoteClientRes.(*client.RobotClient) + test.That(t, mainToRemoteClient.Connected(), test.ShouldBeTrue) + + // Stop the remote's web server. Wait for the main robot to observe there's a connection problem. + logger.Info("Stopping web") + remoteRobot.StopWeb() + testutils.WaitForAssertion(t, func(tb testing.TB) { + tb.Helper() + test.That(tb, mainToRemoteClient.Connected(), test.ShouldBeFalse) + }) + + // Manually kick the resource manager to update remote resource names. + logger.Info("Updating remote resource names") + mainRobot.manager.updateRemotesResourceNames(logging.EnableDebugModeWithKey(ctx, "testOfflineRemoteResources.nodeOffline")) + + // The robot client keeps a cache of resource names. Manually refresh before re-asking the main + // robot what resources it hsa. + mainClient.Refresh(ctx) + resourceNames = mainClient.ResourceNames() + + // Scan again for the remote motor. Assert it still exists. + test.That(t, resourceNames, test.ShouldContain, motorResourceNameFromMain) + + // Restart the remote web server. We closed the old listener, so just pass in the web address as + // part of the web options. + logger.Info("Restarting web server") + err = remoteRobot.StartWeb(ctx, weboptions.Options{ + Network: config.NetworkConfig{ + NetworkConfigData: config.NetworkConfigData{ + BindAddress: remoteAddr, + }, + }, + }) + test.That(t, err, test.ShouldBeNil) + + // Wait until the main robot sees the remote is online. + testutils.WaitForAssertion(t, func(tb testing.TB) { + tb.Helper() + test.That(tb, mainToRemoteClient.Connected(), test.ShouldBeTrue) + }) + + // Again, manually refresh the list of resources to clear the cache. Assert the remote motor + // still exists. + mainToRemoteClient.Refresh(logging.EnableDebugModeWithKey(ctx, "refresh")) + test.That(t, resourceNames, test.ShouldContain, motorResourceNameFromMain) + + // Reconfigure away the motor on the remote robot. + remoteCfg.Components = []resource.Config{} + remoteRobot.Reconfigure(ctx, remoteCfg) + + // Assert the main robot's client object eventually observes that the motor is no longer a + // component. + testutils.WaitForAssertion(t, func(tb testing.TB) { + tb.Helper() + mainToRemoteClient.Refresh(ctx) + resourceNames := mainToRemoteClient.ResourceNames() + test.That(t, resourceNames, test.ShouldNotContain, motorResourceNameFromMain) + }) + + // Manually update remote resource names. Knowing the robot client servicing the information has + // the updated view. + mainRobot.manager.updateRemotesResourceNames(ctx) + + // Force a refresh of resource names on the application client connection. Assert the motor no + // longer appears. + mainClient.Refresh(ctx) + test.That(t, resourceNames, test.ShouldNotContain, mainClient.ResourceNames()) +} From 677f829e97d6b8386cd6ef07d205ded3ac3f6811 Mon Sep 17 00:00:00 2001 From: Dan Gottlieb Date: Tue, 6 Aug 2024 15:37:31 -0400 Subject: [PATCH 2/2] Revert dev code --- robot/client/client.go | 5 +---- robot/impl/resource_manager_test.go | 5 +++-- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/robot/client/client.go b/robot/client/client.go index c73468e2b94..a684ea2e4da 100644 --- a/robot/client/client.go +++ b/robot/client/client.go @@ -347,9 +347,7 @@ func (rc *RobotClient) connectWithLock(ctx context.Context) error { return err } - cancelCtx, stop := context.WithTimeout(ctx, time.Second) - defer stop() - conn, err := grpc.Dial(cancelCtx, rc.address, rc.logger, rc.dialOptions...) + conn, err := grpc.Dial(ctx, rc.address, rc.logger, rc.dialOptions...) if err != nil { return err } @@ -409,7 +407,6 @@ func (rc *RobotClient) checkConnection(ctx context.Context, checkEvery, reconnec return } } - rc.logger.Info("Waiting to recheck. WaitTime:", waitTime) if !utils.SelectContextOrWait(ctx, waitTime) { return } diff --git a/robot/impl/resource_manager_test.go b/robot/impl/resource_manager_test.go index a060a03458a..250b4b1fc30 100644 --- a/robot/impl/resource_manager_test.go +++ b/robot/impl/resource_manager_test.go @@ -2171,8 +2171,9 @@ func TestOfflineRemoteResources(t *testing.T) { }) test.That(t, err, test.ShouldBeNil) - // Wait until the main robot sees the remote is online. - testutils.WaitForAssertion(t, func(tb testing.TB) { + // Wait until the main robot sees the remote is online. This gets stuck behind a 10 second dial + // timeout. So we must manually increase the time we're willing to wait. + testutils.WaitForAssertionWithSleep(t, 50*time.Millisecond, 1000, func(tb testing.TB) { tb.Helper() test.That(tb, mainToRemoteClient.Connected(), test.ShouldBeTrue) })