From 55b2135372a50ba5e8f0a9d43e218d2025ed3e95 Mon Sep 17 00:00:00 2001 From: James Bardin Date: Fri, 15 May 2015 13:32:32 -0400 Subject: [PATCH 1/4] Get rid of Connect - One less thing to call. It doesn't need to be called outside the package, and can be setup in the constructor - Remove unused fields in config.Store --- cmd/commander/main.go | 3 +-- config/backend.go | 6 +++--- config/memory.go | 4 ++-- config/redis.go | 8 +++----- config/store.go | 23 ++++++++--------------- galaxy.go | 3 +-- 6 files changed, 18 insertions(+), 29 deletions(-) diff --git a/cmd/commander/main.go b/cmd/commander/main.go index 9d370ae..9056ecb 100644 --- a/cmd/commander/main.go +++ b/cmd/commander/main.go @@ -50,8 +50,7 @@ func initOrDie() { log.Fatalf("ERROR: Registry URL not specified. Use '-registry redis://127.0.0.1:6379' or set 'GALAXY_REGISTRY_URL'") } - configStore = config.NewStore(config.DefaultTTL) - configStore.Connect(registryURL) + configStore = config.NewStore(config.DefaultTTL, registryURL) serviceRuntime = runtime.NewServiceRuntime(configStore, dns, hostIP) diff --git a/config/backend.go b/config/backend.go index 43b0826..0b5c9be 100644 --- a/config/backend.go +++ b/config/backend.go @@ -29,9 +29,6 @@ type Backend interface { Subscribe(key string) chan string Notify(key, value string) (int, error) - Connect() - Reconnect() - // TODO: still merging these backends // these are brought in from the RegistryBackend // Keys @@ -43,4 +40,7 @@ type Backend interface { // Maps Set(key, field string, value string) (string, error) Get(key, field string) (string, error) + + connect() + reconnect() } diff --git a/config/memory.go b/config/memory.go index 4aff8f1..d013298 100644 --- a/config/memory.go +++ b/config/memory.go @@ -199,10 +199,10 @@ func (r *MemoryBackend) ListEnvs() ([]string, error) { return p, nil } -func (r *MemoryBackend) Connect() { +func (r *MemoryBackend) connect() { } -func (r *MemoryBackend) Reconnect() { +func (r *MemoryBackend) reconnect() { } func (r *MemoryBackend) Keys(key string) ([]string, error) { diff --git a/config/redis.go b/config/redis.go index f6067e8..dbcba33 100644 --- a/config/redis.go +++ b/config/redis.go @@ -327,7 +327,7 @@ func (r *RedisBackend) testOnBorrow(c redis.Conn, t time.Time) error { return err } -func (r *RedisBackend) Connect() { +func (r *RedisBackend) connect() { r.redisPool = redis.Pool{ MaxIdle: 1, IdleTimeout: 120 * time.Second, @@ -336,10 +336,8 @@ func (r *RedisBackend) Connect() { } } -func (r *RedisBackend) Reconnect() { - r.redisPool.Close() - r.Connect() -} +// not needed with a redis.Pool +func (r *RedisBackend) reconnect() {} func (r *RedisBackend) Keys(key string) ([]string, error) { conn := r.redisPool.Get() diff --git a/config/store.go b/config/store.go index fdb6908..7ff2b25 100644 --- a/config/store.go +++ b/config/store.go @@ -24,26 +24,17 @@ type HostInfo struct { } type Store struct { - Backend Backend - Hostname string - TTL uint64 - OutputBuffer *utils.OutputBuffer - pollCh chan bool - registryURL string + Backend Backend + TTL uint64 + pollCh chan bool } -func NewStore(ttl uint64) *Store { - return &Store{ +func NewStore(ttl uint64, registryURL string) *Store { + s := &Store{ TTL: ttl, pollCh: make(chan bool), } -} - -// Build the Redis Pool -func (s *Store) Connect(registryURL string) { - - s.registryURL = registryURL u, err := url.Parse(registryURL) if err != nil { log.Fatalf("ERROR: Unable to parse %s", err) @@ -53,10 +44,12 @@ func (s *Store) Connect(registryURL string) { s.Backend = &RedisBackend{ RedisHost: u.Host, } - s.Backend.Connect() + s.Backend.connect() } else { log.Fatalf("ERROR: Unsupported registry backend: %s", u) } + + return s } func (s *Store) PoolExists(env, pool string) (bool, error) { diff --git a/galaxy.go b/galaxy.go index 96ed538..f4ccdbf 100644 --- a/galaxy.go +++ b/galaxy.go @@ -34,8 +34,7 @@ var config struct { } func initStore(c *cli.Context) { - configStore = gconfig.NewStore(uint64(c.Int("ttl"))) - configStore.Connect(utils.GalaxyRedisHost(c)) + configStore = gconfig.NewStore(uint64(c.Int("ttl")), utils.GalaxyRedisHost(c)) } // ensure the registry as a redis host, but only once From 8a867f22a17af31487afa0eaf839c215f963b496 Mon Sep 17 00:00:00 2001 From: James Bardin Date: Fri, 15 May 2015 16:55:05 -0400 Subject: [PATCH 2/4] Fix panic from malformed env variable - can't index -1 in go --- config/store.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/config/store.go b/config/store.go index 7ff2b25..9628775 100644 --- a/config/store.go +++ b/config/store.go @@ -449,8 +449,10 @@ func (s *Store) EnvFor(container *docker.Container) map[string]string { env := map[string]string{} for _, item := range container.Config.Env { sep := strings.Index(item, "=") - k := item[0:sep] - v := item[sep+1:] + if sep < 0 { + continue + } + k, v := item[0:sep], item[sep+1:] env[k] = v } return env From 1c64291d3d9ebf8b94a67f202a7af16874c3c7f3 Mon Sep 17 00:00:00 2001 From: James Bardin Date: Mon, 18 May 2015 10:36:42 -0400 Subject: [PATCH 3/4] Move ServiceRegistration into its own file - Separate service registration, so we can begin to move non-service related info back to the app config. - newServiceRegistartion doesn't need to be a method of *Store. --- cmd/commander/main.go | 1 + config/registration.go | 94 ++++++++++++++++++++++++++++++++++++++++++ config/store.go | 88 +-------------------------------------- discovery/discovery.go | 1 + 4 files changed, 97 insertions(+), 87 deletions(-) create mode 100644 config/registration.go diff --git a/cmd/commander/main.go b/cmd/commander/main.go index 9056ecb..2f9cf28 100644 --- a/cmd/commander/main.go +++ b/cmd/commander/main.go @@ -666,6 +666,7 @@ func main() { } break case "app:status": + // FIXME: undocumented statusFs := flag.NewFlagSet("app:status", flag.ExitOnError) statusFs.Usage = func() { diff --git a/config/registration.go b/config/registration.go new file mode 100644 index 0000000..fcbf6e7 --- /dev/null +++ b/config/registration.go @@ -0,0 +1,94 @@ +package config + +import ( + "fmt" + "sort" + "time" + + "github.com/fsouza/go-dockerclient" +) + +func newServiceRegistration(container *docker.Container, hostIP, galaxyPort string) *ServiceRegistration { + //FIXME: We're using the first found port and assuming it's tcp. + //How should we handle a service that exposes multiple ports + //as well as tcp vs udp ports. + var externalPort, internalPort string + + // sort the port bindings by internal port number so multiple ports are assigned deterministically + // (docker.Port is a string with a Port method) + cPorts := container.NetworkSettings.Ports + allPorts := []string{} + for p, _ := range cPorts { + allPorts = append(allPorts, string(p)) + } + sort.Strings(allPorts) + + for _, k := range allPorts { + v := cPorts[docker.Port(k)] + if len(v) > 0 { + externalPort = v[0].HostPort + internalPort = docker.Port(k).Port() + // Look for a match to GALAXY_PORT if we have multiple ports to + // choose from. (don't require this, or we may break existing services) + if len(allPorts) > 1 && internalPort == galaxyPort { + break + } + } + } + + serviceRegistration := ServiceRegistration{ + ContainerName: container.Name, + ContainerID: container.ID, + StartedAt: container.Created, + Image: container.Config.Image, + Port: galaxyPort, + } + + if externalPort != "" && internalPort != "" { + serviceRegistration.ExternalIP = hostIP + serviceRegistration.InternalIP = container.NetworkSettings.IPAddress + serviceRegistration.ExternalPort = externalPort + serviceRegistration.InternalPort = internalPort + } + return &serviceRegistration +} + +type ServiceRegistration struct { + Name string `json:"NAME,omitempty"` + ExternalIP string `json:"EXTERNAL_IP,omitempty"` + ExternalPort string `json:"EXTERNAL_PORT,omitempty"` + InternalIP string `json:"INTERNAL_IP,omitempty"` + InternalPort string `json:"INTERNAL_PORT,omitempty"` + ContainerID string `json:"CONTAINER_ID"` + ContainerName string `json:"CONTAINER_NAME"` + Image string `json:"IMAGE,omitempty"` + ImageId string `json:"IMAGE_ID,omitempty"` + StartedAt time.Time `json:"STARTED_AT"` + Expires time.Time `json:"-"` + Path string `json:"-"` + VirtualHosts []string `json:"VIRTUAL_HOSTS"` + Port string `json:"PORT"` + ErrorPages map[string]string `json:"ERROR_PAGES,omitempty"` +} + +func (s *ServiceRegistration) Equals(other ServiceRegistration) bool { + return s.ExternalIP == other.ExternalIP && + s.ExternalPort == other.ExternalPort && + s.InternalIP == other.InternalIP && + s.InternalPort == other.InternalPort +} + +func (s *ServiceRegistration) addr(ip, port string) string { + if ip != "" && port != "" { + return fmt.Sprint(ip, ":", port) + } + return "" + +} +func (s *ServiceRegistration) ExternalAddr() string { + return s.addr(s.ExternalIP, s.ExternalPort) +} + +func (s *ServiceRegistration) InternalAddr() string { + return s.addr(s.InternalIP, s.InternalPort) +} diff --git a/config/store.go b/config/store.go index 9628775..552247f 100644 --- a/config/store.go +++ b/config/store.go @@ -6,7 +6,6 @@ import ( "fmt" "net/url" "path" - "sort" "strings" "time" @@ -236,51 +235,6 @@ func (s *Store) DeleteHost(env, pool string, host HostInfo) error { return s.Backend.DeleteHost(env, pool, host) } -func (s *Store) newServiceRegistration(container *docker.Container, hostIP, galaxyPort string) *ServiceRegistration { - //FIXME: We're using the first found port and assuming it's tcp. - //How should we handle a service that exposes multiple ports - //as well as tcp vs udp ports. - var externalPort, internalPort string - - // sort the port bindings by internal port number so multiple ports are assigned deterministically - // (docker.Port is a string with a Port method) - cPorts := container.NetworkSettings.Ports - allPorts := []string{} - for p, _ := range cPorts { - allPorts = append(allPorts, string(p)) - } - sort.Strings(allPorts) - - for _, k := range allPorts { - v := cPorts[docker.Port(k)] - if len(v) > 0 { - externalPort = v[0].HostPort - internalPort = docker.Port(k).Port() - // Look for a match to GALAXY_PORT if we have multiple ports to - // choose from. (don't require this, or we may break existing services) - if len(allPorts) > 1 && internalPort == galaxyPort { - break - } - } - } - - serviceRegistration := ServiceRegistration{ - ContainerName: container.Name, - ContainerID: container.ID, - StartedAt: container.Created, - Image: container.Config.Image, - Port: galaxyPort, - } - - if externalPort != "" && internalPort != "" { - serviceRegistration.ExternalIP = hostIP - serviceRegistration.InternalIP = container.NetworkSettings.IPAddress - serviceRegistration.ExternalPort = externalPort - serviceRegistration.InternalPort = internalPort - } - return &serviceRegistration -} - func (s *Store) RegisterService(env, pool, hostIP string, container *docker.Container) (*ServiceRegistration, error) { environment := s.EnvFor(container) @@ -291,7 +245,7 @@ func (s *Store) RegisterService(env, pool, hostIP string, container *docker.Cont registrationPath := path.Join(env, pool, "hosts", hostIP, name, container.ID[0:12]) - serviceRegistration := s.newServiceRegistration(container, hostIP, environment["GALAXY_PORT"]) + serviceRegistration := newServiceRegistration(container, hostIP, environment["GALAXY_PORT"]) serviceRegistration.Name = name serviceRegistration.ImageId = container.Config.Image @@ -457,43 +411,3 @@ func (s *Store) EnvFor(container *docker.Container) map[string]string { } return env } - -type ServiceRegistration struct { - Name string `json:"NAME,omitempty"` - ExternalIP string `json:"EXTERNAL_IP,omitempty"` - ExternalPort string `json:"EXTERNAL_PORT,omitempty"` - InternalIP string `json:"INTERNAL_IP,omitempty"` - InternalPort string `json:"INTERNAL_PORT,omitempty"` - ContainerID string `json:"CONTAINER_ID"` - ContainerName string `json:"CONTAINER_NAME"` - Image string `json:"IMAGE,omitempty"` - ImageId string `json:"IMAGE_ID,omitempty"` - StartedAt time.Time `json:"STARTED_AT"` - Expires time.Time `json:"-"` - Path string `json:"-"` - VirtualHosts []string `json:"VIRTUAL_HOSTS"` - Port string `json:"PORT"` - ErrorPages map[string]string `json:"ERROR_PAGES,omitempty"` -} - -func (s *ServiceRegistration) Equals(other ServiceRegistration) bool { - return s.ExternalIP == other.ExternalIP && - s.ExternalPort == other.ExternalPort && - s.InternalIP == other.InternalIP && - s.InternalPort == other.InternalPort -} - -func (s *ServiceRegistration) addr(ip, port string) string { - if ip != "" && port != "" { - return fmt.Sprint(ip, ":", port) - } - return "" - -} -func (s *ServiceRegistration) ExternalAddr() string { - return s.addr(s.ExternalIP, s.ExternalPort) -} - -func (s *ServiceRegistration) InternalAddr() string { - return s.addr(s.InternalIP, s.InternalPort) -} diff --git a/discovery/discovery.go b/discovery/discovery.go index 5313ec9..590fa40 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -21,6 +21,7 @@ func Status(serviceRuntime *runtime.ServiceRuntime, configStore *config.Store, e panic(err) } + //FIXME: addresses, port, and expires missing in output columns := []string{ "APP | CONTAINER ID | IMAGE | EXTERNAL | INTERNAL | PORT | CREATED | EXPIRES"} From b8d7290efd758f54070bc8b40250f4af784a4189 Mon Sep 17 00:00:00 2001 From: James Bardin Date: Mon, 18 May 2015 11:23:08 -0400 Subject: [PATCH 4/4] more config cleanup - rename some more Store receivers from "r" to "s" - Move the global restartChan into Store --- config/notify.go | 59 +++++++++++++++++++++++------------------------- config/store.go | 12 ++++++---- 2 files changed, 35 insertions(+), 36 deletions(-) diff --git a/config/notify.go b/config/notify.go index 65d491d..cf5a38c 100644 --- a/config/notify.go +++ b/config/notify.go @@ -13,18 +13,16 @@ type ConfigChange struct { Error error } -var restartChan chan *ConfigChange - -func (r *Store) CheckForChangesNow() { - r.pollCh <- true +func (s *Store) CheckForChangesNow() { + s.pollCh <- true } -func (r *Store) checkForChanges(env string) { +func (s *Store) checkForChanges(env string) { lastVersion := make(map[string]int64) for { - appCfg, err := r.ListApps(env) + appCfg, err := s.ListApps(env) if err != nil { - restartChan <- &ConfigChange{ + s.restartChan <- &ConfigChange{ Error: err, } time.Sleep(5 * time.Second) @@ -39,10 +37,10 @@ func (r *Store) checkForChanges(env string) { } for { - <-r.pollCh - appCfg, err := r.ListApps(env) + <-s.pollCh + appCfg, err := s.ListApps(env) if err != nil { - restartChan <- &ConfigChange{ + s.restartChan <- &ConfigChange{ Error: err, } continue @@ -53,7 +51,7 @@ func (r *Store) checkForChanges(env string) { log.Printf("%s changed from %d to %d", changedConfig.Name, lastVersion[changedConfig.Name()], changedConfig.ID()) lastVersion[changedConfig.Name()] = changedConfig.ID() - restartChan <- &ConfigChange{ + s.restartChan <- &ConfigChange{ AppConfig: changeCopy, } } @@ -61,7 +59,7 @@ func (r *Store) checkForChanges(env string) { } } -func (r *Store) checkForChangePeriodically(stop chan struct{}) { +func (s *Store) checkForChangePeriodically(stop chan struct{}) { // TODO: default polling interval ticker := time.NewTicker(10 * time.Second) for { @@ -70,66 +68,65 @@ func (r *Store) checkForChangePeriodically(stop chan struct{}) { ticker.Stop() return case <-ticker.C: - r.CheckForChangesNow() + s.CheckForChangesNow() } } } -func (r *Store) restartApp(app, env string) { - appCfg, err := r.GetApp(app, env) +func (s *Store) restartApp(app, env string) { + appCfg, err := s.GetApp(app, env) if err != nil { - restartChan <- &ConfigChange{ + s.restartChan <- &ConfigChange{ Error: err, } return } - restartChan <- &ConfigChange{ + s.restartChan <- &ConfigChange{ Restart: true, AppConfig: appCfg, } } -func (r *Store) NotifyRestart(app, env string) error { +func (s *Store) NotifyRestart(app, env string) error { // TODO: received count ignored, use it somehow? - _, err := r.Backend.Notify(fmt.Sprintf("galaxy-%s", env), fmt.Sprintf("restart %s", app)) + _, err := s.Backend.Notify(fmt.Sprintf("galaxy-%s", env), fmt.Sprintf("restart %s", app)) if err != nil { return err } return nil } -func (r *Store) NotifyEnvChanged(env string) error { +func (s *Store) NotifyEnvChanged(env string) error { // TODO: received count ignored, use it somehow? - _, err := r.Backend.Notify(fmt.Sprintf("galaxy-%s", env), "config") + _, err := s.Backend.Notify(fmt.Sprintf("galaxy-%s", env), "config") if err != nil { return err } return nil } -func (r *Store) subscribeChanges(env string) { +func (s *Store) subscribeChanges(env string) { - msgs := r.Backend.Subscribe(fmt.Sprintf("galaxy-%s", env)) + msgs := s.Backend.Subscribe(fmt.Sprintf("galaxy-%s", env)) for { msg := <-msgs if msg == "config" { - r.CheckForChangesNow() + s.CheckForChangesNow() } else if strings.HasPrefix(msg, "restart") { parts := strings.Split(msg, " ") app := parts[1] - r.restartApp(app, env) + s.restartApp(app, env) } else { log.Printf("Ignoring notification: %s\n", msg) } } } -func (r *Store) Watch(env string, stop chan struct{}) chan *ConfigChange { - restartChan = make(chan *ConfigChange, 10) - go r.checkForChanges(env) - go r.checkForChangePeriodically(stop) - go r.subscribeChanges(env) - return restartChan +func (s *Store) Watch(env string, stop chan struct{}) chan *ConfigChange { + go s.checkForChanges(env) + go s.checkForChangePeriodically(stop) + go s.subscribeChanges(env) + return s.restartChan } diff --git a/config/store.go b/config/store.go index 552247f..c52ce20 100644 --- a/config/store.go +++ b/config/store.go @@ -23,15 +23,17 @@ type HostInfo struct { } type Store struct { - Backend Backend - TTL uint64 - pollCh chan bool + Backend Backend + TTL uint64 + pollCh chan bool + restartChan chan *ConfigChange } func NewStore(ttl uint64, registryURL string) *Store { s := &Store{ - TTL: ttl, - pollCh: make(chan bool), + TTL: ttl, + pollCh: make(chan bool), + restartChan: make(chan *ConfigChange, 10), } u, err := url.Parse(registryURL)