Skip to content

Commit

Permalink
feat: Unify Batch Map and Unary Map Operations Using a Shared gRPC Pr…
Browse files Browse the repository at this point in the history
…otocol (#2139)

Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 authored Oct 13, 2024
1 parent 271e459 commit dd08bca
Show file tree
Hide file tree
Showing 23 changed files with 159 additions and 1,154 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ require (
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe
github.com/nats-io/nats-server/v2 v2.10.20
github.com/nats-io/nats.go v1.37.0
github.com/numaproj/numaflow-go v0.8.2-0.20241003055702-9179ac584a4a
github.com/numaproj/numaflow-go v0.8.2-0.20241013052921-0aa35d8766f1
github.com/prometheus/client_golang v1.19.1
github.com/prometheus/client_model v0.6.1
github.com/prometheus/common v0.55.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -483,8 +483,8 @@ github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDm
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/numaproj/numaflow-go v0.8.2-0.20241003055702-9179ac584a4a h1:KnpALzELgzX7GR2FDvADTDsauGW/B1fzFw9b+kXYkFc=
github.com/numaproj/numaflow-go v0.8.2-0.20241003055702-9179ac584a4a/go.mod h1:FaCMeV0V9SiLcVf2fwT+GeTJHNaK2gdQsTAIqQ4x7oc=
github.com/numaproj/numaflow-go v0.8.2-0.20241013052921-0aa35d8766f1 h1:4uHQqImTmgGkCFrgEhX7atxsAe/nRgjv/2Px0rwqw/I=
github.com/numaproj/numaflow-go v0.8.2-0.20241013052921-0aa35d8766f1/go.mod h1:FaCMeV0V9SiLcVf2fwT+GeTJHNaK2gdQsTAIqQ4x7oc=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
Expand Down
9 changes: 9 additions & 0 deletions pkg/apis/proto/map/v1/map.proto
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ message MapRequest {
// This ID is used to uniquely identify a map request
string id = 2;
optional Handshake handshake = 3;
optional Status status = 4;
}

/*
Expand All @@ -57,6 +58,13 @@ message Handshake {
bool sot = 1;
}

/*
* Status message to indicate the status of the message.
*/
message Status {
bool eot = 1;
}

/**
* MapResponse represents a response element.
*/
Expand All @@ -70,6 +78,7 @@ message MapResponse {
// This ID is used to refer the responses to the request it corresponds to.
string id = 2;
optional Handshake handshake = 3;
optional Status status = 4;
}

/**
Expand Down
8 changes: 2 additions & 6 deletions pkg/isb/stores/jetstream/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,14 +169,10 @@ func TestForwarderJetStreamBuffer(t *testing.T) {
assert.NoError(t, err)

opts := []forward.Option{forward.WithReadBatchSize(tt.batchSize)}
if tt.batchEnabled {
opts = append(opts, forward.WithUDFBatchMap(myForwardJetStreamTest{}))
}
if tt.streamEnabled {
opts = append(opts, forward.WithUDFStreamingMap(myForwardJetStreamTest{}))
}
if tt.unaryEnabled {
opts = append(opts, forward.WithUDFUnaryMap(myForwardJetStreamTest{}))
} else {
opts = append(opts, forward.WithUDFMap(myForwardJetStreamTest{}))
}

f, err := forward.NewInterStepDataForward(vertexInstance, fromStep, toSteps, myForwardJetStreamTest{}, fetchWatermark, publishWatermark, idleManager, opts...)
Expand Down
6 changes: 3 additions & 3 deletions pkg/isb/stores/redis/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func TestRedisCheckBacklog(t *testing.T) {
}

fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps)
f, err := forward.NewInterStepDataForward(vertexInstance, rqr, toSteps, forwardReadWritePerformance{}, fetchWatermark, publishWatermark, wmb.NewNoOpIdleManager(), forward.WithReadBatchSize(10), forward.WithUDFUnaryMap(forwardReadWritePerformance{}))
f, err := forward.NewInterStepDataForward(vertexInstance, rqr, toSteps, forwardReadWritePerformance{}, fetchWatermark, publishWatermark, wmb.NewNoOpIdleManager(), forward.WithReadBatchSize(10), forward.WithUDFMap(forwardReadWritePerformance{}))
assert.NoError(t, err)

stopped := f.Start()
Expand Down Expand Up @@ -349,7 +349,7 @@ func (suite *ReadWritePerformance) SetupSuite() {
}

fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps)
isdf, _ := forward.NewInterStepDataForward(vertexInstance, rqr, toSteps, forwardReadWritePerformance{}, fetchWatermark, publishWatermark, wmb.NewNoOpIdleManager(), forward.WithUDFUnaryMap(forwardReadWritePerformance{}))
isdf, _ := forward.NewInterStepDataForward(vertexInstance, rqr, toSteps, forwardReadWritePerformance{}, fetchWatermark, publishWatermark, wmb.NewNoOpIdleManager(), forward.WithUDFMap(forwardReadWritePerformance{}))

suite.ctx = ctx
suite.rclient = client
Expand Down Expand Up @@ -443,7 +443,7 @@ func (suite *ReadWritePerformance) TestReadWriteLatencyPipelining() {
}

fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps)
suite.isdf, _ = forward.NewInterStepDataForward(vertexInstance, suite.rqr, toSteps, forwardReadWritePerformance{}, fetchWatermark, publishWatermark, wmb.NewNoOpIdleManager(), forward.WithUDFUnaryMap(forwardReadWritePerformance{}))
suite.isdf, _ = forward.NewInterStepDataForward(vertexInstance, suite.rqr, toSteps, forwardReadWritePerformance{}, fetchWatermark, publishWatermark, wmb.NewNoOpIdleManager(), forward.WithUDFMap(forwardReadWritePerformance{}))

suite.False(suite.rqw.IsFull())
var writeMessages = make([]isb.Message, 0, suite.count)
Expand Down
4 changes: 2 additions & 2 deletions pkg/isb/stores/redis/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func TestNewInterStepDataForwardRedis(t *testing.T) {
}

fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps)
f, err := forward.NewInterStepDataForward(vertexInstance, fromStep, toSteps, myForwardRedisTest{}, fetchWatermark, publishWatermark, wmb.NewNoOpIdleManager(), forward.WithUDFUnaryMap(myForwardRedisTest{}))
f, err := forward.NewInterStepDataForward(vertexInstance, fromStep, toSteps, myForwardRedisTest{}, fetchWatermark, publishWatermark, wmb.NewNoOpIdleManager(), forward.WithUDFMap(myForwardRedisTest{}))
assert.NoError(t, err)
assert.False(t, to1.IsFull())

Expand Down Expand Up @@ -463,7 +463,7 @@ func TestReadTimeout(t *testing.T) {
}

fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps)
f, err := forward.NewInterStepDataForward(vertexInstance, fromStep, toSteps, myForwardRedisTest{}, fetchWatermark, publishWatermark, wmb.NewNoOpIdleManager(), forward.WithUDFUnaryMap(myForwardRedisTest{}))
f, err := forward.NewInterStepDataForward(vertexInstance, fromStep, toSteps, myForwardRedisTest{}, fetchWatermark, publishWatermark, wmb.NewNoOpIdleManager(), forward.WithUDFMap(myForwardRedisTest{}))
assert.NoError(t, err)
stopped := f.Start()
// Call stop to end the test as we have a blocking read. The forwarder is up and running with no messages written
Expand Down
2 changes: 1 addition & 1 deletion pkg/reduce/pnf/pnf.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func (pf *ProcessAndForward) writeToBuffer(ctx context.Context, edgeName string,
metrics.LabelReason: writeErr.Error(),
}).Add(float64(len(message.Payload)))

pf.log.Infow("Dropped message", zap.String("reason", writeErr.Error()), zap.String("vertex", pf.vertexName), zap.String("pipeline", pf.pipelineName))
pf.log.Infow("Dropped message", zap.String("reason", writeErr.Error()), zap.String("vertex", pf.vertexName), zap.String("pipeline", pf.pipelineName), zap.String("msg_id", message.ID.String()))
} else {
failedMessages = append(failedMessages, message)
}
Expand Down
180 changes: 0 additions & 180 deletions pkg/sdkclient/batchmapper/client.go

This file was deleted.

Loading

0 comments on commit dd08bca

Please sign in to comment.