diff --git a/pkg/capacitymanager/utils.go b/pkg/capacitymanager/utils.go index 95bc4d6c98..387da789a1 100644 --- a/pkg/capacitymanager/utils.go +++ b/pkg/capacitymanager/utils.go @@ -17,6 +17,16 @@ import ( "github.com/ricochet2200/go-disk-usage/du" ) +// this is used mainly for tests to be deterministic +// or for tests to say "I know I don't have GPUs I am pretenting I do" +func SetIgnorePhysicalResources(value string) { + os.Setenv("BACALHAU_CAPACITY_MANAGER_OVER_COMMIT", value) +} + +func shouldIgnorePhysicalResources() bool { + return os.Getenv("BACALHAU_CAPACITY_MANAGER_OVER_COMMIT") != "" +} + // NvidiaCLI is the path to the Nvidia helper binary const NvidiaCLI = "nvidia-container-cli" @@ -138,9 +148,6 @@ func numSystemGPUs() (uint64, error) { // what resources does this compute node actually have? func getSystemResources(limitConfig model.ResourceUsageConfig) (model.ResourceUsageData, error) { - // this is used mainly for tests to be deterministic - allowOverCommit := os.Getenv("BACALHAU_CAPACITY_MANAGER_OVER_COMMIT") != "" - diskSpace, err := getFreeDiskSpace(config.GetStoragePath()) if err != nil { return model.ResourceUsageData{}, err @@ -161,7 +168,7 @@ func getSystemResources(limitConfig model.ResourceUsageConfig) (model.ResourceUs parsedLimitConfig := ParseResourceUsageConfig(limitConfig) if parsedLimitConfig.CPU > 0 { - if parsedLimitConfig.CPU > physcialResources.CPU && !allowOverCommit { + if parsedLimitConfig.CPU > physcialResources.CPU && !shouldIgnorePhysicalResources() { return physcialResources, fmt.Errorf( "you cannot configure more CPU than you have on this node: configured %f, have %f", parsedLimitConfig.CPU, physcialResources.CPU, @@ -171,7 +178,7 @@ func getSystemResources(limitConfig model.ResourceUsageConfig) (model.ResourceUs } if parsedLimitConfig.Memory > 0 { - if parsedLimitConfig.Memory > physcialResources.Memory && !allowOverCommit { + if parsedLimitConfig.Memory > physcialResources.Memory && !shouldIgnorePhysicalResources() { return physcialResources, fmt.Errorf( "you cannot configure more Memory than you have on this node: configured %d, have %d", parsedLimitConfig.Memory, physcialResources.Memory, @@ -181,7 +188,7 @@ func getSystemResources(limitConfig model.ResourceUsageConfig) (model.ResourceUs } if parsedLimitConfig.Disk > 0 { - if parsedLimitConfig.Disk > physcialResources.Disk && !allowOverCommit { + if parsedLimitConfig.Disk > physcialResources.Disk && !shouldIgnorePhysicalResources() { return physcialResources, fmt.Errorf( "you cannot configure more disk than you have on this node: configured %d, have %d", parsedLimitConfig.Disk, physcialResources.Disk, @@ -191,7 +198,7 @@ func getSystemResources(limitConfig model.ResourceUsageConfig) (model.ResourceUs } if parsedLimitConfig.GPU > 0 { - if parsedLimitConfig.GPU > physcialResources.GPU { + if parsedLimitConfig.GPU > physcialResources.GPU && !shouldIgnorePhysicalResources() { return physcialResources, fmt.Errorf( "you cannot configure more GPU than you have on this node: configured %d, have %d", parsedLimitConfig.GPU, physcialResources.GPU, diff --git a/pkg/computenode/computenode.go b/pkg/computenode/computenode.go index 03cc22d2c1..83cf557371 100644 --- a/pkg/computenode/computenode.go +++ b/pkg/computenode/computenode.go @@ -285,14 +285,14 @@ func (n *ComputeNode) subscriptionEventCreated(ctx context.Context, jobEvent mod // TODO XXX: don't hardcode networkSize, calculate this dynamically from // libp2p instead somehow. https://github.com/filecoin-project/bacalhau/issues/512 - jobNodeDistanceDelayMs := CalculateJobNodeDistanceDelay( //nolint:gomnd //nolint:gomnd + jobNodeDistanceDelayMs, shouldRunJob := CalculateJobNodeDistanceDelay( //nolint:gomnd //nolint:gomnd // if the user isn't going to bid unless there are minBids many bids, // we'd better make sure there are minBids many bids! ctx, 1, n.ID, jobEvent.JobID, Max(jobEvent.Deal.Concurrency, jobEvent.Deal.MinBids), ) // if delay is too high, just exit immediately. - if jobNodeDistanceDelayMs > 1000 { //nolint:gomnd + if !shouldRunJob { //nolint:gomnd // drop the job on the floor, :-O return nil } @@ -348,7 +348,7 @@ func diff(a, b int) int { return a - b } -func CalculateJobNodeDistanceDelay(ctx context.Context, networkSize int, nodeID, jobID string, concurrency int) int { +func CalculateJobNodeDistanceDelay(ctx context.Context, networkSize int, nodeID, jobID string, concurrency int) (int, bool) { // Calculate how long to wait to bid on the job by using a circular hashing // style approach: Invent a metric for distance between node ID and job ID. // If the node and job ID happen to be close to eachother, such that we'd @@ -378,7 +378,17 @@ func CalculateJobNodeDistanceDelay(ctx context.Context, networkSize int, nodeID, "node/job %s/%s, %d/%d, dist=%d, chunk=%d, delay=%d", nodeID, jobID, nodeHash, jobHash, distance, chunk, delay, ) - return delay + shouldRun := true + // if delay is too high, just exit immediately. + if delay > 1000 { //nolint:gomnd + // drop the job on the floor, :-O + shouldRun = false + log.Ctx(ctx).Warn().Msgf( + "dropped job: node/job %s/%s, %d/%d, dist=%d, chunk=%d, delay=%d", + nodeID, jobID, nodeHash, jobHash, distance, chunk, delay, + ) + } + return delay, shouldRun } func (n *ComputeNode) triggerStateTransition(ctx context.Context, event model.JobEvent, shard model.JobShard) error { diff --git a/pkg/test/computenode/resourcelimits_test.go b/pkg/test/computenode/resourcelimits_test.go index 83e3a6e94e..40de584d63 100644 --- a/pkg/test/computenode/resourcelimits_test.go +++ b/pkg/test/computenode/resourcelimits_test.go @@ -12,6 +12,7 @@ import ( "time" "github.com/filecoin-project/bacalhau/pkg/devstack" + "github.com/filecoin-project/bacalhau/pkg/transport/inprocess" "github.com/davecgh/go-spew/spew" "github.com/filecoin-project/bacalhau/pkg/capacitymanager" @@ -401,6 +402,127 @@ func (suite *ComputeNodeResourceLimitsSuite) TestTotalResourceLimits() { } +// test that with 10 GPU nodes - that 10 jobs end up being allocated 1 per node +// this is a check of the bidding & capacity manager system +func (suite *ComputeNodeResourceLimitsSuite) TestParallelGPU() { + // we need to pretend that we have GPUs on each node + capacitymanager.SetIgnorePhysicalResources("1") + nodeCount := 2 + seenJobs := 0 + jobIds := []string{} + + ctx := context.Background() + + // the job needs to hang for a period of time so the other job will + // run on another node + jobHandler := func(ctx context.Context, shard model.JobShard, resultsDir string) (*model.RunCommandResult, error) { + time.Sleep(time.Millisecond * 1000) + seenJobs++ + return &model.RunCommandResult{}, nil + } + + stack := testutils.NewNoopStackMultinode( + ctx, + suite.T(), + nodeCount, + computenode.ComputeNodeConfig{ + CapacityManagerConfig: capacitymanager.Config{ + ResourceLimitTotal: model.ResourceUsageConfig{ + CPU: "1", + Memory: "1Gb", + Disk: "1Gb", + GPU: "1", + }, + }, + }, + noop_executor.ExecutorConfig{ + ExternalHooks: noop_executor.ExecutorConfigExternalHooks{ + JobHandler: jobHandler, + }, + }, + inprocess.InProcessTransportClusterConfig{ + GetMessageDelay: func(fromIndex, toIndex int) time.Duration { + if fromIndex == toIndex { + // a node speaking to itself is quick + return time.Millisecond * 10 + } else { + // otherwise there is a delay + return time.Millisecond * 100 + } + }, + }, + ) + + cm := stack.CleanupManager + defer cm.Cleanup() + + jobConfig := &model.Job{ + Spec: model.Spec{ + Engine: model.EngineNoop, + Verifier: model.VerifierNoop, + Publisher: model.PublisherNoop, + Docker: model.JobSpecDocker{ + Image: "ubuntu", + Entrypoint: []string{"/bin/bash", "-c", "echo hello"}, + }, + Resources: model.ResourceUsageConfig{ + GPU: "1", + }, + }, + Deal: model.Deal{ + Concurrency: 1, + }, + } + + resolver := job.NewStateResolver( + func(ctx context.Context, id string) (*model.Job, error) { + return stack.Nodes[0].LocalDB.GetJob(ctx, id) + }, + func(ctx context.Context, id string) (model.JobState, error) { + return stack.Nodes[0].LocalDB.GetJobState(ctx, id) + }, + ) + + for i := 0; i < nodeCount; i++ { + submittedJob, err := stack.Nodes[0].RequesterNode.SubmitJob(ctx, model.JobCreatePayload{ + ClientID: "123", + Job: jobConfig, + }) + require.NoError(suite.T(), err) + jobIds = append(jobIds, submittedJob.ID) + // this needs to be less than the time the job lasts + // so we are running jobs in parallel + time.Sleep(time.Millisecond * 500) + } + + for _, jobId := range jobIds { + err := resolver.WaitUntilComplete(ctx, jobId) + require.NoError(suite.T(), err) + } + + require.Equal(suite.T(), nodeCount, seenJobs) + + allocationMap := map[string]int{} + + for i := 0; i < nodeCount; i++ { + jobState, err := resolver.GetJobState(ctx, jobIds[i]) + require.NoError(suite.T(), err) + completedShards := job.GetCompletedShardStates(jobState) + require.Equal(suite.T(), 1, len(completedShards)) + require.Equal(suite.T(), model.JobStateCompleted, completedShards[0].State) + allocationMap[completedShards[0].NodeID]++ + } + + // test that each node has 1 job allocated to it + node1Count, ok := allocationMap[stack.Nodes[0].Transport.HostID()] + require.True(suite.T(), ok) + require.Equal(suite.T(), 1, node1Count) + + node2Count, ok := allocationMap[stack.Nodes[1].Transport.HostID()] + require.True(suite.T(), ok) + require.Equal(suite.T(), 1, node2Count) +} + func (suite *ComputeNodeResourceLimitsSuite) TestDockerResourceLimitsCPU() { ctx := context.Background() CPU_LIMIT := "100m" diff --git a/pkg/test/utils/stack.go b/pkg/test/utils/stack.go index ed44c9d655..aa55b9d3f1 100644 --- a/pkg/test/utils/stack.go +++ b/pkg/test/utils/stack.go @@ -26,6 +26,12 @@ type TestStack struct { IpfsStack *devstack.DevStackIPFS } +type TestStackMultinode struct { + CleanupManager *system.CleanupManager + Nodes []*node.Node + IpfsStack *devstack.DevStackIPFS +} + // Docker IPFS stack is designed to be a "as real as possible" stack to write tests against // but without a libp2p transport - it's useful for testing storage drivers or executors // it uses: @@ -37,6 +43,7 @@ type TestStack struct { // * "standard" executors - i.e. the default executor stack as used by devstack // * noop verifiers - don't use this stack if you are testing verification // * IPFS publishers - using the same IPFS cluster as the storage driver +// TODO: this function lies - it only ever returns a single node func NewDevStackMultiNode( ctx context.Context, t *testing.T, @@ -133,3 +140,49 @@ func NewNoopStack( Node: node, } } + +// same as n +func NewNoopStackMultinode( + ctx context.Context, + t *testing.T, + count int, + computeNodeconfig computenode.ComputeNodeConfig, + noopExecutorConfig noop_executor.ExecutorConfig, + inprocessTransportConfig inprocess.InProcessTransportClusterConfig, +) *TestStackMultinode { + cm := system.NewCleanupManager() + + nodes := []*node.Node{} + + inprocessTransportConfig.Count = count + cluster, err := inprocess.NewInProcessTransportCluster(inprocessTransportConfig) + require.NoError(t, err) + + for i := 0; i < count; i++ { + datastore, err := inmemory.NewInMemoryDatastore() + require.NoError(t, err) + + transport := cluster.GetTransport(i) + nodeConfig := node.NodeConfig{ + CleanupManager: cm, + LocalDB: datastore, + Transport: transport, + ComputeNodeConfig: computeNodeconfig, + RequesterNodeConfig: requesternode.RequesterNodeConfig{}, + } + + injector := devstack.NewNoopNodeDependencyInjector() + injector.ExecutorsFactory = devstack.NewNoopExecutorsFactoryWithConfig(noopExecutorConfig) + node, err := node.NewNode(ctx, nodeConfig, injector) + require.NoError(t, err) + err = transport.Start(ctx) + require.NoError(t, err) + + nodes = append(nodes, node) + } + + return &TestStackMultinode{ + Nodes: nodes, + CleanupManager: cm, + } +} diff --git a/pkg/transport/inprocess/inprocess.go b/pkg/transport/inprocess/inprocess.go index bea94cac13..be54b772d0 100644 --- a/pkg/transport/inprocess/inprocess.go +++ b/pkg/transport/inprocess/inprocess.go @@ -22,6 +22,12 @@ type InProcessTransport struct { subscribeFunctions []transport.SubscribeFn seenEvents []model.JobEvent mutex sync.Mutex + // if a publishHandler is assigned - it will take over + // from the standard publish - this is used by the InProcessTransportCluster + // to hijack publish calls and distribute them across all the other nodes + // (a bit like the libp2p transport where every event is written to the + // event handlers of every node) + publishHandler func(ctx context.Context, ev model.JobEvent) error } /* @@ -76,19 +82,13 @@ func (t *InProcessTransport) GetEvents() []model.JobEvent { */ func (t *InProcessTransport) Publish(ctx context.Context, ev model.JobEvent) error { - t.mutex.Lock() - defer t.mutex.Unlock() - t.seenEvents = append(t.seenEvents, ev) - for _, fn := range t.subscribeFunctions { - fnToCall := fn - go func() { - err := fnToCall(ctx, ev) - if err != nil { - log.Error().Msgf("error in handle event: %s\n%+v", err, ev) - } - }() + if t.publishHandler != nil { + // we have been given an external function to call with our event + return t.publishHandler(ctx, ev) + } else { + // we will handle our event internally + return t.applyEvent(ctx, ev) } - return nil } func (t *InProcessTransport) Subscribe(ctx context.Context, fn transport.SubscribeFn) { @@ -109,5 +109,21 @@ func (*InProcessTransport) Decrypt(ctx context.Context, data []byte) ([]byte, er return data, nil } +func (t *InProcessTransport) applyEvent(ctx context.Context, ev model.JobEvent) error { + t.mutex.Lock() + defer t.mutex.Unlock() + t.seenEvents = append(t.seenEvents, ev) + for _, fn := range t.subscribeFunctions { + fnToCall := fn + go func() { + err := fnToCall(ctx, ev) + if err != nil { + log.Error().Msgf("error in handle event: %s\n%+v", err, ev) + } + }() + } + return nil +} + // Static check to ensure that InProcessTransport implements Transport: var _ transport.Transport = (*InProcessTransport)(nil) diff --git a/pkg/transport/inprocess/inprocess_cluster.go b/pkg/transport/inprocess/inprocess_cluster.go new file mode 100644 index 0000000000..a53b3be389 --- /dev/null +++ b/pkg/transport/inprocess/inprocess_cluster.go @@ -0,0 +1,63 @@ +package inprocess + +import ( + "context" + "time" + + "github.com/filecoin-project/bacalhau/pkg/model" + "github.com/rs/zerolog/log" +) + +type InProcessTransportClusterConfig struct { + Count int + // a function to get the network latency for a given node id + GetMessageDelay func(fromIndex, toIndex int) time.Duration +} + +// this is a set of "connected" in process transports +// meaning that an event written to one of them will be delivered +// to the subscribeFunctions and seen events of all of them +type InProcessTransportCluster struct { + transports []*InProcessTransport + config InProcessTransportClusterConfig +} + +func NewInProcessTransportCluster(config InProcessTransportClusterConfig) (*InProcessTransportCluster, error) { + cluster := &InProcessTransportCluster{ + transports: []*InProcessTransport{}, + config: config, + } + for i := 0; i < config.Count; i++ { + transport, err := NewInprocessTransport() + if err != nil { + return nil, err + } + cluster.transports = append(cluster.transports, transport) + } + + for fromTransportIndex, fromTransport := range cluster.transports { + // override the publish handler for each transport and write to all the other transports + fromTransport.publishHandler = func(ctx context.Context, ev model.JobEvent) error { + for toTransportIndex, toTransport := range cluster.transports { + toTransport := toTransport + fromTransportIndex := fromTransportIndex + toTransportIndex := toTransportIndex + go func() { + if cluster.config.GetMessageDelay != nil { + time.Sleep(cluster.config.GetMessageDelay(fromTransportIndex, toTransportIndex)) + } + err := toTransport.applyEvent(ctx, ev) + if err != nil { + log.Error().Msgf("error in handle event: %s\n%+v", err, ev) + } + }() + } + return nil + } + } + return cluster, nil +} + +func (cluster *InProcessTransportCluster) GetTransport(i int) *InProcessTransport { + return cluster.transports[i] +}