diff --git a/registry/job.go b/registry/job.go index 8eeda2280..f79dbbbe5 100644 --- a/registry/job.go +++ b/registry/job.go @@ -18,7 +18,9 @@ import ( "errors" "fmt" "path" + "reflect" "sort" + "strings" etcd "github.com/coreos/fleet/Godeps/_workspace/src/github.com/coreos/etcd/client" @@ -293,6 +295,48 @@ func (r *EtcdRegistry) getUnitFromObjectNode(node *etcd.Node, unitHashLookupFunc } +// getUnitFromPlainNode() takes a *etcd.Node containing a Unit's jobModel, and +// instantiates and returns a representative *job.Unit, transitively fetching +// the associated UnitFile as necessary. In contrast to getUnitFromObjectNode(), +// this does not use not.Value itself as a hash key, but it uses the last part +// of node.Key for the hash key. +func (r *EtcdRegistry) getUnitFromPlainNode(node *etcd.Node, unitHashLookupFunc func(unit.Hash) *unit.UnitFile) (*job.Unit, error) { + var err error + var jm jobModel + + if err = unmarshal(node.Value, &jm); err != nil { + return nil, err + } + + parts := strings.Split(node.Key, "/") + if len(parts) == 0 { + log.Errorf("key '%v' doesn't have enough parts", node.Key) + return nil, nil + } + stringHash := parts[len(parts)-1] + + hashKey, err := unit.HashFromHexString(stringHash) + if err != nil { + log.Errorf("cannot convert key string into hash. %v", err) + return nil, nil + } + + var unit *unit.UnitFile + + unit = unitHashLookupFunc(hashKey) + if unit == nil { + log.Warningf("No Unit found in Registry for Job(%s)", jm.Name) + return nil, nil + } + + ju := &job.Unit{ + Name: jm.Name, + Unit: *unit, + } + return ju, nil + +} + // jobModel is used for serializing and deserializing Jobs stored in the Registry type jobModel struct { Name string @@ -306,7 +350,12 @@ func (r *EtcdRegistry) DestroyUnit(name string) error { opts := &etcd.DeleteOptions{ Recursive: true, } - _, err := r.kAPI.Delete(r.ctx(), key, opts) + u, err := r.Unit(name) + if err != nil { + log.Warningf("r.Unit error, name=%s\n", name) + u = nil + } + _, err = r.kAPI.Delete(r.ctx(), key, opts) if err != nil { if isEtcdError(err, etcd.ErrorCodeKeyNotFound) { err = errors.New("job does not exist") @@ -316,9 +365,81 @@ func (r *EtcdRegistry) DestroyUnit(name string) error { } // TODO(jonboulle): add unit reference counting and actually destroying Units + + // Delete unit from the etcd registry + if u != nil { + var unitMatch bool + // check if the unit is really valid. If not, return err. + key = r.hashedUnitPath(u.Unit.Hash()) + if unitMatch, err = r.checkUnitMatch(u); err != nil { + err = errors.New("cannot check for the unit is valid") + return err + } + if unitMatch { + err = errors.New("Invalid unit in the etcd registry. Skip deleting the unit.") + return err + } + _, err = r.kAPI.Delete(r.ctx(), key, opts) + if err != nil { + if isEtcdError(err, etcd.ErrorCodeKeyNotFound) { + err = errors.New("cannot delete unit from the registry. Unit not found.") + } + return err + } + } return nil } +// checkUnitMatch() determines if the given unit is a really valid entry in the +// etcd registry, by querying the entries via RPC. +func (r *EtcdRegistry) checkUnitMatch(unitDel *job.Unit) (unitMatch bool, err error) { + key := path.Join(r.keyPrefix, unitPrefix) + opts := &etcd.GetOptions{ + Recursive: true, + } + res, err := r.kAPI.Get(r.ctx(), key, opts) + if err != nil { + if isEtcdError(err, etcd.ErrorCodeKeyNotFound) { + err = nil + } + return false, err + } + + return r.checkUnitSiblings(unitDel, res.Node) +} + +// checkUnitSiblings() returns true if there's a duplicated entry already in +// the etcd registry. +func (r *EtcdRegistry) checkUnitSiblings(unitDel *job.Unit, dir *etcd.Node) (bool, error) { + uhashKey := dir.Key + unitDelName := r.hashedUnitPath(unitDel.Unit.Hash()) + for _, uhashNode := range dir.Nodes { + newUnit, err := r.getUnitFromPlainNode(uhashNode, r.getUnitByHash) + if err != nil { + log.Errorf("cannot get unit. err: %v", err) + return false, err + } + if newUnit == nil { + log.Debugf("unable to parse Unit in Registry at key %s", uhashKey) + continue + } + + if unitDelName == uhashNode.Key { + log.Debugf("skipping the entry itself.") + continue + } + + if reflect.DeepEqual(unitDel.Unit.Contents, newUnit.Unit.Contents) { + // matched. so this unit has a duplicated entry, so return + log.Debugf("won't erase this key, as a duplicated entry is found.") + return true, nil + } + } + + log.Debugf("no matching entry, so it can be removed.") + return false, nil +} + // CreateUnit attempts to store a Unit and its associated unit file in the registry func (r *EtcdRegistry) CreateUnit(u *job.Unit) (err error) { if err := r.storeOrGetUnitFile(u.Unit); err != nil {