From 77ee0fda00327d61e43944d8bce20335e3746b1e Mon Sep 17 00:00:00 2001
From: Kai Davenport <kaiyadavenport@gmail.com>
Date: Wed, 19 Oct 2022 09:45:37 +0100
Subject: [PATCH 1/7] allow the publish handler to be overridden so we can have
 a cluster of inprocess transports where each has it's own id

---
 pkg/transport/inprocess/inprocess.go | 40 +++++++++++++++++++---------
 1 file changed, 28 insertions(+), 12 deletions(-)

diff --git a/pkg/transport/inprocess/inprocess.go b/pkg/transport/inprocess/inprocess.go
index bea94cac13..be54b772d0 100644
--- a/pkg/transport/inprocess/inprocess.go
+++ b/pkg/transport/inprocess/inprocess.go
@@ -22,6 +22,12 @@ type InProcessTransport struct {
 	subscribeFunctions []transport.SubscribeFn
 	seenEvents         []model.JobEvent
 	mutex              sync.Mutex
+	// if a publishHandler is assigned - it will take over
+	// from the standard publish - this is used by the InProcessTransportCluster
+	// to hijack publish calls and distribute them across all the other nodes
+	// (a bit like the libp2p transport where every event is written to the
+	// event handlers of every node)
+	publishHandler func(ctx context.Context, ev model.JobEvent) error
 }
 
 /*
@@ -76,19 +82,13 @@ func (t *InProcessTransport) GetEvents() []model.JobEvent {
 */
 
 func (t *InProcessTransport) Publish(ctx context.Context, ev model.JobEvent) error {
-	t.mutex.Lock()
-	defer t.mutex.Unlock()
-	t.seenEvents = append(t.seenEvents, ev)
-	for _, fn := range t.subscribeFunctions {
-		fnToCall := fn
-		go func() {
-			err := fnToCall(ctx, ev)
-			if err != nil {
-				log.Error().Msgf("error in handle event: %s\n%+v", err, ev)
-			}
-		}()
+	if t.publishHandler != nil {
+		// we have been given an external function to call with our event
+		return t.publishHandler(ctx, ev)
+	} else {
+		// we will handle our event internally
+		return t.applyEvent(ctx, ev)
 	}
-	return nil
 }
 
 func (t *InProcessTransport) Subscribe(ctx context.Context, fn transport.SubscribeFn) {
@@ -109,5 +109,21 @@ func (*InProcessTransport) Decrypt(ctx context.Context, data []byte) ([]byte, er
 	return data, nil
 }
 
+func (t *InProcessTransport) applyEvent(ctx context.Context, ev model.JobEvent) error {
+	t.mutex.Lock()
+	defer t.mutex.Unlock()
+	t.seenEvents = append(t.seenEvents, ev)
+	for _, fn := range t.subscribeFunctions {
+		fnToCall := fn
+		go func() {
+			err := fnToCall(ctx, ev)
+			if err != nil {
+				log.Error().Msgf("error in handle event: %s\n%+v", err, ev)
+			}
+		}()
+	}
+	return nil
+}
+
 // Static check to ensure that InProcessTransport implements Transport:
 var _ transport.Transport = (*InProcessTransport)(nil)

From 94c5e29e6364efd6d8d8b562ab17380a5eb62286 Mon Sep 17 00:00:00 2001
From: Kai Davenport <kaiyadavenport@gmail.com>
Date: Wed, 19 Oct 2022 09:46:33 +0100
Subject: [PATCH 2/7] the inprocess_cluster is for when we want a cluster of
 noop stack nodes and we need a libp2p style transport that has a unique id
 for each transport but shares the same global event bus

---
 pkg/transport/inprocess/inprocess_cluster.go | 108 +++++++++++++++++++
 1 file changed, 108 insertions(+)
 create mode 100644 pkg/transport/inprocess/inprocess_cluster.go

diff --git a/pkg/transport/inprocess/inprocess_cluster.go b/pkg/transport/inprocess/inprocess_cluster.go
new file mode 100644
index 0000000000..73650d0ae0
--- /dev/null
+++ b/pkg/transport/inprocess/inprocess_cluster.go
@@ -0,0 +1,108 @@
+package inprocess
+
+import (
+	"context"
+	"time"
+
+	"github.com/filecoin-project/bacalhau/pkg/model"
+	"github.com/filecoin-project/bacalhau/pkg/transport"
+	"github.com/multiformats/go-multiaddr"
+	"github.com/rs/zerolog/log"
+)
+
+type InProcessTransportClusterConfig struct {
+	Count int
+	// a function to get the network latency for a given node id
+	GetMessageDelay func(index int) time.Duration
+}
+
+// this is a set of "connected" in process transports
+// meaning that an event written to one of them will be delivered
+// to the subscribeFunctions and seen events of all of them
+type InProcessTransportCluster struct {
+	transports []*InProcessTransport
+	config     InProcessTransportClusterConfig
+}
+
+func NewInProcessTransportCluster(config InProcessTransportClusterConfig) (*InProcessTransportCluster, error) {
+	cluster := &InProcessTransportCluster{
+		transports: []*InProcessTransport{},
+		config:     config,
+	}
+	for i := 0; i < config.Count; i++ {
+		transport, err := NewInprocessTransport()
+		if err != nil {
+			return nil, err
+		}
+		transport.publishHandler = cluster.Publish
+		cluster.transports = append(cluster.transports, transport)
+	}
+	return cluster, nil
+}
+
+func (cluster *InProcessTransportCluster) Start(ctx context.Context) error {
+	return nil
+}
+
+func (cluster *InProcessTransportCluster) Shutdown(ctx context.Context) error {
+	return nil
+}
+
+func (cluster *InProcessTransportCluster) HostID() string {
+	return "cluster_root"
+}
+
+func (cluster *InProcessTransportCluster) HostAddrs() ([]multiaddr.Multiaddr, error) {
+	return []multiaddr.Multiaddr{}, nil
+}
+
+func (cluster *InProcessTransportCluster) GetEvents() []model.JobEvent {
+	return []model.JobEvent{}
+}
+
+/*
+
+  pub / sub
+
+*/
+
+// this function is assigned to the "publishHandler" of each transport
+// this means that calling "Publish" on one of the nodes transports
+// will end up here where we distribute the event to all the other nodes at the same time
+func (cluster *InProcessTransportCluster) Publish(ctx context.Context, ev model.JobEvent) error {
+	for i, transport := range cluster.transports {
+		transport := transport
+		i := i
+		go func() {
+			if cluster.config.GetMessageDelay != nil {
+				time.Sleep(cluster.config.GetMessageDelay(i))
+			}
+			err := transport.applyEvent(ctx, ev)
+			if err != nil {
+				log.Error().Msgf("error in handle event: %s\n%+v", err, ev)
+			}
+		}()
+	}
+	return nil
+}
+
+func (cluster *InProcessTransportCluster) Subscribe(ctx context.Context, fn transport.SubscribeFn) {}
+
+/*
+encrypt / decrypt
+*/
+
+func (*InProcessTransportCluster) Encrypt(ctx context.Context, data, encryptionKeyBytes []byte) ([]byte, error) {
+	return data, nil
+}
+
+func (*InProcessTransportCluster) Decrypt(ctx context.Context, data []byte) ([]byte, error) {
+	return data, nil
+}
+
+func (cluster *InProcessTransportCluster) GetTransport(i int) *InProcessTransport {
+	return cluster.transports[i]
+}
+
+// Static check to ensure that InProcessTransportCluster implements Transport:
+var _ transport.Transport = (*InProcessTransportCluster)(nil)

From 1f9fec490d2347ece44350f0584a80c998455513 Mon Sep 17 00:00:00 2001
From: Kai Davenport <kaiyadavenport@gmail.com>
Date: Wed, 19 Oct 2022 09:56:21 +0100
Subject: [PATCH 3/7] multi node noop stack that used the inprocess cluster
 transport

---
 pkg/test/utils/stack.go | 53 +++++++++++++++++++++++++++++++++++++++++
 1 file changed, 53 insertions(+)

diff --git a/pkg/test/utils/stack.go b/pkg/test/utils/stack.go
index ed44c9d655..aa55b9d3f1 100644
--- a/pkg/test/utils/stack.go
+++ b/pkg/test/utils/stack.go
@@ -26,6 +26,12 @@ type TestStack struct {
 	IpfsStack *devstack.DevStackIPFS
 }
 
+type TestStackMultinode struct {
+	CleanupManager *system.CleanupManager
+	Nodes          []*node.Node
+	IpfsStack      *devstack.DevStackIPFS
+}
+
 // Docker IPFS stack is designed to be a "as real as possible" stack to write tests against
 // but without a libp2p transport - it's useful for testing storage drivers or executors
 // it uses:
@@ -37,6 +43,7 @@ type TestStack struct {
 // * "standard" executors - i.e. the default executor stack as used by devstack
 // * noop verifiers - don't use this stack if you are testing verification
 // * IPFS publishers - using the same IPFS cluster as the storage driver
+// TODO: this function lies - it only ever returns a single node
 func NewDevStackMultiNode(
 	ctx context.Context,
 	t *testing.T,
@@ -133,3 +140,49 @@ func NewNoopStack(
 		Node: node,
 	}
 }
+
+// same as n
+func NewNoopStackMultinode(
+	ctx context.Context,
+	t *testing.T,
+	count int,
+	computeNodeconfig computenode.ComputeNodeConfig,
+	noopExecutorConfig noop_executor.ExecutorConfig,
+	inprocessTransportConfig inprocess.InProcessTransportClusterConfig,
+) *TestStackMultinode {
+	cm := system.NewCleanupManager()
+
+	nodes := []*node.Node{}
+
+	inprocessTransportConfig.Count = count
+	cluster, err := inprocess.NewInProcessTransportCluster(inprocessTransportConfig)
+	require.NoError(t, err)
+
+	for i := 0; i < count; i++ {
+		datastore, err := inmemory.NewInMemoryDatastore()
+		require.NoError(t, err)
+
+		transport := cluster.GetTransport(i)
+		nodeConfig := node.NodeConfig{
+			CleanupManager:      cm,
+			LocalDB:             datastore,
+			Transport:           transport,
+			ComputeNodeConfig:   computeNodeconfig,
+			RequesterNodeConfig: requesternode.RequesterNodeConfig{},
+		}
+
+		injector := devstack.NewNoopNodeDependencyInjector()
+		injector.ExecutorsFactory = devstack.NewNoopExecutorsFactoryWithConfig(noopExecutorConfig)
+		node, err := node.NewNode(ctx, nodeConfig, injector)
+		require.NoError(t, err)
+		err = transport.Start(ctx)
+		require.NoError(t, err)
+
+		nodes = append(nodes, node)
+	}
+
+	return &TestStackMultinode{
+		Nodes:          nodes,
+		CleanupManager: cm,
+	}
+}

From 239ce4061a0b4c1853037319d437bc5fb25bc866 Mon Sep 17 00:00:00 2001
From: Kai Davenport <kaiyadavenport@gmail.com>
Date: Wed, 19 Oct 2022 09:58:51 +0100
Subject: [PATCH 4/7] refactor the ignore physical resources code and include
 gpus

---
 pkg/capacitymanager/utils.go | 21 ++++++++++++++-------
 1 file changed, 14 insertions(+), 7 deletions(-)

diff --git a/pkg/capacitymanager/utils.go b/pkg/capacitymanager/utils.go
index 95bc4d6c98..387da789a1 100644
--- a/pkg/capacitymanager/utils.go
+++ b/pkg/capacitymanager/utils.go
@@ -17,6 +17,16 @@ import (
 	"github.com/ricochet2200/go-disk-usage/du"
 )
 
+// this is used mainly for tests to be deterministic
+// or for tests to say "I know I don't have GPUs I am pretenting I do"
+func SetIgnorePhysicalResources(value string) {
+	os.Setenv("BACALHAU_CAPACITY_MANAGER_OVER_COMMIT", value)
+}
+
+func shouldIgnorePhysicalResources() bool {
+	return os.Getenv("BACALHAU_CAPACITY_MANAGER_OVER_COMMIT") != ""
+}
+
 // NvidiaCLI is the path to the Nvidia helper binary
 const NvidiaCLI = "nvidia-container-cli"
 
@@ -138,9 +148,6 @@ func numSystemGPUs() (uint64, error) {
 
 // what resources does this compute node actually have?
 func getSystemResources(limitConfig model.ResourceUsageConfig) (model.ResourceUsageData, error) {
-	// this is used mainly for tests to be deterministic
-	allowOverCommit := os.Getenv("BACALHAU_CAPACITY_MANAGER_OVER_COMMIT") != ""
-
 	diskSpace, err := getFreeDiskSpace(config.GetStoragePath())
 	if err != nil {
 		return model.ResourceUsageData{}, err
@@ -161,7 +168,7 @@ func getSystemResources(limitConfig model.ResourceUsageConfig) (model.ResourceUs
 	parsedLimitConfig := ParseResourceUsageConfig(limitConfig)
 
 	if parsedLimitConfig.CPU > 0 {
-		if parsedLimitConfig.CPU > physcialResources.CPU && !allowOverCommit {
+		if parsedLimitConfig.CPU > physcialResources.CPU && !shouldIgnorePhysicalResources() {
 			return physcialResources, fmt.Errorf(
 				"you cannot configure more CPU than you have on this node: configured %f, have %f",
 				parsedLimitConfig.CPU, physcialResources.CPU,
@@ -171,7 +178,7 @@ func getSystemResources(limitConfig model.ResourceUsageConfig) (model.ResourceUs
 	}
 
 	if parsedLimitConfig.Memory > 0 {
-		if parsedLimitConfig.Memory > physcialResources.Memory && !allowOverCommit {
+		if parsedLimitConfig.Memory > physcialResources.Memory && !shouldIgnorePhysicalResources() {
 			return physcialResources, fmt.Errorf(
 				"you cannot configure more Memory than you have on this node: configured %d, have %d",
 				parsedLimitConfig.Memory, physcialResources.Memory,
@@ -181,7 +188,7 @@ func getSystemResources(limitConfig model.ResourceUsageConfig) (model.ResourceUs
 	}
 
 	if parsedLimitConfig.Disk > 0 {
-		if parsedLimitConfig.Disk > physcialResources.Disk && !allowOverCommit {
+		if parsedLimitConfig.Disk > physcialResources.Disk && !shouldIgnorePhysicalResources() {
 			return physcialResources, fmt.Errorf(
 				"you cannot configure more disk than you have on this node: configured %d, have %d",
 				parsedLimitConfig.Disk, physcialResources.Disk,
@@ -191,7 +198,7 @@ func getSystemResources(limitConfig model.ResourceUsageConfig) (model.ResourceUs
 	}
 
 	if parsedLimitConfig.GPU > 0 {
-		if parsedLimitConfig.GPU > physcialResources.GPU {
+		if parsedLimitConfig.GPU > physcialResources.GPU && !shouldIgnorePhysicalResources() {
 			return physcialResources, fmt.Errorf(
 				"you cannot configure more GPU than you have on this node: configured %d, have %d",
 				parsedLimitConfig.GPU, physcialResources.GPU,

From 10a5d7924cd717fce141fc7207eccafa16608e2b Mon Sep 17 00:00:00 2001
From: Kai Davenport <kaiyadavenport@gmail.com>
Date: Wed, 19 Oct 2022 10:00:03 +0100
Subject: [PATCH 5/7] failing test for scheduling in parallel to 2 gpus

---
 pkg/test/computenode/resourcelimits_test.go | 118 ++++++++++++++++++++
 1 file changed, 118 insertions(+)

diff --git a/pkg/test/computenode/resourcelimits_test.go b/pkg/test/computenode/resourcelimits_test.go
index 83e3a6e94e..3585cc8087 100644
--- a/pkg/test/computenode/resourcelimits_test.go
+++ b/pkg/test/computenode/resourcelimits_test.go
@@ -12,6 +12,7 @@ import (
 	"time"
 
 	"github.com/filecoin-project/bacalhau/pkg/devstack"
+	"github.com/filecoin-project/bacalhau/pkg/transport/inprocess"
 
 	"github.com/davecgh/go-spew/spew"
 	"github.com/filecoin-project/bacalhau/pkg/capacitymanager"
@@ -401,6 +402,123 @@ func (suite *ComputeNodeResourceLimitsSuite) TestTotalResourceLimits() {
 
 }
 
+// test that with 10 GPU nodes - that 10 jobs end up being allocated 1 per node
+// this is a check of the bidding & capacity manager system
+func (suite *ComputeNodeResourceLimitsSuite) TestParallelGPU() {
+	// we need to pretend that we have GPUs on each node
+	capacitymanager.SetIgnorePhysicalResources("1")
+	nodeCount := 2
+	seenJobs := 0
+	jobIds := []string{}
+
+	ctx := context.Background()
+
+	// the job needs to hang for a period of time so the other job will
+	// run on another node
+	jobHandler := func(ctx context.Context, shard model.JobShard, resultsDir string) (*model.RunCommandResult, error) {
+		time.Sleep(time.Second * 1)
+		seenJobs++
+		return &model.RunCommandResult{}, nil
+	}
+
+	stack := testutils.NewNoopStackMultinode(
+		ctx,
+		suite.T(),
+		nodeCount,
+		computenode.ComputeNodeConfig{
+			CapacityManagerConfig: capacitymanager.Config{
+				ResourceLimitTotal: model.ResourceUsageConfig{
+					CPU:    "1",
+					Memory: "1Gb",
+					Disk:   "1Gb",
+					GPU:    "1",
+				},
+			},
+		},
+		noop_executor.ExecutorConfig{
+			ExternalHooks: noop_executor.ExecutorConfigExternalHooks{
+				JobHandler: jobHandler,
+			},
+		},
+		inprocess.InProcessTransportClusterConfig{
+			GetMessageDelay: func(index int) time.Duration {
+				if index == 0 {
+					return time.Millisecond * 10
+				} else {
+					return time.Second * 1
+				}
+			},
+		},
+	)
+
+	cm := stack.CleanupManager
+	defer cm.Cleanup()
+
+	jobConfig := &model.Job{
+		Spec: model.Spec{
+			Engine:    model.EngineNoop,
+			Verifier:  model.VerifierNoop,
+			Publisher: model.PublisherNoop,
+			Docker: model.JobSpecDocker{
+				Image:      "ubuntu",
+				Entrypoint: []string{"/bin/bash", "-c", "echo hello"},
+			},
+			Resources: model.ResourceUsageConfig{
+				GPU: "1",
+			},
+		},
+		Deal: model.Deal{
+			Concurrency: 1,
+		},
+	}
+
+	resolver := job.NewStateResolver(
+		func(ctx context.Context, id string) (*model.Job, error) {
+			return stack.Nodes[0].LocalDB.GetJob(ctx, id)
+		},
+		func(ctx context.Context, id string) (model.JobState, error) {
+			return stack.Nodes[0].LocalDB.GetJobState(ctx, id)
+		},
+	)
+
+	for i := 0; i < nodeCount; i++ {
+		submittedJob, err := stack.Nodes[0].RequesterNode.SubmitJob(ctx, model.JobCreatePayload{
+			ClientID: "123",
+			Job:      jobConfig,
+		})
+		require.NoError(suite.T(), err)
+		jobIds = append(jobIds, submittedJob.ID)
+		time.Sleep(time.Millisecond * 500)
+	}
+
+	for _, jobId := range jobIds {
+		err := resolver.WaitUntilComplete(ctx, jobId)
+		require.NoError(suite.T(), err)
+	}
+
+	require.Equal(suite.T(), nodeCount, seenJobs)
+
+	allocationMap := map[string]int{}
+
+	for i := 0; i < nodeCount; i++ {
+		jobState, err := resolver.GetJobState(ctx, jobIds[i])
+		require.NoError(suite.T(), err)
+		completedShards := job.GetCompletedShardStates(jobState)
+		require.Equal(suite.T(), 1, len(completedShards))
+		require.Equal(suite.T(), model.JobStateCompleted, completedShards[0].State)
+		allocationMap[completedShards[0].NodeID]++
+	}
+
+	// test that each node has 1 job allocated to it
+	node1Count, ok := allocationMap[stack.Nodes[0].Transport.HostID()]
+	require.True(suite.T(), ok)
+	require.Equal(suite.T(), 1, node1Count)
+
+	node2Count, ok := allocationMap[stack.Nodes[1].Transport.HostID()]
+	require.True(suite.T(), ok)
+	require.Equal(suite.T(), 1, node2Count)
+}
+
 func (suite *ComputeNodeResourceLimitsSuite) TestDockerResourceLimitsCPU() {
 	ctx := context.Background()
 	CPU_LIMIT := "100m"

From 7664fa5598c5b30f62f71a7455468eda0860dccb Mon Sep 17 00:00:00 2001
From: Kai Davenport <kaiyadavenport@gmail.com>
Date: Wed, 19 Oct 2022 16:37:16 +0100
Subject: [PATCH 6/7] passing test for multiple GPUs

---
 pkg/test/computenode/resourcelimits_test.go  | 12 ++-
 pkg/transport/inprocess/inprocess_cluster.go | 85 +++++---------------
 2 files changed, 28 insertions(+), 69 deletions(-)

diff --git a/pkg/test/computenode/resourcelimits_test.go b/pkg/test/computenode/resourcelimits_test.go
index 3585cc8087..40de584d63 100644
--- a/pkg/test/computenode/resourcelimits_test.go
+++ b/pkg/test/computenode/resourcelimits_test.go
@@ -416,7 +416,7 @@ func (suite *ComputeNodeResourceLimitsSuite) TestParallelGPU() {
 	// the job needs to hang for a period of time so the other job will
 	// run on another node
 	jobHandler := func(ctx context.Context, shard model.JobShard, resultsDir string) (*model.RunCommandResult, error) {
-		time.Sleep(time.Second * 1)
+		time.Sleep(time.Millisecond * 1000)
 		seenJobs++
 		return &model.RunCommandResult{}, nil
 	}
@@ -441,11 +441,13 @@ func (suite *ComputeNodeResourceLimitsSuite) TestParallelGPU() {
 			},
 		},
 		inprocess.InProcessTransportClusterConfig{
-			GetMessageDelay: func(index int) time.Duration {
-				if index == 0 {
+			GetMessageDelay: func(fromIndex, toIndex int) time.Duration {
+				if fromIndex == toIndex {
+					// a node speaking to itself is quick
 					return time.Millisecond * 10
 				} else {
-					return time.Second * 1
+					// otherwise there is a delay
+					return time.Millisecond * 100
 				}
 			},
 		},
@@ -488,6 +490,8 @@ func (suite *ComputeNodeResourceLimitsSuite) TestParallelGPU() {
 		})
 		require.NoError(suite.T(), err)
 		jobIds = append(jobIds, submittedJob.ID)
+		// this needs to be less than the time the job lasts
+		// so we are running jobs in parallel
 		time.Sleep(time.Millisecond * 500)
 	}
 
diff --git a/pkg/transport/inprocess/inprocess_cluster.go b/pkg/transport/inprocess/inprocess_cluster.go
index 73650d0ae0..a53b3be389 100644
--- a/pkg/transport/inprocess/inprocess_cluster.go
+++ b/pkg/transport/inprocess/inprocess_cluster.go
@@ -5,15 +5,13 @@ import (
 	"time"
 
 	"github.com/filecoin-project/bacalhau/pkg/model"
-	"github.com/filecoin-project/bacalhau/pkg/transport"
-	"github.com/multiformats/go-multiaddr"
 	"github.com/rs/zerolog/log"
 )
 
 type InProcessTransportClusterConfig struct {
 	Count int
 	// a function to get the network latency for a given node id
-	GetMessageDelay func(index int) time.Duration
+	GetMessageDelay func(fromIndex, toIndex int) time.Duration
 }
 
 // this is a set of "connected" in process transports
@@ -34,75 +32,32 @@ func NewInProcessTransportCluster(config InProcessTransportClusterConfig) (*InPr
 		if err != nil {
 			return nil, err
 		}
-		transport.publishHandler = cluster.Publish
 		cluster.transports = append(cluster.transports, transport)
 	}
-	return cluster, nil
-}
-
-func (cluster *InProcessTransportCluster) Start(ctx context.Context) error {
-	return nil
-}
-
-func (cluster *InProcessTransportCluster) Shutdown(ctx context.Context) error {
-	return nil
-}
-
-func (cluster *InProcessTransportCluster) HostID() string {
-	return "cluster_root"
-}
-
-func (cluster *InProcessTransportCluster) HostAddrs() ([]multiaddr.Multiaddr, error) {
-	return []multiaddr.Multiaddr{}, nil
-}
-
-func (cluster *InProcessTransportCluster) GetEvents() []model.JobEvent {
-	return []model.JobEvent{}
-}
-
-/*
 
-  pub / sub
-
-*/
-
-// this function is assigned to the "publishHandler" of each transport
-// this means that calling "Publish" on one of the nodes transports
-// will end up here where we distribute the event to all the other nodes at the same time
-func (cluster *InProcessTransportCluster) Publish(ctx context.Context, ev model.JobEvent) error {
-	for i, transport := range cluster.transports {
-		transport := transport
-		i := i
-		go func() {
-			if cluster.config.GetMessageDelay != nil {
-				time.Sleep(cluster.config.GetMessageDelay(i))
-			}
-			err := transport.applyEvent(ctx, ev)
-			if err != nil {
-				log.Error().Msgf("error in handle event: %s\n%+v", err, ev)
+	for fromTransportIndex, fromTransport := range cluster.transports {
+		// override the publish handler for each transport and write to all the other transports
+		fromTransport.publishHandler = func(ctx context.Context, ev model.JobEvent) error {
+			for toTransportIndex, toTransport := range cluster.transports {
+				toTransport := toTransport
+				fromTransportIndex := fromTransportIndex
+				toTransportIndex := toTransportIndex
+				go func() {
+					if cluster.config.GetMessageDelay != nil {
+						time.Sleep(cluster.config.GetMessageDelay(fromTransportIndex, toTransportIndex))
+					}
+					err := toTransport.applyEvent(ctx, ev)
+					if err != nil {
+						log.Error().Msgf("error in handle event: %s\n%+v", err, ev)
+					}
+				}()
 			}
-		}()
+			return nil
+		}
 	}
-	return nil
-}
-
-func (cluster *InProcessTransportCluster) Subscribe(ctx context.Context, fn transport.SubscribeFn) {}
-
-/*
-encrypt / decrypt
-*/
-
-func (*InProcessTransportCluster) Encrypt(ctx context.Context, data, encryptionKeyBytes []byte) ([]byte, error) {
-	return data, nil
-}
-
-func (*InProcessTransportCluster) Decrypt(ctx context.Context, data []byte) ([]byte, error) {
-	return data, nil
+	return cluster, nil
 }
 
 func (cluster *InProcessTransportCluster) GetTransport(i int) *InProcessTransport {
 	return cluster.transports[i]
 }
-
-// Static check to ensure that InProcessTransportCluster implements Transport:
-var _ transport.Transport = (*InProcessTransportCluster)(nil)

From a18dab95301b4ea493c8d975e3ec918a245d1ddf Mon Sep 17 00:00:00 2001
From: Kai Davenport <kaiyadavenport@gmail.com>
Date: Wed, 19 Oct 2022 16:48:07 +0100
Subject: [PATCH 7/7] add logging for when we are dropping a job

---
 pkg/computenode/computenode.go | 18 ++++++++++++++----
 1 file changed, 14 insertions(+), 4 deletions(-)

diff --git a/pkg/computenode/computenode.go b/pkg/computenode/computenode.go
index 03cc22d2c1..83cf557371 100644
--- a/pkg/computenode/computenode.go
+++ b/pkg/computenode/computenode.go
@@ -285,14 +285,14 @@ func (n *ComputeNode) subscriptionEventCreated(ctx context.Context, jobEvent mod
 
 	// TODO XXX: don't hardcode networkSize, calculate this dynamically from
 	// libp2p instead somehow. https://github.com/filecoin-project/bacalhau/issues/512
-	jobNodeDistanceDelayMs := CalculateJobNodeDistanceDelay( //nolint:gomnd //nolint:gomnd
+	jobNodeDistanceDelayMs, shouldRunJob := CalculateJobNodeDistanceDelay( //nolint:gomnd //nolint:gomnd
 		// if the user isn't going to bid unless there are minBids many bids,
 		// we'd better make sure there are minBids many bids!
 		ctx, 1, n.ID, jobEvent.JobID, Max(jobEvent.Deal.Concurrency, jobEvent.Deal.MinBids),
 	)
 
 	// if delay is too high, just exit immediately.
-	if jobNodeDistanceDelayMs > 1000 { //nolint:gomnd
+	if !shouldRunJob { //nolint:gomnd
 		// drop the job on the floor, :-O
 		return nil
 	}
@@ -348,7 +348,7 @@ func diff(a, b int) int {
 	return a - b
 }
 
-func CalculateJobNodeDistanceDelay(ctx context.Context, networkSize int, nodeID, jobID string, concurrency int) int {
+func CalculateJobNodeDistanceDelay(ctx context.Context, networkSize int, nodeID, jobID string, concurrency int) (int, bool) {
 	// Calculate how long to wait to bid on the job by using a circular hashing
 	// style approach: Invent a metric for distance between node ID and job ID.
 	// If the node and job ID happen to be close to eachother, such that we'd
@@ -378,7 +378,17 @@ func CalculateJobNodeDistanceDelay(ctx context.Context, networkSize int, nodeID,
 		"node/job %s/%s, %d/%d, dist=%d, chunk=%d, delay=%d",
 		nodeID, jobID, nodeHash, jobHash, distance, chunk, delay,
 	)
-	return delay
+	shouldRun := true
+	// if delay is too high, just exit immediately.
+	if delay > 1000 { //nolint:gomnd
+		// drop the job on the floor, :-O
+		shouldRun = false
+		log.Ctx(ctx).Warn().Msgf(
+			"dropped job: node/job %s/%s, %d/%d, dist=%d, chunk=%d, delay=%d",
+			nodeID, jobID, nodeHash, jobHash, distance, chunk, delay,
+		)
+	}
+	return delay, shouldRun
 }
 
 func (n *ComputeNode) triggerStateTransition(ctx context.Context, event model.JobEvent, shard model.JobShard) error {