Skip to content

Commit

Permalink
Merge pull request #913 from filecoin-project/899-concurrent-gpus
Browse files Browse the repository at this point in the history
899 test for concurrent gpus
  • Loading branch information
binocarlos authored Oct 19, 2022
2 parents acbdc3f + a18dab9 commit 0043888
Show file tree
Hide file tree
Showing 6 changed files with 294 additions and 23 deletions.
21 changes: 14 additions & 7 deletions pkg/capacitymanager/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,16 @@ import (
"github.com/ricochet2200/go-disk-usage/du"
)

// this is used mainly for tests to be deterministic
// or for tests to say "I know I don't have GPUs I am pretenting I do"
func SetIgnorePhysicalResources(value string) {
os.Setenv("BACALHAU_CAPACITY_MANAGER_OVER_COMMIT", value)
}

func shouldIgnorePhysicalResources() bool {
return os.Getenv("BACALHAU_CAPACITY_MANAGER_OVER_COMMIT") != ""
}

// NvidiaCLI is the path to the Nvidia helper binary
const NvidiaCLI = "nvidia-container-cli"

Expand Down Expand Up @@ -138,9 +148,6 @@ func numSystemGPUs() (uint64, error) {

// what resources does this compute node actually have?
func getSystemResources(limitConfig model.ResourceUsageConfig) (model.ResourceUsageData, error) {
// this is used mainly for tests to be deterministic
allowOverCommit := os.Getenv("BACALHAU_CAPACITY_MANAGER_OVER_COMMIT") != ""

diskSpace, err := getFreeDiskSpace(config.GetStoragePath())
if err != nil {
return model.ResourceUsageData{}, err
Expand All @@ -161,7 +168,7 @@ func getSystemResources(limitConfig model.ResourceUsageConfig) (model.ResourceUs
parsedLimitConfig := ParseResourceUsageConfig(limitConfig)

if parsedLimitConfig.CPU > 0 {
if parsedLimitConfig.CPU > physcialResources.CPU && !allowOverCommit {
if parsedLimitConfig.CPU > physcialResources.CPU && !shouldIgnorePhysicalResources() {
return physcialResources, fmt.Errorf(
"you cannot configure more CPU than you have on this node: configured %f, have %f",
parsedLimitConfig.CPU, physcialResources.CPU,
Expand All @@ -171,7 +178,7 @@ func getSystemResources(limitConfig model.ResourceUsageConfig) (model.ResourceUs
}

if parsedLimitConfig.Memory > 0 {
if parsedLimitConfig.Memory > physcialResources.Memory && !allowOverCommit {
if parsedLimitConfig.Memory > physcialResources.Memory && !shouldIgnorePhysicalResources() {
return physcialResources, fmt.Errorf(
"you cannot configure more Memory than you have on this node: configured %d, have %d",
parsedLimitConfig.Memory, physcialResources.Memory,
Expand All @@ -181,7 +188,7 @@ func getSystemResources(limitConfig model.ResourceUsageConfig) (model.ResourceUs
}

if parsedLimitConfig.Disk > 0 {
if parsedLimitConfig.Disk > physcialResources.Disk && !allowOverCommit {
if parsedLimitConfig.Disk > physcialResources.Disk && !shouldIgnorePhysicalResources() {
return physcialResources, fmt.Errorf(
"you cannot configure more disk than you have on this node: configured %d, have %d",
parsedLimitConfig.Disk, physcialResources.Disk,
Expand All @@ -191,7 +198,7 @@ func getSystemResources(limitConfig model.ResourceUsageConfig) (model.ResourceUs
}

if parsedLimitConfig.GPU > 0 {
if parsedLimitConfig.GPU > physcialResources.GPU {
if parsedLimitConfig.GPU > physcialResources.GPU && !shouldIgnorePhysicalResources() {
return physcialResources, fmt.Errorf(
"you cannot configure more GPU than you have on this node: configured %d, have %d",
parsedLimitConfig.GPU, physcialResources.GPU,
Expand Down
18 changes: 14 additions & 4 deletions pkg/computenode/computenode.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,14 +285,14 @@ func (n *ComputeNode) subscriptionEventCreated(ctx context.Context, jobEvent mod

// TODO XXX: don't hardcode networkSize, calculate this dynamically from
// libp2p instead somehow. https://github.com/filecoin-project/bacalhau/issues/512
jobNodeDistanceDelayMs := CalculateJobNodeDistanceDelay( //nolint:gomnd //nolint:gomnd
jobNodeDistanceDelayMs, shouldRunJob := CalculateJobNodeDistanceDelay( //nolint:gomnd //nolint:gomnd
// if the user isn't going to bid unless there are minBids many bids,
// we'd better make sure there are minBids many bids!
ctx, 1, n.ID, jobEvent.JobID, Max(jobEvent.Deal.Concurrency, jobEvent.Deal.MinBids),
)

// if delay is too high, just exit immediately.
if jobNodeDistanceDelayMs > 1000 { //nolint:gomnd
if !shouldRunJob { //nolint:gomnd
// drop the job on the floor, :-O
return nil
}
Expand Down Expand Up @@ -348,7 +348,7 @@ func diff(a, b int) int {
return a - b
}

func CalculateJobNodeDistanceDelay(ctx context.Context, networkSize int, nodeID, jobID string, concurrency int) int {
func CalculateJobNodeDistanceDelay(ctx context.Context, networkSize int, nodeID, jobID string, concurrency int) (int, bool) {
// Calculate how long to wait to bid on the job by using a circular hashing
// style approach: Invent a metric for distance between node ID and job ID.
// If the node and job ID happen to be close to eachother, such that we'd
Expand Down Expand Up @@ -378,7 +378,17 @@ func CalculateJobNodeDistanceDelay(ctx context.Context, networkSize int, nodeID,
"node/job %s/%s, %d/%d, dist=%d, chunk=%d, delay=%d",
nodeID, jobID, nodeHash, jobHash, distance, chunk, delay,
)
return delay
shouldRun := true
// if delay is too high, just exit immediately.
if delay > 1000 { //nolint:gomnd
// drop the job on the floor, :-O
shouldRun = false
log.Ctx(ctx).Warn().Msgf(
"dropped job: node/job %s/%s, %d/%d, dist=%d, chunk=%d, delay=%d",
nodeID, jobID, nodeHash, jobHash, distance, chunk, delay,
)
}
return delay, shouldRun
}

func (n *ComputeNode) triggerStateTransition(ctx context.Context, event model.JobEvent, shard model.JobShard) error {
Expand Down
122 changes: 122 additions & 0 deletions pkg/test/computenode/resourcelimits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/filecoin-project/bacalhau/pkg/devstack"
"github.com/filecoin-project/bacalhau/pkg/transport/inprocess"

"github.com/davecgh/go-spew/spew"
"github.com/filecoin-project/bacalhau/pkg/capacitymanager"
Expand Down Expand Up @@ -401,6 +402,127 @@ func (suite *ComputeNodeResourceLimitsSuite) TestTotalResourceLimits() {

}

// test that with 10 GPU nodes - that 10 jobs end up being allocated 1 per node
// this is a check of the bidding & capacity manager system
func (suite *ComputeNodeResourceLimitsSuite) TestParallelGPU() {
// we need to pretend that we have GPUs on each node
capacitymanager.SetIgnorePhysicalResources("1")
nodeCount := 2
seenJobs := 0
jobIds := []string{}

ctx := context.Background()

// the job needs to hang for a period of time so the other job will
// run on another node
jobHandler := func(ctx context.Context, shard model.JobShard, resultsDir string) (*model.RunCommandResult, error) {
time.Sleep(time.Millisecond * 1000)
seenJobs++
return &model.RunCommandResult{}, nil
}

stack := testutils.NewNoopStackMultinode(
ctx,
suite.T(),
nodeCount,
computenode.ComputeNodeConfig{
CapacityManagerConfig: capacitymanager.Config{
ResourceLimitTotal: model.ResourceUsageConfig{
CPU: "1",
Memory: "1Gb",
Disk: "1Gb",
GPU: "1",
},
},
},
noop_executor.ExecutorConfig{
ExternalHooks: noop_executor.ExecutorConfigExternalHooks{
JobHandler: jobHandler,
},
},
inprocess.InProcessTransportClusterConfig{
GetMessageDelay: func(fromIndex, toIndex int) time.Duration {
if fromIndex == toIndex {
// a node speaking to itself is quick
return time.Millisecond * 10
} else {
// otherwise there is a delay
return time.Millisecond * 100
}
},
},
)

cm := stack.CleanupManager
defer cm.Cleanup()

jobConfig := &model.Job{
Spec: model.Spec{
Engine: model.EngineNoop,
Verifier: model.VerifierNoop,
Publisher: model.PublisherNoop,
Docker: model.JobSpecDocker{
Image: "ubuntu",
Entrypoint: []string{"/bin/bash", "-c", "echo hello"},
},
Resources: model.ResourceUsageConfig{
GPU: "1",
},
},
Deal: model.Deal{
Concurrency: 1,
},
}

resolver := job.NewStateResolver(
func(ctx context.Context, id string) (*model.Job, error) {
return stack.Nodes[0].LocalDB.GetJob(ctx, id)
},
func(ctx context.Context, id string) (model.JobState, error) {
return stack.Nodes[0].LocalDB.GetJobState(ctx, id)
},
)

for i := 0; i < nodeCount; i++ {
submittedJob, err := stack.Nodes[0].RequesterNode.SubmitJob(ctx, model.JobCreatePayload{
ClientID: "123",
Job: jobConfig,
})
require.NoError(suite.T(), err)
jobIds = append(jobIds, submittedJob.ID)
// this needs to be less than the time the job lasts
// so we are running jobs in parallel
time.Sleep(time.Millisecond * 500)
}

for _, jobId := range jobIds {
err := resolver.WaitUntilComplete(ctx, jobId)
require.NoError(suite.T(), err)
}

require.Equal(suite.T(), nodeCount, seenJobs)

allocationMap := map[string]int{}

for i := 0; i < nodeCount; i++ {
jobState, err := resolver.GetJobState(ctx, jobIds[i])
require.NoError(suite.T(), err)
completedShards := job.GetCompletedShardStates(jobState)
require.Equal(suite.T(), 1, len(completedShards))
require.Equal(suite.T(), model.JobStateCompleted, completedShards[0].State)
allocationMap[completedShards[0].NodeID]++
}

// test that each node has 1 job allocated to it
node1Count, ok := allocationMap[stack.Nodes[0].Transport.HostID()]
require.True(suite.T(), ok)
require.Equal(suite.T(), 1, node1Count)

node2Count, ok := allocationMap[stack.Nodes[1].Transport.HostID()]
require.True(suite.T(), ok)
require.Equal(suite.T(), 1, node2Count)
}

func (suite *ComputeNodeResourceLimitsSuite) TestDockerResourceLimitsCPU() {
ctx := context.Background()
CPU_LIMIT := "100m"
Expand Down
53 changes: 53 additions & 0 deletions pkg/test/utils/stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ type TestStack struct {
IpfsStack *devstack.DevStackIPFS
}

type TestStackMultinode struct {
CleanupManager *system.CleanupManager
Nodes []*node.Node
IpfsStack *devstack.DevStackIPFS
}

// Docker IPFS stack is designed to be a "as real as possible" stack to write tests against
// but without a libp2p transport - it's useful for testing storage drivers or executors
// it uses:
Expand All @@ -37,6 +43,7 @@ type TestStack struct {
// * "standard" executors - i.e. the default executor stack as used by devstack
// * noop verifiers - don't use this stack if you are testing verification
// * IPFS publishers - using the same IPFS cluster as the storage driver
// TODO: this function lies - it only ever returns a single node
func NewDevStackMultiNode(
ctx context.Context,
t *testing.T,
Expand Down Expand Up @@ -133,3 +140,49 @@ func NewNoopStack(
Node: node,
}
}

// same as n
func NewNoopStackMultinode(
ctx context.Context,
t *testing.T,
count int,
computeNodeconfig computenode.ComputeNodeConfig,
noopExecutorConfig noop_executor.ExecutorConfig,
inprocessTransportConfig inprocess.InProcessTransportClusterConfig,
) *TestStackMultinode {
cm := system.NewCleanupManager()

nodes := []*node.Node{}

inprocessTransportConfig.Count = count
cluster, err := inprocess.NewInProcessTransportCluster(inprocessTransportConfig)
require.NoError(t, err)

for i := 0; i < count; i++ {
datastore, err := inmemory.NewInMemoryDatastore()
require.NoError(t, err)

transport := cluster.GetTransport(i)
nodeConfig := node.NodeConfig{
CleanupManager: cm,
LocalDB: datastore,
Transport: transport,
ComputeNodeConfig: computeNodeconfig,
RequesterNodeConfig: requesternode.RequesterNodeConfig{},
}

injector := devstack.NewNoopNodeDependencyInjector()
injector.ExecutorsFactory = devstack.NewNoopExecutorsFactoryWithConfig(noopExecutorConfig)
node, err := node.NewNode(ctx, nodeConfig, injector)
require.NoError(t, err)
err = transport.Start(ctx)
require.NoError(t, err)

nodes = append(nodes, node)
}

return &TestStackMultinode{
Nodes: nodes,
CleanupManager: cm,
}
}
40 changes: 28 additions & 12 deletions pkg/transport/inprocess/inprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ type InProcessTransport struct {
subscribeFunctions []transport.SubscribeFn
seenEvents []model.JobEvent
mutex sync.Mutex
// if a publishHandler is assigned - it will take over
// from the standard publish - this is used by the InProcessTransportCluster
// to hijack publish calls and distribute them across all the other nodes
// (a bit like the libp2p transport where every event is written to the
// event handlers of every node)
publishHandler func(ctx context.Context, ev model.JobEvent) error
}

/*
Expand Down Expand Up @@ -76,19 +82,13 @@ func (t *InProcessTransport) GetEvents() []model.JobEvent {
*/

func (t *InProcessTransport) Publish(ctx context.Context, ev model.JobEvent) error {
t.mutex.Lock()
defer t.mutex.Unlock()
t.seenEvents = append(t.seenEvents, ev)
for _, fn := range t.subscribeFunctions {
fnToCall := fn
go func() {
err := fnToCall(ctx, ev)
if err != nil {
log.Error().Msgf("error in handle event: %s\n%+v", err, ev)
}
}()
if t.publishHandler != nil {
// we have been given an external function to call with our event
return t.publishHandler(ctx, ev)
} else {
// we will handle our event internally
return t.applyEvent(ctx, ev)
}
return nil
}

func (t *InProcessTransport) Subscribe(ctx context.Context, fn transport.SubscribeFn) {
Expand All @@ -109,5 +109,21 @@ func (*InProcessTransport) Decrypt(ctx context.Context, data []byte) ([]byte, er
return data, nil
}

func (t *InProcessTransport) applyEvent(ctx context.Context, ev model.JobEvent) error {
t.mutex.Lock()
defer t.mutex.Unlock()
t.seenEvents = append(t.seenEvents, ev)
for _, fn := range t.subscribeFunctions {
fnToCall := fn
go func() {
err := fnToCall(ctx, ev)
if err != nil {
log.Error().Msgf("error in handle event: %s\n%+v", err, ev)
}
}()
}
return nil
}

// Static check to ensure that InProcessTransport implements Transport:
var _ transport.Transport = (*InProcessTransport)(nil)
Loading

0 comments on commit 0043888

Please sign in to comment.