diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index 1ed0687..51b3496 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -8,6 +8,6 @@ Fixes # (issue) ### Quick checks: -- [ ] There is no other [pull request](https://github.com/conduitio/conduit-connector-connectorname/pulls) for the same update/change. +- [ ] There is no other [pull request](https://github.com/conduitio/conduit-connector-sftp/pulls) for the same update/change. - [ ] I have written unit tests. - [ ] I have made sure that the PR is of reasonable size and can be easily reviewed. \ No newline at end of file diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index c8b6c51..885d3c2 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -17,4 +17,4 @@ jobs: go-version-file: 'go.mod' - name: Test - run: make test-integration GOTEST_FLAGS="-v -count=1" + run: make test GOTEST_FLAGS="-v -count=1" diff --git a/.gitignore b/.gitignore index be2f5ea..c4ab186 100644 --- a/.gitignore +++ b/.gitignore @@ -20,7 +20,7 @@ .vscode # Binary, built with `make build` -/conduit-connector-connectorname +/conduit-connector-sftp ### OS ### .DS_Store diff --git a/.golangci.goheader.template b/.golangci.goheader.template new file mode 100644 index 0000000..947c941 --- /dev/null +++ b/.golangci.goheader.template @@ -0,0 +1,13 @@ +Copyright © {{ copyright-year }} Meroxa, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. \ No newline at end of file diff --git a/.golangci.yml b/.golangci.yml index 3c72277..96d5122 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -20,7 +20,12 @@ linters-settings: - .WithMessagef( - .WithStack( - (context.Context).Err() - + goheader: + template-path: '.golangci.goheader.template' + values: + regexp: + copyright-year: 20[2-9]\d + issues: exclude-rules: - path: _test\.go diff --git a/.goreleaser.yml b/.goreleaser.yml index 31922bf..441b4af 100644 --- a/.goreleaser.yml +++ b/.goreleaser.yml @@ -8,7 +8,7 @@ builds: env: - CGO_ENABLED=0 ldflags: - - "-s -w -X 'github.com/conduitio/conduit-connector-connectorname.version={{ .Tag }}'" + - "-s -w -X 'github.com/conduitio/conduit-connector-sftp.version={{ .Tag }}'" checksum: name_template: checksums.txt archives: diff --git a/Makefile b/Makefile index a1b16d1..7e26c63 100644 --- a/Makefile +++ b/Makefile @@ -2,20 +2,28 @@ VERSION=$(shell git describe --tags --dirty --always) .PHONY: build build: - go build -ldflags "-X 'github.com/conduitio/conduit-connector-connectorname.version=${VERSION}'" -o conduit-connector-connectorname cmd/connector/main.go + go build -ldflags "-X 'github.com/conduitio/conduit-connector-sftp.version=${VERSION}'" -o conduit-connector-sftp cmd/connector/main.go .PHONY: test test: - go test $(GOTEST_FLAGS) -race ./... - -.PHONY: test-integration -test-integration: # run required docker containers, execute integration tests, stop containers after tests docker compose -f test/docker-compose.yml up -d go test $(GOTEST_FLAGS) -v -race ./...; ret=$$?; \ docker compose -f test/docker-compose.yml down; \ exit $$ret +.PHONY: gofumpt +gofumpt: + go install mvdan.cc/gofumpt@latest + +.PHONY: fmt +fmt: gofumpt + gofumpt -l -w . + +.PHONY: lint +lint: + golangci-lint run -v + .PHONY: generate generate: go generate ./... @@ -25,11 +33,3 @@ install-tools: @echo Installing tools from tools.go @go list -e -f '{{ join .Imports "\n" }}' tools.go | xargs -I % go list -f "%@{{.Module.Version}}" % | xargs -tI % go install % @go mod tidy - -.PHONY: fmt -fmt: - gofumpt -l -w . - -.PHONY: lint -lint: - golangci-lint run diff --git a/README.md b/README.md index d247288..6d89fbf 100644 --- a/README.md +++ b/README.md @@ -1,65 +1,36 @@ -# Conduit Connector Template +# Conduit Connector SFTP -This is a template project for building [Conduit](https://conduit.io) connectors in Go. It makes it possible to -start working on a Conduit connector in a matter of seconds. +The SFTP connector is one of [Conduit](https://github.com/ConduitIO/conduit) plugins. It +provides both, a source and a destination SFTP connector. -## Quick start +## How to build it -1. Click [_Use this template_](https://github.com/new?template_name=conduit-connector-template&template_owner=ConduitIO) and clone your new repository. -2. Initialize the repository using [`setup.sh`](https://github.com/ConduitIO/conduit-connector-template/blob/main/setup.sh) and commit your changes. - ```sh - ./setup.sh github.com/myusername/conduit-connector-myconnector - git add -A - git commit -m "initialize repository" - ``` -3. Set up [automatic Dependabot PR merges](#automatically-merging-dependabot-prs). +Run `make build`. -With that, you're all set up and ready to start working on your connector! As a next step, we recommend that you -check out the [Conduit Connector SDK](https://github.com/ConduitIO/conduit-connector-sdk). +## Testing -## What's included? +Run `make test` to run all the unit and integration tests. -* Skeleton code for the connector's configuration, source and destination. -* Example unit tests. -* A [Makefile](/Makefile) with commonly used targets. -* A [GitHub workflow](/.github/workflows/test.yml) to build the code and run the tests. -* A [GitHub workflow](/.github/workflows/lint.yml) to run a pre-configured set of linters. -* A [GitHub workflow](/.github/workflows/release.yml) which automatically creates a release when a tag is pushed. -* A [Dependabot setup](/.github/dependabot.yml) which checks your dependencies for available updates and -[merges minor version upgrades](/.github/workflows/dependabot-auto-merge-go.yml) automatically. -* [Issue](/.github/ISSUE_TEMPLATE) and [PR templates](/.github/pull_request_template.md). -* A [README template](/README_TEMPLATE.md). +## Source -## Automatically merging Dependabot PRs +The source SFTP connector monitors a directory on an SFTP server for files matching a specified pattern. It reads these files and converts them into `opencdc.Record` that can be processed by Conduit. For handling large files, it splits them into smaller chunks, enabling smooth data handling through the Conduit pipeline. +The connector supports both password and private key authentication methods. -> [!NOTE] -> This applies only to public connector repositories, as branch protection rules are not enforced in private repositories. +### Configuration Options -The template makes it simple to keep your connector up-to-date using automatic merging of -[Dependabot](https://github.com/dependabot) PRs. To make use of this setup, you need to adjust -some repository settings. +| name | description | required | default value | +| -------------- | ----------------------------------------------------------------------------------------------------- | -------- | -------- | +| `address` | Address is the address of the sftp server to connect.| **true** | | +| `hostKey` | HostKey is the key used for host key callback validation.| **true** | | +| `username`| User is the username of the SFTP user. | **true** | | +| `password`| Password is the SFTP password (can be used as passphrase for private key). | false | | +| `privateKeyPath`| PrivateKeyPath is the private key for ssh login.| false | | +| `directoryPath` | DirectoryPath is the path to the directory to read data. | **true** | | +| `filePattern` | Pattern to match files that should be read (e.g., "*.txt") | false | `*` | +| `fileChunkSizeBytes` | Maximum size of a file chunk in bytes to split large files. | false | `3145728` | -1. Navigate to Settings -> General and allow auto-merge of PRs. +## Destination - ![Allow auto-merge](https://github.com/ConduitIO/conduit-connector-template/assets/8320753/695b15f0-85b4-49cb-966d-649e9bf03455) +### Configuration Options -2. Navigate to Settings -> Branches and add a branch protection rule. - - ![Add branch protection rule](https://github.com/ConduitIO/conduit-connector-template/assets/8320753/9f5a07bc-d141-42b9-9918-e8d9cc648482) - -3. Create a rule for branch `main` that requires status checks `build` and `golangci-lint`. - - ![Status checks](https://github.com/ConduitIO/conduit-connector-template/assets/8320753/96219185-c329-432a-8623-9b4462015f32) - -## Recommended repository settings - -- Allow squash merging only. -- Always suggest updating pull request branches. -- Automatically delete head branches. -- Branch protection rules on branch `main` (only in public repositories): - - Require a pull request before merging. - - Require approvals. - - Require status checks `build` and `golangci-lint`. - - Require branches to be up to date before merging. - - Require conversation resolution before merging. - - Do not allow bypassing the above settings. +![scarf pixel](https://static.scarf.sh/a.png?x-pxid=64b333ae-77ad-4895-a5cd-a73bb14362d9) \ No newline at end of file diff --git a/cmd/connector/main.go b/cmd/connector/main.go index 5828da2..d8a3ec1 100644 --- a/cmd/connector/main.go +++ b/cmd/connector/main.go @@ -1,10 +1,24 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package main import ( - connectorname "github.com/conduitio/conduit-connector-connectorname" + sftp "github.com/conduitio-labs/conduit-connector-sftp" sdk "github.com/conduitio/conduit-connector-sdk" ) func main() { - sdk.Serve(connectorname.Connector) + sdk.Serve(sftp.Connector) } diff --git a/config.go b/config.go deleted file mode 100644 index d0a44a7..0000000 --- a/config.go +++ /dev/null @@ -1,10 +0,0 @@ -package connectorname - -// Config contains shared config parameters, common to the source and -// destination. If you don't need shared parameters you can entirely remove this -// file. -type Config struct { - // GlobalConfigParam is named global_config_param_name and needs to be - // provided by the user. - GlobalConfigParam string `json:"global_config_param_name" validate:"required"` -} diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..cb2b5ce --- /dev/null +++ b/config/config.go @@ -0,0 +1,45 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:generate paramgen -output=paramgen.go Config + +package config + +import "fmt" + +var ErrEmptyAuthFields = fmt.Errorf("both %q and %q can not be empty", ConfigPassword, ConfigPrivateKeyPath) + +// Config contains shared config parameters, common to the source and destination. +type Config struct { + // Address is the address of the sftp server to connect. + Address string `json:"address" validate:"required"` + // HostKey is the key used for host key callback validation. + HostKey string `json:"hostKey" validate:"required"` + // User is the SFTP user. + Username string `json:"username" validate:"required"` + // Password is the SFTP password (can be used as passphrase for private key). + Password string `json:"password"` + // PrivateKeyPath is the private key for ssh login. + PrivateKeyPath string `json:"privateKeyPath"` + // DirectoryPath is the path to the directory to read/write data. + DirectoryPath string `json:"directoryPath" validate:"required"` +} + +// Validate is used for custom validation for sftp authentication configuration. +func (c *Config) Validate() error { + if c.Password == "" && c.PrivateKeyPath == "" { + return ErrEmptyAuthFields + } + return nil +} diff --git a/config/config_test.go b/config/config_test.go new file mode 100644 index 0000000..c9aad2b --- /dev/null +++ b/config/config_test.go @@ -0,0 +1,76 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "testing" +) + +func TestConfig_Validate(t *testing.T) { + type fields struct { + Address string + Username string + Password string + PrivateKeyPath string + } + tests := []struct { + name string + fields fields + wantErr error + }{ + { + name: "success: password authentication", + fields: fields{ + Address: "localhost:22", + Username: "user", + Password: "pass", + }, + wantErr: nil, + }, + { + name: "success: privatekey authentication", + fields: fields{ + Address: "localhost:22", + Username: "user", + PrivateKeyPath: "path", + }, + wantErr: nil, + }, + { + name: "error: missing password and privateKeyPath", + fields: fields{ + Address: "localhost:22", + Username: "user", + }, + wantErr: ErrEmptyAuthFields, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := &Config{ + Address: tt.fields.Address, + Username: tt.fields.Username, + Password: tt.fields.Password, + PrivateKeyPath: tt.fields.PrivateKeyPath, + } + err := c.Validate() + if err != nil { + if tt.wantErr != nil && tt.wantErr.Error() != err.Error() { + t.Errorf("Validate() error = %v, wantErr %v", err, tt.wantErr) + } + } + }) + } +} diff --git a/config/paramgen.go b/config/paramgen.go new file mode 100644 index 0000000..93426de --- /dev/null +++ b/config/paramgen.go @@ -0,0 +1,66 @@ +// Code generated by paramgen. DO NOT EDIT. +// Source: github.com/ConduitIO/conduit-commons/tree/main/paramgen + +package config + +import ( + "github.com/conduitio/conduit-commons/config" +) + +const ( + ConfigAddress = "address" + ConfigDirectoryPath = "directoryPath" + ConfigHostKey = "hostKey" + ConfigPassword = "password" + ConfigPrivateKeyPath = "privateKeyPath" + ConfigUsername = "username" +) + +func (Config) Parameters() map[string]config.Parameter { + return map[string]config.Parameter{ + ConfigAddress: { + Default: "", + Description: "Address is the address of the sftp server to connect.", + Type: config.ParameterTypeString, + Validations: []config.Validation{ + config.ValidationRequired{}, + }, + }, + ConfigDirectoryPath: { + Default: "", + Description: "DirectoryPath is the path to the directory to read/write data.", + Type: config.ParameterTypeString, + Validations: []config.Validation{ + config.ValidationRequired{}, + }, + }, + ConfigHostKey: { + Default: "", + Description: "HostKey is the key used for host key callback validation.", + Type: config.ParameterTypeString, + Validations: []config.Validation{ + config.ValidationRequired{}, + }, + }, + ConfigPassword: { + Default: "", + Description: "Password is the SFTP password (can be used as passphrase for private key).", + Type: config.ParameterTypeString, + Validations: []config.Validation{}, + }, + ConfigPrivateKeyPath: { + Default: "", + Description: "PrivateKeyPath is the private key for ssh login.", + Type: config.ParameterTypeString, + Validations: []config.Validation{}, + }, + ConfigUsername: { + Default: "", + Description: "User is the SFTP user.", + Type: config.ParameterTypeString, + Validations: []config.Validation{ + config.ValidationRequired{}, + }, + }, + } +} diff --git a/connector.go b/connector.go index 8a99575..c944c50 100644 --- a/connector.go +++ b/connector.go @@ -1,10 +1,27 @@ -package connectorname +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. -import sdk "github.com/conduitio/conduit-connector-sdk" +package sftp + +import ( + source "github.com/conduitio-labs/conduit-connector-sftp/source" + sdk "github.com/conduitio/conduit-connector-sdk" +) // Connector combines all constructors for each plugin in one struct. var Connector = sdk.Connector{ NewSpecification: Specification, - NewSource: NewSource, - NewDestination: NewDestination, + NewSource: source.NewSource, + NewDestination: nil, } diff --git a/destination.go b/destination.go deleted file mode 100644 index 06924ca..0000000 --- a/destination.go +++ /dev/null @@ -1,77 +0,0 @@ -package connectorname - -//go:generate paramgen -output=paramgen_dest.go DestinationConfig - -import ( - "context" - "fmt" - - "github.com/conduitio/conduit-commons/config" - "github.com/conduitio/conduit-commons/opencdc" - sdk "github.com/conduitio/conduit-connector-sdk" -) - -type Destination struct { - sdk.UnimplementedDestination - - config DestinationConfig -} - -type DestinationConfig struct { - // Config includes parameters that are the same in the source and destination. - Config - // DestinationConfigParam must be either yes or no (defaults to yes). - DestinationConfigParam string `validate:"inclusion=yes|no" default:"yes"` -} - -func NewDestination() sdk.Destination { - // Create Destination and wrap it in the default middleware. - return sdk.DestinationWithMiddleware(&Destination{}, sdk.DefaultDestinationMiddleware()...) -} - -func (d *Destination) Parameters() config.Parameters { - // Parameters is a map of named Parameters that describe how to configure - // the Destination. Parameters can be generated from DestinationConfig with - // paramgen. - return d.config.Parameters() -} - -func (d *Destination) Configure(ctx context.Context, cfg config.Config) error { - // Configure is the first function to be called in a connector. It provides - // the connector with the configuration that can be validated and stored. - // In case the configuration is not valid it should return an error. - // Testing if your connector can reach the configured data source should be - // done in Open, not in Configure. - // The SDK will validate the configuration and populate default values - // before calling Configure. If you need to do more complex validations you - // can do them manually here. - - sdk.Logger(ctx).Info().Msg("Configuring Destination...") - err := sdk.Util.ParseConfig(ctx, cfg, &d.config, NewDestination().Parameters()) - if err != nil { - return fmt.Errorf("invalid config: %w", err) - } - return nil -} - -func (d *Destination) Open(_ context.Context) error { - // Open is called after Configure to signal the plugin it can prepare to - // start writing records. If needed, the plugin should open connections in - // this function. - return nil -} - -func (d *Destination) Write(_ context.Context, _ []opencdc.Record) (int, error) { - // Write writes len(r) records from r to the destination right away without - // caching. It should return the number of records written from r - // (0 <= n <= len(r)) and any error encountered that caused the write to - // stop early. Write must return a non-nil error if it returns n < len(r). - return 0, nil -} - -func (d *Destination) Teardown(_ context.Context) error { - // Teardown signals to the plugin that all records were written and there - // will be no more calls to any other function. After Teardown returns, the - // plugin should be ready for a graceful shutdown. - return nil -} diff --git a/destination_test.go b/destination_test.go deleted file mode 100644 index 53fe1e7..0000000 --- a/destination_test.go +++ /dev/null @@ -1,16 +0,0 @@ -package connectorname_test - -import ( - "context" - "testing" - - connectorname "github.com/conduitio/conduit-connector-connectorname" - "github.com/matryer/is" -) - -func TestTeardown_NoOpen(t *testing.T) { - is := is.New(t) - con := connectorname.NewDestination() - err := con.Teardown(context.Background()) - is.NoErr(err) -} diff --git a/go.mod b/go.mod index b87945f..bacb842 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/conduitio/conduit-connector-connectorname +module github.com/conduitio-labs/conduit-connector-sftp go 1.23.2 @@ -7,6 +7,8 @@ require ( github.com/conduitio/conduit-connector-sdk v0.12.0 github.com/golangci/golangci-lint v1.63.4 github.com/matryer/is v1.4.1 + github.com/pkg/sftp v1.13.6 + golang.org/x/crypto v0.30.0 mvdan.cc/gofumpt v0.7.0 ) @@ -108,6 +110,7 @@ require ( github.com/karamaru-alpha/copyloopvar v1.1.0 // indirect github.com/kisielk/errcheck v1.8.0 // indirect github.com/kkHAIKE/contextcheck v1.1.5 // indirect + github.com/kr/fs v0.1.0 // indirect github.com/kulti/thelper v0.6.3 // indirect github.com/kunwardeep/paralleltest v1.0.10 // indirect github.com/kyoh86/exportloopref v0.1.11 // indirect @@ -207,7 +210,6 @@ require ( go.uber.org/mock v0.5.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect - golang.org/x/crypto v0.30.0 // indirect golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c // indirect golang.org/x/exp/typeparams v0.0.0-20241108190413-2d47ceb2692f // indirect golang.org/x/mod v0.22.0 // indirect diff --git a/go.sum b/go.sum index eca8fb9..b4fbf42 100644 --- a/go.sum +++ b/go.sum @@ -244,6 +244,8 @@ github.com/kkHAIKE/contextcheck v1.1.5 h1:CdnJh63tcDe53vG+RebdpdXJTc9atMgGqdx8LX github.com/kkHAIKE/contextcheck v1.1.5/go.mod h1:O930cpht4xb1YQpK+1+AgoM3mFsvxr7uyFptcnWTYUA= github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8= +github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -339,6 +341,8 @@ github.com/otiai10/mint v1.3.1/go.mod h1:/yxELlJQ0ufhjUwhshSj+wFjZ78CnZ48/1wtmBH github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNHvL12M= github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/sftp v1.13.6 h1:JFZT4XbOU7l77xGSpOdW+pwIMqP044IyjXX6FGyEKFo= +github.com/pkg/sftp v1.13.6/go.mod h1:tz1ryNURKu77RL+GuCzmoJYxQczL3wLNNpPWagdg4Qk= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -504,6 +508,7 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/crypto v0.30.0 h1:RwoQn3GkWiMkzlX562cLB7OxWvjH1L8xutO2WoJcRoY= @@ -537,6 +542,7 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= @@ -578,6 +584,7 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -589,6 +596,7 @@ golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc= golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= @@ -596,6 +604,8 @@ golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU= golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= +golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q= +golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= diff --git a/paramgen_dest.go b/paramgen_dest.go deleted file mode 100644 index 376ba64..0000000 --- a/paramgen_dest.go +++ /dev/null @@ -1,34 +0,0 @@ -// Code generated by paramgen. DO NOT EDIT. -// Source: github.com/ConduitIO/conduit-commons/tree/main/paramgen - -package connectorname - -import ( - "github.com/conduitio/conduit-commons/config" -) - -const ( - DestinationConfigDestinationConfigParam = "destinationConfigParam" - DestinationConfigGlobalConfigParamName = "global_config_param_name" -) - -func (DestinationConfig) Parameters() map[string]config.Parameter { - return map[string]config.Parameter{ - DestinationConfigDestinationConfigParam: { - Default: "yes", - Description: "DestinationConfigParam must be either yes or no (defaults to yes).", - Type: config.ParameterTypeString, - Validations: []config.Validation{ - config.ValidationInclusion{List: []string{"yes", "no"}}, - }, - }, - DestinationConfigGlobalConfigParamName: { - Default: "", - Description: "GlobalConfigParam is named global_config_param_name and needs to be\nprovided by the user.", - Type: config.ParameterTypeString, - Validations: []config.Validation{ - config.ValidationRequired{}, - }, - }, - } -} diff --git a/paramgen_src.go b/paramgen_src.go deleted file mode 100644 index 539d403..0000000 --- a/paramgen_src.go +++ /dev/null @@ -1,34 +0,0 @@ -// Code generated by paramgen. DO NOT EDIT. -// Source: github.com/ConduitIO/conduit-commons/tree/main/paramgen - -package connectorname - -import ( - "github.com/conduitio/conduit-commons/config" -) - -const ( - SourceConfigFoo = "foo" - SourceConfigGlobalConfigParamName = "global_config_param_name" -) - -func (SourceConfig) Parameters() map[string]config.Parameter { - return map[string]config.Parameter{ - SourceConfigFoo: { - Default: "", - Description: "SourceConfigParam is named foo and must be provided by the user.", - Type: config.ParameterTypeString, - Validations: []config.Validation{ - config.ValidationRequired{}, - }, - }, - SourceConfigGlobalConfigParamName: { - Default: "", - Description: "GlobalConfigParam is named global_config_param_name and needs to be\nprovided by the user.", - Type: config.ParameterTypeString, - Validations: []config.Validation{ - config.ValidationRequired{}, - }, - }, - } -} diff --git a/setup.sh b/setup.sh deleted file mode 100755 index 226b9dd..0000000 --- a/setup.sh +++ /dev/null @@ -1,40 +0,0 @@ -#!/bin/bash -if [ $# -eq 0 ] -then - echo "Module name not provided." - exit 1 -fi - -MODULE_NAME=$1 -if ! [[ "$MODULE_NAME" =~ ^github.com\/.*\/conduit-connector-(.*)$ ]] -then - echo "Module name ${MODULE_NAME} not in recommended format \"github.com/repository/conduit-connector-connectorname\"." - echo - echo "Certain things (such as pull request templates) will not work correctly." - while true; do - read -n1 -p "Are you sure you want to continue? [y/n] " yn - echo - case $yn in - [Yy]* ) break;; - [Nn]* ) exit;; - * ) echo "Please answer yes or no.";; - esac - done -fi - -CONNECTOR_NAME=${BASH_REMATCH[1]} - -if [[ "$OSTYPE" == "darwin"* ]]; then - LC_ALL=C find . -type f ! -name "setup.sh" -exec sed -i "" "s~github.com/conduitio/conduit-connector-connectorname~$MODULE_NAME~g" {} + - LC_ALL=C find . -type f ! -name "setup.sh" -exec sed -i "" "s~connectorname~$CONNECTOR_NAME~g" {} + - LC_ALL=C sed -i "" "s~* @ConduitIO/conduit-core~ ~g" .github/CODEOWNERS -else - find . -type f ! -name "setup.sh" -exec sed -i "s~github.com/conduitio/conduit-connector-connectorname~$MODULE_NAME~g" {} + - find . -type f ! -name "setup.sh" -exec sed -i "s~connectorname~$CONNECTOR_NAME~g" {} + - sed -i "s~* @ConduitIO/conduit-core~ ~g" .github/CODEOWNERS -fi - -# Remove this script -rm "$0" -rm README.md -mv README_TEMPLATE.md README.md diff --git a/source.go b/source.go deleted file mode 100644 index 75f8d39..0000000 --- a/source.go +++ /dev/null @@ -1,100 +0,0 @@ -package connectorname - -//go:generate paramgen -output=paramgen_src.go SourceConfig - -import ( - "context" - "fmt" - - "github.com/conduitio/conduit-commons/config" - "github.com/conduitio/conduit-commons/opencdc" - sdk "github.com/conduitio/conduit-connector-sdk" -) - -type Source struct { - sdk.UnimplementedSource - - config SourceConfig - lastPositionRead opencdc.Position //nolint:unused // this is just an example -} - -type SourceConfig struct { - // Config includes parameters that are the same in the source and destination. - Config - // SourceConfigParam is named foo and must be provided by the user. - SourceConfigParam string `json:"foo" validate:"required"` -} - -func NewSource() sdk.Source { - // Create Source and wrap it in the default middleware. - return sdk.SourceWithMiddleware(&Source{}, sdk.DefaultSourceMiddleware()...) -} - -func (s *Source) Parameters() config.Parameters { - // Parameters is a map of named Parameters that describe how to configure - // the Source. Parameters can be generated from SourceConfig with paramgen. - return s.config.Parameters() -} - -func (s *Source) Configure(ctx context.Context, cfg config.Config) error { - // Configure is the first function to be called in a connector. It provides - // the connector with the configuration that can be validated and stored. - // In case the configuration is not valid it should return an error. - // Testing if your connector can reach the configured data source should be - // done in Open, not in Configure. - // The SDK will validate the configuration and populate default values - // before calling Configure. If you need to do more complex validations you - // can do them manually here. - - sdk.Logger(ctx).Info().Msg("Configuring Source...") - err := sdk.Util.ParseConfig(ctx, cfg, &s.config, NewSource().Parameters()) - if err != nil { - return fmt.Errorf("invalid config: %w", err) - } - return nil -} - -func (s *Source) Open(_ context.Context, _ opencdc.Position) error { - // Open is called after Configure to signal the plugin it can prepare to - // start producing records. If needed, the plugin should open connections in - // this function. The position parameter will contain the position of the - // last record that was successfully processed, Source should therefore - // start producing records after this position. The context passed to Open - // will be cancelled once the plugin receives a stop signal from Conduit. - return nil -} - -func (s *Source) Read(_ context.Context) (opencdc.Record, error) { - // Read returns a new Record and is supposed to block until there is either - // a new record or the context gets cancelled. It can also return the error - // ErrBackoffRetry to signal to the SDK it should call Read again with a - // backoff retry. - // If Read receives a cancelled context or the context is cancelled while - // Read is running it must stop retrieving new records from the source - // system and start returning records that have already been buffered. If - // there are no buffered records left Read must return the context error to - // signal a graceful stop. If Read returns ErrBackoffRetry while the context - // is cancelled it will also signal that there are no records left and Read - // won't be called again. - // After Read returns an error the function won't be called again (except if - // the error is ErrBackoffRetry, as mentioned above). - // Read can be called concurrently with Ack. - return opencdc.Record{}, nil -} - -func (s *Source) Ack(_ context.Context, _ opencdc.Position) error { - // Ack signals to the implementation that the record with the supplied - // position was successfully processed. This method might be called after - // the context of Read is already cancelled, since there might be - // outstanding acks that need to be delivered. When Teardown is called it is - // guaranteed there won't be any more calls to Ack. - // Ack can be called concurrently with Read. - return nil -} - -func (s *Source) Teardown(_ context.Context) error { - // Teardown signals to the plugin that there will be no more calls to any - // other function. After Teardown returns, the plugin should be ready for a - // graceful shutdown. - return nil -} diff --git a/source/config/config.go b/source/config/config.go new file mode 100644 index 0000000..d4156c5 --- /dev/null +++ b/source/config/config.go @@ -0,0 +1,42 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "fmt" + + "github.com/conduitio-labs/conduit-connector-sftp/config" +) + +//go:generate paramgen -output=paramgen.go Config + +type Config struct { + config.Config + + // Pattern to filter files in the source directory. + FilePattern string `json:"filePattern" default:"*"` + // Maximum size of a file chunk in bytes to split large files, default is 3MB. + FileChunkSizeBytes int64 `json:"fileChunkSizeBytes" default:"3145728"` +} + +// Validate executes manual validations. +func (c Config) Validate() error { + err := c.Config.Validate() + if err != nil { + return fmt.Errorf("validate config: %w", err) + } + + return nil +} diff --git a/source/config/paramgen.go b/source/config/paramgen.go new file mode 100644 index 0000000..2424853 --- /dev/null +++ b/source/config/paramgen.go @@ -0,0 +1,80 @@ +// Code generated by paramgen. DO NOT EDIT. +// Source: github.com/ConduitIO/conduit-commons/tree/main/paramgen + +package config + +import ( + "github.com/conduitio/conduit-commons/config" +) + +const ( + ConfigAddress = "address" + ConfigDirectoryPath = "directoryPath" + ConfigFileChunkSizeBytes = "fileChunkSizeBytes" + ConfigFilePattern = "filePattern" + ConfigHostKey = "hostKey" + ConfigPassword = "password" + ConfigPrivateKeyPath = "privateKeyPath" + ConfigUsername = "username" +) + +func (Config) Parameters() map[string]config.Parameter { + return map[string]config.Parameter{ + ConfigAddress: { + Default: "", + Description: "Address is the address of the sftp server to connect.", + Type: config.ParameterTypeString, + Validations: []config.Validation{ + config.ValidationRequired{}, + }, + }, + ConfigDirectoryPath: { + Default: "", + Description: "DirectoryPath is the path to the directory to read/write data.", + Type: config.ParameterTypeString, + Validations: []config.Validation{ + config.ValidationRequired{}, + }, + }, + ConfigFileChunkSizeBytes: { + Default: "3145728", + Description: "Maximum size of a file chunk in bytes to split large files, default is 3MB.", + Type: config.ParameterTypeInt, + Validations: []config.Validation{}, + }, + ConfigFilePattern: { + Default: "*", + Description: "Pattern to filter files in the source directory.", + Type: config.ParameterTypeString, + Validations: []config.Validation{}, + }, + ConfigHostKey: { + Default: "", + Description: "HostKey is the key used for host key callback validation.", + Type: config.ParameterTypeString, + Validations: []config.Validation{ + config.ValidationRequired{}, + }, + }, + ConfigPassword: { + Default: "", + Description: "Password is the SFTP password (can be used as passphrase for private key).", + Type: config.ParameterTypeString, + Validations: []config.Validation{}, + }, + ConfigPrivateKeyPath: { + Default: "", + Description: "PrivateKeyPath is the private key for ssh login.", + Type: config.ParameterTypeString, + Validations: []config.Validation{}, + }, + ConfigUsername: { + Default: "", + Description: "User is the SFTP user.", + Type: config.ParameterTypeString, + Validations: []config.Validation{ + config.ValidationRequired{}, + }, + }, + } +} diff --git a/source/iterator.go b/source/iterator.go new file mode 100644 index 0000000..a4aa4b4 --- /dev/null +++ b/source/iterator.go @@ -0,0 +1,337 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package source + +import ( + "context" + "crypto/md5" //nolint: gosec // MD5 used for non-cryptographic unique identifier + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "io" + "io/fs" + "math" + "os" + "path/filepath" + "sort" + "time" + + "github.com/conduitio-labs/conduit-connector-sftp/source/config" + "github.com/conduitio/conduit-commons/opencdc" + sdk "github.com/conduitio/conduit-connector-sdk" + "github.com/pkg/sftp" +) + +var ErrFileModifiedDuringRead = errors.New("file was modified during read") + +type fileInfo struct { + name string + size int64 + modTime time.Time +} + +type Iterator struct { + sftpClient *sftp.Client + position *Position + config config.Config + files []fileInfo +} + +func NewIterator(sftpClient *sftp.Client, position *Position, config config.Config) *Iterator { + return &Iterator{ + sftpClient: sftpClient, + position: position, + config: config, + } +} + +// hasNext indicates whether the source has the next record to return or not. +func (iter *Iterator) hasNext() (bool, error) { + if len(iter.files) != 0 { + return true, nil + } + + if err := iter.loadFiles(); err != nil { + return false, fmt.Errorf("load files: %w", err) + } + + return len(iter.files) != 0, nil +} + +// Next returns the next record. +func (iter *Iterator) Next(ctx context.Context) (opencdc.Record, error) { + if err := ctx.Err(); err != nil { + return opencdc.Record{}, err + } + + hasNext, err := iter.hasNext() + if err != nil { + return opencdc.Record{}, err + } + if !hasNext { + return opencdc.Record{}, sdk.ErrBackoffRetry + } + + file := iter.files[0] + return iter.processFile(ctx, file) +} + +type fileReadResult struct { + content []byte + metadata opencdc.Metadata + position *Position + chunkIndex int + totalChunks int +} + +func (iter *Iterator) processFile(ctx context.Context, fileInfo fileInfo) (opencdc.Record, error) { + filePath := filepath.Join(iter.config.DirectoryPath, fileInfo.name) + + // Handle initial file validation and opening + initialStat, file, err := iter.prepareFileRead(filePath, fileInfo) + if err != nil { + if errors.Is(err, ErrFileModifiedDuringRead) || errors.Is(err, fs.ErrNotExist) { + return iter.handleFileError(ctx, err) + } + return opencdc.Record{}, err + } + defer file.Close() + + if err := ctx.Err(); err != nil { + return opencdc.Record{}, err + } + + isLargeFile := fileInfo.size >= iter.config.FileChunkSizeBytes + result, err := iter.readFileContent(ctx, file, initialStat, fileInfo, isLargeFile) + if err != nil { + if errors.Is(err, ErrFileModifiedDuringRead) || errors.Is(err, fs.ErrNotExist) { + return iter.handleFileError(ctx, err) + } + return opencdc.Record{}, err + } + + // Update iterator state + iter.position = result.position + if !isLargeFile || result.chunkIndex == result.totalChunks { + iter.files = iter.files[1:] + iter.position.ChunkInfo = nil + } + + positionBytes, err := json.Marshal(result.position) + if err != nil { + return opencdc.Record{}, fmt.Errorf("marshal position: %w", err) + } + + return sdk.Util.Source.NewRecordCreate( + positionBytes, + result.metadata, + opencdc.StructuredData{"filename": fileInfo.name}, + opencdc.RawData(result.content), + ), nil +} + +func (iter *Iterator) prepareFileRead(filePath string, fileInfo fileInfo) (os.FileInfo, *sftp.File, error) { + initialStat, err := iter.sftpClient.Stat(filePath) + if err != nil { + return nil, nil, fmt.Errorf("stat file: %w", err) + } + + if !initialStat.ModTime().UTC().Equal(fileInfo.modTime) { + return nil, nil, ErrFileModifiedDuringRead + } + + file, err := iter.sftpClient.Open(filePath) + if err != nil { + return nil, nil, fmt.Errorf("open file: %w", err) + } + + return initialStat, file, nil +} + +func (iter *Iterator) readFileContent(ctx context.Context, file *sftp.File, stat os.FileInfo, fileInfo fileInfo, isLargeFile bool) (fileReadResult, error) { + result := fileReadResult{ + position: &Position{LastProcessedFileTimestamp: stat.ModTime().UTC()}, + } + + if isLargeFile { + return iter.readLargeFileChunk(ctx, file, stat, fileInfo) + } + + content, err := io.ReadAll(file) + if err != nil { + return result, fmt.Errorf("read file: %w", err) + } + + filePath := filepath.Join(iter.config.DirectoryPath, fileInfo.name) + + if err := iter.validateFile(ctx, stat, filePath); err != nil { + return result, err + } + + result.content = content + result.metadata = iter.createMetadata(stat, filePath, len(content)) + + return result, nil +} + +func (iter *Iterator) readLargeFileChunk(ctx context.Context, file *sftp.File, stat os.FileInfo, fileInfo fileInfo) (fileReadResult, error) { + result := fileReadResult{ + position: &Position{LastProcessedFileTimestamp: stat.ModTime().UTC()}, + totalChunks: int(math.Ceil(float64(stat.Size()) / float64(iter.config.FileChunkSizeBytes))), + } + + result.chunkIndex = 1 + if iter.position.ChunkInfo != nil && + iter.position.ChunkInfo.Filename == fileInfo.name && + iter.position.ChunkInfo.ModTime == stat.ModTime().UTC().Format(time.RFC3339) { + result.chunkIndex = iter.position.ChunkInfo.ChunkIndex + 1 + } + + if err := ctx.Err(); err != nil { + return result, err + } + + offset := int64(result.chunkIndex-1) * iter.config.FileChunkSizeBytes + chunkSize := int(math.Min(float64(iter.config.FileChunkSizeBytes), float64(stat.Size()-offset))) + + if _, err := file.Seek(offset, io.SeekStart); err != nil { + return result, fmt.Errorf("seek file: %w", err) + } + + chunk := make([]byte, chunkSize) + if _, err := io.ReadFull(file, chunk); err != nil { + return result, fmt.Errorf("read chunk: %w", err) + } + + filePath := filepath.Join(iter.config.DirectoryPath, fileInfo.name) + + if err := iter.validateFile(ctx, stat, filePath); err != nil { + return result, err + } + + result.content = chunk + + result.metadata = iter.createMetadata(stat, filePath, len(chunk)) + result.metadata["chunk_index"] = fmt.Sprintf("%d", result.chunkIndex) + result.metadata["total_chunks"] = fmt.Sprintf("%d", result.totalChunks) + result.metadata["is_chunked"] = "true" + + result.position.ChunkInfo = &ChunkInfo{ + Filename: stat.Name(), + ChunkIndex: result.chunkIndex, + TotalChunks: result.totalChunks, + ModTime: stat.ModTime().UTC().Format(time.RFC3339), + } + + return result, nil +} + +func (iter *Iterator) handleFileError(ctx context.Context, err error) (opencdc.Record, error) { + if errors.Is(err, ErrFileModifiedDuringRead) { + if err := iter.loadFiles(); err != nil { + return opencdc.Record{}, fmt.Errorf("load files: %w", err) + } + return iter.Next(ctx) + } + // File was deleted/moved/renamed, skip to next file + iter.files = iter.files[1:] + return iter.Next(ctx) +} + +// loadFiles finds files matching the pattern that haven't been processed. +func (iter *Iterator) loadFiles() error { + files, err := iter.sftpClient.ReadDir(iter.config.DirectoryPath) + if err != nil { + return fmt.Errorf("read directory: %w", err) + } + + var unprocessedFiles []fileInfo + for _, file := range files { + if file.IsDir() { + continue + } + + fileName := file.Name() + modTime := file.ModTime().UTC() + + // Check if the file matches the pattern and is unprocessed or partially processed. + if iter.shouldProcessFile(fileName, modTime) { + unprocessedFiles = append(unprocessedFiles, fileInfo{ + name: fileName, + size: file.Size(), + modTime: modTime, + }) + } + } + + // Sort unprocessed files by modification time to maintain order. + sort.Slice(unprocessedFiles, func(i, j int) bool { + return unprocessedFiles[i].modTime.Before(unprocessedFiles[j].modTime) + }) + + iter.files = unprocessedFiles + return nil +} + +// helper method to determine if a file should be processed. +func (iter *Iterator) shouldProcessFile(fileName string, modTime time.Time) bool { + if matched, _ := filepath.Match(iter.config.FilePattern, fileName); matched && + modTime.After(iter.position.LastProcessedFileTimestamp) { + return true + } + + if iter.position.ChunkInfo != nil && + fileName == iter.position.ChunkInfo.Filename && + modTime.Equal(iter.position.LastProcessedFileTimestamp) { + return true + } + + return false +} + +// validateFile ensures file integrity during read. +func (iter *Iterator) validateFile(ctx context.Context, fileInfo os.FileInfo, filePath string) error { + finalStat, err := iter.sftpClient.Stat(filePath) + if err != nil { + return fmt.Errorf("stat file: %w", err) + } + + if !fileInfo.ModTime().Equal(finalStat.ModTime()) { + sdk.Logger(ctx).Info().Msgf(`file "%s" modified during read`, fileInfo.Name()) + return ErrFileModifiedDuringRead + } + + return nil +} + +func (iter *Iterator) createMetadata(fileInfo os.FileInfo, filePath string, contentLength int) opencdc.Metadata { + return opencdc.Metadata{ + opencdc.MetadataCollection: iter.config.DirectoryPath, + opencdc.MetadataCreatedAt: time.Now().UTC().Format(time.RFC3339), + "filename": fileInfo.Name(), + "source_path": filePath, + "file_size": fmt.Sprintf("%d", contentLength), + "mod_time": fileInfo.ModTime().UTC().Format(time.RFC3339), + "hash": generateFileHash(fileInfo.Name(), fileInfo.ModTime().UTC(), fileInfo.Size()), + } +} + +// generateFileHash creates a unique hash based on file name, mod time, and size. +func generateFileHash(fileName string, modTime time.Time, fileSize int64) string { + data := fmt.Sprintf("%s|%s|%d", fileName, modTime.Format(time.RFC3339), fileSize) + hash := md5.Sum([]byte(data)) //nolint: gosec // MD5 used for non-cryptographic unique identifier + return hex.EncodeToString(hash[:]) +} diff --git a/source/position.go b/source/position.go new file mode 100644 index 0000000..4857a95 --- /dev/null +++ b/source/position.go @@ -0,0 +1,51 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package source + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/conduitio/conduit-commons/opencdc" +) + +// ChunkInfo stores information about file chunks. +type ChunkInfo struct { + Filename string `json:"filename"` + ChunkIndex int `json:"chunkIndex"` + TotalChunks int `json:"totalChunks"` + ModTime string `json:"modTime"` +} + +// Position represents SFTP's position. +type Position struct { + LastProcessedFileTimestamp time.Time `json:"lastProcessedFileTimestamp"` + ChunkInfo *ChunkInfo `json:"chunkInfo,omitempty"` +} + +// ParseSDKPosition parses opencdc.Position and returns Position. +func ParseSDKPosition(position opencdc.Position) (*Position, error) { + if position == nil { + return &Position{}, nil + } + + var pos Position + if err := json.Unmarshal(position, &pos); err != nil { + return nil, fmt.Errorf("unmarshal position: %w", err) + } + + return &pos, nil +} diff --git a/source/position_test.go b/source/position_test.go new file mode 100644 index 0000000..a37c9d3 --- /dev/null +++ b/source/position_test.go @@ -0,0 +1,71 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package source + +import ( + "errors" + "testing" + "time" + + "github.com/conduitio/conduit-commons/opencdc" + "github.com/matryer/is" +) + +func TestParseSDKPosition(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + in opencdc.Position + wantPos *Position + wantErr error + }{ + { + name: "success_position_is_nil", + in: nil, + wantPos: &Position{}, + }, + { + name: "success_valid_position", + in: opencdc.Position(`{ + "lastProcessedFileTimestamp": "2024-06-01T12:00:00Z" + }`), + wantPos: &Position{ + LastProcessedFileTimestamp: time.Date(2024, 6, 1, 12, 0, 0, 0, time.UTC), + }, + }, + { + name: "failure_invalid_json", + in: opencdc.Position("invalid"), + wantErr: errors.New("unmarshal position: invalid character 'i' looking for beginning of value"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + is := is.New(t) + + got, err := ParseSDKPosition(tt.in) + if tt.wantErr == nil { + is.NoErr(err) + is.Equal(got, tt.wantPos) + } else { + is.True(err != nil) + is.Equal(err.Error(), tt.wantErr.Error()) + } + }) + } +} diff --git a/source/source.go b/source/source.go new file mode 100644 index 0000000..3e391a7 --- /dev/null +++ b/source/source.go @@ -0,0 +1,190 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package source + +import ( + "context" + "errors" + "fmt" + "os" + + "github.com/conduitio-labs/conduit-connector-sftp/source/config" + commonsConfig "github.com/conduitio/conduit-commons/config" + "github.com/conduitio/conduit-commons/lang" + "github.com/conduitio/conduit-commons/opencdc" + sdk "github.com/conduitio/conduit-connector-sdk" + "github.com/pkg/sftp" + "golang.org/x/crypto/ssh" +) + +type Source struct { + sdk.UnimplementedSource + + config config.Config + position *Position + + sshClient *ssh.Client + sftpClient *sftp.Client + + iterator *Iterator +} + +// NewSource initialises a new source. +func NewSource() sdk.Source { + return sdk.SourceWithMiddleware(&Source{}, sdk.DefaultSourceMiddleware( + // disable schema extraction by default, because the source produces raw payload data + sdk.SourceWithSchemaExtractionConfig{ + PayloadEnabled: lang.Ptr(false), + }, + )...) +} + +// Parameters returns a map of named Parameters that describe how to configure the Source. +func (s *Source) Parameters() commonsConfig.Parameters { + return s.config.Parameters() +} + +func (s *Source) Configure(ctx context.Context, cfgRaw commonsConfig.Config) error { + sdk.Logger(ctx).Info().Msg("Configuring Source...") + err := sdk.Util.ParseConfig(ctx, cfgRaw, &s.config, NewSource().Parameters()) + if err != nil { + return fmt.Errorf("invalid config: %w", err) + } + + err = s.config.Validate() + if err != nil { + return fmt.Errorf("error validating configuration: %w", err) + } + + return nil +} + +func (s *Source) Open(ctx context.Context, position opencdc.Position) error { + sdk.Logger(ctx).Info().Msg("Opening a SFTP Source...") + sshConfig, err := s.sshConfigAuth() + if err != nil { + return fmt.Errorf("failed to create SSH config: %w", err) + } + + s.sshClient, err = ssh.Dial("tcp", s.config.Address, sshConfig) + if err != nil { + return fmt.Errorf("failed to dial SSH: %w", err) + } + + s.sftpClient, err = sftp.NewClient(s.sshClient) + if err != nil { + err = s.sshClient.Close() + if err != nil { + return fmt.Errorf("failed to close SSH client: %w", err) + } + return fmt.Errorf("failed to create SFTP client: %w", err) + } + + _, err = s.sftpClient.Stat(s.config.DirectoryPath) + if err != nil { + return fmt.Errorf(`remote path "%s" does not exist: %w`, s.config.DirectoryPath, err) + } + + s.position, err = ParseSDKPosition(position) + if err != nil { + return err + } + + s.iterator = NewIterator(s.sftpClient, s.position, s.config) + + return nil +} + +func (s *Source) Read(ctx context.Context) (opencdc.Record, error) { + sdk.Logger(ctx).Debug().Msg("Reading a record from SFTP Source...") + + return s.iterator.Next(ctx) +} + +func (s *Source) Ack(ctx context.Context, position opencdc.Position) error { + sdk.Logger(ctx).Trace(). + Str("position", string(position)). + Msg("got ack") + + return nil +} + +func (s *Source) Teardown(ctx context.Context) error { + sdk.Logger(ctx).Info().Msg("Tearing down the SFTP Source") + + var errs []error + if s.sftpClient != nil { + if err := s.sftpClient.Close(); err != nil { + errs = append(errs, fmt.Errorf("close SFTP client: %w", err)) + } + } + + if s.sshClient != nil { + if err := s.sshClient.Close(); err != nil { + errs = append(errs, fmt.Errorf("close SSH client: %w", err)) + } + } + + return errors.Join(errs...) +} + +func (s *Source) sshConfigAuth() (*ssh.ClientConfig, error) { + sshConfig := &ssh.ClientConfig{ + User: s.config.Username, + } + + //nolint:dogsled // not required here. + hostKey, _, _, _, err := ssh.ParseAuthorizedKey([]byte(s.config.HostKey)) + if err != nil { + return nil, fmt.Errorf("failed to parse host key: %w", err) + } + + sshConfig.HostKeyCallback = ssh.FixedHostKey(hostKey) + + if s.config.PrivateKeyPath != "" { + auth, err := s.authWithPrivateKey() + if err != nil { + return nil, err + } + + sshConfig.Auth = []ssh.AuthMethod{auth} + return sshConfig, nil + } + + sshConfig.Auth = []ssh.AuthMethod{ssh.Password(s.config.Password)} + return sshConfig, nil +} + +func (s *Source) authWithPrivateKey() (ssh.AuthMethod, error) { + key, err := os.ReadFile(s.config.PrivateKeyPath) + if err != nil { + return nil, fmt.Errorf("failed to read private key file: %w", err) + } + + if s.config.Password != "" { + signer, err := ssh.ParsePrivateKeyWithPassphrase(key, []byte(s.config.Password)) + if err != nil { + return nil, fmt.Errorf("failed to parse private key: %w", err) + } + return ssh.PublicKeys(signer), nil + } + + signer, err := ssh.ParsePrivateKey(key) + if err != nil { + return nil, fmt.Errorf("failed to parse private key: %w", err) + } + + return ssh.PublicKeys(signer), nil +} diff --git a/source/source_integration_test.go b/source/source_integration_test.go new file mode 100644 index 0000000..10f1bcd --- /dev/null +++ b/source/source_integration_test.go @@ -0,0 +1,540 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package source + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "os" + "os/exec" + "strings" + "testing" + "time" + + "github.com/conduitio-labs/conduit-connector-sftp/config" + "github.com/conduitio/conduit-commons/opencdc" + sdk "github.com/conduitio/conduit-connector-sdk" + "github.com/matryer/is" + "github.com/pkg/sftp" + "golang.org/x/crypto/ssh" +) + +func TestSource_Configure(t *testing.T) { + t.Parallel() + + t.Run("source configure success", func(t *testing.T) { + is := is.New(t) + + s := NewSource() + + err := s.Configure(context.Background(), map[string]string{ + config.ConfigAddress: "locahost:22", + config.ConfigHostKey: "host-key", + config.ConfigUsername: "root", + config.ConfigPassword: "root", + config.ConfigDirectoryPath: "/home/root", + }) + is.NoErr(err) + }) + + t.Run("source configure failure", func(t *testing.T) { + is := is.New(t) + + s := NewSource() + + err := s.Configure(context.Background(), map[string]string{ + config.ConfigHostKey: "host-key", + config.ConfigUsername: "root", + config.ConfigPassword: "root", + config.ConfigDirectoryPath: "/home/root", + }) + is.True(err != nil) + is.Equal(err.Error(), + `invalid config: config invalid: error validating "address": required parameter is not provided`) + }) + + t.Run("source configure fail config validate", func(t *testing.T) { + is := is.New(t) + + s := NewSource() + + err := s.Configure(context.Background(), map[string]string{ + config.ConfigAddress: "locahost:22", + config.ConfigHostKey: "host-key", + config.ConfigUsername: "root", + config.ConfigPassword: "", + config.ConfigDirectoryPath: "/home/root", + }) + is.True(err != nil) + is.Equal(err.Error(), + `error validating configuration: validate config: both "password" and "privateKeyPath" can not be empty`) + }) +} + +func TestSource_Open(t *testing.T) { + t.Parallel() + + hostKey, err := setupHostKey() + if err != nil { + fmt.Println(err) + return + } + + t.Run("source open success", func(t *testing.T) { + is := is.New(t) + + s := NewSource() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + err = s.Configure(ctx, map[string]string{ + config.ConfigAddress: "localhost:2222", + config.ConfigHostKey: hostKey, + config.ConfigUsername: "user", + config.ConfigPassword: "pass", + config.ConfigDirectoryPath: "/source", + }) + is.NoErr(err) + + err = s.Open(ctx, nil) + is.NoErr(err) + + err = s.Teardown(ctx) + is.NoErr(err) + }) + + t.Run("source open error sshConfig", func(t *testing.T) { + is := is.New(t) + + s := NewSource() + + ctx := context.Background() + + err := s.Configure(ctx, map[string]string{ + config.ConfigAddress: "localhost:2222", + config.ConfigHostKey: "hostKey", + config.ConfigUsername: "user", + config.ConfigPassword: "pass", + config.ConfigDirectoryPath: "/source", + }) + is.NoErr(err) + + err = s.Open(ctx, nil) + is.True(err != nil) + is.Equal(err.Error(), "failed to create SSH config: failed to parse host key: ssh: no key found") + + err = s.Teardown(ctx) + is.NoErr(err) + }) + + t.Run("source open error read private key", func(t *testing.T) { + is := is.New(t) + + s := NewSource() + + ctx := context.Background() + + err = s.Configure(ctx, map[string]string{ + config.ConfigAddress: "localhost:2222", + config.ConfigHostKey: hostKey, + config.ConfigUsername: "user", + config.ConfigPrivateKeyPath: "privatekey", + config.ConfigDirectoryPath: "/source", + }) + is.NoErr(err) + + err = s.Open(ctx, nil) + is.True(err != nil) + is.Equal(err.Error(), "failed to create SSH config: failed to read private key file: open privatekey: no such file or directory") + + err = s.Teardown(ctx) + is.NoErr(err) + }) + + t.Run("source open error ssh dial", func(t *testing.T) { + is := is.New(t) + + s := NewSource() + + ctx := context.Background() + + err = s.Configure(ctx, map[string]string{ + config.ConfigAddress: "localhost:22", + config.ConfigHostKey: hostKey, + config.ConfigUsername: "root", + config.ConfigPassword: "root", + config.ConfigDirectoryPath: "/home/root", + }) + is.NoErr(err) + + err = s.Open(ctx, nil) + is.True(err != nil) + is.Equal(err.Error(), "failed to dial SSH: ssh: handshake failed: ssh: host key mismatch") + }) + + t.Run("source open error remote path not exist", func(t *testing.T) { + is := is.New(t) + + s := NewSource() + + ctx := context.Background() + + err = s.Configure(ctx, map[string]string{ + config.ConfigAddress: "localhost:2222", + config.ConfigHostKey: hostKey, + config.ConfigUsername: "user", + config.ConfigPassword: "pass", + config.ConfigDirectoryPath: "/home/root", + }) + is.NoErr(err) + + err = s.Open(ctx, nil) + is.True(err != nil) + is.Equal(err.Error(), `remote path "/home/root" does not exist: file does not exist`) + }) +} + +func TestSource_Read(t *testing.T) { + hostKey, err := setupHostKey() + if err != nil { + t.Fatalf("failed to setup host key: %v", err) + } + + configuration := map[string]string{ + "address": "localhost:2222", + "hostKey": hostKey, + "username": "user", + "password": "pass", + "directoryPath": "/source", + "filePattern": "*", + "fileChunkSizeBytes": "1024", + } + + t.Run("success reading new file", func(t *testing.T) { + is := is.New(t) + + _, err = writeTestFile("test.txt", "Hello World!") + if err != nil { + t.Fatal(err) + } + + s := &Source{} + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + err = s.Configure(ctx, configuration) + is.NoErr(err) + + err = s.Open(ctx, nil) + is.NoErr(err) + + record, err := s.Read(ctx) + is.NoErr(err) + + // Verify record contents + is.Equal(record.Operation, opencdc.OperationCreate) + is.Equal(record.Metadata["filename"], "test.txt") + is.Equal(string(record.Payload.After.(opencdc.RawData)), "Hello World!") + + // Verify position was updated + pos, err := ParseSDKPosition(record.Position) + is.NoErr(err) + is.True(pos.LastProcessedFileTimestamp.After(time.Time{})) + + err = s.Teardown(ctx) + is.NoErr(err) + + err = removeTestFile("test.txt") + if err != nil { + t.Fatal(err) + } + }) + + t.Run("no new files available", func(t *testing.T) { + is := is.New(t) + s := &Source{} + ctx := context.Background() + + err = s.Configure(ctx, configuration) + is.NoErr(err) + + err = s.Open(ctx, nil) + is.NoErr(err) + + record, err := s.Read(ctx) + is.True(errors.Is(err, sdk.ErrBackoffRetry)) + is.Equal(record, opencdc.Record{}) + + err = s.Teardown(ctx) + is.NoErr(err) + }) + + t.Run("file pattern filtering", func(t *testing.T) { + is := is.New(t) + s := &Source{} + ctx := context.Background() + + // Create additional test files with different extensions + _, err = writeTestFile("test.csv", "1,2,3") + is.NoErr(err) + _, err = writeTestFile("test.json", `{"key": "value"}`) + is.NoErr(err) + + err = s.Configure(ctx, map[string]string{ + "address": "localhost:2222", + "hostKey": hostKey, + "username": "user", + "password": "pass", + "directoryPath": "/source", + "filePattern": "*.csv", + "fileChunkSizeBytes": "1024", + }) + is.NoErr(err) + + err = s.Open(ctx, nil) + is.NoErr(err) + + record, err := s.Read(ctx) + is.NoErr(err) + is.Equal(record.Metadata["filename"], "test.csv") + + err = s.Teardown(ctx) + is.NoErr(err) + }) +} + +func TestSource_ReadLargeFile(t *testing.T) { + hostKey, err := setupHostKey() + if err != nil { + t.Fatalf("failed to setup host key: %v", err) + } + + // Setup a large test file that will be chunked + largeContent := strings.Repeat("Large file content\n", 1000) + stat, err := writeTestFile("large.txt", largeContent) + if err != nil { + t.Fatal(err) + } + defer removeTestFile("large.txt") + + configuration := map[string]string{ + "address": "localhost:2222", + "hostKey": hostKey, + "username": "user", + "password": "pass", + "directoryPath": "/source", + "filePattern": "*.txt", + "fileChunkSizeBytes": "1024", + } + + t.Run("success reading large file in chunks", func(t *testing.T) { + is := is.New(t) + s := &Source{} + ctx := context.Background() + + err = s.Configure(ctx, configuration) + is.NoErr(err) + + err = s.Open(ctx, nil) + is.NoErr(err) + + // Read first chunk + record1, err := s.Read(ctx) + is.NoErr(err) + + // Verify first chunk metadata + is.Equal(record1.Metadata["filename"], "large.txt") + is.Equal(record1.Metadata["is_chunked"], "true") + is.Equal(record1.Metadata["chunk_index"], "1") + expectedTotalChunks := len(largeContent) / 1024 + if len(largeContent)%1024 != 0 { + expectedTotalChunks++ + } + is.Equal(record1.Metadata["total_chunks"], fmt.Sprintf("%d", expectedTotalChunks)) + + // Read and verify middle chunk + record2, err := s.Read(ctx) + is.NoErr(err) + is.Equal(record2.Metadata["chunk_index"], "2") + + // Read until last chunk + var lastRecord opencdc.Record + for { + record, err := s.Read(ctx) + if errors.Is(err, sdk.ErrBackoffRetry) { + break + } + is.NoErr(err) + lastRecord = record + } + + is.Equal(lastRecord.Metadata["chunk_index"], lastRecord.Metadata["total_chunks"]) + + err = s.Teardown(ctx) + is.NoErr(err) + }) + + t.Run("resume from middle chunk", func(t *testing.T) { + is := is.New(t) + s := &Source{} + ctx := context.Background() + + err = s.Configure(ctx, configuration) + is.NoErr(err) + + pos := &Position{ + LastProcessedFileTimestamp: stat.ModTime().UTC(), + ChunkInfo: &ChunkInfo{ + Filename: "large.txt", + ChunkIndex: 2, + ModTime: stat.ModTime().UTC().Format(time.RFC3339), + TotalChunks: len(largeContent)/1024 + 1, + }, + } + posBytes, err := json.Marshal(pos) + is.NoErr(err) + + err = s.Open(ctx, opencdc.Position(posBytes)) + is.NoErr(err) + + record, err := s.Read(ctx) + is.NoErr(err) + is.Equal(record.Metadata["chunk_index"], "3") + + err = s.Teardown(ctx) + is.NoErr(err) + }) + + t.Run("file modified during chunking", func(t *testing.T) { + is := is.New(t) + s := &Source{} + ctx := context.Background() + + err = s.Configure(ctx, configuration) + is.NoErr(err) + + err = s.Open(ctx, nil) + is.NoErr(err) + + record1, err := s.Read(ctx) + is.NoErr(err) + is.Equal(record1.Metadata["chunk_index"], "1") + + // Modify file between chunks + newContent := strings.Repeat("New file content\n", 1000) + _, err = writeTestFile("large.txt", newContent) + is.NoErr(err) + + // Next read should detect modification and start over + record2, err := s.Read(ctx) + is.NoErr(err) + // Should start reading the modified file from beginning + is.Equal(record2.Metadata["chunk_index"], "1") + + err = s.Teardown(ctx) + is.NoErr(err) + }) +} + +func TestTeardown_NoOpen(t *testing.T) { + is := is.New(t) + con := NewSource() + err := con.Teardown(context.Background()) + is.NoErr(err) +} + +func setupHostKey() (string, error) { + cmd := exec.Command("ssh-keyscan", "-t", "rsa", "-p", "2222", "localhost") + output, err := cmd.Output() + if err != nil { + return "", fmt.Errorf("error setupHostKey: %w", err) + } + + return string(output), nil +} + +func writeTestFile(name string, content string) (os.FileInfo, error) { + sshConfig := &ssh.ClientConfig{ + User: "user", + Auth: []ssh.AuthMethod{ + ssh.Password("pass"), + }, + HostKeyCallback: ssh.InsecureIgnoreHostKey(), + } + + sshClient, err := ssh.Dial("tcp", "localhost:2222", sshConfig) + if err != nil { + return nil, fmt.Errorf("failed to dial SSH: %w", err) + } + defer sshClient.Close() + + client, err := sftp.NewClient(sshClient) + if err != nil { + return nil, fmt.Errorf("failed to create SFTP client: %w", err) + } + defer client.Close() + + remoteFilePath := fmt.Sprintf("/source/%s", name) + remoteFile, err := client.Create(remoteFilePath) + if err != nil { + return nil, fmt.Errorf("failed to create file: %w", err) + } + defer remoteFile.Close() + + stat, err := remoteFile.Stat() + if err != nil { + return nil, fmt.Errorf("failed to stat file: %w", err) + } + + _, err = remoteFile.Write([]byte(content)) + if err != nil { + return nil, fmt.Errorf("failed to write to file: %w", err) + } + + return stat, nil +} + +func removeTestFile(name string) error { + sshConfig := &ssh.ClientConfig{ + User: "user", + Auth: []ssh.AuthMethod{ + ssh.Password("pass"), + }, + HostKeyCallback: ssh.InsecureIgnoreHostKey(), + } + + sshClient, err := ssh.Dial("tcp", "localhost:2222", sshConfig) + if err != nil { + return fmt.Errorf("failed to dial SSH: %w", err) + } + defer sshClient.Close() + + client, err := sftp.NewClient(sshClient) + if err != nil { + return fmt.Errorf("failed to create SFTP client: %w", err) + } + defer client.Close() + + remoteFilePath := fmt.Sprintf("/source/%s", name) + err = client.Remove(remoteFilePath) + if err != nil { + return fmt.Errorf("failed to remove file: %w", err) + } + + return nil +} diff --git a/source/source_test.go b/source/source_test.go new file mode 100644 index 0000000..d9f9b07 --- /dev/null +++ b/source/source_test.go @@ -0,0 +1,108 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package source + +import ( + "context" + "testing" + + commonsConfig "github.com/conduitio/conduit-commons/config" + "github.com/matryer/is" +) + +func TestNewSource(t *testing.T) { + is := is.New(t) + + con := NewSource() + is.True(con != nil) + is.True(con.Parameters() != nil) +} + +func TestSource_Configure_allFieldsSuccess(t *testing.T) { + t.Parallel() + + is := is.New(t) + ctx := context.Background() + + s := Source{} + + // Test valid configuration using private key + err := s.Configure(ctx, commonsConfig.Config{ + "address": "sftp.example.com:22", + "username": "testuser", + "privateKeyPath": "/path/to/privatekey", + "hostKey": "ssh-rsa AAAAB3NzaC1...", + "directoryPath": "/path/to/directory", + }) + is.NoErr(err) + + // Test valid configuration using password + err = s.Configure(ctx, commonsConfig.Config{ + "address": "sftp.example.com:22", + "username": "testuser", + "password": "testpass", + "hostKey": "ssh-rsa AAAAB3NzaC1...", + "directoryPath": "/path/to/directory", + }) + is.NoErr(err) +} + +func TestSource_Configure_missingAuthentication(t *testing.T) { + t.Parallel() + + is := is.New(t) + ctx := context.Background() + + s := Source{} + + err := s.Configure(ctx, commonsConfig.Config{ + "address": "sftp.example.com:22", + "username": "testuser", + "hostKey": "ssh-rsa AAAAB3NzaC1...", + "directoryPath": "/path/to/directory", + }) + is.True(err != nil) + is.Equal(err.Error(), `error validating configuration: validate config: both "password" and "privateKeyPath" can not be empty`) +} + +func TestSource_Configure_missingHostKey(t *testing.T) { + t.Parallel() + + is := is.New(t) + ctx := context.Background() + + s := Source{} + + err := s.Configure(ctx, commonsConfig.Config{ + "address": "sftp.example.com:22", + "username": "testuser", + "password": "testpass", + "directoryPath": "/path/to/directory", + }) + is.True(err != nil) + is.Equal(err.Error(), `invalid config: config invalid: error validating "hostKey": required parameter is not provided`) +} + +func TestSource_Teardown_Success(t *testing.T) { + t.Parallel() + + is := is.New(t) + ctx := context.Background() + + s := &Source{} + + err := s.Teardown(ctx) + is.NoErr(err) +} diff --git a/source_test.go b/source_test.go deleted file mode 100644 index 7744363..0000000 --- a/source_test.go +++ /dev/null @@ -1,16 +0,0 @@ -package connectorname_test - -import ( - "context" - "testing" - - connectorname "github.com/conduitio/conduit-connector-connectorname" - "github.com/matryer/is" -) - -func TestTeardownSource_NoOpen(t *testing.T) { - is := is.New(t) - con := connectorname.NewSource() - err := con.Teardown(context.Background()) - is.NoErr(err) -} diff --git a/spec.go b/spec.go index cc4c562..29622d8 100644 --- a/spec.go +++ b/spec.go @@ -1,4 +1,18 @@ -package connectorname +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sftp import ( sdk "github.com/conduitio/conduit-connector-sdk" @@ -11,10 +25,11 @@ var version = "(devel)" // Specification returns the connector's specification. func Specification() sdk.Specification { return sdk.Specification{ - Name: "connectorname", - Summary: "", - Description: "", - Version: version, - Author: "", + Name: "sftp", + Summary: "SFTP source and destination connector.", + Description: "The SFTP connector is one of Conduit plugins. " + + "It provides both, a source and a destination SFTP connector.", + Version: version, + Author: "Meroxa, Inc.", } } diff --git a/test/docker-compose.yml b/test/docker-compose.yml index 210feb3..20292d5 100644 --- a/test/docker-compose.yml +++ b/test/docker-compose.yml @@ -1,6 +1,12 @@ -# Configure the services needed for the integration tests. -# More information at https://docs.docker.com/compose/ version: "3.9" + services: - hello-world: - image: "hello-world:latest" + sftp: + image: atmoz/sftp + ports: + - "2222:22" + restart: always + environment: + - USER=user + - PASS=pass + command: user:pass:::source,destination \ No newline at end of file