Skip to content


Use embedded ssm client
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshuaWilkes committed Jul 23, 2024
1 parent 87a3ae6 commit ab74ba0
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 209 deletions.
246 changes: 52 additions & 194 deletions cmd/cli/command/aws/rds/rds.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package rds

import (
Expand All @@ -24,7 +23,6 @@ import (
awsConfig ""

Expand All @@ -39,7 +37,6 @@ import (
accessv1alpha1 ""
Expand Down Expand Up @@ -87,12 +84,6 @@ var proxyCommand = cli.Command{
return err

// ensure required CLI tools are installed
err = CheckDependencies()
if err != nil {
return err

target := c.String("target")
role := c.String("role")
client := access.NewFromConfig(cfg)
Expand Down Expand Up @@ -363,7 +354,7 @@ var proxyCommand = cli.Command{
mysqlPort := strconv.Itoa((c.Int("mysql-port")))
postgresPort := strconv.Itoa((c.Int("postgres-port")))

notifyCh := make(chan struct{})
notifyCh := make(chan struct{}, 10)

awscfg, err := awsConfig.LoadDefaultConfig(ctx, awsConfig.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(creds.AccessKeyID, creds.SecretAccessKey, creds.SessionToken)))
if err != nil {
Expand All @@ -372,12 +363,22 @@ var proxyCommand = cli.Command{
awscfg.Region = commandData.GrantOutput.Database.Region
ssmClient := ssm.NewFromConfig(awscfg)

var cmd *exec.Cmd
// listen for interrupt signals and forward them on
// listen for a context cancellation

// Set up a channel to receive OS signals
sigs := make(chan os.Signal, 1)
// Notify sigs on os.Interrupt (Ctrl+C)
signal.Notify(sigs, os.Interrupt, syscall.SIGTERM)

ctx, cancel := context.WithCancel(ctx)
defer cancel()
eg, ctx := errgroup.WithContext(ctx)

var sessionOutput *ssm.StartSessionOutput
// in local dev you can skip using ssm and just use a local port forward instead
if os.Getenv("CF_DEV_PROXY") == "true" {
cmd = exec.Command("socat", fmt.Sprintf("TCP-LISTEN:%s,fork", commandData.SSMPortForwardLocalPort), fmt.Sprintf("TCP:", commandData.SSMPortForwardServerPort))
commandData.SSMPortForwardLocalPort = commandData.SSMPortForwardServerPort
go func() { notifyCh <- struct{}{} }()
} else {
documentName := "AWS-StartPortForwardingSession"
Expand All @@ -395,66 +396,49 @@ var proxyCommand = cli.Command{
if err != nil {
return err
eg.Go(func() error {
clientId := uuid.New().String()
ssmSession := session.Session{
StreamUrl: *sessionOutput.StreamUrl,
SessionId: *sessionOutput.SessionId,
TokenValue: *sessionOutput.TokenValue,
IsAwsCliUpgradeNeeded: false,
Endpoint: "localhost:" + commandData.SSMPortForwardLocalPort,
DataChannel: &datachannel.DataChannel{},
ClientId: clientId,


log := log.Logger(true, "session-manager-plugin")

clientId := uuid.New().String()
ssmSession := session.Session{
StreamUrl: *sessionOutput.StreamUrl,
SessionId: *sessionOutput.SessionId,
TokenValue: *sessionOutput.TokenValue,
IsAwsCliUpgradeNeeded: false,
Endpoint: "localhost:" + commandData.SSMPortForwardLocalPort,
DataChannel: &datachannel.DataChannel{},
ClientId: clientId,

clio.Debugw("running aws ssm command", "command", "aws "+strings.Join(formatSSMCommandArgs(commandData), " "))

si = spinner.New(spinner.CharSets[14], 100*time.Millisecond)
si.Suffix = " Starting database proxy..."
si.Writer = os.Stderr
defer si.Stop()

// cmd.Stderr = io.MultiWriter(NewNotifyingWriter(io.Discard, "Waiting for connections...", notifyCh), DebugWriter{})
// cmd.Stdout = io.MultiWriter(NewNotifyingWriter(io.Discard, "Waiting for connections...", notifyCh), DebugWriter{})
// cmd.Stdin = os.Stdin
// cmd.Env = PrepareAWSCLIEnv(creds, commandData)

// Start the command in a separate goroutine

//register the port session
portSession := portsession.PortSession{
Session: ssmSession,

clio.Info("executing session with ssm client")
si = spinner.New(spinner.CharSets[14], 100*time.Millisecond)
si.Suffix = " Starting database proxy..."
si.Writer = os.Stderr
defer si.Stop()

clio.Info("executing session with ssm client")

// registers the PortSession feature
_ = portsession.PortSession{}
// writes ssm session logs to clio.Debug while listening for the waiting for connectiosn phrase
// once we see that, we can start connecting

go func() {
err := ssmSession.TerminateSession(&SSMDebugLogger{
Writers: []io.Writer{DebugWriter{}},
if err != nil {

err = ssmSession.Execute(log)
if err != nil {
return err
return ssmSession.Execute(&SSMDebugLogger{
Writers: []io.Writer{
NewNotifyingWriter(DebugWriter{}, "Waiting for connections...", notifyCh),

// err = cmd.Start()
// if err != nil {
// return err
// }

// listen for interrupt signals and forward them on
// listen for a context cancellation

// Set up a channel to receive OS signals
sigs := make(chan os.Signal, 1)
// Notify sigs on os.Interrupt (Ctrl+C)
signal.Notify(sigs, os.Interrupt, syscall.SIGTERM)

ctx, cancel := context.WithCancel(ctx)
eg, ctx := errgroup.WithContext(ctx)

eg.Go(func() error {
select {
case <-notifyCh:
Expand Down Expand Up @@ -579,24 +563,6 @@ var proxyCommand = cli.Command{
case <-ctx.Done():
clio.Info("Shutting down database proxy...")
if err := cmd.Process.Signal(os.Interrupt); err != nil {
clio.Errorw("Error sending SIGTERM to AWS SSM process", zap.Error(err))
return nil

// Wait for the command to finish
eg.Go(func() error {
defer cancel()
err = cmd.Wait()
if err != nil {
if err.Error() == "exit status 130" {
return nil
return clierr.New(fmt.Errorf("AWS SSM port forward session closed with an error: %w", err).Error(),
clierr.Info("You can try re-running this command with the verbose flag to see detailed logs, 'cf --verbose aws rds proxy'"),
clierr.Infof("In rare cases, where the database proxy has been re-deployed while your grant was active, you will need to close your request in Common Fate and request access again 'cf access close request --id=%s' This is usually indicated by an error message containing '(TargetNotConnected) when calling the StartSession'", ensuredGrant.Grant.AccessRequestId))
return nil

Expand All @@ -618,100 +584,12 @@ func GrabUnusedPort() (string, error) {
return strconv.Itoa(port), nil

// DebugWriter is an io.Writer that writes messages using clio.Debug.
type DebugWriter struct{}

// Write implements the io.Writer interface for DebugWriter.
func (dw DebugWriter) Write(p []byte) (n int, err error) {
message := string(p)
return len(p), nil

type NotifyingWriter struct {
writer io.Writer
phrase string
notifyCh chan struct{}
buffer bytes.Buffer

func NewNotifyingWriter(writer io.Writer, phrase string, notifyCh chan struct{}) *NotifyingWriter {
return &NotifyingWriter{
writer: writer,
phrase: phrase,
notifyCh: notifyCh,

func (nw *NotifyingWriter) Write(p []byte) (n int, err error) {
// Write to the buffer first
// Check if the phrase is in the buffer
if strings.Contains(nw.buffer.String(), nw.phrase) {
// Notify the channel in a non-blocking way
select {
case nw.notifyCh <- struct{}{}:
// Clear the buffer up to the phrase
// Write to the underlying writer
return nw.writer.Write(p)

func PrepareAWSCLIEnv(creds aws.Credentials, commandData CommandData) []string {
return append(SanitisedEnv(), assume.EnvKeys(creds, commandData.GrantOutput.Database.Region)...)

// SanitisedEnv returns the environment variables excluding specific AWS keys.
// used so that existing aws creds in the terminal are not passed through to downstream programs like the AWS cli
func SanitisedEnv() []string {
// List of AWS keys to remove from the environment.
awsKeys := map[string]struct{}{

var cleanedEnv []string
for _, env := range os.Environ() {
// Split the environment variable into key and value
parts := strings.SplitN(env, "=", 2)
key := parts[0]

// If the key is not one of the AWS keys, include it in the cleaned environment
if _, found := awsKeys[key]; !found {
cleanedEnv = append(cleanedEnv, env)
return cleanedEnv

type CommandData struct {
GrantOutput AWSRDS
SSMPortForwardLocalPort string
SSMPortForwardServerPort string

func formatSSMCommandArgs(data CommandData) []string {
out := []string{
fmt.Sprintf("--target=%s", data.GrantOutput.SSMSessionTarget),
fmt.Sprintf(`{"portNumber":["%s"], "localPortNumber":["%s"]}`, data.SSMPortForwardServerPort, data.SSMPortForwardLocalPort),

return out

// CredentialProcessOutput represents the JSON output format of the credential process.
type CredentialProcessOutput struct {
Version int `json:"Version"`
Expand All @@ -738,26 +616,6 @@ func ParseCredentialProcessOutput(credentialProcessOutput string) (aws.Credentia
}, nil

func CheckDependencies() error {
_, err := exec.LookPath("granted")
if err != nil {
// The executable was not found in the PATH
if _, ok := err.(*exec.Error); ok {
return clierr.New("the required cli 'granted' was not found on your path", clierr.Info("Granted is required to access AWS via SSO, please follow the instructions here to install it"))
return err
_, err = exec.LookPath("aws")
if err != nil {
// The executable was not found in the PATH
if _, ok := err.(*exec.Error); ok {
return clierr.New("the required cli 'aws' was not found on your path", clierr.Info("The AWS cli is required to access dastabases via SSM Session Manager, please follow the instructions here to install it"))
return err
return nil

func GrantedCredentialProcess(ctx context.Context, commandData CommandData) (aws.Credentials, error) {
// the grant id is used for teh profile to avoid issues with the credential cache in granted credential-process, it also gets the benefit of this cache per grant
configFile := fmt.Sprintf(`[profile %s]
Expand Down

0 comments on commit ab74ba0

Please sign in to comment.