diff --git a/cmd/helpers_render.go b/cmd/helpers_render.go index 45f886e..bc11594 100644 --- a/cmd/helpers_render.go +++ b/cmd/helpers_render.go @@ -20,12 +20,17 @@ func renderOutput(out interface{}, columns ...string) error { } func printOutput(out interface{}, columns ...string) error { + return printOutputWithDefaultFormat("y", out, columns...) +} + +func printOutputWithDefaultFormat(defaultFormat string, out interface{}, columns ...string) error { format := viper.GetString(ArgGlobalOutput) if format == "" { - format = "y" + format = defaultFormat } + formatKey := strings.ToLower(format[0:1]) switch formatKey { diff --git a/cmd/kube.go b/cmd/kube.go index cb21de9..07f8d68 100644 --- a/cmd/kube.go +++ b/cmd/kube.go @@ -52,6 +52,7 @@ func init() { // kubeCmd represents the kube command var kubeCmd = &cobra.Command{ Use: "kube {kube-layout}", + Aliases: []string{"k"}, Args: cobra.ExactArgs(1), Short: "Group of commands wrapping kubectl.", Long: `You must have the cluster set in kubectl.`, diff --git a/cmd/kube_port_forward.go b/cmd/kube_port_forward.go new file mode 100644 index 0000000..661326d --- /dev/null +++ b/cmd/kube_port_forward.go @@ -0,0 +1,284 @@ +package cmd + +import ( + "fmt" + "github.com/naveego/bosun/pkg/cli" + "github.com/naveego/bosun/pkg/kube/portforward" + "github.com/naveego/bosun/pkg/yaml" + "github.com/pkg/errors" + "github.com/spf13/cobra" + "github.com/spf13/viper" + "io/ioutil" + "os" + "os/signal" + "path/filepath" + "sort" +) + +// kubeCmd represents the kube command +var kubePortForwardCmd = addCommand(kubeCmd, &cobra.Command{ + Use: "port-forward", + Aliases: []string{"pf"}, + Short: "Group of commands for managing port forwarding.", +}) + +var kubePortForwardDaemon = addCommand(kubePortForwardCmd, &cobra.Command{ + Use: "daemon", + Short: "Runs the port-forwarding daemon", + RunE: func(cmd *cobra.Command, args []string) error { + + dir := filepath.Join(filepath.Dir(viper.GetString(ArgBosunConfigFile)), "port-forwards") + if len(args) > 0 { + dir = args[0] + } + + daemon, err := portforward.NewDaemon(dir) + if err != nil { + return err + } + + err = daemon.Start() + if err != nil { + return err + } + + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, os.Interrupt) + + <-signalChan + fmt.Println("Received an interrupt, stopping services...") + + return daemon.Stop() + }, +}) + +var kubePortForwardState = addCommand(kubePortForwardCmd, &cobra.Command{ + Use: "state", + Aliases: []string{"show","list", "ls"}, + Short: "Reports on state of port forwards", + RunE: func(cmd *cobra.Command, args []string) error { + var err error + + controller, err := getKubePortForwardController(args) + if err != nil { + return err + } + + state, err := controller.GetState() + if err != nil { + return err + } + + if state.Error != "" { + return errors.New(state.Error) + } + + + return printOutputWithDefaultFormat("table", state) + }, +}) + +var kubePortForwardStart = addCommand(kubePortForwardCmd, &cobra.Command{ + Use: "start [name]", + Short: "Starts a port forward task", + RunE: func(cmd *cobra.Command, args []string) error { + var err error + + controller, err := getKubePortForwardController(args) + if err != nil { + return err + } + + + var name string + switch len(args){ + case 1: + name = args[0] + default: + state, stateErr := controller.GetState() + if stateErr != nil { + return stateErr + } + + var names []string + for n, s := range state.Ports { + if !s.Config.Active { + names = append(names, n) + } + } + sort.Strings(names) + if len(names) == 0{ + return errors.New("all port-forwards are running") + } + + name = cli.RequestChoice("Choose a port-forward to start", names...) + } + + err = controller.StartPortForward(name) + return err + }, +}) + +var kubePortForwardStop = addCommand(kubePortForwardCmd, &cobra.Command{ + Use: "stop [name]", + Short: "Stops a port forward task", + RunE: func(cmd *cobra.Command, args []string) error { + var err error + + controller, err := getKubePortForwardController(args) + if err != nil { + return err + } + + + var name string + switch len(args){ + case 1: + name = args[0] + default: + state, stateErr := controller.GetState() + if stateErr != nil { + return stateErr + } + + var names []string + for n, s := range state.Ports { + if s.Config.Active { + names = append(names, n) + } + } + sort.Strings(names) + if len(names) == 0{ + return errors.New("no port-forwards running") + } + + name = cli.RequestChoice("Choose a port-forward to stop", names...) + } + + err = controller.StopPortForward(name) + return err + }, +}) + +func getKubePortForwardController(args []string) (*portforward.Controller, error) { + + dir := filepath.Join(filepath.Dir(viper.GetString(ArgBosunConfigFile)), "port-forwards") + + controller, err := portforward.NewController(dir) + return controller, err +} + +var kubePortForwardAdd = addCommand(kubePortForwardCmd, &cobra.Command{ + Use: "add {name} [args...]", + Args: cobra.MinimumNArgs(1), + Short: "Adds a port-forward to the daemon", + RunE: func(cmd *cobra.Command, args []string) error { + var err error + + controller, err := getKubePortForwardController(args) + if err != nil { + return err + } + + name := args[0] + + var portForwardConfig portforward.PortForwardConfig + + if len(args) > 1 { + portForwardConfig.Args = args[1:] + } else { + tempFile, _ := ioutil.TempFile(os.TempDir(), "port-forward-*.yaml") + + b, _ := yaml.Marshal(portForwardConfig) + _, err = tempFile.Write(b) + if err != nil { + return err + } + + err = tempFile.Close() + if err != nil { + return err + } + + err = cli.Edit(tempFile.Name()) + if err != nil { + return err + } + + err = yaml.LoadYaml(tempFile.Name(), &portForwardConfig) + if err != nil { + return err + } + } + + err = controller.AddPortForward(name, portForwardConfig) + + return err + }, +}) + +var kubePortForwardEdit = addCommand(kubePortForwardCmd, &cobra.Command{ + Use: "edit [name]", + Short: "Edit a port-forward", + RunE: func(cmd *cobra.Command, args []string) error { + + controller, err := getKubePortForwardController(args) + if err != nil { + return err + } + + + var name string + switch len(args){ + case 1: + name = args[0] + default: + state, stateErr := controller.GetState() + if stateErr != nil { + return stateErr + } + + var names []string + for n, s := range state.Ports { + if s.Config.Active { + names = append(names, n) + } + } + sort.Strings(names) + if len(names) == 0{ + return errors.New("no port-forwards found") + } + + name = cli.RequestChoice("Choose a port-forward to edit", names...) + } + + portForwardConfig, err := controller.GetPortForwardConfig(name) + + tempFile, _ := ioutil.TempFile(os.TempDir(), "port-forward-*.yaml") + + b, _ := yaml.Marshal(portForwardConfig) + _, err = tempFile.Write(b) + if err != nil { + return err + } + + err = tempFile.Close() + if err != nil { + return err + } + + err = cli.Edit(tempFile.Name()) + if err != nil { + return err + } + + err = yaml.LoadYaml(tempFile.Name(), &portForwardConfig) + if err != nil { + return err + } + + err = controller.AddPortForward(name, *portForwardConfig) + + return err + }, +}) diff --git a/cmd/root.go b/cmd/root.go index 510297b..89bbda7 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -144,7 +144,7 @@ func init() { } rootCmd.PersistentFlags().String(ArgBosunConfigFile, bosunConfigFile, "Config file for Bosun. You can also set BOSUN_CONFIG.") - rootCmd.PersistentFlags().StringP(ArgGlobalOutput, "o", "yaml", "Output format. Options are `table`, `json`, or `yaml`. Only respected by a some commands.") + rootCmd.PersistentFlags().StringP(ArgGlobalOutput, "o", "", "Output format. Options are `table`, `json`, or `yaml`. Only respected by a some commands.") rootCmd.PersistentFlags().Bool(ArgGlobalVerbose, false, "Enable verbose logging.") rootCmd.PersistentFlags().Bool(ArgGlobalTrace, false, "Enable trace logging.") _ = rootCmd.PersistentFlags().MarkHidden(ArgGlobalTrace) diff --git a/go.mod b/go.mod index 84f953c..d1e9382 100644 --- a/go.mod +++ b/go.mod @@ -43,6 +43,7 @@ require ( github.com/elazarl/go-bindata-assetfs v1.0.0 // indirect github.com/fatih/color v1.7.0 github.com/fatih/structs v1.1.0 // indirect + github.com/fsnotify/fsnotify v1.4.7 github.com/fullsailor/pkcs7 v0.0.0-20180613152042-8306686428a5 // indirect github.com/gammazero/deque v0.0.0-20190130191400-2afb3858e9c7 // indirect github.com/gammazero/workerpool v0.0.0-20181230203049-86a96b5d5d92 // indirect @@ -54,6 +55,7 @@ require ( github.com/go-stomp/stomp v2.0.2+incompatible // indirect github.com/go-test/deep v1.0.1 // indirect github.com/gocql/gocql v0.0.0-20181124151448-70385f88b28b // indirect + github.com/gofrs/flock v0.8.0 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef // indirect github.com/golang/lint v0.0.0-20181217174547-8f45f776aaf1 // indirect @@ -126,7 +128,7 @@ require ( github.com/pquerna/cachecontrol v0.0.0-20180517163645-1555304b9b35 // indirect github.com/pquerna/otp v1.1.0 // indirect github.com/prometheus/common v0.2.0 - github.com/rapidloop/mybot v0.0.0-20160205033900-2777401b233f + github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5 github.com/rs/xid v1.2.1 github.com/ryanuber/go-glob v0.0.0-20170128012129-256dc444b735 // indirect github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec // indirect @@ -143,6 +145,7 @@ require ( github.com/tidwall/pretty v0.0.0-20190325153808-1166b9ac2b65 // indirect github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect github.com/vbauerster/mpb/v4 v4.7.0 + github.com/x-cray/logrus-prefixed-formatter v0.5.2 github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c // indirect github.com/xdg/stringprep v1.0.0 // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect @@ -159,6 +162,7 @@ require ( gopkg.in/inconshreveable/go-update.v0 v0.0.0-20150814200126-d8b0b1d421aa gopkg.in/ldap.v2 v2.5.1 // indirect gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce + gopkg.in/natefinch/lumberjack.v2 v2.0.0 gopkg.in/ory-am/dockertest.v2 v2.2.3 // indirect gopkg.in/square/go-jose.v2 v2.3.0 // indirect gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 @@ -171,11 +175,6 @@ require ( sigs.k8s.io/controller-runtime v0.4.0 ) -replace ( - gopkg.in/russross/blackfriday.v2 v2.0.0 => github.com/russross/blackfriday/v2 v2.0.1 - gopkg.in/russross/blackfriday.v2 v2.0.1 => github.com/russross/blackfriday/v2 v2.0.1 -) - replace ( k8s.io/api => k8s.io/api v0.0.0-20190222213804-5cb15d344471 k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20190221213512-86fb29eff628 diff --git a/go.sum b/go.sum index 1fa4500..f30cf79 100644 --- a/go.sum +++ b/go.sum @@ -99,9 +99,8 @@ github.com/briankassouf/jose v0.9.1/go.mod h1:HQhVmdUf7dBNwIIdBTivnCDxcf6IZY3/zr github.com/cenkalti/backoff v2.0.0+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/cenkalti/backoff v2.1.0+incompatible h1:FIRvWBZrzS4YC7NT5cOuZjexzFvIr+Dbi6aD1cZaNBk= github.com/cenkalti/backoff v2.1.0+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= +github.com/census-instrumentation/opencensus-proto v0.0.3-0.20181214143942-ba49f56771b8 h1:vvX9PxDdQSEmcYSMjZpKbCRRTyHrvfr08Ue9/zk7HV4= github.com/census-instrumentation/opencensus-proto v0.0.3-0.20181214143942-ba49f56771b8/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= -github.com/census-instrumentation/opencensus-proto v0.1.0-0.20181214143942-ba49f56771b8 h1:gUqsFVdUKoRHNg8fkFd8gB5OOEa/g5EwlAHznb4zjbI= -github.com/census-instrumentation/opencensus-proto v0.1.0-0.20181214143942-ba49f56771b8/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/centrify/cloud-golang-sdk v0.0.0-20190214225812-119110094d0f h1:gJzxrodnNd/CtPXjO3WYiakyNzHg3rtAi7rO74ejHYU= github.com/centrify/cloud-golang-sdk v0.0.0-20190214225812-119110094d0f/go.mod h1:C0rtzmGXgN78pYR0tGJFhtHgkbAs0lIbHwkB81VxDQE= github.com/cheggaaa/pb v1.0.27/go.mod h1:pQciLPpbU0oxA0h+VJYYLxO+XeDQb5pZijXscXHm81s= @@ -249,6 +248,8 @@ github.com/go-test/deep v1.0.1 h1:UQhStjbkDClarlmv0am7OXXO4/GaPdCGiUiMTvi28sg= github.com/go-test/deep v1.0.1/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= github.com/gocql/gocql v0.0.0-20181124151448-70385f88b28b h1:dnUw9Ih14dCKzbtZxm+pwQRYIb+9ypiwtZgsCQN4zmg= github.com/gocql/gocql v0.0.0-20181124151448-70385f88b28b/go.mod h1:4Fw1eo5iaEhDUs8XyuhSVCVy52Jq3L+/3GJgYkwc+/0= +github.com/gofrs/flock v0.8.0 h1:MSdYClljsF3PbENUUEx85nkWfJSGfzYI9yEBZOJz6CY= +github.com/gofrs/flock v0.8.0/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.2-0.20190723190241-65acae22fc9d h1:3PaI8p3seN09VjbTYC/QWlUZdZ1qS1zGjy7LH2Wt07I= github.com/gogo/protobuf v1.2.2-0.20190723190241-65acae22fc9d/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= @@ -619,9 +620,9 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1 h1:/K3IL0Z1quvmJ7X0A1AwNEK7CRkVK3YwfOU/QAL4WGg= github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= -github.com/rapidloop/mybot v0.0.0-20160205033900-2777401b233f h1:u9hv28Wkzcbaj05S80mtHAnq7VgbokvExTdZTeNWtDk= -github.com/rapidloop/mybot v0.0.0-20160205033900-2777401b233f/go.mod h1:gddSkKc21DuMnqNMf0sKNYerJbfVLREBvJwR1SBCeZc= github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446/go.mod h1:uYEyJGbgTkfkS4+E/PavXkNJcbFIpEtjt2B0KDQ5+9M= +github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5 h1:mZHayPoR0lNmnHyvtYjDeq0zlVHn9K/ZXoy17ylucdo= +github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5/go.mod h1:GEXHk5HgEKCvEIIrSpFI3ozzG5xOKA2DVlEX/gGnewM= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rs/xid v1.2.1 h1:mhH9Nq+C1fY2l1XIpgxIiUOfNpRBYH1kKcr+qfKgjRc= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= @@ -720,6 +721,8 @@ github.com/ulikunitz/xz v0.5.5 h1:pFrO0lVpTBXLpYw+pnLj6TbvHuyjXMfjGeCwSqCVwok= github.com/ulikunitz/xz v0.5.5/go.mod h1:2bypXElzHzzJZwzH67Y6wb67pO62Rzfn7BSiF4ABRW8= github.com/vbauerster/mpb/v4 v4.7.0 h1:Et+zVewxG6qmfBf4Ez+nDhLbCSh6WhZrUPHg9a6e+hw= github.com/vbauerster/mpb/v4 v4.7.0/go.mod h1:ugxYn2kSUrY10WK5CWDUZvQxjdwKFN9K3Ja3/z6p4X0= +github.com/x-cray/logrus-prefixed-formatter v0.5.2 h1:00txxvfBM9muc0jiLIEAkAcIMJzfthRT6usrui8uGmg= +github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0= @@ -926,6 +929,7 @@ gopkg.in/ldap.v2 v2.5.1 h1:wiu0okdNfjlBzg6UWvd1Hn8Y+Ux17/u/4nlk4CQr6tU= gopkg.in/ldap.v2 v2.5.1/go.mod h1:oI0cpe/D7HRtBQl8aTg+ZmzFUAvu4lsv3eLXMLGFxWk= gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce h1:xcEWjVhvbDy+nHP67nPDDpbYrY+ILlfndk4bRioVHaU= gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA= +gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= gopkg.in/ory-am/dockertest.v2 v2.2.3 h1:vSYvP7tvyfAm9merq0gHmcI4yk5nkPpfXmoBCnSP3/4= gopkg.in/ory-am/dockertest.v2 v2.2.3/go.mod h1:kDHEsan1UcKFYH1c28sDmqnmeqIpB4Nj682gSNhYDYM= diff --git a/pkg/bosun/app_manifest.go b/pkg/bosun/app_manifest.go index 9d7c031..c008f6e 100644 --- a/pkg/bosun/app_manifest.go +++ b/pkg/bosun/app_manifest.go @@ -23,7 +23,7 @@ type AppMetadata struct { PinnedReleaseVersion *semver.Version `yaml:"pinnedReleaseVersion,omitempty"` Hashes AppHashes `yaml:"hashes"` Branch string `yaml:"branch" json:"branch"` - Tag string `yaml:"tag" json:"tag"` + Tag string `yaml:"tag,omitempty" json:"tag,omitempty"` } func (a *AppMetadata) RepoRef() issues.RepoRef { diff --git a/pkg/bosun/platform.go b/pkg/bosun/platform.go index 66b4f8d..4107a33 100644 --- a/pkg/bosun/platform.go +++ b/pkg/bosun/platform.go @@ -328,7 +328,7 @@ func (p *Platform) SwitchToReleaseBranch(ctx BosunContext, branch string) error } if localRepo.IsDirty() { - return errors.Errorf("repo at %s is dirty, commit or stash your changes before adding it to the release") + return errors.Errorf("repo at %s is dirty, commit or stash your changes before adding it to the release", localRepo.Path) } log.Debug("Checking if release branch exists...") diff --git a/pkg/bosun/release_manifest.go b/pkg/bosun/release_manifest.go index 19acb7c..0cdbaa8 100644 --- a/pkg/bosun/release_manifest.go +++ b/pkg/bosun/release_manifest.go @@ -211,6 +211,7 @@ func (r *ReleaseManifest) BumpForRelease(ctx BosunContext, app *App, fromBranch, r.init() r.MarkDirty() + var err error name := app.Name appConfig := app.AppConfig @@ -218,7 +219,11 @@ func (r *ReleaseManifest) BumpForRelease(ctx BosunContext, app *App, fromBranch, if appConfig.BranchForRelease { log := ctx.Log().WithField("app", appConfig.Name) if !app.IsRepoCloned() { - return nil, errors.New("repo is not cloned but must be branched for release; what is going on?") + + app, err = ctx.Bosun.workspaceAppProvider.GetApp(name) + if err != nil { + return nil, errors.New("app to bump %q could not be acquired from workspace provider") + } } localRepo := app.Repo.LocalRepo @@ -542,10 +547,6 @@ func (r *ReleaseManifest) updateAppHashes(manifest *AppManifest) error { func (r *ReleaseManifest) PrepareAppForRelease(ctx BosunContext, app *App, bump semver.Bump, branch string) (*AppManifest, error) { - if _, ok := r.AppMetadata[app.Name]; ok { - return nil, errors.Errorf("app %q is already part of the release, use an update command to update it", app.Name) - } - if branch == "" { branch = app.AppConfig.Branching.Develop } @@ -565,7 +566,7 @@ func (r *ReleaseManifest) PrepareAppForRelease(ctx BosunContext, app *App, bump return nil, errors.Wrapf(err, "get latest version of manifest from app") } return appManifest, err - } else if r.Slot == SlotStable { + } else if r.Slot == SlotUnstable { appManifest, err := app.GetManifestFromBranch(ctx, branch, true) if err != nil { diff --git a/pkg/kube/portforward/config.go b/pkg/kube/portforward/config.go new file mode 100644 index 0000000..35ddbdb --- /dev/null +++ b/pkg/kube/portforward/config.go @@ -0,0 +1,58 @@ +package portforward + +import ( + "fmt" + "strings" +) + +type DaemonConfig struct { + Ports map[string]*PortForwardConfig +} + +type PortForwardConfig struct { + Active bool `yaml:"active" json:"active"` + LocalPort int `yaml:"localPort" json:"localPort,omitempty"` + KubeConfig string `yaml:"kubeConfig" json:"kubeConfig,omitempty"` + KubeContext string `yaml:"kubeContext" json:"kubeContext,omitempty"` + TargetType string `yaml:"targetType" json:"targetType,omitempty"` + TargetName string `yaml:"targetName" json:"targetName,omitempty"` + TargetPort int `yaml:"targetPort" json:"targetPort,omitempty"` + Namespace string `yaml:"namespace" json:"namespace,omitempty"` + Args []string `yaml:"args" json:"args,omitempty"` +} + + +func (p PortForwardConfig) String() string { + return strings.Join(p.ToArgs(), " ") +} + + +func (p PortForwardConfig) ToArgs() []string { + args := p.Args + + if len(args) == 0 { + args = []string{"port-forward"} + + if p.Namespace != "" { + args = append(args, "--namespace", p.Namespace) + } + + if p.KubeContext != "" { + args = append(args, "--context", p.KubeContext) + } + + if p.KubeConfig != "" { + args = append(args, "--kubeconfig", p.KubeConfig) + } + + if p.TargetType != "" && p.TargetName != "" { + args = append(args, p.TargetType+"/"+p.TargetName) + } else { + args = append(args, p.TargetName) + } + + args = append(args, fmt.Sprintf("%d:%d", p.LocalPort, p.TargetPort)) + } + + return args +} diff --git a/pkg/kube/portforward/controller.go b/pkg/kube/portforward/controller.go new file mode 100644 index 0000000..41e5668 --- /dev/null +++ b/pkg/kube/portforward/controller.go @@ -0,0 +1,128 @@ +package portforward + +import ( + "github.com/gofrs/flock" + "github.com/naveego/bosun/pkg/yaml" + "github.com/pkg/errors" + "os" + "path/filepath" +) + +type Controller struct { + dir string + configPath string + statePath string +} + +func NewController(dir string) (*Controller, error) { + + err := os.MkdirAll(dir, 0700) + if err != nil { + return nil, err + } + + fileLock := flock.New(filepath.Join(dir, lockFileName)) + + locked, err := fileLock.TryLock() + if err != nil { + return nil, errors.Wrap(err, "error checking file lock") + } + + if locked { + _ = fileLock.Unlock() + return nil, errors.Errorf("port-forward daemon does not seem to be running, you can start it with `bosun kube port-forward daemon %s`", dir) + } + + return &Controller{ + dir: dir, + configPath: filepath.Join(dir, configFileName), + statePath: filepath.Join(dir, stateFileName), + }, nil +} + +func (c *Controller) GetState() (DaemonState, error) { + var state DaemonState + + err := yaml.LoadYaml(c.statePath, &state) + + return state, err +} + +func (c *Controller) AddPortForward(name string, portForwardConfig PortForwardConfig) error { + return c.updateConfig(func(config *DaemonConfig) error{ + config.Ports[name] = &portForwardConfig + return nil + }) +} + +func (c *Controller) RemovePortForward(name string) error { + return c.updateConfig(func(config *DaemonConfig) error { + delete(config.Ports, name) + return nil + }) +} + +func (c *Controller) StartPortForward(name string) error { + return c.updateConfig(func(config *DaemonConfig) error { + if portForwardConfig, ok := config.Ports[name]; ok { + portForwardConfig.Active = true + return nil + } + return errors.Errorf("no port-forward named %s", name) + }) +} + +func (c *Controller) StopPortForward(name string) error { + return c.updateConfig(func(config *DaemonConfig) error { + if portForwardConfig, ok := config.Ports[name]; ok { + portForwardConfig.Active = false + return nil + } + return errors.Errorf("no port-forward named %s", name) + }) +} + +func (c *Controller) updateConfig(mutator func(config *DaemonConfig) error) error { + var daemonConfig DaemonConfig + + err := yaml.LoadYaml(c.configPath, &daemonConfig) + if err != nil { + return err + } + + if daemonConfig.Ports == nil { + daemonConfig.Ports = map[string]*PortForwardConfig{} + } + + err = mutator(&daemonConfig) + if err != nil { + return err + } + + return yaml.SaveYaml(c.configPath, daemonConfig) +} + +func (c *Controller) GetConfig() (DaemonConfig, error){ + var daemonConfig DaemonConfig + + err := yaml.LoadYaml(c.configPath, &daemonConfig) + if err != nil { + return daemonConfig, err + } + return daemonConfig, nil +} + +func (c *Controller) GetPortForwardConfig(name string) (*PortForwardConfig, error) { + var daemonConfig DaemonConfig + + err := yaml.LoadYaml(c.configPath, &daemonConfig) + if err != nil { + return nil, err + } + + if pfc, ok := daemonConfig.Ports[name]; ok { + return pfc, nil + } + + return nil, errors.Errorf("no port-forward found with name %q", name) +} diff --git a/pkg/kube/portforward/daemon.go b/pkg/kube/portforward/daemon.go new file mode 100644 index 0000000..d381748 --- /dev/null +++ b/pkg/kube/portforward/daemon.go @@ -0,0 +1,299 @@ +package portforward + +import ( + "fmt" + "github.com/fsnotify/fsnotify" + "github.com/gofrs/flock" + "github.com/naveego/bosun/pkg/yaml" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "github.com/x-cray/logrus-prefixed-formatter" + "gopkg.in/natefinch/lumberjack.v2" + "gopkg.in/tomb.v2" + "reflect" + "sync" + + "github.com/rifflock/lfshook" + "io/ioutil" + "os" + "path/filepath" + "time" +) + +const ( + lockFileName = "bosun.lock" + configFileName = "config.yaml" + errorFileName = "err.log" + stateFileName = "state.yaml" +) + +func NewDaemon(dir string) (*PortForwardDaemon, error) { + + err := os.MkdirAll(dir, 0700) + if err != nil { + return nil, err + } + + fileLock := flock.New(filepath.Join(dir, lockFileName)) + locked, err := fileLock.TryLock() + if err != nil { + return nil, errors.Wrap(err, "error checking file lock") + } + + if !locked { + return nil, errors.Errorf("port-forward daemon appears to already be running for dir %s", dir) + } + + path := filepath.Join(dir, fmt.Sprintf("daemon.log")) + writer := &lumberjack.Logger{ + Filename: path, + MaxSize: 1, + MaxBackups: 2, + } + + logger := logrus.New() + + logger.AddHook(lfshook.NewHook( + lfshook.WriterMap{ + logrus.DebugLevel: writer, + logrus.InfoLevel: writer, + logrus.WarnLevel: writer, + logrus.ErrorLevel: writer, + logrus.PanicLevel: writer, + }, + &prefixed.TextFormatter{ + TimestampFormat: time.RFC3339Nano, + FullTimestamp: true, + DisableUppercase: true, + ForceFormatting: true, + }, + )) + + return &PortForwardDaemon{ + dir: dir, + children: map[string]*portForwardTask{}, + log: logrus.NewEntry(logger), + configPath: filepath.Join(dir, configFileName), + fileLock: fileLock, + }, nil +} + +type PortForwardDaemon struct { + dir string + + state DaemonState + children map[string]*portForwardTask + log *logrus.Entry + configPath string + config DaemonConfig + t *tomb.Tomb + fileLock *flock.Flock + mu sync.Mutex +} + +func (p *PortForwardDaemon) Start() error { + + p.t = &tomb.Tomb{} + + p.log.Infof("Using config at %s", p.configPath) + + if _, err := os.Stat(p.configPath); os.IsNotExist(err) { + if err = ioutil.WriteFile(p.configPath, []byte{}, 0600); err != nil { + return errors.Wrap(err, "could not create config file") + } + } + + p.setErrorState(nil) + + watcher, err := fsnotify.NewWatcher() + if err != nil { + return errors.Wrap(err, "create watcher") + } + + p.reloadConfig() + + p.t.Go(func() (err error) { + + defer func() { err = p.recordPanic() }() + + for { + select { + case <-p.t.Dying(): + return nil + case _, ok := <-watcher.Events: + if !ok { + return nil + } + p.log.Info("Got config file event.") + p.reloadConfig() + case watcherErr, ok := <-watcher.Errors: + if !ok { + return nil + } + p.setErrorState(watcherErr) + } + } + }) + + err = watcher.Add(p.configPath) + return errors.Wrap(err, "add config path to watcher") + +} + +func (p *PortForwardDaemon) recordPanic() (err error) { + if r := recover(); r != nil { + err = errors.New(fmt.Sprint(r)) + p.updateState(func(state *DaemonState) { + state.OK = false + state.Error = err.Error() + }) + p.log.WithError(err).Error("Panic detected, will probably shut down.") + } + + return +} + +func (p *PortForwardDaemon) updateState(mutator func(state *DaemonState)) { + + p.mu.Lock() + defer p.mu.Unlock() + + if p.state.Ports == nil { + p.state.Ports = map[string]PortForwardState{} + } + + mutator(&p.state) + if err := yaml.SaveYaml(filepath.Join(p.dir, stateFileName), p.state); err != nil { + p.log.WithError(err).Error("Could not save state.") + } +} + +func (p *PortForwardDaemon) updatePFState(name string, mutator func(state *PortForwardState)) { + p.updateState(func(state *DaemonState) { + pfstate := state.Ports[name] + mutator(&pfstate) + state.Ports[name] = pfstate + }) +} + +func (p *PortForwardDaemon) setErrorState(err error) { + if err == nil { + p.updateState(func(state *DaemonState) { + state.OK = true + state.Error = "" + }) + + } else { + p.log.WithError(err).Error("Error occurred.") + p.updateState(func(state *DaemonState) { + state.OK = false + state.Error = err.Error() + + }) + } +} + +func (p *PortForwardDaemon) reloadConfig() { + p.log.Info("Loading config file...") + + actual := p.config + + var desired DaemonConfig + + err := yaml.LoadYaml(p.configPath, &desired) + if err != nil { + p.setErrorState(err) + return + } + + if reflect.DeepEqual(desired, actual) { + p.log.Info("No change detected in config.") + return + } + + p.log.Info("Config appears to have changed, reconciling...") + + for name, desiredPortConfig := range desired.Ports { + av := p.config.Ports[name] + if err = p.reconcilePortForward(name, av, desiredPortConfig); err != nil { + p.updatePFState(name, func(state *PortForwardState) { + state.Error = err.Error() + state.Config = desiredPortConfig + }) + } else { + p.updatePFState(name, func(state *PortForwardState) { + state.Config = desiredPortConfig + }) + } + } + // Reconcile ports which were desired but aren't any more + for name, actualPortConfig := range actual.Ports { + if _, ok := desired.Ports[name]; !ok { + if err = p.reconcilePortForward(name, actualPortConfig, nil); err != nil { + p.updatePFState(name, func(state *PortForwardState) { + state.Error = err.Error() + state.Config = actualPortConfig + }) + } else { + p.updatePFState(name, func(state *PortForwardState) { + state.Config = actualPortConfig + }) + } + } + } + + p.log.Info("Config changes reconciled.") + p.config = desired +} + +func (p *PortForwardDaemon) reconcilePortForward(name string, actual *PortForwardConfig, desired *PortForwardConfig) error { + + p.log.Infof("Reconciling port forward config %s", name) + + if reflect.DeepEqual(actual, desired) { + p.log.Infof("Port forward %q unchanged.", name) + return nil + } + + var err error + task, taskExists := p.children[name] + + if taskExists { + p.log.Info("Removing port forward %s", name) + + task.Stop() + + delete(p.children, name) + p.updateState(func(state *DaemonState) { + delete(state.Ports, name) + }) + taskExists = false + } + + if desired.Active { + p.log.Infof("Activating port forward %s", name) + task, err = newPortForward(p, name, *desired) + p.children[name] = task + if err != nil { + return err + } + task.Start() + } else { + p.log.Infof("Port forward %s is not desired to be active", name) + } + + return nil +} + +func (p *PortForwardDaemon) Stop() error { + + if p.t != nil { + p.t.Kill(nil) + } + + _ = p.fileLock.Unlock() + + <-p.t.Dead() + + return p.t.Err() +} diff --git a/pkg/kube/portforward/state.go b/pkg/kube/portforward/state.go new file mode 100644 index 0000000..6638a2a --- /dev/null +++ b/pkg/kube/portforward/state.go @@ -0,0 +1,30 @@ +package portforward + +import "fmt" + +type DaemonState struct { + OK bool `yaml:"ok"` + Error string `yaml:"error,omitempty"` + Ports map[string]PortForwardState `yaml:"ports"` +} + +func (d DaemonState) Headers() []string { + return []string{"Name", "Active", "State", "Config", "Error"} +} + +func (d DaemonState) Rows() [][]string { + + var out [][]string + for name, state := range d.Ports{ + out = append(out, []string{name, fmt.Sprint(state.Config.Active), state.State, state.Config.String(), state.Error}) + } + + return out +} + +type PortForwardState struct { + State string `yaml:"state,omitempty"` + Error string `yaml:"error,omitempty"` + Config *PortForwardConfig `yaml:"config"` + PID int `yaml:"pid,omitempty"` +} \ No newline at end of file diff --git a/pkg/kube/portforward/task.go b/pkg/kube/portforward/task.go new file mode 100644 index 0000000..31de014 --- /dev/null +++ b/pkg/kube/portforward/task.go @@ -0,0 +1,202 @@ +package portforward + +import ( + "bufio" + "fmt" + "github.com/sirupsen/logrus" + "gopkg.in/tomb.v2" + "io" + "os/exec" + "syscall" + "time" +) + +type portForwardTask struct { + config PortForwardConfig + log *logrus.Entry + name string + daemon *PortForwardDaemon + t *tomb.Tomb + cmd *exec.Cmd +} + +func newPortForward(p *PortForwardDaemon, name string, config PortForwardConfig) (*portForwardTask, error) { + + return &portForwardTask{ + name: name, + config: config, + daemon: p, + log: p.log.WithField("task", name), + }, nil + +} + +func (t *portForwardTask) updateState(mutator func(state *PortForwardState)) { + t.daemon.updatePFState(t.name, mutator) +} + +func (t *portForwardTask) Start() { + + if t.t != nil && t.t.Alive() { + t.log.Warn("Already started.") + } + + ctx := t.daemon.t.Context(nil) + t.t, _ = tomb.WithContext(ctx) + + t.t.Go(func() (err error) { + + defer func() { err = t.daemon.recordPanic() }() + + maxTimeout := 60 * time.Second + minTimeout := 1 * time.Second + currentTimeout := minTimeout + + for t.t.Alive() { + + lastAttempt := time.Now() + + args := t.config.Args + + if len(args) == 0 { + args = []string{"port-forward"} + + if t.config.Namespace != "" { + args = append(args, "--namespace", t.config.Namespace) + } + + if t.config.KubeContext != "" { + args = append(args, "--context", t.config.KubeContext) + } + + if t.config.KubeConfig != "" { + args = append(args, "--kubeconfig", t.config.KubeConfig) + } + + if t.config.TargetType != "" && t.config.TargetName != "" { + args = append(args, t.config.TargetType+"/"+t.config.TargetName) + } else { + args = append(args, t.config.TargetName) + } + } + + args = append(args, fmt.Sprintf("%d:%d", t.config.LocalPort, t.config.TargetPort)) + + t.log.Infof("Starting command with args %v", args) + + cmd := exec.Command("kubectl", args...) + + stderr, _ := cmd.StderrPipe() + go pipeToLog(fmt.Sprintf("STDERR: %s: ", t.name), t.log, stderr) + + cmd.SysProcAttr = &syscall.SysProcAttr{ + Setpgid: true, + Pdeathsig: syscall.SIGKILL, + } + + cmdErr := cmd.Start() + if cmdErr != nil { + t.log.WithError(cmdErr).Errorf("Command with args %v couldn't start.", args) + t.updateState(func(state *PortForwardState) { + state.State = "StartFailed" + state.Error = cmdErr.Error() + }) + } else { + + doneCh := make(chan struct{}) + + t.log.Infof("Command with args %v started.", args) + go func() { + waitErr := cmd.Wait() + t.log.WithError(waitErr).Infof("Command with args %v stopped.", args) + close(doneCh) + }() + + t.updateState(func(state *PortForwardState) { + state.State = "Running" + state.PID = cmd.Process.Pid + state.Error = "" + }) + + select { + case <-t.t.Dying(): + t.log.Infof("Task stop requested, stopping command...") + _ = cmd.Process.Kill() + + select { + case <-time.After(5 * time.Second): + t.log.Warnf("Task stop requested, but command with pid %d didn't stop, you may need to stop it.", cmd.Process.Pid) + case <-doneCh: + t.updateState(func(state *PortForwardState) { + state.State = "Stopped" + state.Error = "" + state.PID = 0 + }) + } + case <-doneCh: + t.log.Infof("Task stopped unexpectedly, will restart.") + } + } + + if t.t.Alive() { + + attemptDuration := time.Now().Sub(lastAttempt) + + if attemptDuration > maxTimeout { + t.log.Infof("Attempt lasted %v, which is longer than max timeout %s, resetting timeout to %s", attemptDuration, maxTimeout, minTimeout) + currentTimeout = minTimeout + } else { + currentTimeout = currentTimeout * 2 + if currentTimeout > maxTimeout { + currentTimeout = maxTimeout + } + t.log.Infof("Doubling timeout to %v", currentTimeout) + } + } + + select { + case <-t.t.Dying(): + t.log.Infof("Task stopped as requested.") + t.updateState(func(state *PortForwardState) { + state.State = "Stopped" + state.Error = "" + }) + return nil + case <-time.After(currentTimeout): + } + } + + t.updateState(func(state *PortForwardState) { + state.State = "Stopped" + state.Error = "" + }) + return nil + }) + + // Keep parent tomb alive until our tomb is dead. + t.daemon.t.Go(func() error { + <-t.t.Dead() + t.log.Infof("Task goroutine is dead.") + return nil + }) +} + +func (t *portForwardTask) Stop() { + + if t.t != nil { + t.t.Kill(nil) + <-t.t.Dead() + } + + t.t = nil +} + +func pipeToLog(prefix string, log *logrus.Entry, reader io.Reader) { + + scanner := bufio.NewScanner(reader) + for scanner.Scan() { + log.Info(prefix + scanner.Text()) + } +} + +