Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SFTP Backend #16

Merged
merged 8 commits into from
Sep 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Straw

Straw is a streaming I/O abstraction, intended to provide a nice way to interact with various stream (blob) storage backends.

Currently it supports local filesystem and aws s3 as storage options.
Currently it supports local filesystem aws s3, and sftp as storage options.

It is not intended to be a general purpose VFS style API, instead focussing on cleanly and portably supporting streaming reads and streaming writes of entire objects.

Expand Down
169 changes: 169 additions & 0 deletions straw_sftp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package straw

import (
"errors"
"fmt"
"io"
"net/url"
"os"
"path/filepath"
"sort"
"strings"
"sync"

"github.com/pkg/sftp"
"golang.org/x/crypto/ssh"
)

var _ StreamStore = &SFTPStreamStore{}

type SFTPStreamStore struct {
sshClient *ssh.Client
sftpClient *sftp.Client
}

func NewSFTPStreamStore(urlString string) (*SFTPStreamStore, error) {
u, err := url.Parse(urlString)
if err != nil {
return nil, err
}

pass, passSet := u.User.Password()
if u.User.Username() == "" || !passSet {
return nil, errors.New("username and password are required in the url")
}

config := &ssh.ClientConfig{
User: u.User.Username(),
Auth: []ssh.AuthMethod{
ssh.Password(pass),
},
HostKeyCallback: ssh.InsecureIgnoreHostKey(), // TODO: allow passing host key as url param. ssh.FixedHostKey(hostKey),
}

client, err := ssh.Dial("tcp", u.Host, config)
if err != nil {
return nil, err
}

sclient, err := sftp.NewClient(client)
if err != nil {
client.Close()
return nil, err
}

ss := &SFTPStreamStore{client, sclient}

return ss, nil
}

func (s *SFTPStreamStore) Lstat(filename string) (os.FileInfo, error) {
return s.sftpClient.Lstat(filename)
}

func (s *SFTPStreamStore) Stat(filename string) (os.FileInfo, error) {
return s.sftpClient.Stat(filename)
}

func (s *SFTPStreamStore) Mkdir(path string, mode os.FileMode) error {
err := s.sftpClient.Mkdir(path)
if err != nil && strings.Contains(err.Error(), ": file exists") {
d, _ := filepath.Split(path)
return fmt.Errorf("%s file exists", d)
mjgarton marked this conversation as resolved.
Show resolved Hide resolved
}
return err
}

func (s *SFTPStreamStore) OpenReadCloser(name string) (StrawReader, error) {
sr, err := s.sftpClient.Open(name)
if err != nil {
return nil, err
}
fi, err := sr.Stat()
if err != nil {
sr.Close()
return nil, err
}
if fi.IsDir() {
sr.Close()
return nil, fmt.Errorf("%s is a directory", name)
}
return &sftpReader{f: sr}, nil
}

func (s *SFTPStreamStore) Remove(name string) error {
err := s.sftpClient.Remove(name)
if err != nil && strings.Contains(err.Error(), ": directory not empty") {
return fmt.Errorf("%s directory not empty", name)
}
return err
}

func (s *SFTPStreamStore) CreateWriteCloser(name string) (StrawWriter, error) {
fi, err := s.Stat(name)
if err == nil && fi.IsDir() {
return nil, fmt.Errorf("%s is a directory", name)
}

sw, err := s.sftpClient.Create(name)
if err != nil && strings.Contains(err.Error(), ": not a directory") {
d, _ := filepath.Split(name)
return nil, fmt.Errorf("%s not a directory", d)
}
return sw, nil
}

func (s *SFTPStreamStore) Readdir(name string) ([]os.FileInfo, error) {
fi, err := s.sftpClient.ReadDir(name)
if err != nil {
return nil, err
}
sort.Slice(fi, func(i, j int) bool { return fi[i].Name() < fi[j].Name() })
mjgarton marked this conversation as resolved.
Show resolved Hide resolved
return fi, nil
}

type sftpReader struct {
lk sync.Mutex
f *sftp.File
}

func (r *sftpReader) Close() error {
r.lk.Lock()
defer r.lk.Unlock()
return r.f.Close()
}

func (r *sftpReader) Read(buf []byte) (int, error) {
r.lk.Lock()
defer r.lk.Unlock()
return r.f.Read(buf)
}

