Skip to content

Commit

Permalink
feat: Use gRPC bidirectional streaming for source transformer (#2071)
Browse files Browse the repository at this point in the history
  • Loading branch information
BulkBeing authored Oct 2, 2024
1 parent ac7b33b commit e69551b
Show file tree
Hide file tree
Showing 35 changed files with 946 additions and 918 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ manifests: crds
kubectl kustomize config/extensions/webhook > config/validating-webhook-install.yaml

$(GOPATH)/bin/golangci-lint:
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b `go env GOPATH`/bin v1.54.1
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b `go env GOPATH`/bin v1.61.0

.PHONY: lint
lint: $(GOPATH)/bin/golangci-lint
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ require (
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe
github.com/nats-io/nats-server/v2 v2.10.20
github.com/nats-io/nats.go v1.37.0
github.com/numaproj/numaflow-go v0.8.2-0.20240923064822-e16694a878d0
github.com/numaproj/numaflow-go v0.8.2-0.20241001031210-60188185d9c0
github.com/prometheus/client_golang v1.18.0
github.com/prometheus/client_model v0.5.0
github.com/prometheus/common v0.45.0
Expand All @@ -55,7 +55,7 @@ require (
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d
google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117
google.golang.org/grpc v1.66.0
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.4.0
google.golang.org/protobuf v1.34.2
k8s.io/api v0.29.2
k8s.io/apimachinery v0.29.2
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -485,8 +485,8 @@ github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDm
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/numaproj/numaflow-go v0.8.2-0.20240923064822-e16694a878d0 h1:qPqZfJdPdsz4qymyzMSNICQe/xBnx9P/G3hRbC1DR7k=
github.com/numaproj/numaflow-go v0.8.2-0.20240923064822-e16694a878d0/go.mod h1:g4JZOyUPhjfhv+kR0sX5d8taw/dasgKPXLvQBi39mJ4=
github.com/numaproj/numaflow-go v0.8.2-0.20241001031210-60188185d9c0 h1:MN4Q36mPrXqPrv2dNoK3gyV7c1CGwUF3wNJxTZSw1lk=
github.com/numaproj/numaflow-go v0.8.2-0.20241001031210-60188185d9c0/go.mod h1:FaCMeV0V9SiLcVf2fwT+GeTJHNaK2gdQsTAIqQ4x7oc=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
Expand Down Expand Up @@ -1049,8 +1049,8 @@ google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAG
google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.66.0 h1:DibZuoBznOxbDQxRINckZcUvnCEvrW9pcWIE2yF9r1c=
google.golang.org/grpc v1.66.0/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0 h1:rNBFJjBCOgVr9pWD7rs/knKL4FRTKgpZmsRfV214zcA=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0/go.mod h1:Dk1tviKTvMCz5tvh7t+fh94dhmQVHuCt2OzJB3CTW9Y=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.4.0 h1:9SxA29VM43MF5Z9dQu694wmY5t8E/Gxr7s+RSxiIDmc=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.4.0/go.mod h1:yZOK5zhQMiALmuweVdIVoQPa6eIJyXn2B9g5dJDhqX4=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
Expand Down
7 changes: 5 additions & 2 deletions hack/generate-proto.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@ install-protobuf() {
ARCH=$(uname_arch)

echo "OS: $OS ARCH: $ARCH"
if [[ "$ARCH" = "amd64" ]]; then
ARCH="x86_64"
elif [[ "$ARCH" = "arm64" ]]; then
ARCH="aarch_64"
fi
BINARY_URL=$PB_REL/download/v${PROTOBUF_VERSION}/protoc-${PROTOBUF_VERSION}-${OS}-${ARCH}.zip
if [[ "$OS" = "darwin" ]]; then
BINARY_URL=$PB_REL/download/v${PROTOBUF_VERSION}/protoc-${PROTOBUF_VERSION}-osx-universal_binary.zip
elif [[ "$OS" = "linux" ]]; then
BINARY_URL=$PB_REL/download/v${PROTOBUF_VERSION}/protoc-${PROTOBUF_VERSION}-linux-x86_64.zip
fi
echo "Downloading $BINARY_URL"

Expand Down
25 changes: 17 additions & 8 deletions pkg/apis/proto/daemon/daemon_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 11 additions & 5 deletions pkg/apis/proto/mvtxdaemon/mvtxdaemon_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 24 additions & 6 deletions pkg/apis/proto/sourcetransform/v1/sourcetransform.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,35 @@ service SourceTransform {
// SourceTransformFn applies a function to each request element.
// In addition to map function, SourceTransformFn also supports assigning a new event time to response.
// SourceTransformFn can be used only at source vertex by source data transformer.
rpc SourceTransformFn(SourceTransformRequest) returns (SourceTransformResponse);
rpc SourceTransformFn(stream SourceTransformRequest) returns (stream SourceTransformResponse);

// IsReady is the heartbeat endpoint for gRPC.
rpc IsReady(google.protobuf.Empty) returns (ReadyResponse);
}

/*
* Handshake message between client and server to indicate the start of transmission.
*/
message Handshake {
// Required field indicating the start of transmission.
bool sot = 1;
}

/**
* SourceTransformerRequest represents a request element.
*/
message SourceTransformRequest {
repeated string keys = 1;
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
map<string, string> headers = 5;
message Request {
repeated string keys = 1;
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
map<string, string> headers = 5;
// This ID is used to uniquely identify a transform request
string id = 6;
}
Request request = 1;
optional Handshake handshake = 2;
}

/**
Expand All @@ -56,6 +70,10 @@ message SourceTransformResponse {
repeated string tags = 4;
}
repeated Result results = 1;
// This ID is used to refer the responses to the request it corresponds to.
string id = 2;
// Handshake message between client and server to indicate the start of transmission.
optional Handshake handshake = 3;
}

/**
Expand Down
56 changes: 56 additions & 0 deletions pkg/isb/tracker/message_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package tracker

import (
"sync"

"github.com/numaproj/numaflow/pkg/isb"
)

// MessageTracker is used to store a key value pair for string and *ReadMessage
// as it can be accessed by concurrent goroutines, we keep all operations
// under a mutex
type MessageTracker struct {
lock sync.RWMutex
m map[string]*isb.ReadMessage
}

// NewMessageTracker initializes a new instance of a Tracker
func NewMessageTracker(messages []*isb.ReadMessage) *MessageTracker {
m := make(map[string]*isb.ReadMessage, len(messages))
for _, msg := range messages {
id := msg.ReadOffset.String()
m[id] = msg
}
return &MessageTracker{
m: m,
lock: sync.RWMutex{},
}
}

// Remove will remove the entry for a given id and return the stored value corresponding to this id.
// A `nil` return value indicates that the id doesn't exist in the tracker.
func (t *MessageTracker) Remove(id string) *isb.ReadMessage {
t.lock.Lock()
defer t.lock.Unlock()
item, ok := t.m[id]
if !ok {
return nil
}
delete(t.m, id)
return item
}

// IsEmpty is a helper function which checks if the Tracker map is empty
// return true if empty
func (t *MessageTracker) IsEmpty() bool {
t.lock.RLock()
defer t.lock.RUnlock()
return len(t.m) == 0
}

// Len returns the number of messages currently stored in the tracker
func (t *MessageTracker) Len() int {
t.lock.RLock()
defer t.lock.RUnlock()
return len(t.m)
}
Original file line number Diff line number Diff line change
@@ -1,37 +1,39 @@
package rpc
package tracker

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/isb/testutils"
)

func TestTracker_AddRequest(t *testing.T) {
tr := NewTracker()
readMessages := testutils.BuildTestReadMessages(3, time.Unix(1661169600, 0), nil)
for _, msg := range readMessages {
tr.addRequest(&msg)
messages := make([]*isb.ReadMessage, len(readMessages))
for i, msg := range readMessages {
messages[i] = &msg
}
tr := NewMessageTracker(messages)
id := readMessages[0].ReadOffset.String()
m, ok := tr.getRequest(id)
assert.True(t, ok)
m := tr.Remove(id)
assert.NotNil(t, m)
assert.Equal(t, readMessages[0], *m)
}

func TestTracker_RemoveRequest(t *testing.T) {
tr := NewTracker()
readMessages := testutils.BuildTestReadMessages(3, time.Unix(1661169600, 0), nil)
for _, msg := range readMessages {
tr.addRequest(&msg)
messages := make([]*isb.ReadMessage, len(readMessages))
for i, msg := range readMessages {
messages[i] = &msg
}
tr := NewMessageTracker(messages)
id := readMessages[0].ReadOffset.String()
m, ok := tr.getRequest(id)
assert.True(t, ok)
m := tr.Remove(id)
assert.NotNil(t, m)
assert.Equal(t, readMessages[0], *m)
tr.removeRequest(id)
_, ok = tr.getRequest(id)
assert.False(t, ok)
m = tr.Remove(id)
assert.Nil(t, m)
}
2 changes: 0 additions & 2 deletions pkg/sdkclient/grpc/grpc_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package grpc

import (
"fmt"
"log"
"strconv"

"google.golang.org/grpc"
Expand Down Expand Up @@ -56,7 +55,6 @@ func ConnectToServer(udsSockAddr string, serverInfo *serverinfo.ServerInfo, maxM
)
} else {
sockAddr = getUdsSockAddr(udsSockAddr)
log.Println("UDS Client:", sockAddr)

conn, err = grpc.NewClient(sockAddr, grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMessageSize), grpc.MaxCallSendMsgSize(maxMessageSize)))
Expand Down
Loading

0 comments on commit e69551b

Please sign in to comment.