Skip to content

Commit

Permalink
feat: add ssh tunneling support
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Mar 21, 2024
1 parent 87f2a09 commit 0847a04
Show file tree
Hide file tree
Showing 25 changed files with 509 additions and 13 deletions.
13 changes: 9 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,24 @@ go 1.22.0
require (
cloud.google.com/go v0.112.1
cloud.google.com/go/bigquery v1.59.1
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5
github.com/databricks/databricks-sql-go v1.5.3
github.com/dlclark/regexp2 v1.11.0
github.com/gliderlabs/ssh v0.3.7
github.com/go-sql-driver/mysql v1.7.1
github.com/google/uuid v1.6.0
github.com/lib/pq v1.10.9
github.com/ory/dockertest/v3 v3.10.0
github.com/rudderlabs/rudder-go-kit v0.23.2
github.com/rudderlabs/sql-tunnels v0.1.6
github.com/samber/lo v1.39.0
github.com/sirupsen/logrus v1.9.3
github.com/snowflakedb/gosnowflake v1.7.2
github.com/stretchr/testify v1.9.0
github.com/tidwall/gjson v1.17.1
github.com/tidwall/sjson v1.2.5
github.com/trinodb/trino-go-client v0.313.0
golang.org/x/crypto v0.21.0
google.golang.org/api v0.170.0
)

