diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 578352ed55..84a1082fb7 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -140,7 +140,7 @@ jobs: max-parallel: 11 matrix: driver: [jetstream] - case: [e2e, diamond-e2e, transformer-e2e, kafka-e2e, http-e2e, nats-e2e, sdks-e2e, reduce-e2e, udsource-e2e, api-e2e, sideinput-e2e] + case: [e2e, diamond-e2e, transformer-e2e, kafka-e2e, http-e2e, nats-e2e, sdks-e2e, reduce-e2e, udsource-e2e, api-e2e, sideinputs-e2e] steps: - name: Checkout code uses: actions/checkout@v3 diff --git a/Makefile b/Makefile index b63e7a0356..be0b55e4c8 100644 --- a/Makefile +++ b/Makefile @@ -116,7 +116,7 @@ test-api-e2e: test-udsource-e2e: test-transformer-e2e: test-diamond-e2e: -test-sideinput-e2e: +test-sideinputs-e2e: test-%: $(MAKE) cleanup-e2e $(MAKE) image e2eapi-image diff --git a/pkg/forwarder/doc.go b/pkg/forwarder/doc.go new file mode 100644 index 0000000000..b37dcd6a83 --- /dev/null +++ b/pkg/forwarder/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package forwarder defines the interfaces for data forwarders in different type of vertices. +package forwarder diff --git a/pkg/forward/interfaces.go b/pkg/forwarder/interfaces.go similarity index 98% rename from pkg/forward/interfaces.go rename to pkg/forwarder/interfaces.go index 825bed2806..00db3305ee 100644 --- a/pkg/forward/interfaces.go +++ b/pkg/forwarder/interfaces.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package forward +package forwarder // VertexBuffer points to the partition of a buffer owned by the vertex. type VertexBuffer struct { diff --git a/pkg/isb/stores/jetstream/writer_test.go b/pkg/isb/stores/jetstream/writer_test.go index 42dcf9778f..bbc785dd64 100644 --- a/pkg/isb/stores/jetstream/writer_test.go +++ b/pkg/isb/stores/jetstream/writer_test.go @@ -24,10 +24,11 @@ import ( "github.com/stretchr/testify/assert" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" - "github.com/numaproj/numaflow/pkg/forward" + "github.com/numaproj/numaflow/pkg/forwarder" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/isb/testutils" natstest "github.com/numaproj/numaflow/pkg/shared/clients/nats/test" + "github.com/numaproj/numaflow/pkg/udf/forward" "github.com/numaproj/numaflow/pkg/watermark/generic" "github.com/numaproj/numaflow/pkg/watermark/wmb" ) @@ -35,8 +36,8 @@ import ( type myForwardJetStreamTest struct { } -func (f myForwardJetStreamTest) WhereTo(_ []string, _ []string) ([]forward.VertexBuffer, error) { - return []forward.VertexBuffer{{ +func (f myForwardJetStreamTest) WhereTo(_ []string, _ []string) ([]forwarder.VertexBuffer, error) { + return []forwarder.VertexBuffer{{ ToVertexName: "to1", ToVertexPartitionIdx: 0, }}, nil diff --git a/pkg/isb/stores/redis/read_test.go b/pkg/isb/stores/redis/read_test.go index 079a560250..719a0949c0 100644 --- a/pkg/isb/stores/redis/read_test.go +++ b/pkg/isb/stores/redis/read_test.go @@ -31,10 +31,11 @@ import ( "github.com/stretchr/testify/suite" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" - "github.com/numaproj/numaflow/pkg/forward" + "github.com/numaproj/numaflow/pkg/forwarder" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/isb/testutils" redisclient "github.com/numaproj/numaflow/pkg/shared/clients/redis" + "github.com/numaproj/numaflow/pkg/udf/forward" "github.com/numaproj/numaflow/pkg/watermark/generic" "github.com/numaproj/numaflow/pkg/watermark/wmb" ) @@ -299,8 +300,8 @@ type ReadWritePerformance struct { type forwardReadWritePerformance struct { } -func (f forwardReadWritePerformance) WhereTo(_ []string, _ []string) ([]forward.VertexBuffer, error) { - return []forward.VertexBuffer{{ +func (f forwardReadWritePerformance) WhereTo(_ []string, _ []string) ([]forwarder.VertexBuffer, error) { + return []forwarder.VertexBuffer{{ ToVertexName: "to1", ToVertexPartitionIdx: 0, }}, nil diff --git a/pkg/isb/stores/redis/write_test.go b/pkg/isb/stores/redis/write_test.go index a412d24758..b84695c4ce 100644 --- a/pkg/isb/stores/redis/write_test.go +++ b/pkg/isb/stores/redis/write_test.go @@ -28,10 +28,11 @@ import ( "github.com/stretchr/testify/assert" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" - "github.com/numaproj/numaflow/pkg/forward" + "github.com/numaproj/numaflow/pkg/forwarder" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/isb/testutils" redisclient "github.com/numaproj/numaflow/pkg/shared/clients/redis" + "github.com/numaproj/numaflow/pkg/udf/forward" "github.com/numaproj/numaflow/pkg/watermark/generic" "github.com/numaproj/numaflow/pkg/watermark/wmb" ) @@ -342,8 +343,8 @@ func Test_GetRefreshFullError(t *testing.T) { type myForwardRedisTest struct { } -func (f myForwardRedisTest) WhereTo(_ []string, _ []string) ([]forward.VertexBuffer, error) { - return []forward.VertexBuffer{{ +func (f myForwardRedisTest) WhereTo(_ []string, _ []string) ([]forwarder.VertexBuffer, error) { + return []forwarder.VertexBuffer{{ ToVertexName: "to1", ToVertexPartitionIdx: 0, }}, nil diff --git a/pkg/reduce/data_forward.go b/pkg/reduce/data_forward.go index c515ac1b81..e87c812c0f 100644 --- a/pkg/reduce/data_forward.go +++ b/pkg/reduce/data_forward.go @@ -38,7 +38,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" - "github.com/numaproj/numaflow/pkg/forward" + "github.com/numaproj/numaflow/pkg/forwarder" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/metrics" "github.com/numaproj/numaflow/pkg/reduce/pbq" @@ -69,7 +69,7 @@ type DataForward struct { // wmbChecker checks if the idle watermark is valid when the len(readMessage) is 0. wmbChecker wmb.WMBChecker pbqManager *pbq.Manager - whereToDecider forward.ToWhichStepDecider + whereToDecider forwarder.ToWhichStepDecider udfInvocationTracking map[partition.ID]*pnf.ForwardTask of *pnf.OrderedProcessor opts *Options @@ -82,7 +82,7 @@ func NewDataForward(ctx context.Context, fromBuffer isb.BufferReader, toBuffers map[string][]isb.BufferWriter, pbqManager *pbq.Manager, - whereToDecider forward.ToWhichStepDecider, + whereToDecider forwarder.ToWhichStepDecider, fw fetch.Fetcher, watermarkPublishers map[string]publish.Publisher, windowingStrategy window.Windower, diff --git a/pkg/reduce/data_forward_test.go b/pkg/reduce/data_forward_test.go index 317d557c60..1349d27901 100644 --- a/pkg/reduce/data_forward_test.go +++ b/pkg/reduce/data_forward_test.go @@ -30,7 +30,7 @@ import ( "go.uber.org/atomic" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" - "github.com/numaproj/numaflow/pkg/forward" + "github.com/numaproj/numaflow/pkg/forwarder" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer" "github.com/numaproj/numaflow/pkg/reduce/pbq" @@ -125,8 +125,8 @@ type myForwardTestRoundRobin struct { count int } -func (f *myForwardTestRoundRobin) WhereTo(_ []string, _ []string) ([]forward.VertexBuffer, error) { - var output = []forward.VertexBuffer{{ +func (f *myForwardTestRoundRobin) WhereTo(_ []string, _ []string) ([]forwarder.VertexBuffer, error) { + var output = []forwarder.VertexBuffer{{ ToVertexName: "reduce-to-vertex", ToVertexPartitionIdx: int32(f.count % 2), }} @@ -164,8 +164,8 @@ func (f CounterReduceTest) ApplyReduce(_ context.Context, partitionID *partition }, nil } -func (f CounterReduceTest) WhereTo(_ []string, _ []string) ([]forward.VertexBuffer, error) { - return []forward.VertexBuffer{{ +func (f CounterReduceTest) WhereTo(_ []string, _ []string) ([]forwarder.VertexBuffer, error) { + return []forwarder.VertexBuffer{{ ToVertexName: "reduce-to-vertex", ToVertexPartitionIdx: 0, }}, nil diff --git a/pkg/reduce/pnf/ordered.go b/pkg/reduce/pnf/ordered.go index 51f7098f44..aea4f160e6 100644 --- a/pkg/reduce/pnf/ordered.go +++ b/pkg/reduce/pnf/ordered.go @@ -29,7 +29,7 @@ import ( dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaflow/pkg/watermark/wmb" - "github.com/numaproj/numaflow/pkg/forward" + "github.com/numaproj/numaflow/pkg/forwarder" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/metrics" "github.com/numaproj/numaflow/pkg/reduce/applier" @@ -60,7 +60,7 @@ type OrderedProcessor struct { pbqManager *pbq.Manager udf applier.ReduceApplier toBuffers map[string][]isb.BufferWriter - whereToDecider forward.ToWhichStepDecider + whereToDecider forwarder.ToWhichStepDecider watermarkPublishers map[string]publish.Publisher idleManager wmb.IdleManager log *zap.SugaredLogger @@ -72,7 +72,7 @@ func NewOrderedProcessor(ctx context.Context, udf applier.ReduceApplier, toBuffers map[string][]isb.BufferWriter, pbqManager *pbq.Manager, - whereToDecider forward.ToWhichStepDecider, + whereToDecider forwarder.ToWhichStepDecider, watermarkPublishers map[string]publish.Publisher, idleManager wmb.IdleManager) *OrderedProcessor { diff --git a/pkg/reduce/pnf/ordered_test.go b/pkg/reduce/pnf/ordered_test.go index 9ae6c3fc85..65ac36058c 100644 --- a/pkg/reduce/pnf/ordered_test.go +++ b/pkg/reduce/pnf/ordered_test.go @@ -24,7 +24,7 @@ import ( "github.com/stretchr/testify/assert" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" - "github.com/numaproj/numaflow/pkg/forward" + "github.com/numaproj/numaflow/pkg/forwarder" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer" "github.com/numaproj/numaflow/pkg/isb/testutils" @@ -40,8 +40,8 @@ import ( type myForwardTest struct { } -func (f myForwardTest) WhereTo(_ []string, _ []string) ([]forward.VertexBuffer, error) { - return []forward.VertexBuffer{}, nil +func (f myForwardTest) WhereTo(_ []string, _ []string) ([]forwarder.VertexBuffer, error) { + return []forwarder.VertexBuffer{}, nil } func (f myForwardTest) Apply(ctx context.Context, message *isb.ReadMessage) ([]*isb.WriteMessage, error) { diff --git a/pkg/reduce/pnf/processandforward.go b/pkg/reduce/pnf/processandforward.go index ea91adc165..0f2eb6dbf1 100644 --- a/pkg/reduce/pnf/processandforward.go +++ b/pkg/reduce/pnf/processandforward.go @@ -32,7 +32,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" - "github.com/numaproj/numaflow/pkg/forward" + "github.com/numaproj/numaflow/pkg/forwarder" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/metrics" "github.com/numaproj/numaflow/pkg/reduce/applier" @@ -56,7 +56,7 @@ type processAndForward struct { pbqReader pbq.Reader log *zap.SugaredLogger toBuffers map[string][]isb.BufferWriter - whereToDecider forward.ToWhichStepDecider + whereToDecider forwarder.ToWhichStepDecider wmPublishers map[string]publish.Publisher idleManager wmb.IdleManager } @@ -70,7 +70,7 @@ func newProcessAndForward(ctx context.Context, udf applier.ReduceApplier, pbqReader pbq.Reader, toBuffers map[string][]isb.BufferWriter, - whereToDecider forward.ToWhichStepDecider, + whereToDecider forwarder.ToWhichStepDecider, pw map[string]publish.Publisher, idleManager wmb.IdleManager) *processAndForward { @@ -174,7 +174,7 @@ func (p *processAndForward) whereToStep() map[string][][]isb.Message { // writer doesn't accept array of pointers messagesToStep := make(map[string][][]isb.Message) - var to []forward.VertexBuffer + var to []forwarder.VertexBuffer var err error for _, msg := range p.writeMessages { to, err = p.whereToDecider.WhereTo(msg.Keys, msg.Tags) diff --git a/pkg/reduce/pnf/processandforward_test.go b/pkg/reduce/pnf/processandforward_test.go index d6247eda25..232acfad9c 100644 --- a/pkg/reduce/pnf/processandforward_test.go +++ b/pkg/reduce/pnf/processandforward_test.go @@ -28,7 +28,7 @@ import ( "github.com/numaproj/numaflow-go/pkg/apis/proto/reduce/v1/reducemock" "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" - "github.com/numaproj/numaflow/pkg/forward" + "github.com/numaproj/numaflow/pkg/forwarder" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer" "github.com/numaproj/numaflow/pkg/reduce/pbq" @@ -63,16 +63,16 @@ type forwardTest struct { buffers []string } -func (f *forwardTest) WhereTo(keys []string, _ []string) ([]forward.VertexBuffer, error) { +func (f *forwardTest) WhereTo(keys []string, _ []string) ([]forwarder.VertexBuffer, error) { if strings.Compare(keys[len(keys)-1], "test-forward-one") == 0 { - return []forward.VertexBuffer{{ + return []forwarder.VertexBuffer{{ ToVertexName: "buffer1", ToVertexPartitionIdx: int32(f.count % 2), }}, nil } else if strings.Compare(keys[len(keys)-1], "test-forward-all") == 0 { - var steps []forward.VertexBuffer + var steps []forwarder.VertexBuffer for _, buffer := range f.buffers { - steps = append(steps, forward.VertexBuffer{ + steps = append(steps, forwarder.VertexBuffer{ ToVertexName: buffer, ToVertexPartitionIdx: int32(f.count % 2), }) @@ -80,7 +80,7 @@ func (f *forwardTest) WhereTo(keys []string, _ []string) ([]forward.VertexBuffer return steps, nil } f.count++ - return []forward.VertexBuffer{}, nil + return []forwarder.VertexBuffer{}, nil } func (f forwardTest) Apply(ctx context.Context, message *isb.ReadMessage) ([]*isb.WriteMessage, error) { diff --git a/pkg/sinks/sinker.go b/pkg/sinks/sinker.go index c95794a8e9..e2045a677b 100644 --- a/pkg/sinks/sinker.go +++ b/pkg/sinks/sinker.go @@ -17,12 +17,12 @@ limitations under the License. package sinks import ( - "github.com/numaproj/numaflow/pkg/forward" + "github.com/numaproj/numaflow/pkg/forwarder" "github.com/numaproj/numaflow/pkg/isb" ) // Sinker interface defines what a Sink should implement. type Sinker interface { isb.BufferWriter - forward.StarterStopper + forwarder.StarterStopper } diff --git a/pkg/sources/forward/data_forward.go b/pkg/sources/forward/data_forward.go index b8d372b070..45eaaed197 100644 --- a/pkg/sources/forward/data_forward.go +++ b/pkg/sources/forward/data_forward.go @@ -27,7 +27,7 @@ import ( "go.uber.org/zap" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" - "github.com/numaproj/numaflow/pkg/forward" + "github.com/numaproj/numaflow/pkg/forwarder" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/metrics" "github.com/numaproj/numaflow/pkg/shared/idlehandler" @@ -51,7 +51,7 @@ type DataForward struct { reader isb.BufferReader // toBuffers store the toVertex name to its owned buffers mapping. toBuffers map[string][]isb.BufferWriter - toWhichStepDecider forward.ToWhichStepDecider + toWhichStepDecider forwarder.ToWhichStepDecider transformer applier.SourceTransformApplier wmFetcher fetch.Fetcher toVertexWMStores map[string]store.WatermarkStore @@ -73,7 +73,7 @@ func NewDataForward( vertexInstance *dfv1.VertexInstance, fromStep isb.BufferReader, toSteps map[string][]isb.BufferWriter, - toWhichStepDecider forward.ToWhichStepDecider, + toWhichStepDecider forwarder.ToWhichStepDecider, transformer applier.SourceTransformApplier, fetchWatermark fetch.Fetcher, srcWMPublisher isb.SourceWatermarkPublisher, diff --git a/pkg/sources/forward/data_forward_test.go b/pkg/sources/forward/data_forward_test.go index c100e4d6a1..41f103776e 100644 --- a/pkg/sources/forward/data_forward_test.go +++ b/pkg/sources/forward/data_forward_test.go @@ -30,7 +30,7 @@ import ( "github.com/stretchr/testify/assert" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" - "github.com/numaproj/numaflow/pkg/forward" + "github.com/numaproj/numaflow/pkg/forwarder" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer" "github.com/numaproj/numaflow/pkg/isb/testutils" @@ -79,8 +79,8 @@ func (t *testForwardFetcher) ComputeHeadIdleWMB(int32) wmb.WMB { type myForwardTest struct { } -func (f myForwardTest) WhereTo(_ []string, _ []string) ([]forward.VertexBuffer, error) { - return []forward.VertexBuffer{{ +func (f myForwardTest) WhereTo(_ []string, _ []string) ([]forwarder.VertexBuffer, error) { + return []forwarder.VertexBuffer{{ ToVertexName: "to1", ToVertexPartitionIdx: 0, }}, nil @@ -788,8 +788,8 @@ func TestNewDataForward(t *testing.T) { type mySourceForwardTest struct { } -func (f mySourceForwardTest) WhereTo(_ []string, _ []string) ([]forward.VertexBuffer, error) { - return []forward.VertexBuffer{{ +func (f mySourceForwardTest) WhereTo(_ []string, _ []string) ([]forwarder.VertexBuffer, error) { + return []forwarder.VertexBuffer{{ ToVertexName: "to1", ToVertexPartitionIdx: 0, }}, nil @@ -799,8 +799,8 @@ type mySourceForwardTestRoundRobin struct { count int } -func (f *mySourceForwardTestRoundRobin) WhereTo(_ []string, _ []string) ([]forward.VertexBuffer, error) { - var output = []forward.VertexBuffer{{ +func (f *mySourceForwardTestRoundRobin) WhereTo(_ []string, _ []string) ([]forwarder.VertexBuffer, error) { + var output = []forwarder.VertexBuffer{{ ToVertexName: "to1", ToVertexPartitionIdx: int32(f.count % 2), }} @@ -1072,8 +1072,8 @@ func TestWriteToBuffer(t *testing.T) { type myForwardDropTest struct { } -func (f myForwardDropTest) WhereTo(_ []string, _ []string) ([]forward.VertexBuffer, error) { - return []forward.VertexBuffer{}, nil +func (f myForwardDropTest) WhereTo(_ []string, _ []string) ([]forwarder.VertexBuffer, error) { + return []forwarder.VertexBuffer{}, nil } func (f myForwardDropTest) ApplyTransform(ctx context.Context, message *isb.ReadMessage) ([]*isb.WriteMessage, error) { @@ -1084,8 +1084,8 @@ type myForwardToAllTest struct { count int } -func (f *myForwardToAllTest) WhereTo(_ []string, _ []string) ([]forward.VertexBuffer, error) { - var output = []forward.VertexBuffer{{ +func (f *myForwardToAllTest) WhereTo(_ []string, _ []string) ([]forwarder.VertexBuffer, error) { + var output = []forwarder.VertexBuffer{{ ToVertexName: "to1", ToVertexPartitionIdx: int32(f.count % 2), }, @@ -1104,8 +1104,8 @@ func (f *myForwardToAllTest) ApplyTransform(ctx context.Context, message *isb.Re type myForwardInternalErrTest struct { } -func (f myForwardInternalErrTest) WhereTo(_ []string, _ []string) ([]forward.VertexBuffer, error) { - return []forward.VertexBuffer{{ +func (f myForwardInternalErrTest) WhereTo(_ []string, _ []string) ([]forwarder.VertexBuffer, error) { + return []forwarder.VertexBuffer{{ ToVertexName: "to1", ToVertexPartitionIdx: 0, }}, nil @@ -1125,8 +1125,8 @@ func (f myForwardInternalErrTest) ApplyTransform(_ context.Context, _ *isb.ReadM type myForwardApplyWhereToErrTest struct { } -func (f myForwardApplyWhereToErrTest) WhereTo(_ []string, _ []string) ([]forward.VertexBuffer, error) { - return []forward.VertexBuffer{{ +func (f myForwardApplyWhereToErrTest) WhereTo(_ []string, _ []string) ([]forwarder.VertexBuffer, error) { + return []forwarder.VertexBuffer{{ ToVertexName: "to1", ToVertexPartitionIdx: 0, }}, fmt.Errorf("whereToStep failed") @@ -1139,8 +1139,8 @@ func (f myForwardApplyWhereToErrTest) ApplyTransform(ctx context.Context, messag type myForwardApplyTransformerErrTest struct { } -func (f myForwardApplyTransformerErrTest) WhereTo(_ []string, _ []string) ([]forward.VertexBuffer, error) { - return []forward.VertexBuffer{{ +func (f myForwardApplyTransformerErrTest) WhereTo(_ []string, _ []string) ([]forwarder.VertexBuffer, error) { + return []forwarder.VertexBuffer{{ ToVertexName: "to1", ToVertexPartitionIdx: 0, }}, nil diff --git a/pkg/sources/forward/shutdown_test.go b/pkg/sources/forward/shutdown_test.go index 4c3375e564..49b7e53558 100644 --- a/pkg/sources/forward/shutdown_test.go +++ b/pkg/sources/forward/shutdown_test.go @@ -24,7 +24,7 @@ import ( "github.com/stretchr/testify/assert" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" - "github.com/numaproj/numaflow/pkg/forward" + "github.com/numaproj/numaflow/pkg/forwarder" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer" "github.com/numaproj/numaflow/pkg/isb/testutils" @@ -39,8 +39,8 @@ func (s myShutdownTest) IsHealthy(context.Context) error { return nil } -func (s myShutdownTest) WhereTo([]string, []string) ([]forward.VertexBuffer, error) { - return []forward.VertexBuffer{}, nil +func (s myShutdownTest) WhereTo([]string, []string) ([]forwarder.VertexBuffer, error) { + return []forwarder.VertexBuffer{}, nil } func (s myShutdownTest) ApplyTransform(ctx context.Context, message *isb.ReadMessage) ([]*isb.WriteMessage, error) { diff --git a/pkg/sources/generator/tickgen.go b/pkg/sources/generator/tickgen.go index f332726571..c637b7c6fa 100644 --- a/pkg/sources/generator/tickgen.go +++ b/pkg/sources/generator/tickgen.go @@ -28,7 +28,7 @@ import ( "go.uber.org/zap" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" - "github.com/numaproj/numaflow/pkg/forward" + "github.com/numaproj/numaflow/pkg/forwarder" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/metrics" "github.com/numaproj/numaflow/pkg/shared/logging" @@ -151,7 +151,7 @@ func WithReadTimeout(timeout time.Duration) Option { func NewMemGen( vertexInstance *dfv1.VertexInstance, writers map[string][]isb.BufferWriter, - fsd forward.ToWhichStepDecider, + fsd forwarder.ToWhichStepDecider, transformerApplier applier2.SourceTransformApplier, fetchWM fetch.Fetcher, toVertexPublisherStores map[string]store.WatermarkStore, diff --git a/pkg/sources/generator/tickgen_test.go b/pkg/sources/generator/tickgen_test.go index d89c2dc654..5127c43a67 100644 --- a/pkg/sources/generator/tickgen_test.go +++ b/pkg/sources/generator/tickgen_test.go @@ -25,7 +25,7 @@ import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" - "github.com/numaproj/numaflow/pkg/forward" + "github.com/numaproj/numaflow/pkg/forwarder" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer" "github.com/numaproj/numaflow/pkg/sources/forward/applier" @@ -37,8 +37,8 @@ import ( type myForwardToAllTest struct { } -func (f myForwardToAllTest) WhereTo(_ []string, _ []string) ([]forward.VertexBuffer, error) { - return []forward.VertexBuffer{{ +func (f myForwardToAllTest) WhereTo(_ []string, _ []string) ([]forwarder.VertexBuffer, error) { + return []forwarder.VertexBuffer{{ ToVertexName: "writer", ToVertexPartitionIdx: 0, }}, nil diff --git a/pkg/sources/http/http.go b/pkg/sources/http/http.go index dfd1ef2cb0..388c1a6366 100644 --- a/pkg/sources/http/http.go +++ b/pkg/sources/http/http.go @@ -30,7 +30,7 @@ import ( "go.uber.org/zap" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" - "github.com/numaproj/numaflow/pkg/forward" + "github.com/numaproj/numaflow/pkg/forwarder" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/metrics" "github.com/numaproj/numaflow/pkg/shared/logging" @@ -89,7 +89,7 @@ func WithBufferSize(s int) Option { func New( vertexInstance *dfv1.VertexInstance, writers map[string][]isb.BufferWriter, - fsd forward.ToWhichStepDecider, + fsd forwarder.ToWhichStepDecider, transformerApplier applier.SourceTransformApplier, fetchWM fetch.Fetcher, toVertexPublisherStores map[string]store.WatermarkStore, diff --git a/pkg/sources/http/http_test.go b/pkg/sources/http/http_test.go index 9f49021adb..38133f896d 100644 --- a/pkg/sources/http/http_test.go +++ b/pkg/sources/http/http_test.go @@ -23,7 +23,7 @@ import ( "github.com/stretchr/testify/assert" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" - "github.com/numaproj/numaflow/pkg/forward" + "github.com/numaproj/numaflow/pkg/forwarder" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer" "github.com/numaproj/numaflow/pkg/sources/forward/applier" @@ -35,8 +35,8 @@ import ( type myForwardToAllTest struct { } -func (f myForwardToAllTest) WhereTo(_ []string, _ []string) ([]forward.VertexBuffer, error) { - return []forward.VertexBuffer{{ +func (f myForwardToAllTest) WhereTo(_ []string, _ []string) ([]forwarder.VertexBuffer, error) { + return []forwarder.VertexBuffer{{ ToVertexName: "test", ToVertexPartitionIdx: 0, }}, nil diff --git a/pkg/sources/kafka/handler_test.go b/pkg/sources/kafka/handler_test.go index eea804b42d..270ca331ec 100644 --- a/pkg/sources/kafka/handler_test.go +++ b/pkg/sources/kafka/handler_test.go @@ -26,7 +26,7 @@ import ( "github.com/stretchr/testify/assert" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" - "github.com/numaproj/numaflow/pkg/forward" + "github.com/numaproj/numaflow/pkg/forwarder" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer" "github.com/numaproj/numaflow/pkg/shared/logging" @@ -39,8 +39,8 @@ import ( type myForwardToAllTest struct { } -func (f myForwardToAllTest) WhereTo(_ []string, _ []string) ([]forward.VertexBuffer, error) { - return []forward.VertexBuffer{{ +func (f myForwardToAllTest) WhereTo(_ []string, _ []string) ([]forwarder.VertexBuffer, error) { + return []forwarder.VertexBuffer{{ ToVertexName: "test", ToVertexPartitionIdx: 0, }}, nil diff --git a/pkg/sources/kafka/reader.go b/pkg/sources/kafka/reader.go index 521d8d5a7e..8669aa7d16 100644 --- a/pkg/sources/kafka/reader.go +++ b/pkg/sources/kafka/reader.go @@ -26,7 +26,7 @@ import ( "go.uber.org/zap" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" - "github.com/numaproj/numaflow/pkg/forward" + "github.com/numaproj/numaflow/pkg/forwarder" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/metrics" "github.com/numaproj/numaflow/pkg/shared/logging" @@ -328,7 +328,7 @@ func (r *KafkaSource) Pending(_ context.Context) (int64, error) { func NewKafkaSource( vertexInstance *dfv1.VertexInstance, writers map[string][]isb.BufferWriter, - fsd forward.ToWhichStepDecider, + fsd forwarder.ToWhichStepDecider, transformerApplier applier.SourceTransformApplier, fetchWM fetch.Fetcher, toVertexPublisherStores map[string]store.WatermarkStore, diff --git a/pkg/sources/nats/nats.go b/pkg/sources/nats/nats.go index 2d99574f77..0d360666d9 100644 --- a/pkg/sources/nats/nats.go +++ b/pkg/sources/nats/nats.go @@ -26,7 +26,7 @@ import ( "go.uber.org/zap" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" - "github.com/numaproj/numaflow/pkg/forward" + "github.com/numaproj/numaflow/pkg/forwarder" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/metrics" "github.com/numaproj/numaflow/pkg/shared/logging" @@ -61,7 +61,7 @@ type natsSource struct { func New( vertexInstance *dfv1.VertexInstance, writers map[string][]isb.BufferWriter, - fsd forward.ToWhichStepDecider, + fsd forwarder.ToWhichStepDecider, transformerApplier applier.SourceTransformApplier, fetchWM fetch.Fetcher, toVertexPublisherStores map[string]store.WatermarkStore, diff --git a/pkg/sources/nats/nats_test.go b/pkg/sources/nats/nats_test.go index 59159a8722..c5107b2040 100644 --- a/pkg/sources/nats/nats_test.go +++ b/pkg/sources/nats/nats_test.go @@ -26,7 +26,7 @@ import ( "github.com/stretchr/testify/assert" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" - "github.com/numaproj/numaflow/pkg/forward" + "github.com/numaproj/numaflow/pkg/forwarder" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer" natstest "github.com/numaproj/numaflow/pkg/shared/clients/nats/test" @@ -39,8 +39,8 @@ import ( type myForwardToAllTest struct { } -func (f myForwardToAllTest) WhereTo(_ []string, _ []string) ([]forward.VertexBuffer, error) { - return []forward.VertexBuffer{{ +func (f myForwardToAllTest) WhereTo(_ []string, _ []string) ([]forwarder.VertexBuffer, error) { + return []forwarder.VertexBuffer{{ ToVertexName: "test", ToVertexPartitionIdx: 0, }}, nil diff --git a/pkg/sources/source.go b/pkg/sources/source.go index 13d2cb3ae4..284e4c32a1 100644 --- a/pkg/sources/source.go +++ b/pkg/sources/source.go @@ -24,7 +24,7 @@ import ( "go.uber.org/zap" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" - "github.com/numaproj/numaflow/pkg/forward" + "github.com/numaproj/numaflow/pkg/forwarder" "github.com/numaproj/numaflow/pkg/isb" jetstreamisb "github.com/numaproj/numaflow/pkg/isb/stores/jetstream" redisisb "github.com/numaproj/numaflow/pkg/isb/stores/redis" @@ -278,7 +278,7 @@ func (sp *SourceProcessor) Start(ctx context.Context) error { // getSourcer is used to send the sourcer information func (sp *SourceProcessor) getSourcer( writers map[string][]isb.BufferWriter, - fsd forward.ToWhichStepDecider, + fsd forwarder.ToWhichStepDecider, transformerApplier applier.SourceTransformApplier, udsGRPCClient *udsource.GRPCBasedUDSource, fetchWM fetch.Fetcher, @@ -327,21 +327,21 @@ func (sp *SourceProcessor) getSourcer( return nil, fmt.Errorf("invalid source spec") } -func (sp *SourceProcessor) getSourceGoWhereDecider(shuffleFuncMap map[string]*shuffle.Shuffle) forward.GoWhere { +func (sp *SourceProcessor) getSourceGoWhereDecider(shuffleFuncMap map[string]*shuffle.Shuffle) forwarder.GoWhere { getToBufferPartition := GetPartitionedBufferIdx() - fsd := forward.GoWhere(func(keys []string, tags []string) ([]forward.VertexBuffer, error) { - var result []forward.VertexBuffer + fsd := forwarder.GoWhere(func(keys []string, tags []string) ([]forwarder.VertexBuffer, error) { + var result []forwarder.VertexBuffer for _, edge := range sp.VertexInstance.Vertex.Spec.ToEdges { if edge.ToVertexType == dfv1.VertexTypeReduceUDF && edge.GetToVertexPartitionCount() > 1 { // Need to shuffle toVertexPartition := shuffleFuncMap[fmt.Sprintf("%s:%s", edge.From, edge.To)].Shuffle(keys) - result = append(result, forward.VertexBuffer{ + result = append(result, forwarder.VertexBuffer{ ToVertexName: edge.To, ToVertexPartitionIdx: toVertexPartition, }) } else { - result = append(result, forward.VertexBuffer{ + result = append(result, forwarder.VertexBuffer{ ToVertexName: edge.To, ToVertexPartitionIdx: getToBufferPartition(edge.To, edge.GetToVertexPartitionCount()), }) @@ -352,10 +352,10 @@ func (sp *SourceProcessor) getSourceGoWhereDecider(shuffleFuncMap map[string]*sh return fsd } -func (sp *SourceProcessor) getTransformerGoWhereDecider(shuffleFuncMap map[string]*shuffle.Shuffle) forward.GoWhere { +func (sp *SourceProcessor) getTransformerGoWhereDecider(shuffleFuncMap map[string]*shuffle.Shuffle) forwarder.GoWhere { getToBufferPartition := GetPartitionedBufferIdx() - fsd := forward.GoWhere(func(keys []string, tags []string) ([]forward.VertexBuffer, error) { - var result []forward.VertexBuffer + fsd := forwarder.GoWhere(func(keys []string, tags []string) ([]forwarder.VertexBuffer, error) { + var result []forwarder.VertexBuffer if sharedutil.StringSliceContains(tags, dfv1.MessageTagDrop) { return result, nil @@ -366,12 +366,12 @@ func (sp *SourceProcessor) getTransformerGoWhereDecider(shuffleFuncMap map[strin if edge.Conditions == nil || edge.Conditions.Tags == nil || len(edge.Conditions.Tags.Values) == 0 { if edge.ToVertexType == dfv1.VertexTypeReduceUDF && edge.GetToVertexPartitionCount() > 1 { // Need to shuffle toVertexPartition := shuffleFuncMap[fmt.Sprintf("%s:%s", edge.From, edge.To)].Shuffle(keys) - result = append(result, forward.VertexBuffer{ + result = append(result, forwarder.VertexBuffer{ ToVertexName: edge.To, ToVertexPartitionIdx: toVertexPartition, }) } else { - result = append(result, forward.VertexBuffer{ + result = append(result, forwarder.VertexBuffer{ ToVertexName: edge.To, ToVertexPartitionIdx: getToBufferPartition(edge.To, edge.GetToVertexPartitionCount()), }) @@ -380,12 +380,12 @@ func (sp *SourceProcessor) getTransformerGoWhereDecider(shuffleFuncMap map[strin if sharedutil.CompareSlice(edge.Conditions.Tags.GetOperator(), tags, edge.Conditions.Tags.Values) { if edge.ToVertexType == dfv1.VertexTypeReduceUDF && edge.GetToVertexPartitionCount() > 1 { // Need to shuffle toVertexPartition := shuffleFuncMap[fmt.Sprintf("%s:%s", edge.From, edge.To)].Shuffle(keys) - result = append(result, forward.VertexBuffer{ + result = append(result, forwarder.VertexBuffer{ ToVertexName: edge.To, ToVertexPartitionIdx: toVertexPartition, }) } else { - result = append(result, forward.VertexBuffer{ + result = append(result, forwarder.VertexBuffer{ ToVertexName: edge.To, ToVertexPartitionIdx: getToBufferPartition(edge.To, edge.GetToVertexPartitionCount()), }) diff --git a/pkg/sources/sourcer.go b/pkg/sources/sourcer.go index 6c1b24f43c..e532e2c247 100644 --- a/pkg/sources/sourcer.go +++ b/pkg/sources/sourcer.go @@ -17,7 +17,7 @@ limitations under the License. package sources import ( - "github.com/numaproj/numaflow/pkg/forward" + "github.com/numaproj/numaflow/pkg/forwarder" "github.com/numaproj/numaflow/pkg/isb" ) @@ -25,7 +25,7 @@ import ( // This is intended to be consumed by a connector like isb.forward type Sourcer interface { isb.BufferReader - forward.StarterStopper + forwarder.StarterStopper isb.LagReader isb.SourceWatermarkPublisher } diff --git a/pkg/sources/udsource/user_defined_source.go b/pkg/sources/udsource/user_defined_source.go index a73638fd3e..279bc55be1 100644 --- a/pkg/sources/udsource/user_defined_source.go +++ b/pkg/sources/udsource/user_defined_source.go @@ -25,7 +25,7 @@ import ( "go.uber.org/zap" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" - "github.com/numaproj/numaflow/pkg/forward" + "github.com/numaproj/numaflow/pkg/forwarder" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/shared/logging" sourceforward "github.com/numaproj/numaflow/pkg/sources/forward" @@ -83,7 +83,7 @@ type userDefinedSource struct { func New( vertexInstance *dfv1.VertexInstance, writers map[string][]isb.BufferWriter, - fsd forward.ToWhichStepDecider, + fsd forwarder.ToWhichStepDecider, transformer applier.SourceTransformApplier, sourceApplier *GRPCBasedUDSource, fetchWM fetch.Fetcher, diff --git a/pkg/forward/applier/mapper.go b/pkg/udf/forward/applier/mapper.go similarity index 100% rename from pkg/forward/applier/mapper.go rename to pkg/udf/forward/applier/mapper.go diff --git a/pkg/forward/applier/mapstreamer.go b/pkg/udf/forward/applier/mapstreamer.go similarity index 100% rename from pkg/forward/applier/mapstreamer.go rename to pkg/udf/forward/applier/mapstreamer.go diff --git a/pkg/forward/doc.go b/pkg/udf/forward/doc.go similarity index 97% rename from pkg/forward/doc.go rename to pkg/udf/forward/doc.go index a9141c6d39..e2425964f0 100644 --- a/pkg/forward/doc.go +++ b/pkg/udf/forward/doc.go @@ -11,5 +11,5 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package forward is used for creating a data forwarder for the map UDF vertex. +// Package forward is used for creating a data forwarder for the UDF vertex. package forward diff --git a/pkg/forward/forward.go b/pkg/udf/forward/forward.go similarity index 99% rename from pkg/forward/forward.go rename to pkg/udf/forward/forward.go index d0b6bd81d7..f356e9a708 100644 --- a/pkg/forward/forward.go +++ b/pkg/udf/forward/forward.go @@ -34,11 +34,12 @@ import ( "k8s.io/apimachinery/pkg/util/wait" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" - "github.com/numaproj/numaflow/pkg/forward/applier" + "github.com/numaproj/numaflow/pkg/forwarder" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/metrics" "github.com/numaproj/numaflow/pkg/shared/idlehandler" "github.com/numaproj/numaflow/pkg/shared/logging" + "github.com/numaproj/numaflow/pkg/udf/forward/applier" "github.com/numaproj/numaflow/pkg/watermark/fetch" "github.com/numaproj/numaflow/pkg/watermark/publish" "github.com/numaproj/numaflow/pkg/watermark/wmb" @@ -54,7 +55,7 @@ type InterStepDataForward struct { fromBufferPartition isb.BufferReader // toBuffers is a map of toVertex name to the toVertex's owned buffers. toBuffers map[string][]isb.BufferWriter - FSD ToWhichStepDecider + FSD forwarder.ToWhichStepDecider mapUDF applier.MapApplier mapStreamUDF applier.MapStreamApplier wmFetcher fetch.Fetcher @@ -76,7 +77,7 @@ func NewInterStepDataForward( vertexInstance *dfv1.VertexInstance, fromStep isb.BufferReader, toSteps map[string][]isb.BufferWriter, - fsd ToWhichStepDecider, + fsd forwarder.ToWhichStepDecider, applyUDF applier.MapApplier, applyUDFStream applier.MapStreamApplier, fetchWatermark fetch.Fetcher, diff --git a/pkg/forward/forward_test.go b/pkg/udf/forward/forward_test.go similarity index 98% rename from pkg/forward/forward_test.go rename to pkg/udf/forward/forward_test.go index 55608d69d6..73dcb68b7f 100644 --- a/pkg/forward/forward_test.go +++ b/pkg/udf/forward/forward_test.go @@ -29,6 +29,7 @@ import ( "go.uber.org/goleak" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" + "github.com/numaproj/numaflow/pkg/forwarder" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer" "github.com/numaproj/numaflow/pkg/isb/testutils" @@ -81,8 +82,8 @@ func (t *testForwardFetcher) ComputeHeadIdleWMB(int32) wmb.WMB { type myForwardTest struct { } -func (f myForwardTest) WhereTo(_ []string, _ []string) ([]VertexBuffer, error) { - return []VertexBuffer{{ +func (f myForwardTest) WhereTo(_ []string, _ []string) ([]forwarder.VertexBuffer, error) { + return []forwarder.VertexBuffer{{ ToVertexName: "to1", ToVertexPartitionIdx: 0, }}, nil @@ -1233,8 +1234,8 @@ func TestNewInterStepDataForwardIdleWatermark_Reset(t *testing.T) { type mySourceForwardTest struct { } -func (f mySourceForwardTest) WhereTo(_ []string, _ []string) ([]VertexBuffer, error) { - return []VertexBuffer{{ +func (f mySourceForwardTest) WhereTo(_ []string, _ []string) ([]forwarder.VertexBuffer, error) { + return []forwarder.VertexBuffer{{ ToVertexName: "to1", ToVertexPartitionIdx: 0, }}, nil @@ -1244,8 +1245,8 @@ type mySourceForwardTestRoundRobin struct { count int } -func (f *mySourceForwardTestRoundRobin) WhereTo(_ []string, _ []string) ([]VertexBuffer, error) { - var output = []VertexBuffer{{ +func (f *mySourceForwardTestRoundRobin) WhereTo(_ []string, _ []string) ([]forwarder.VertexBuffer, error) { + var output = []forwarder.VertexBuffer{{ ToVertexName: "to1", ToVertexPartitionIdx: int32(f.count % 2), }} @@ -1535,8 +1536,8 @@ func TestWriteToBuffer(t *testing.T) { type myForwardDropTest struct { } -func (f myForwardDropTest) WhereTo(_ []string, _ []string) ([]VertexBuffer, error) { - return []VertexBuffer{}, nil +func (f myForwardDropTest) WhereTo(_ []string, _ []string) ([]forwarder.VertexBuffer, error) { + return []forwarder.VertexBuffer{}, nil } func (f myForwardDropTest) ApplyMap(ctx context.Context, message *isb.ReadMessage) ([]*isb.WriteMessage, error) { @@ -1551,8 +1552,8 @@ type myForwardToAllTest struct { count int } -func (f *myForwardToAllTest) WhereTo(_ []string, _ []string) ([]VertexBuffer, error) { - var output = []VertexBuffer{{ +func (f *myForwardToAllTest) WhereTo(_ []string, _ []string) ([]forwarder.VertexBuffer, error) { + var output = []forwarder.VertexBuffer{{ ToVertexName: "to1", ToVertexPartitionIdx: int32(f.count % 2), }, @@ -1575,8 +1576,8 @@ func (f myForwardToAllTest) ApplyMapStream(ctx context.Context, message *isb.Rea type myForwardInternalErrTest struct { } -func (f myForwardInternalErrTest) WhereTo(_ []string, _ []string) ([]VertexBuffer, error) { - return []VertexBuffer{{ +func (f myForwardInternalErrTest) WhereTo(_ []string, _ []string) ([]forwarder.VertexBuffer, error) { + return []forwarder.VertexBuffer{{ ToVertexName: "to1", ToVertexPartitionIdx: 0, }}, nil @@ -1608,8 +1609,8 @@ func (f myForwardInternalErrTest) ApplyMapStream(_ context.Context, _ *isb.ReadM type myForwardApplyWhereToErrTest struct { } -func (f myForwardApplyWhereToErrTest) WhereTo(_ []string, _ []string) ([]VertexBuffer, error) { - return []VertexBuffer{{ +func (f myForwardApplyWhereToErrTest) WhereTo(_ []string, _ []string) ([]forwarder.VertexBuffer, error) { + return []forwarder.VertexBuffer{{ ToVertexName: "to1", ToVertexPartitionIdx: 0, }}, fmt.Errorf("whereToStep failed") @@ -1626,8 +1627,8 @@ func (f myForwardApplyWhereToErrTest) ApplyMapStream(ctx context.Context, messag type myForwardApplyUDFErrTest struct { } -func (f myForwardApplyUDFErrTest) WhereTo(_ []string, _ []string) ([]VertexBuffer, error) { - return []VertexBuffer{{ +func (f myForwardApplyUDFErrTest) WhereTo(_ []string, _ []string) ([]forwarder.VertexBuffer, error) { + return []forwarder.VertexBuffer{{ ToVertexName: "to1", ToVertexPartitionIdx: 0, }}, nil diff --git a/pkg/forward/options.go b/pkg/udf/forward/options.go similarity index 100% rename from pkg/forward/options.go rename to pkg/udf/forward/options.go diff --git a/pkg/forward/shutdown.go b/pkg/udf/forward/shutdown.go similarity index 100% rename from pkg/forward/shutdown.go rename to pkg/udf/forward/shutdown.go diff --git a/pkg/forward/shutdown_test.go b/pkg/udf/forward/shutdown_test.go similarity index 96% rename from pkg/forward/shutdown_test.go rename to pkg/udf/forward/shutdown_test.go index c9c5ee70bd..bf0781bfaa 100644 --- a/pkg/forward/shutdown_test.go +++ b/pkg/udf/forward/shutdown_test.go @@ -22,6 +22,7 @@ import ( "time" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" + "github.com/numaproj/numaflow/pkg/forwarder" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer" "github.com/numaproj/numaflow/pkg/isb/testutils" @@ -34,8 +35,8 @@ import ( type myShutdownTest struct { } -func (s myShutdownTest) WhereTo(_ []string, _ []string) ([]VertexBuffer, error) { - return []VertexBuffer{}, nil +func (s myShutdownTest) WhereTo(_ []string, _ []string) ([]forwarder.VertexBuffer, error) { + return []forwarder.VertexBuffer{}, nil } func (s myShutdownTest) ApplyMap(ctx context.Context, message *isb.ReadMessage) ([]*isb.WriteMessage, error) { diff --git a/pkg/udf/map_udf.go b/pkg/udf/map_udf.go index fa40069bcc..c00d25afbb 100644 --- a/pkg/udf/map_udf.go +++ b/pkg/udf/map_udf.go @@ -24,7 +24,7 @@ import ( "go.uber.org/zap" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" - "github.com/numaproj/numaflow/pkg/forward" + "github.com/numaproj/numaflow/pkg/forwarder" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/metrics" "github.com/numaproj/numaflow/pkg/sdkclient" @@ -34,6 +34,7 @@ import ( "github.com/numaproj/numaflow/pkg/shared/logging" sharedutil "github.com/numaproj/numaflow/pkg/shared/util" "github.com/numaproj/numaflow/pkg/shuffle" + "github.com/numaproj/numaflow/pkg/udf/forward" "github.com/numaproj/numaflow/pkg/udf/rpc" "github.com/numaproj/numaflow/pkg/watermark/fetch" "github.com/numaproj/numaflow/pkg/watermark/generic" @@ -177,8 +178,8 @@ func (u *MapUDFProcessor) Start(ctx context.Context) error { // create a conditional forwarder for each partition getVertexPartitionIdx := GetPartitionedBufferIdx() - conditionalForwarder := forward.GoWhere(func(keys []string, tags []string) ([]forward.VertexBuffer, error) { - var result []forward.VertexBuffer + conditionalForwarder := forwarder.GoWhere(func(keys []string, tags []string) ([]forwarder.VertexBuffer, error) { + var result []forwarder.VertexBuffer if sharedutil.StringSliceContains(tags, dfv1.MessageTagDrop) { return result, nil @@ -189,12 +190,12 @@ func (u *MapUDFProcessor) Start(ctx context.Context) error { if edge.Conditions == nil || edge.Conditions.Tags == nil || len(edge.Conditions.Tags.Values) == 0 { if edge.ToVertexType == dfv1.VertexTypeReduceUDF && edge.GetToVertexPartitionCount() > 1 { // Need to shuffle toVertexPartition := shuffleFuncMap[fmt.Sprintf("%s:%s", edge.From, edge.To)].Shuffle(keys) - result = append(result, forward.VertexBuffer{ + result = append(result, forwarder.VertexBuffer{ ToVertexName: edge.To, ToVertexPartitionIdx: toVertexPartition, }) } else { - result = append(result, forward.VertexBuffer{ + result = append(result, forwarder.VertexBuffer{ ToVertexName: edge.To, ToVertexPartitionIdx: getVertexPartitionIdx(edge.To, edge.GetToVertexPartitionCount()), }) @@ -203,12 +204,12 @@ func (u *MapUDFProcessor) Start(ctx context.Context) error { if sharedutil.CompareSlice(edge.Conditions.Tags.GetOperator(), tags, edge.Conditions.Tags.Values) { if edge.ToVertexType == dfv1.VertexTypeReduceUDF && edge.GetToVertexPartitionCount() > 1 { // Need to shuffle toVertexPartition := shuffleFuncMap[fmt.Sprintf("%s:%s", edge.From, edge.To)].Shuffle(keys) - result = append(result, forward.VertexBuffer{ + result = append(result, forwarder.VertexBuffer{ ToVertexName: edge.To, ToVertexPartitionIdx: toVertexPartition, }) } else { - result = append(result, forward.VertexBuffer{ + result = append(result, forwarder.VertexBuffer{ ToVertexName: edge.To, ToVertexPartitionIdx: getVertexPartitionIdx(edge.To, edge.GetToVertexPartitionCount()), }) diff --git a/pkg/udf/reduce_udf.go b/pkg/udf/reduce_udf.go index cd1530c73f..d18fe758c8 100644 --- a/pkg/udf/reduce_udf.go +++ b/pkg/udf/reduce_udf.go @@ -22,6 +22,7 @@ import ( "strings" "sync" + "github.com/numaproj/numaflow/pkg/forwarder" "github.com/numaproj/numaflow/pkg/sdkclient" "github.com/numaproj/numaflow/pkg/sdkclient/reducer" jsclient "github.com/numaproj/numaflow/pkg/shared/clients/nats" @@ -32,7 +33,6 @@ import ( "go.uber.org/zap" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" - "github.com/numaproj/numaflow/pkg/forward" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/metrics" "github.com/numaproj/numaflow/pkg/reduce" @@ -155,8 +155,8 @@ func (u *ReduceUDFProcessor) Start(ctx context.Context) error { } } getVertexPartition := GetPartitionedBufferIdx() - conditionalForwarder := forward.GoWhere(func(keys []string, tags []string) ([]forward.VertexBuffer, error) { - var result []forward.VertexBuffer + conditionalForwarder := forwarder.GoWhere(func(keys []string, tags []string) ([]forwarder.VertexBuffer, error) { + var result []forwarder.VertexBuffer if sharedutil.StringSliceContains(tags, dfv1.MessageTagDrop) { return result, nil } @@ -166,12 +166,12 @@ func (u *ReduceUDFProcessor) Start(ctx context.Context) error { if edge.Conditions == nil || edge.Conditions.Tags == nil || len(edge.Conditions.Tags.Values) == 0 { if edge.ToVertexType == dfv1.VertexTypeReduceUDF && edge.GetToVertexPartitionCount() > 1 { // Need to shuffle toVertexPartition := shuffleFuncMap[fmt.Sprintf("%s:%s", edge.From, edge.To)].Shuffle(keys) - result = append(result, forward.VertexBuffer{ + result = append(result, forwarder.VertexBuffer{ ToVertexName: edge.To, ToVertexPartitionIdx: toVertexPartition, }) } else { - result = append(result, forward.VertexBuffer{ + result = append(result, forwarder.VertexBuffer{ ToVertexName: edge.To, ToVertexPartitionIdx: getVertexPartition(edge.To, edge.GetToVertexPartitionCount()), }) @@ -180,12 +180,12 @@ func (u *ReduceUDFProcessor) Start(ctx context.Context) error { if sharedutil.CompareSlice(edge.Conditions.Tags.GetOperator(), tags, edge.Conditions.Tags.Values) { if edge.ToVertexType == dfv1.VertexTypeReduceUDF && edge.GetToVertexPartitionCount() > 1 { // Need to shuffle toVertexPartition := shuffleFuncMap[fmt.Sprintf("%s:%s", edge.From, edge.To)].Shuffle(keys) - result = append(result, forward.VertexBuffer{ + result = append(result, forwarder.VertexBuffer{ ToVertexName: edge.To, ToVertexPartitionIdx: toVertexPartition, }) } else { - result = append(result, forward.VertexBuffer{ + result = append(result, forwarder.VertexBuffer{ ToVertexName: edge.To, ToVertexPartitionIdx: getVertexPartition(edge.To, edge.GetToVertexPartitionCount()), }) diff --git a/test/sideinput-e2e/sideinput-e2e_sink_source_test.go b/test/sideinputs-e2e/sideinput-e2e_sink_source_test.go similarity index 100% rename from test/sideinput-e2e/sideinput-e2e_sink_source_test.go rename to test/sideinputs-e2e/sideinput-e2e_sink_source_test.go diff --git a/test/sideinput-e2e/sideinput_test.go b/test/sideinputs-e2e/sideinput_test.go similarity index 100% rename from test/sideinput-e2e/sideinput_test.go rename to test/sideinputs-e2e/sideinput_test.go diff --git a/test/sideinput-e2e/testdata/map-sideinput-pipeline.yaml b/test/sideinputs-e2e/testdata/map-sideinput-pipeline.yaml similarity index 100% rename from test/sideinput-e2e/testdata/map-sideinput-pipeline.yaml rename to test/sideinputs-e2e/testdata/map-sideinput-pipeline.yaml diff --git a/test/sideinput-e2e/testdata/reduce-sideinput-pipeline.yaml b/test/sideinputs-e2e/testdata/reduce-sideinput-pipeline.yaml similarity index 100% rename from test/sideinput-e2e/testdata/reduce-sideinput-pipeline.yaml rename to test/sideinputs-e2e/testdata/reduce-sideinput-pipeline.yaml diff --git a/test/sideinput-e2e/testdata/sideinput_sink.yaml b/test/sideinputs-e2e/testdata/sideinput_sink.yaml similarity index 100% rename from test/sideinput-e2e/testdata/sideinput_sink.yaml rename to test/sideinputs-e2e/testdata/sideinput_sink.yaml diff --git a/test/sideinput-e2e/testdata/sideinput_source.yaml b/test/sideinputs-e2e/testdata/sideinput_source.yaml similarity index 100% rename from test/sideinput-e2e/testdata/sideinput_source.yaml rename to test/sideinputs-e2e/testdata/sideinput_source.yaml