Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination connector implementation. #1

Open
wants to merge 23 commits into
base: feat/source
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ The connector supports both password and private key authentication methods.

## Destination

Destination connects to a remote server. It takes an `opencdc.Record`, extracts filename from the metadata and upload the file to the remote server. The connector supports both password and private key authentication methods.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chunking is a nice feature and it would be good to mention in the readme that we support it and in which way (what metadata is needed, and so on), so that other sources (not only SFTP) can use it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mentioned.

### Configuration Options

![scarf pixel](https://static.scarf.sh/a.png?x-pxid=64b333ae-77ad-4895-a5cd-a73bb14362d9)
| name | description | required |
| -------------- | ----------------------------------------------------------------------------------------------------- | -------- |
| `address` | Address is the address of the sftp server to connect.| **true** |
| `hostKey` | HostKey is the key used for host key callback validation.| **true** |
| `username`| User is the username of the SFTP user. | **true** |
| `password`| Password is the SFTP password (can be used as passphrase for private key). | false |
| `privateKeyPath`| PrivateKeyPath is the private key for ssh login.| false |
| `directoryPath` | DirectoryPath is the path to the directory to read/write data. | true |

![scarf pixel](https://static.scarf.sh/a.png?x-pxid=64b333ae-77ad-4895-a5cd-a73bb14362d9)
94 changes: 94 additions & 0 deletions acceptance_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sftp

import (
"fmt"
"os/exec"
"sync/atomic"
"testing"
"time"

"github.com/conduitio-labs/conduit-connector-sftp/config"
"github.com/conduitio-labs/conduit-connector-sftp/destination"
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-connector-sdk"
)

type driver struct {
sdk.ConfigurableAcceptanceTestDriver
id int64
}

func (d *driver) GenerateRecord(_ *testing.T, _ opencdc.Operation) opencdc.Record {
atomic.AddInt64(&d.id, 1)

content := []byte("hello world")
filename := fmt.Sprintf("%d.txt", d.id)

return sdk.Util.Source.NewRecordCreate(
nil,
map[string]string{
opencdc.MetadataCollection: "upload",
opencdc.MetadataCreatedAt: time.Now().UTC().Format(time.RFC3339),
"filename": filename,
"source_path": "/upload",
"file_size": fmt.Sprintf("%d", len(content)),
"mod_time": time.Now().UTC().Format(time.RFC3339),
},
opencdc.StructuredData{"filename": filename},
opencdc.RawData(content),
)
}

func (d *driver) ReadFromDestination(_ *testing.T, records []opencdc.Record) []opencdc.Record {
return records
}

func TestAcceptance(t *testing.T) {
hostKey, err := setupHostKey()
if err != nil {
fmt.Println(err)
return
}

sdk.AcceptanceTest(t, &driver{
ConfigurableAcceptanceTestDriver: sdk.ConfigurableAcceptanceTestDriver{
Config: sdk.ConfigurableAcceptanceTestDriverConfig{
Connector: sdk.Connector{
NewSpecification: Specification,
NewDestination: destination.NewDestination,
NewSource: nil,
},
DestinationConfig: map[string]string{
config.ConfigAddress: "localhost:2222",
config.ConfigHostKey: hostKey,
config.ConfigUsername: "user",
config.ConfigPassword: "pass",
config.ConfigDirectoryPath: "/upload",
},
},
},
})
}

func setupHostKey() (string, error) {
cmd := exec.Command("ssh-keyscan", "-t", "rsa", "-p", "2222", "localhost")
output, err := cmd.Output()
if err != nil {
return "", fmt.Errorf("error setupHostKey: %w", err)
}
return string(output), nil
}
98 changes: 98 additions & 0 deletions common/sshauth.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package common

import (
"bytes"
"fmt"
"net"
"os"

"golang.org/x/crypto/ssh"
)

var ErrUntrustedKey = fmt.Errorf("host key does not match the trusted key")

type MismatchKeyTypeError struct {
key1, key2 string
}

func (e MismatchKeyTypeError) Error() string {
return fmt.Sprintf("host key type mismatch: got %s, want %s", e.key1, e.key2)
}

func NewMismatchKeyTypeError(key1, key2 string) MismatchKeyTypeError {
return MismatchKeyTypeError{key1, key2}
}

func SSHConfigAuth(remoteHostKey, username, password, privateKeyPath string) (*ssh.ClientConfig, error) {
//nolint:dogsled // not required here.
hostKey, _, _, _, err := ssh.ParseAuthorizedKey([]byte(remoteHostKey))
if err != nil {
return nil, fmt.Errorf("failed to parse server public key: %w", err)
}

hostKeyCallback := func(_ string, _ net.Addr, key ssh.PublicKey) error {
if key.Type() != hostKey.Type() {
return NewMismatchKeyTypeError(key.Type(), hostKey.Type())
}

if !bytes.Equal(key.Marshal(), hostKey.Marshal()) {
return ErrUntrustedKey
}

return nil
}

sshConfig := &ssh.ClientConfig{
User: username,
HostKeyCallback: hostKeyCallback,
}

if privateKeyPath != "" {
auth, err := authWithPrivateKey(privateKeyPath, password)
if err != nil {
return nil, err
}

sshConfig.Auth = []ssh.AuthMethod{auth}
return sshConfig, nil
}

sshConfig.Auth = []ssh.AuthMethod{ssh.Password(password)}
return sshConfig, nil
}

func authWithPrivateKey(privateKeyPath, password string) (ssh.AuthMethod, error) {
key, err := os.ReadFile(privateKeyPath)
if err != nil {
return nil, fmt.Errorf("failed to read private key file: %w", err)
}

if password != "" {
signer, err := ssh.ParsePrivateKeyWithPassphrase(key, []byte(password))
if err != nil {
return nil, fmt.Errorf("failed to parse private key: %w", err)
}
return ssh.PublicKeys(signer), nil
}

signer, err := ssh.ParsePrivateKey(key)
if err != nil {
return nil, fmt.Errorf("failed to parse private key: %w", err)
}

return ssh.PublicKeys(signer), nil
}
6 changes: 4 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package config

import "fmt"
import (
"fmt"
)

var ErrEmptyAuthFields = fmt.Errorf("both %q and %q can not be empty", ConfigPassword, ConfigPrivateKeyPath)

Expand All @@ -26,7 +28,7 @@ type Config struct {
Address string `json:"address" validate:"required"`
// HostKey is the key used for host key callback validation.
HostKey string `json:"hostKey" validate:"required"`
// User is the SFTP user.
// User is the username of the SFTP user.
Username string `json:"username" validate:"required"`
// Password is the SFTP password (can be used as passphrase for private key).
Password string `json:"password"`
Expand Down
2 changes: 1 addition & 1 deletion config/paramgen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
package sftp

import (
"github.com/conduitio-labs/conduit-connector-sftp/destination"
source "github.com/conduitio-labs/conduit-connector-sftp/source"
sdk "github.com/conduitio/conduit-connector-sdk"
)

// Connector combines all constructors for each plugin in one struct.
var Connector = sdk.Connector{
NewSpecification: Specification,
NewDestination: destination.NewDestination,
NewSource: source.NewSource,
NewDestination: nil,
}
Loading
Loading