Skip to content

Commit

Permalink
rename Start to Open
Browse files Browse the repository at this point in the history
  • Loading branch information
lovromazgon committed May 27, 2024
1 parent c9d44c7 commit 4ae468b
Show file tree
Hide file tree
Showing 23 changed files with 439 additions and 440 deletions.
6 changes: 3 additions & 3 deletions cplugin/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

type DestinationPlugin interface {
Configure(context.Context, DestinationConfigureRequest) (DestinationConfigureResponse, error)
Start(context.Context, DestinationStartRequest) (DestinationStartResponse, error)
Open(context.Context, DestinationOpenRequest) (DestinationOpenResponse, error)
Run(context.Context, DestinationRunStream) error
Stop(context.Context, DestinationStopRequest) (DestinationStopResponse, error)
Teardown(context.Context, DestinationTeardownRequest) (DestinationTeardownResponse, error)
Expand Down Expand Up @@ -60,8 +60,8 @@ type DestinationConfigureRequest struct {
}
type DestinationConfigureResponse struct{}

type DestinationStartRequest struct{}
type DestinationStartResponse struct{}
type DestinationOpenRequest struct{}
type DestinationOpenResponse struct{}

type DestinationRunRequest struct {
Records []opencdc.Record
Expand Down
6 changes: 3 additions & 3 deletions cplugin/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

type SourcePlugin interface {
Configure(context.Context, SourceConfigureRequest) (SourceConfigureResponse, error)
Start(context.Context, SourceStartRequest) (SourceStartResponse, error)
Open(context.Context, SourceOpenRequest) (SourceOpenResponse, error)
Run(context.Context, SourceRunStream) error
Stop(context.Context, SourceStopRequest) (SourceStopResponse, error)
Teardown(context.Context, SourceTeardownRequest) (SourceTeardownResponse, error)
Expand Down Expand Up @@ -60,10 +60,10 @@ type SourceConfigureRequest struct {
}
type SourceConfigureResponse struct{}

type SourceStartRequest struct {
type SourceOpenRequest struct {
Position opencdc.Position
}
type SourceStartResponse struct{}
type SourceOpenResponse struct{}

type SourceRunRequest struct {
AckPositions []opencdc.Position
Expand Down
4 changes: 2 additions & 2 deletions cplugin/v1/client/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ func (s *DestinationPluginClient) Configure(ctx context.Context, goReq cplugin.D
return fromproto.DestinationConfigureResponse(protoResp), nil
}

func (s *DestinationPluginClient) Start(ctx context.Context, goReq cplugin.DestinationStartRequest) (cplugin.DestinationStartResponse, error) {
func (s *DestinationPluginClient) Open(ctx context.Context, goReq cplugin.DestinationOpenRequest) (cplugin.DestinationOpenResponse, error) {
protoReq := toproto.DestinationStartRequest(goReq)
protoResp, err := s.grpcClient.Start(ctx, protoReq)
if err != nil {
return cplugin.DestinationStartResponse{}, unwrapGRPCError(err)
return cplugin.DestinationOpenResponse{}, unwrapGRPCError(err)
}
return fromproto.DestinationStartResponse(protoResp), nil
}
Expand Down
4 changes: 2 additions & 2 deletions cplugin/v1/client/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ func (s *SourcePluginClient) Configure(ctx context.Context, goReq cplugin.Source
return fromproto.SourceConfigureResponse(protoResp), nil
}

func (s *SourcePluginClient) Start(ctx context.Context, goReq cplugin.SourceStartRequest) (cplugin.SourceStartResponse, error) {
func (s *SourcePluginClient) Open(ctx context.Context, goReq cplugin.SourceOpenRequest) (cplugin.SourceOpenResponse, error) {
protoReq := toproto.SourceStartRequest(goReq)
protoResp, err := s.grpcClient.Start(ctx, protoReq)
if err != nil {
return cplugin.SourceStartResponse{}, unwrapGRPCError(err)
return cplugin.SourceOpenResponse{}, unwrapGRPCError(err)
}
return fromproto.SourceStartResponse(protoResp), nil
}
Expand Down
8 changes: 4 additions & 4 deletions cplugin/v1/fromproto/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func DestinationConfigureRequest(in *connectorv1.Destination_Configure_Request)
}
}

func DestinationStartRequest(_ *connectorv1.Destination_Start_Request) cplugin.DestinationStartRequest {
return cplugin.DestinationStartRequest{}
func DestinationStartRequest(_ *connectorv1.Destination_Start_Request) cplugin.DestinationOpenRequest {
return cplugin.DestinationOpenRequest{}
}

func DestinationRunRequest(in *connectorv1.Destination_Run_Request) (cplugin.DestinationRunRequest, error) {
Expand Down Expand Up @@ -76,8 +76,8 @@ func DestinationConfigureResponse(_ *connectorv1.Destination_Configure_Response)
return cplugin.DestinationConfigureResponse{}
}

func DestinationStartResponse(_ *connectorv1.Destination_Start_Response) cplugin.DestinationStartResponse {
return cplugin.DestinationStartResponse{}
func DestinationStartResponse(_ *connectorv1.Destination_Start_Response) cplugin.DestinationOpenResponse {
return cplugin.DestinationOpenResponse{}
}

func DestinationRunResponse(in *connectorv1.Destination_Run_Response) cplugin.DestinationRunResponse {
Expand Down
8 changes: 4 additions & 4 deletions cplugin/v1/fromproto/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func SourceConfigureRequest(in *connectorv1.Source_Configure_Request) cplugin.So
}
}

func SourceStartRequest(in *connectorv1.Source_Start_Request) cplugin.SourceStartRequest {
return cplugin.SourceStartRequest{
func SourceStartRequest(in *connectorv1.Source_Start_Request) cplugin.SourceOpenRequest {
return cplugin.SourceOpenRequest{
Position: in.Position,
}
}
Expand Down Expand Up @@ -71,8 +71,8 @@ func SourceConfigureResponse(_ *connectorv1.Source_Configure_Response) cplugin.S
return cplugin.SourceConfigureResponse{}
}

func SourceStartResponse(_ *connectorv1.Source_Start_Response) cplugin.SourceStartResponse {
return cplugin.SourceStartResponse{}
func SourceStartResponse(_ *connectorv1.Source_Start_Response) cplugin.SourceOpenResponse {
return cplugin.SourceOpenResponse{}
}

func SourceRunResponse(in *connectorv1.Source_Run_Response) (cplugin.SourceRunResponse, error) {
Expand Down
2 changes: 1 addition & 1 deletion cplugin/v1/fromproto/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestSourceStartRequest(t *testing.T) {
have := &connectorv1.Source_Start_Request{
Position: []byte("test_position"),
}
want := cplugin.SourceStartRequest{
want := cplugin.SourceOpenRequest{
Position: []byte("test_position"),
}

Expand Down
2 changes: 1 addition & 1 deletion cplugin/v1/server/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (s *destinationPluginServer) Configure(ctx context.Context, protoReq *conne
}
func (s *destinationPluginServer) Start(ctx context.Context, protoReq *connectorv1.Destination_Start_Request) (*connectorv1.Destination_Start_Response, error) {
goReq := fromproto.DestinationStartRequest(protoReq)
goResp, err := s.impl.Start(ctx, goReq)
goResp, err := s.impl.Open(ctx, goReq)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion cplugin/v1/server/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (s *sourcePluginServer) Configure(ctx context.Context, protoReq *connectorv
}
func (s *sourcePluginServer) Start(ctx context.Context, protoReq *connectorv1.Source_Start_Request) (*connectorv1.Source_Start_Response, error) {
goReq := fromproto.SourceStartRequest(protoReq)
goResp, err := s.impl.Start(ctx, goReq)
goResp, err := s.impl.Open(ctx, goReq)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions cplugin/v1/toproto/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func DestinationConfigureRequest(in cplugin.DestinationConfigureRequest) *connec
}
}

func DestinationStartRequest(_ cplugin.DestinationStartRequest) *connectorv1.Destination_Start_Request {
func DestinationStartRequest(_ cplugin.DestinationOpenRequest) *connectorv1.Destination_Start_Request {
return &connectorv1.Destination_Start_Request{}
}

Expand Down Expand Up @@ -82,7 +82,7 @@ func DestinationConfigureResponse(_ cplugin.DestinationConfigureResponse) *conne
return &connectorv1.Destination_Configure_Response{}
}

func DestinationStartResponse(_ cplugin.DestinationStartResponse) *connectorv1.Destination_Start_Response {
func DestinationStartResponse(_ cplugin.DestinationOpenResponse) *connectorv1.Destination_Start_Response {
return &connectorv1.Destination_Start_Response{}
}

Expand Down
4 changes: 2 additions & 2 deletions cplugin/v1/toproto/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func SourceConfigureRequest(in cplugin.SourceConfigureRequest) *connectorv1.Sour
}
}

func SourceStartRequest(in cplugin.SourceStartRequest) *connectorv1.Source_Start_Request {
func SourceStartRequest(in cplugin.SourceOpenRequest) *connectorv1.Source_Start_Request {
return &connectorv1.Source_Start_Request{
Position: in.Position,
}
Expand Down Expand Up @@ -77,7 +77,7 @@ func SourceConfigureResponse(_ cplugin.SourceConfigureResponse) *connectorv1.Sou
return &connectorv1.Source_Configure_Response{}
}

func SourceStartResponse(_ cplugin.SourceStartResponse) *connectorv1.Source_Start_Response {
func SourceStartResponse(_ cplugin.SourceOpenResponse) *connectorv1.Source_Start_Response {
return &connectorv1.Source_Start_Response{}
}

Expand Down
10 changes: 5 additions & 5 deletions cplugin/v2/client/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ func (s *DestinationPluginClient) Configure(ctx context.Context, goReq cplugin.D
return fromproto.DestinationConfigureResponse(protoResp), nil
}

func (s *DestinationPluginClient) Start(ctx context.Context, goReq cplugin.DestinationStartRequest) (cplugin.DestinationStartResponse, error) {
protoReq := toproto.DestinationStartRequest(goReq)
protoResp, err := s.grpcClient.Start(ctx, protoReq)
func (s *DestinationPluginClient) Open(ctx context.Context, goReq cplugin.DestinationOpenRequest) (cplugin.DestinationOpenResponse, error) {
protoReq := toproto.DestinationOpenRequest(goReq)
protoResp, err := s.grpcClient.Open(ctx, protoReq)
if err != nil {
return cplugin.DestinationStartResponse{}, unwrapGRPCError(err)
return cplugin.DestinationOpenResponse{}, unwrapGRPCError(err)
}
return fromproto.DestinationStartResponse(protoResp), nil
return fromproto.DestinationOpenResponse(protoResp), nil
}

func (s *DestinationPluginClient) Run(ctx context.Context, stream cplugin.DestinationRunStream) error {
Expand Down
10 changes: 5 additions & 5 deletions cplugin/v2/client/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ func (s *SourcePluginClient) Configure(ctx context.Context, goReq cplugin.Source
return fromproto.SourceConfigureResponse(protoResp), nil
}

func (s *SourcePluginClient) Start(ctx context.Context, goReq cplugin.SourceStartRequest) (cplugin.SourceStartResponse, error) {
protoReq := toproto.SourceStartRequest(goReq)
protoResp, err := s.grpcClient.Start(ctx, protoReq)
func (s *SourcePluginClient) Open(ctx context.Context, goReq cplugin.SourceOpenRequest) (cplugin.SourceOpenResponse, error) {
protoReq := toproto.SourceOpenRequest(goReq)
protoResp, err := s.grpcClient.Open(ctx, protoReq)
if err != nil {
return cplugin.SourceStartResponse{}, unwrapGRPCError(err)
return cplugin.SourceOpenResponse{}, unwrapGRPCError(err)
}
return fromproto.SourceStartResponse(protoResp), nil
return fromproto.SourceOpenResponse(protoResp), nil
}

// Run initializes a stream for the source plugin to send and receive messages.
Expand Down
8 changes: 4 additions & 4 deletions cplugin/v2/fromproto/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func DestinationConfigureRequest(in *connectorv2.Destination_Configure_Request)
}
}

func DestinationStartRequest(_ *connectorv2.Destination_Start_Request) cplugin.DestinationStartRequest {
return cplugin.DestinationStartRequest{}
func DestinationOpenRequest(_ *connectorv2.Destination_Open_Request) cplugin.DestinationOpenRequest {
return cplugin.DestinationOpenRequest{}
}

func DestinationRunRequest(in *connectorv2.Destination_Run_Request) (cplugin.DestinationRunRequest, error) {
Expand Down Expand Up @@ -78,8 +78,8 @@ func DestinationConfigureResponse(_ *connectorv2.Destination_Configure_Response)
return cplugin.DestinationConfigureResponse{}
}

func DestinationStartResponse(_ *connectorv2.Destination_Start_Response) cplugin.DestinationStartResponse {
return cplugin.DestinationStartResponse{}
func DestinationOpenResponse(_ *connectorv2.Destination_Open_Response) cplugin.DestinationOpenResponse {
return cplugin.DestinationOpenResponse{}
}

func DestinationRunResponse(in *connectorv2.Destination_Run_Response) cplugin.DestinationRunResponse {
Expand Down
8 changes: 4 additions & 4 deletions cplugin/v2/fromproto/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func SourceConfigureRequest(in *connectorv2.Source_Configure_Request) cplugin.So
}
}

func SourceStartRequest(in *connectorv2.Source_Start_Request) cplugin.SourceStartRequest {
return cplugin.SourceStartRequest{
func SourceOpenRequest(in *connectorv2.Source_Open_Request) cplugin.SourceOpenRequest {
return cplugin.SourceOpenRequest{
Position: in.Position,
}
}
Expand Down Expand Up @@ -76,8 +76,8 @@ func SourceConfigureResponse(_ *connectorv2.Source_Configure_Response) cplugin.S
return cplugin.SourceConfigureResponse{}
}

func SourceStartResponse(_ *connectorv2.Source_Start_Response) cplugin.SourceStartResponse {
return cplugin.SourceStartResponse{}
func SourceOpenResponse(_ *connectorv2.Source_Open_Response) cplugin.SourceOpenResponse {
return cplugin.SourceOpenResponse{}
}

func SourceRunResponse(in *connectorv2.Source_Run_Response) (cplugin.SourceRunResponse, error) {
Expand Down
8 changes: 4 additions & 4 deletions cplugin/v2/fromproto/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,16 @@ func TestSourceConfigureRequest(t *testing.T) {
is.Equal("", cmp.Diff(want, got))
}

func TestSourceStartRequest(t *testing.T) {
have := &connectorv2.Source_Start_Request{
func TestSourceOpenRequest(t *testing.T) {
have := &connectorv2.Source_Open_Request{
Position: []byte("test_position"),
}
want := cplugin.SourceStartRequest{
want := cplugin.SourceOpenRequest{
Position: []byte("test_position"),
}

is := is.New(t)
got := SourceStartRequest(have)
got := SourceOpenRequest(have)
is.Equal("", cmp.Diff(want, got))
}

Expand Down
8 changes: 4 additions & 4 deletions cplugin/v2/server/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ func (s *DestinationPluginServer) Configure(ctx context.Context, protoReq *conne
}
return toproto.DestinationConfigureResponse(goResp), nil
}
func (s *DestinationPluginServer) Start(ctx context.Context, protoReq *connectorv2.Destination_Start_Request) (*connectorv2.Destination_Start_Response, error) {
goReq := fromproto.DestinationStartRequest(protoReq)
goResp, err := s.impl.Start(ctx, goReq)
func (s *DestinationPluginServer) Open(ctx context.Context, protoReq *connectorv2.Destination_Open_Request) (*connectorv2.Destination_Open_Response, error) {
goReq := fromproto.DestinationOpenRequest(protoReq)
goResp, err := s.impl.Open(ctx, goReq)
if err != nil {
return nil, err
}
return toproto.DestinationStartResponse(goResp), nil
return toproto.DestinationOpenResponse(goResp), nil
}
func (s *DestinationPluginServer) Run(stream connectorv2.DestinationPlugin_RunServer) error {
return s.impl.Run(stream.Context(), &DestinationRunStream{stream: stream})
Expand Down
8 changes: 4 additions & 4 deletions cplugin/v2/server/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ func (s *SourcePluginServer) Configure(ctx context.Context, protoReq *connectorv
}
return toproto.SourceConfigureResponse(goResp), nil
}
func (s *SourcePluginServer) Start(ctx context.Context, protoReq *connectorv2.Source_Start_Request) (*connectorv2.Source_Start_Response, error) {
goReq := fromproto.SourceStartRequest(protoReq)
goResp, err := s.impl.Start(ctx, goReq)
func (s *SourcePluginServer) Open(ctx context.Context, protoReq *connectorv2.Source_Open_Request) (*connectorv2.Source_Open_Response, error) {
goReq := fromproto.SourceOpenRequest(protoReq)
goResp, err := s.impl.Open(ctx, goReq)
if err != nil {
return nil, err
}
return toproto.SourceStartResponse(goResp), nil
return toproto.SourceOpenResponse(goResp), nil
}
func (s *SourcePluginServer) Run(stream connectorv2.SourcePlugin_RunServer) error {
return s.impl.Run(stream.Context(), &SourceRunStream{stream: stream})
Expand Down
8 changes: 4 additions & 4 deletions cplugin/v2/toproto/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func DestinationConfigureRequest(in cplugin.DestinationConfigureRequest) *connec
}
}

func DestinationStartRequest(_ cplugin.DestinationStartRequest) *connectorv2.Destination_Start_Request {
return &connectorv2.Destination_Start_Request{}
func DestinationOpenRequest(_ cplugin.DestinationOpenRequest) *connectorv2.Destination_Open_Request {
return &connectorv2.Destination_Open_Request{}
}

func DestinationRunRequest(in cplugin.DestinationRunRequest) (*connectorv2.Destination_Run_Request, error) {
Expand Down Expand Up @@ -84,8 +84,8 @@ func DestinationConfigureResponse(_ cplugin.DestinationConfigureResponse) *conne
return &connectorv2.Destination_Configure_Response{}
}

func DestinationStartResponse(_ cplugin.DestinationStartResponse) *connectorv2.Destination_Start_Response {
return &connectorv2.Destination_Start_Response{}
func DestinationOpenResponse(_ cplugin.DestinationOpenResponse) *connectorv2.Destination_Open_Response {
return &connectorv2.Destination_Open_Response{}
}

func DestinationRunResponse(in cplugin.DestinationRunResponse) *connectorv2.Destination_Run_Response {
Expand Down
8 changes: 4 additions & 4 deletions cplugin/v2/toproto/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func SourceConfigureRequest(in cplugin.SourceConfigureRequest) *connectorv2.Sour
}
}

func SourceStartRequest(in cplugin.SourceStartRequest) *connectorv2.Source_Start_Request {
return &connectorv2.Source_Start_Request{
func SourceOpenRequest(in cplugin.SourceOpenRequest) *connectorv2.Source_Open_Request {
return &connectorv2.Source_Open_Request{
Position: in.Position,
}
}
Expand Down Expand Up @@ -78,8 +78,8 @@ func SourceConfigureResponse(_ cplugin.SourceConfigureResponse) *connectorv2.Sou
return &connectorv2.Source_Configure_Response{}
}

func SourceStartResponse(_ cplugin.SourceStartResponse) *connectorv2.Source_Start_Response {
return &connectorv2.Source_Start_Response{}
func SourceOpenResponse(_ cplugin.SourceOpenResponse) *connectorv2.Source_Open_Response {
return &connectorv2.Source_Open_Response{}
}

func SourceRunResponse(in cplugin.SourceRunResponse) (*connectorv2.Source_Run_Response, error) {
Expand Down
Loading

0 comments on commit 4ae468b

Please sign in to comment.