Skip to content

Commit

Permalink
RSDK-6925 load resources in parallel (viamrobotics#3707)
Browse files Browse the repository at this point in the history
Co-authored-by: Dan Gottlieb <[email protected]>
  • Loading branch information
maximpertsov and dgottlieb authored May 21, 2024
1 parent a1f5c08 commit c2cc3ff
Show file tree
Hide file tree
Showing 11 changed files with 470 additions and 175 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ require (
goji.io v2.0.2+incompatible
golang.org/x/image v0.15.0
golang.org/x/mobile v0.0.0-20240112133503-c713f31d574b
golang.org/x/sync v0.6.0
golang.org/x/sys v0.20.0
golang.org/x/term v0.20.0
golang.org/x/time v0.3.0
Expand Down Expand Up @@ -372,7 +373,6 @@ require (
golang.org/x/mod v0.14.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/oauth2 v0.10.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/text v0.15.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/api v0.126.0 // indirect
Expand Down
158 changes: 158 additions & 0 deletions module/concurrent_reconfigure_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package module_test

import (
"context"
"fmt"
"testing"
"time"

"go.viam.com/test"

"go.viam.com/rdk/components/generic"
"go.viam.com/rdk/config"
"go.viam.com/rdk/logging"
"go.viam.com/rdk/resource"
"go.viam.com/rdk/robot"
robotimpl "go.viam.com/rdk/robot/impl"
)

func setupTestRobotWithModules(
t *testing.T,
ctx context.Context,
logger logging.Logger,
) (*config.Config, robot.LocalRobot) {
cfg := &config.Config{
Modules: []config.Module{
{
Name: "TestModule",
ExePath: "testmodule/run.sh",
},
},
}
rob, err := robotimpl.New(ctx, cfg, logger)
test.That(t, err, test.ShouldBeNil)
t.Cleanup(func() {
test.That(t, rob.Close(ctx), test.ShouldBeNil)
})
return cfg, rob
}

func TestConcurrentReconfiguration(t *testing.T) {
ctx := context.Background()
logger := logging.NewTestLogger(t)

t.Run("no dependencies", func(t *testing.T) {
cfg, rob := setupTestRobotWithModules(t, ctx, logger)

// Add resources to the config
const resCount = 10
for i := 1; i <= resCount; i++ {
cfg.Components = append(cfg.Components,
resource.Config{
Name: fmt.Sprintf("slow%d", i),
Model: resource.NewModel("rdk", "test", "slow"),
API: generic.API,
},
)
}
// We configure the robot with 10 resources where each resource takes 1s to
// construct. Since we construct resources concurrently, it should take much less
// than 10s.
start := time.Now()
rob.Reconfigure(ctx, cfg)
duration := time.Since(start)

// This threshold is somewhat arbitrary and can be adjusted upward if this test
// starts to flake.
const threshold = 3 * time.Second
test.That(t, duration, test.ShouldBeLessThan, threshold)

// This threshold is absolute and should not be adjusted.
const absThreshold = 10 * time.Second
test.That(t, duration, test.ShouldBeLessThan, absThreshold)

// Assert that all resources were added.
var err error
for i := 1; i <= resCount; i++ {
_, err = rob.ResourceByName(generic.Named(fmt.Sprintf("slow%d", i)))
test.That(t, err, test.ShouldBeNil)
}

// Rename resources in config
cfg.Components = nil
for i := 1; i <= resCount; i++ {
// rename components
cfg.Components = append(cfg.Components,
resource.Config{
Name: fmt.Sprintf("slow-%d", i),
Model: resource.NewModel("rdk", "test", "slow"),
API: generic.API,
},
)
}

// We renamed each resource to trigger a reconfiguration - this should happen
// concurrently and take much less than 10s.
start = time.Now()
rob.Reconfigure(context.Background(), cfg)
duration = time.Since(start)

test.That(t, duration, test.ShouldBeLessThan, threshold)
test.That(t, duration, test.ShouldBeLessThan, absThreshold)

// Assert that all resources were reconfigured.
for i := 1; i <= resCount; i++ {
_, err = rob.ResourceByName(generic.Named(fmt.Sprintf("slow%d", i)))
test.That(t, err, test.ShouldNotBeNil)
_, err = rob.ResourceByName(generic.Named(fmt.Sprintf("slow-%d", i)))
test.That(t, err, test.ShouldBeNil)
}
})

t.Run("with dependencies", func(t *testing.T) {
cfg, rob := setupTestRobotWithModules(t, ctx, logger)
const resCount = 10
for i := 1; i <= resCount; i++ {
if i%2 == 1 {
cfg.Components = append(cfg.Components,
resource.Config{
Name: fmt.Sprintf("slow%d", i),
Model: resource.NewModel("rdk", "test", "slow"),
API: generic.API,
},
)
} else {
cfg.Components = append(cfg.Components,
resource.Config{
Name: fmt.Sprintf("slow%d", i),
Model: resource.NewModel("rdk", "test", "slow"),
API: generic.API,
DependsOn: []string{fmt.Sprintf("slow%d", i-1)},
},
)
}
}

// We configure the robot with 10 resources where each resource takes 1s to
// construct. This resource config has 2 levels of dependencies and resources are
// constructed concurrently within each level. Therefore it should take more than
// 2s to reconfigure but much less than 10s.
start := time.Now()
rob.Reconfigure(context.Background(), cfg)
duration := time.Since(start)

test.That(t, duration, test.ShouldBeGreaterThanOrEqualTo, 2*time.Second)
// This threshold is somewhat arbitrary and can be adjusted upward if this test
// starts to flake.
test.That(t, duration, test.ShouldBeLessThan, 5*time.Second)
// This threshold is absolute and should not be adjusted.
test.That(t, duration, test.ShouldBeLessThan, 10*time.Second)

// Assert that all resources were added.
var err error
for i := 1; i <= resCount; i++ {
_, err = rob.ResourceByName(generic.Named(fmt.Sprintf("slow%d", i)))
test.That(t, err, test.ShouldBeNil)
}
})
}
23 changes: 22 additions & 1 deletion module/modmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ type module struct {
client pb.ModuleServiceClient
addr string
resources map[resource.Name]*addedResource
// resourcesMu must be held if the `resources` field is accessed without
// write-locking the module manager.
resourcesMu sync.Mutex

// pendingRemoval allows delaying module close until after resources within it are closed
pendingRemoval bool
Expand Down Expand Up @@ -459,6 +462,12 @@ func (mgr *Manager) closeModule(mod *module, reconfigure bool) error {

// AddResource tells a component module to configure a new component.
func (mgr *Manager) AddResource(ctx context.Context, conf resource.Config, deps []string) (resource.Resource, error) {
mgr.mu.RLock()
defer mgr.mu.RUnlock()
return mgr.addResource(ctx, conf, deps)
}

func (mgr *Manager) addResourceWithWriteLock(ctx context.Context, conf resource.Config, deps []string) (resource.Resource, error) {
mgr.mu.Lock()
defer mgr.mu.Unlock()
return mgr.addResource(ctx, conf, deps)
Expand All @@ -482,6 +491,9 @@ func (mgr *Manager) addResource(ctx context.Context, conf resource.Config, deps
return nil, err
}
mgr.rMap.Store(conf.ResourceName(), mod)

mod.resourcesMu.Lock()
defer mod.resourcesMu.Unlock()
mod.resources[conf.ResourceName()] = &addedResource{conf, deps}

apiInfo, ok := resource.LookupGenericAPIRegistration(conf.API)
Expand Down Expand Up @@ -512,6 +524,8 @@ func (mgr *Manager) ReconfigureResource(ctx context.Context, conf resource.Confi
if err != nil {
return err
}
mod.resourcesMu.Lock()
defer mod.resourcesMu.Unlock()
mod.resources[conf.ResourceName()] = &addedResource{conf, deps}

return nil
Expand Down Expand Up @@ -820,11 +834,18 @@ func (mgr *Manager) newOnUnexpectedExitHandler(mod *module) func(exitCode int) b
// Finally, handle orphaned resources.
var orphanedResourceNames []resource.Name
for name, res := range mod.resources {
if _, err := mgr.AddResource(mgr.restartCtx, res.conf, res.deps); err != nil {
// The `addResource` method might still be executing for this resource with a
// read lock, so we execute it here with a write lock to make sure it doesn't
// run concurrently.
if _, err := mgr.addResourceWithWriteLock(mgr.restartCtx, res.conf, res.deps); err != nil {
mgr.logger.Warnw("Error while re-adding resource to module",
"resource", name, "module", mod.cfg.Name, "error", err)
mgr.rMap.Delete(name)

mod.resourcesMu.Lock()
delete(mod.resources, name)
mod.resourcesMu.Unlock()

orphanedResourceNames = append(orphanedResourceNames, name)
}
}
Expand Down
Loading

0 comments on commit c2cc3ff

Please sign in to comment.