Skip to content

Commit

Permalink
Merge pull request #248 from litl/jbardin-config
Browse files Browse the repository at this point in the history
More refactoring
  • Loading branch information
James Bardin committed May 20, 2015
2 parents 6cb0cce + b8d7290 commit d78ec3c
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 151 deletions.
4 changes: 2 additions & 2 deletions cmd/commander/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ func initOrDie() {
log.Fatalf("ERROR: Registry URL not specified. Use '-registry redis://127.0.0.1:6379' or set 'GALAXY_REGISTRY_URL'")
}

configStore = config.NewStore(config.DefaultTTL)
configStore.Connect(registryURL)
configStore = config.NewStore(config.DefaultTTL, registryURL)

serviceRuntime = runtime.NewServiceRuntime(configStore, dns, hostIP)

Expand Down Expand Up @@ -667,6 +666,7 @@ func main() {
}
break
case "app:status":
// FIXME: undocumented

statusFs := flag.NewFlagSet("app:status", flag.ExitOnError)
statusFs.Usage = func() {
Expand Down
6 changes: 3 additions & 3 deletions config/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ type Backend interface {
Subscribe(key string) chan string
Notify(key, value string) (int, error)

Connect()
Reconnect()

// TODO: still merging these backends
// these are brought in from the RegistryBackend
// Keys
Expand All @@ -43,4 +40,7 @@ type Backend interface {
// Maps
Set(key, field string, value string) (string, error)
Get(key, field string) (string, error)

connect()
reconnect()
}
4 changes: 2 additions & 2 deletions config/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,10 @@ func (r *MemoryBackend) ListEnvs() ([]string, error) {
return p, nil
}

func (r *MemoryBackend) Connect() {
func (r *MemoryBackend) connect() {
}

func (r *MemoryBackend) Reconnect() {
func (r *MemoryBackend) reconnect() {
}

func (r *MemoryBackend) Keys(key string) ([]string, error) {
Expand Down
59 changes: 28 additions & 31 deletions config/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,16 @@ type ConfigChange struct {
Error error
}

var restartChan chan *ConfigChange

func (r *Store) CheckForChangesNow() {
r.pollCh <- true
func (s *Store) CheckForChangesNow() {
s.pollCh <- true
}

func (r *Store) checkForChanges(env string) {
func (s *Store) checkForChanges(env string) {
lastVersion := make(map[string]int64)
for {
appCfg, err := r.ListApps(env)
appCfg, err := s.ListApps(env)
if err != nil {
restartChan <- &ConfigChange{
s.restartChan <- &ConfigChange{
Error: err,
}
time.Sleep(5 * time.Second)
Expand All @@ -39,10 +37,10 @@ func (r *Store) checkForChanges(env string) {
}

for {
<-r.pollCh
appCfg, err := r.ListApps(env)
<-s.pollCh
appCfg, err := s.ListApps(env)
if err != nil {
restartChan <- &ConfigChange{
s.restartChan <- &ConfigChange{
Error: err,
}
continue
Expand All @@ -53,15 +51,15 @@ func (r *Store) checkForChanges(env string) {
log.Printf("%s changed from %d to %d", changedConfig.Name,
lastVersion[changedConfig.Name()], changedConfig.ID())
lastVersion[changedConfig.Name()] = changedConfig.ID()
restartChan <- &ConfigChange{
s.restartChan <- &ConfigChange{
AppConfig: changeCopy,
}
}
}
}
}

func (r *Store) checkForChangePeriodically(stop chan struct{}) {
func (s *Store) checkForChangePeriodically(stop chan struct{}) {
// TODO: default polling interval
ticker := time.NewTicker(10 * time.Second)
for {
Expand All @@ -70,66 +68,65 @@ func (r *Store) checkForChangePeriodically(stop chan struct{}) {
ticker.Stop()
return
case <-ticker.C:
r.CheckForChangesNow()
s.CheckForChangesNow()
}
}
}

func (r *Store) restartApp(app, env string) {
appCfg, err := r.GetApp(app, env)
func (s *Store) restartApp(app, env string) {
appCfg, err := s.GetApp(app, env)
if err != nil {
restartChan <- &ConfigChange{
s.restartChan <- &ConfigChange{
Error: err,
}
return
}

restartChan <- &ConfigChange{
s.restartChan <- &ConfigChange{
Restart: true,
AppConfig: appCfg,
}
}

func (r *Store) NotifyRestart(app, env string) error {
func (s *Store) NotifyRestart(app, env string) error {
// TODO: received count ignored, use it somehow?
_, err := r.Backend.Notify(fmt.Sprintf("galaxy-%s", env), fmt.Sprintf("restart %s", app))
_, err := s.Backend.Notify(fmt.Sprintf("galaxy-%s", env), fmt.Sprintf("restart %s", app))
if err != nil {
return err
}
return nil
}

func (r *Store) NotifyEnvChanged(env string) error {
func (s *Store) NotifyEnvChanged(env string) error {
// TODO: received count ignored, use it somehow?
_, err := r.Backend.Notify(fmt.Sprintf("galaxy-%s", env), "config")
_, err := s.Backend.Notify(fmt.Sprintf("galaxy-%s", env), "config")
if err != nil {
return err
}
return nil
}

func (r *Store) subscribeChanges(env string) {
func (s *Store) subscribeChanges(env string) {

msgs := r.Backend.Subscribe(fmt.Sprintf("galaxy-%s", env))
msgs := s.Backend.Subscribe(fmt.Sprintf("galaxy-%s", env))
for {

msg := <-msgs
if msg == "config" {
r.CheckForChangesNow()
s.CheckForChangesNow()
} else if strings.HasPrefix(msg, "restart") {
parts := strings.Split(msg, " ")
app := parts[1]
r.restartApp(app, env)
s.restartApp(app, env)
} else {
log.Printf("Ignoring notification: %s\n", msg)
}
}
}

func (r *Store) Watch(env string, stop chan struct{}) chan *ConfigChange {
restartChan = make(chan *ConfigChange, 10)
go r.checkForChanges(env)
go r.checkForChangePeriodically(stop)
go r.subscribeChanges(env)
return restartChan
func (s *Store) Watch(env string, stop chan struct{}) chan *ConfigChange {
go s.checkForChanges(env)
go s.checkForChangePeriodically(stop)
go s.subscribeChanges(env)
return s.restartChan
}
8 changes: 3 additions & 5 deletions config/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (r *RedisBackend) testOnBorrow(c redis.Conn, t time.Time) error {
return err
}

func (r *RedisBackend) Connect() {
func (r *RedisBackend) connect() {
r.redisPool = redis.Pool{
MaxIdle: 1,
IdleTimeout: 120 * time.Second,
Expand All @@ -336,10 +336,8 @@ func (r *RedisBackend) Connect() {
}
}

func (r *RedisBackend) Reconnect() {
r.redisPool.Close()
r.Connect()
}
// not needed with a redis.Pool
func (r *RedisBackend) reconnect() {}

func (r *RedisBackend) Keys(key string) ([]string, error) {
conn := r.redisPool.Get()
Expand Down
94 changes: 94 additions & 0 deletions config/registration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package config

import (
"fmt"
"sort"
"time"

"github.com/fsouza/go-dockerclient"
)

func newServiceRegistration(container *docker.Container, hostIP, galaxyPort string) *ServiceRegistration {
//FIXME: We're using the first found port and assuming it's tcp.
//How should we handle a service that exposes multiple ports
//as well as tcp vs udp ports.
var externalPort, internalPort string

// sort the port bindings by internal port number so multiple ports are assigned deterministically
// (docker.Port is a string with a Port method)
cPorts := container.NetworkSettings.Ports
allPorts := []string{}
for p, _ := range cPorts {
allPorts = append(allPorts, string(p))
}
sort.Strings(allPorts)

for _, k := range allPorts {
v := cPorts[docker.Port(k)]
if len(v) > 0 {
externalPort = v[0].HostPort
internalPort = docker.Port(k).Port()
// Look for a match to GALAXY_PORT if we have multiple ports to
// choose from. (don't require this, or we may break existing services)
if len(allPorts) > 1 && internalPort == galaxyPort {
break
}
}
}

serviceRegistration := ServiceRegistration{
ContainerName: container.Name,
ContainerID: container.ID,
StartedAt: container.Created,
Image: container.Config.Image,
Port: galaxyPort,
}

if externalPort != "" && internalPort != "" {
serviceRegistration.ExternalIP = hostIP
serviceRegistration.InternalIP = container.NetworkSettings.IPAddress
serviceRegistration.ExternalPort = externalPort
serviceRegistration.InternalPort = internalPort
}
return &serviceRegistration
}

type ServiceRegistration struct {
Name string `json:"NAME,omitempty"`
ExternalIP string `json:"EXTERNAL_IP,omitempty"`
ExternalPort string `json:"EXTERNAL_PORT,omitempty"`
InternalIP string `json:"INTERNAL_IP,omitempty"`
InternalPort string `json:"INTERNAL_PORT,omitempty"`
ContainerID string `json:"CONTAINER_ID"`
ContainerName string `json:"CONTAINER_NAME"`
Image string `json:"IMAGE,omitempty"`
ImageId string `json:"IMAGE_ID,omitempty"`
StartedAt time.Time `json:"STARTED_AT"`
Expires time.Time `json:"-"`
Path string `json:"-"`
VirtualHosts []string `json:"VIRTUAL_HOSTS"`
Port string `json:"PORT"`
ErrorPages map[string]string `json:"ERROR_PAGES,omitempty"`
}

func (s *ServiceRegistration) Equals(other ServiceRegistration) bool {
return s.ExternalIP == other.ExternalIP &&
s.ExternalPort == other.ExternalPort &&
s.InternalIP == other.InternalIP &&
s.InternalPort == other.InternalPort
}

func (s *ServiceRegistration) addr(ip, port string) string {
if ip != "" && port != "" {
return fmt.Sprint(ip, ":", port)
}
return ""

}
func (s *ServiceRegistration) ExternalAddr() string {
return s.addr(s.ExternalIP, s.ExternalPort)
}

func (s *ServiceRegistration) InternalAddr() string {
return s.addr(s.InternalIP, s.InternalPort)
}
Loading

0 comments on commit d78ec3c

Please sign in to comment.