Expand All @@ -33,7 +39,8 @@ require (
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
github.com/Microsoft/go-winio v0.6.0 // indirect
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/andybalholm/brotli v1.0.6 // indirect
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be // indirect
github.com/apache/arrow/go/v12 v12.0.1 // indirect
github.com/apache/arrow/go/v14 v14.0.2 // indirect
github.com/apache/thrift v0.17.0 // indirect
Expand Down Expand Up @@ -78,7 +85,6 @@ require (
github.com/google/flatbuffers v23.5.26+incompatible // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.2 // indirect
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
Expand All @@ -101,12 +107,12 @@ require (
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 // indirect
github.com/opencontainers/runc v1.1.12 // indirect
github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/rs/zerolog v1.28.0 // indirect
github.com/tidwall/gjson v1.17.1 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
Expand All @@ -119,7 +125,6 @@ require (
go.opentelemetry.io/otel v1.24.0 // indirect
go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/net v0.22.0 // indirect
Expand Down
14 changes: 12 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,18 @@ github.com/Microsoft/go-winio v0.6.0 h1:slsWYD/zyx7lCXoZVlvQrj0hPTM1HI4+v1sIda2y
github.com/Microsoft/go-winio v0.6.0/go.mod h1:cTAf44im0RAYeL23bpB+fzCyDH2MJiz2BO69KH/soAE=
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw=
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/andybalholm/brotli v1.0.6 h1:Yf9fFpf49Zrxb9NlQaluyE92/+X7UVHlhMNJN2sxfOI=
github.com/andybalholm/brotli v1.0.6/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8=
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4=
github.com/apache/arrow/go/v12 v12.0.1 h1:JsR2+hzYYjgSUkBSaahpqCetqZMr76djX80fF/DiJbg=
github.com/apache/arrow/go/v12 v12.0.1/go.mod h1:weuTY7JvTG/HDPtMQxEUp7pU73vkLWMLpY67QwZ/WWw=
github.com/apache/arrow/go/v14 v14.0.2 h1:N8OkaJEOfI3mEZt07BIkvo4sC6XDbL+48MBPWO5IONw=
github.com/apache/arrow/go/v14 v14.0.2/go.mod h1:u3fgh3EdgN/YQ8cVQRguVW3R+seMybFg8QBQ5LU+eBY=
github.com/apache/thrift v0.17.0 h1:cMd2aj52n+8VoAtvSvLn4kDC3aZ6IAkBuqWQ2IDu7wo=
github.com/apache/thrift v0.17.0/go.mod h1:OLxhMRJxomX+1I/KUw03qoV3mMz16BwaKI+d4fPBx7Q=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/aws/aws-sdk-go-v2 v1.17.7 h1:CLSjnhJSTSogvqUGhIC6LqFKATMRexcxLZ0i/Nzk9Eg=
github.com/aws/aws-sdk-go-v2 v1.17.7/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 h1:dK82zF6kkPeCo8J1e+tGx4JdvDIQzj7ygIoLg8WMuGs=
Expand Down Expand Up @@ -137,6 +141,8 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU=
github.com/gabriel-vasile/mimetype v1.4.2/go.mod h1:zApsH/mKG4w07erKIaJPFiX0Tsq9BFQgN3qGY5GnNgA=
github.com/gliderlabs/ssh v0.3.7 h1:iV3Bqi942d9huXnzEF2Mt+CY9gLu8DNM4Obd+8bODRE=
github.com/gliderlabs/ssh v0.3.7/go.mod h1:zpHEXBstFnQYtGnB8k8kQLol82umzn/2/snG7alWVD8=
github.com/go-jose/go-jose/v3 v3.0.0/go.mod h1:RNkWWRld676jZEYoV3+XK8L2ZnNSvIsxFMht0mSX+u8=
github.com/go-jose/go-jose/v3 v3.0.3 h1:fFKWeig/irsp7XD2zBxvnmA/XaRWp5V3CBsZXJF7G7k=
github.com/go-jose/go-jose/v3 v3.0.3/go.mod h1:5b+7YgP7ZICgJDBdfjZaIt+H/9L9T/YQrVfLAMboGkQ=
Expand Down Expand Up @@ -268,6 +274,8 @@ github.com/opencontainers/runc v1.1.12 h1:BOIssBaW1La0/qbNZHXOOa71dZfZEQOzW7dqQf
github.com/opencontainers/runc v1.1.12/go.mod h1:S+lQwSfncpBha7XTy/5lBwWgm5+y5Ma/O44Ekby9FK8=
github.com/ory/dockertest/v3 v3.10.0 h1:4K3z2VMe8Woe++invjaTB7VRyQXQy5UY+loujO4aNE4=
github.com/ory/dockertest/v3 v3.10.0/go.mod h1:nr57ZbRWMqfsdGdFNLHz5jjNdDb7VVFnzAeW1n5N1Lg=
github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5 h1:Ii+DKncOVM8Cu1Hc+ETb5K+23HdAMvESYE3ZJ5b5cMI=
github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5/go.mod h1:iIss55rKnNBTvrwdmkUpLnDpZoAHvWaiq5+iMmen4AE=
github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ=
github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 h1:KoWmjvw+nsYOo29YJK9vDA65RGE3NrOnUtO7a+RF9HU=
Expand All @@ -286,6 +294,8 @@ github.com/rs/zerolog v1.28.0 h1:MirSo27VyNi7RJYP3078AA1+Cyzd2GB66qy3aUHvsWY=
github.com/rs/zerolog v1.28.0/go.mod h1:NILgTygv/Uej1ra5XxGf82ZFSLk58MFGAUS2o6usyD0=
github.com/rudderlabs/rudder-go-kit v0.23.2 h1:leRqvPJhWEru+jC6qYTNoWIJfKFAbwJ4GbOtlRyQQKY=
github.com/rudderlabs/rudder-go-kit v0.23.2/go.mod h1:NlvfItvDEjR26akv5D/ZMH1A3F8DZKo4ifRwLvq/u60=
github.com/rudderlabs/sql-tunnels v0.1.6 h1:v2KA2cq8ZV5LXRJQpqigq1Q4V64oDL+XlfckW/0K2/4=
github.com/rudderlabs/sql-tunnels v0.1.6/go.mod h1:Jew/XwojzFTK4KTDj/wvChI7iZCgAKYyKjFK1nnHlNM=
github.com/samber/lo v1.39.0 h1:4gTz1wUhNYLhFSKl6O+8peW0v2F4BCY034GRpU9WnuA=
github.com/samber/lo v1.39.0/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
Expand Down
17 changes: 14 additions & 3 deletions sqlconnect/internal/base/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@ package base

import (
"database/sql"
"errors"
"fmt"

"github.com/samber/lo"

"github.com/rudderlabs/sqlconnect-go/sqlconnect"
)

func NewDB(db *sql.DB, opts ...Option) *DB {
func NewDB(db *sql.DB, tunnelCloser func() error, opts ...Option) *DB {
d := &DB{
DB: db,
Dialect: dialect{},
DB: db,
Dialect: dialect{},
tunnelCloser: tunnelCloser,
columnTypeMapper: func(c ColumnType) string {
return c.DatabaseTypeName()
},
Expand Down Expand Up @@ -76,12 +78,21 @@ func NewDB(db *sql.DB, opts ...Option) *DB {
type DB struct {
*sql.DB
sqlconnect.Dialect
tunnelCloser func() error // closer for the ssh tunnel to be called on close

columnTypeMapper func(ColumnType) string // map from database type to rudder type
jsonRowMapper func(databaseTypeName string, value any) any
sqlCommands SQLCommands
}

// Close closes the db and the tunnel
func (d *DB) Close() error {
return errors.Join(
d.DB.Close(), // first close the db
d.tunnelCloser(), // then close the tunnel
)
}

type ColumnType interface {
DatabaseTypeName() string
DecimalSize() (precision, scale int64, ok bool)
Expand Down
2 changes: 2 additions & 0 deletions sqlconnect/internal/bigquery/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/rudderlabs/sqlconnect-go/sqlconnect"
"github.com/rudderlabs/sqlconnect-go/sqlconnect/internal/base"
"github.com/rudderlabs/sqlconnect-go/sqlconnect/internal/bigquery/driver"
"github.com/rudderlabs/sqlconnect-go/sqlconnect/internal/sshtunnel"
)

const (
Expand All @@ -32,6 +33,7 @@ func NewDB(configJSON json.RawMessage) (*DB, error) {
return &DB{
DB: base.NewDB(
db,
sshtunnel.NoTunnelCloser,
base.WithDialect(dialect{}),
base.WithColumnTypeMapper(getColumnTypeMapper(config)),
base.WithJsonRowMapper(getJonRowMapper(config)),
Expand Down
16 changes: 14 additions & 2 deletions sqlconnect/internal/databricks/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package databricks
import (
"encoding/json"
"time"

"github.com/rudderlabs/sqlconnect-go/sqlconnect/internal/sshtunnel"
)

type Config struct {
Expand All @@ -12,6 +14,8 @@ type Config struct {
Token string `json:"token"`
Catalog string `json:"catalog"`

TunnelInfo *sshtunnel.Config `json:"tunnel_info,omitempty"`

RetryAttempts int `json:"retryAttempts"`
MinRetryWaitTime time.Duration `json:"minRetryWaitTime"`
MaxRetryWaitTime time.Duration `json:"maxRetryWaitTime"`
Expand All @@ -20,10 +24,18 @@ type Config struct {
UseLegacyMappings bool `json:"useLegacyMappings"`
}

func (c *Config) Parse(configJson json.RawMessage) error {
err := json.Unmarshal(configJson, c)
func (c *Config) Parse(input json.RawMessage) error {
err := json.Unmarshal(input, c)
if err != nil {
return err
}
if c.TunnelInfo == nil { // if tunnel info is not provided as a separate json object, try to parse it from the input
if c.TunnelInfo, err = sshtunnel.ParseInlineConfig(input); err != nil {
return err

Check warning on line 34 in sqlconnect/internal/databricks/config.go

View check run for this annotation

Codecov / codecov/patch

sqlconnect/internal/databricks/config.go#L34

Added line #L34 was not covered by tests
}
}
if c.Catalog == "" {
c.Catalog = "hive_metastore" // default catalog
}
return nil
}
23 changes: 22 additions & 1 deletion sqlconnect/internal/databricks/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/rudderlabs/sqlconnect-go/sqlconnect"
"github.com/rudderlabs/sqlconnect-go/sqlconnect/internal/base"
"github.com/rudderlabs/sqlconnect-go/sqlconnect/internal/sshtunnel"
)

const (
Expand All @@ -25,7 +26,7 @@ func NewDB(configJson json.RawMessage) (*DB, error) {
return nil, err
}

connector, err := databricks.NewConnector(
opts := newOpts(
databricks.WithAccessToken(config.Token),
databricks.WithServerHostname(config.Host),
databricks.WithPort(config.Port),
Expand All @@ -38,6 +39,18 @@ func NewDB(configJson json.RawMessage) (*DB, error) {
),
databricks.WithUserAgentEntry("Rudderstack"),
)
tunnelCloser := sshtunnel.NoTunnelCloser
if config.TunnelInfo != nil {
tunnel, err := sshtunnel.NewSocks5Tunnel(*config.TunnelInfo)
if err != nil {
return nil, err

Check warning on line 46 in sqlconnect/internal/databricks/db.go

View check run for this annotation

Codecov / codecov/patch

sqlconnect/internal/databricks/db.go#L46

Added line #L46 was not covered by tests
}
tunnelCloser = tunnel.Close
// Use a custom http transport in the client to route the connection through the tunnel's socks5 proxy
opts = append(opts, databricks.WithTransport(sshtunnel.Socks5HTTPTransport(tunnel.Host(), tunnel.Port())))
}

connector, err := databricks.NewConnector(opts...)
if err != nil {
return nil, err
}
Expand All @@ -53,6 +66,7 @@ func NewDB(configJson json.RawMessage) (*DB, error) {
return &DB{
DB: base.NewDB(
db,
tunnelCloser,
base.WithDialect(dialect{}),
base.WithColumnTypeMapper(getColumnTypeMapper(config)),
base.WithJsonRowMapper(getJonRowMapper(config)),
Expand Down Expand Up @@ -130,3 +144,10 @@ func getJonRowMapper(config Config) func(databaseTypeName string, value any) any
}
return jsonRowMapper
}

// This is required because databricks connection option types are unexported...
func newOpts[T any](args ...T) []T {
var slice []T
slice = append(slice, args...)
return slice
}
3 changes: 3 additions & 0 deletions sqlconnect/internal/databricks/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,7 @@ func TestDatabricksDB(t *testing.T) {

integrationtest.TestDatabaseScenarios(t, databricks.DatabaseType, []byte(configJSON), strings.ToLower, integrationtest.Options{LegacySupport: true})
})
integrationtest.TestDatabaseScenarios(t, databricks.DatabaseType, []byte(configJSON), strings.ToLower, integrationtest.Options{LegacySupport: true})

integrationtest.TestSshTunnelScenarios(t, databricks.DatabaseType, []byte(configJSON))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package integrationtest

import (
"context"
"encoding/json"
"fmt"
"io"
"strconv"
"testing"

sshx "github.com/gliderlabs/ssh"
"github.com/stretchr/testify/require"
"github.com/tidwall/sjson"
"golang.org/x/crypto/ssh"

kithelper "github.com/rudderlabs/rudder-go-kit/testhelper"
tunnelhelper "github.com/rudderlabs/sql-tunnels/tunnel/testhelper"
"github.com/rudderlabs/sqlconnect-go/sqlconnect"
"github.com/rudderlabs/sqlconnect-go/sqlconnect/internal/sshtunnel"
)

func TestSshTunnelScenarios(t *testing.T, warehouse string, configJSON json.RawMessage) {
sshPort, err := kithelper.GetFreePort()
require.NoError(t, err, "it should be able to get a free port")
server, privateKey := newSshServer(t, sshPort)
t.Cleanup(func() {
err := server.Close()
require.NoError(t, err, "it should be able to close the ssh server")
})
tunnelConfig := sshtunnel.Config{
User: "root",
Host: "127.0.0.1",
Port: strconv.Itoa(sshPort),
PrivateKey: string(privateKey),
}

configJSON, err = sjson.SetBytes(configJSON, "tunnel_info", tunnelConfig)
require.NoError(t, err, "it should be able to set the tunnel info in the config")

t.Run("ssh tunnel", func(t *testing.T) {
db, err := sqlconnect.NewDB(warehouse, configJSON)
require.NoError(t, err, "it should be able to create a new DB")
defer func() { _ = db.Close() }()
err = db.Ping()
require.NoError(t, err, "it should be able to ping the db")
_, err = db.ListSchemas(context.Background())
require.NoError(t, err, "it should be able to list schemas")
require.GreaterOrEqual(t, server.connections, 1, "ssh server should have received at least 1 connection")
})
}

type testsshserver struct {
*sshx.Server

connections int
}

func (s *testsshserver) DirectTCPIPHandler(srv *sshx.Server, conn *ssh.ServerConn, newChan ssh.NewChannel, ctx sshx.Context) {
s.connections++
sshx.DirectTCPIPHandler(srv, conn, newChan, ctx)
}

func newSshServer(t *testing.T, port int) (server *testsshserver, privateKey []byte) {
t.Helper()
server = &testsshserver{}
var publicKey []byte
privateKey, publicKey = tunnelhelper.SSHKeyPairs(t)
pkey, _, _, _, err := sshx.ParseAuthorizedKey(publicKey)
require.NoError(t, err)

server.Server = &sshx.Server{
LocalPortForwardingCallback: sshx.LocalPortForwardingCallback(func(ctx sshx.Context, dhost string, dport uint32) bool {
return true
}),
Addr: fmt.Sprintf("0.0.0.0:%d", port),
Handler: sshx.Handler(func(s sshx.Session) {
_, _ = io.WriteString(s, "Remote forwarding available...\n")
select {}

Check warning on line 78 in sqlconnect/internal/integration_test/sshtunnel_integration_test_scenario.go

View check run for this annotation

Codecov / codecov/patch

sqlconnect/internal/integration_test/sshtunnel_integration_test_scenario.go#L76-L78

Added lines #L76 - L78 were not covered by tests
}),
ReversePortForwardingCallback: sshx.ReversePortForwardingCallback(func(ctx sshx.Context, host string, port uint32) bool {
return true
}),

Check warning on line 82 in sqlconnect/internal/integration_test/sshtunnel_integration_test_scenario.go

View check run for this annotation

Codecov / codecov/patch

sqlconnect/internal/integration_test/sshtunnel_integration_test_scenario.go#L80-L82

Added lines #L80 - L82 were not covered by tests
PublicKeyHandler: func(ctx sshx.Context, key sshx.PublicKey) bool {
return sshx.KeysEqual(key, pkey)
},
ChannelHandlers: map[string]sshx.ChannelHandler{
"direct-tcpip": server.DirectTCPIPHandler,
"session": sshx.DefaultSessionHandler,
},
}
go func() {
err := server.ListenAndServe()
require.Equal(t, sshx.ErrServerClosed, err)
}()

return
}
Loading

0 comments on commit 0847a04

Please sign in to comment.