func (r *sftpReader) ReadAt(buf []byte, offset int64) (int, error) {
r.lk.Lock()
defer r.lk.Unlock()
// get current offset
oldOffset, err := r.f.Seek(0, io.SeekCurrent)
if err != nil {
return 0, err
}
// seek to offset
off, err := r.f.Seek(offset, io.SeekStart)
if err != nil {
return 0, err
}
if off != offset {
panic("bug in offset handling")
}
j, err := io.ReadFull(r.f, buf)
if err == io.ErrUnexpectedEOF {
err = io.EOF
}

// put the original offset back
if _, err := r.f.Seek(oldOffset, io.SeekStart); err != nil {
return j, err
}

return j, err
}
110 changes: 110 additions & 0 deletions straw_test.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
package straw

import (
"crypto/ed25519"
"crypto/rand"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"os"
"path/filepath"
"reflect"
"strings"
"testing"

"github.com/pkg/sftp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/crypto/ssh"
)

type fsTester struct {
Expand Down Expand Up @@ -615,6 +620,111 @@ func TestS3FS(t *testing.T) {
testFS(t, "s3fs", func() StreamStore { return &TestLogStreamStore{t, s3fs} }, "/")
}

func TestSFTPFS(t *testing.T) {
go startSFTPServer()

sftpfs, err := NewSFTPStreamStore("sftp://test:tiger@localhost:9922/")
if err != nil {
t.Fatal(err)
}
dir, err := ioutil.TempDir("", "straw_sftp_test")
if err != nil {
t.Fatal(err)
}
testFS(t, "sftpfs", func() StreamStore { return &TestLogStreamStore{t, sftpfs} }, dir)
}

func startSFTPServer() {
config := &ssh.ServerConfig{
PasswordCallback: func(c ssh.ConnMetadata, pass []byte) (*ssh.Permissions, error) {
log.Printf("Login: %s\n", c.User())
if c.User() == "test" && string(pass) == "tiger" {
return nil, nil
}
return nil, fmt.Errorf("password rejected for %q", c.User())
},
}

_, priv, err := ed25519.GenerateKey(rand.Reader)
if err != nil {
log.Fatal(err)
}

private, err := ssh.NewSignerFromKey(priv)
if err != nil {
log.Fatal(err)
}

config.AddHostKey(private)

listener, err := net.Listen("tcp", "0.0.0.0:9922")
if err != nil {
log.Fatal("failed to listen for connection", err)
}
log.Printf("Listening on %v\n", listener.Addr())

nConn, err := listener.Accept()
if err != nil {
log.Fatal("failed to accept incoming connection", err)
}

_, chans, reqs, err := ssh.NewServerConn(nConn, config)
if err != nil {
log.Fatal("failed to handshake", err)
}
log.Printf("SSH server established\n")

go ssh.DiscardRequests(reqs)

for newChannel := range chans {
log.Printf("Incoming channel: %s\n", newChannel.ChannelType())
if newChannel.ChannelType() != "session" {
newChannel.Reject(ssh.UnknownChannelType, "unknown channel type")
log.Printf("Unknown channel type: %s\n", newChannel.ChannelType())
continue
}
channel, requests, err := newChannel.Accept()
if err != nil {
log.Fatal("could not accept channel.", err)
}
log.Printf("Channel accepted\n")

go func(in <-chan *ssh.Request) {
for req := range in {
log.Printf("Request: %v\n", req.Type)
ok := false
switch req.Type {
case "subsystem":
log.Printf("Subsystem: %s\n", req.Payload[4:])
if string(req.Payload[4:]) == "sftp" {
ok = true
}
}
log.Printf(" - accepted: %v\n", ok)
req.Reply(ok, nil)
}
}(requests)

serverOptions := []sftp.ServerOption{
sftp.WithDebug(log.Writer()),
}

server, err := sftp.NewServer(
channel,
serverOptions...,
)
if err != nil {
log.Fatal(err)
}
if err := server.Serve(); err == io.EOF {
server.Close()
log.Print("sftp client exited session.")
} else if err != nil {
log.Fatal("sftp server completed with error:", err)
}
}
}

func testFS(t *testing.T, name string, fsProvider func() StreamStore, rootDir string) {
tester := &fsTester{name, nil, fsProvider, rootDir}

Expand Down