From fc5e90b7c84709f9054fc1dfdf82612792af0b7b Mon Sep 17 00:00:00 2001 From: Dongsu Park Date: Fri, 18 Mar 2016 13:04:47 +0100 Subject: [PATCH] registry: remove units from etcd registry upon DestroyUnit() So far each command "fleetctl destroy unit" has removed job entries from the etcd registry, under /_coreos.com/fleet/job. But it has not removed its unit file, under /_coreos.com/fleet/unit. As a result, fleet left lots of garbages in the etcd registry, so users had to manually clean them up. So this patch gets unit contents deleted actually from etcd registry when DestroyUnit() gets called. To avoid potential hash collisions, it first fetches a list of units from registry, to check there's any duplicated entry. Only if no duplicated unit is found, fleetd actually deletes the unit from registry. Fixes: https://github.com/coreos/fleet/issues/1456 Fixes: https://github.com/coreos/fleet/issues/1290 Reference: https://github.com/coreos/fleet/pull/1291 --- registry/job.go | 123 +++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 122 insertions(+), 1 deletion(-) diff --git a/registry/job.go b/registry/job.go index 152962df6..1d0d057f6 100644 --- a/registry/job.go +++ b/registry/job.go @@ -18,7 +18,9 @@ import ( "errors" "fmt" "path" + "reflect" "sort" + "strings" etcd "github.com/coreos/fleet/Godeps/_workspace/src/github.com/coreos/etcd/client" @@ -293,6 +295,48 @@ func (r *EtcdRegistry) getUnitFromObjectNode(node *etcd.Node, unitHashLookupFunc } +// getUnitFromPlainNode() takes a *etcd.Node containing a Unit's jobModel, and +// instantiates and returns a representative *job.Unit, transitively fetching +// the associated UnitFile as necessary. In contrast to getUnitFromObjectNode(), +// this does not use not.Value itself as a hash key, but it uses the last part +// of node.Key for the hash key. +func (r *EtcdRegistry) getUnitFromPlainNode(node *etcd.Node, unitHashLookupFunc func(unit.Hash) *unit.UnitFile) (*job.Unit, error) { + var err error + var jm jobModel + + if err = unmarshal(node.Value, &jm); err != nil { + return nil, err + } + + parts := strings.Split(node.Key, "/") + if len(parts) == 0 { + log.Errorf("key '%v' doesn't have enough parts", node.Key) + return nil, nil + } + stringHash := parts[len(parts)-1] + + hashKey, err := unit.HashFromHexString(stringHash) + if err != nil { + log.Errorf("cannot convert key string into hash. %v", err) + return nil, nil + } + + var unit *unit.UnitFile + + unit = unitHashLookupFunc(hashKey) + if unit == nil { + log.Warningf("No Unit found in Registry for Job(%s)", jm.Name) + return nil, nil + } + + ju := &job.Unit{ + Name: jm.Name, + Unit: *unit, + } + return ju, nil + +} + // jobModel is used for serializing and deserializing Jobs stored in the Registry type jobModel struct { Name string @@ -306,7 +350,12 @@ func (r *EtcdRegistry) DestroyUnit(name string) error { opts := &etcd.DeleteOptions{ Recursive: true, } - _, err := r.kAPI.Delete(r.ctx(), key, opts) + u, err := r.Unit(name) + if err != nil { + log.Warningf("r.Unit error, name=%s\n", name) + u = nil + } + _, err = r.kAPI.Delete(r.ctx(), key, opts) if err != nil { if isEtcdError(err, etcd.ErrorCodeKeyNotFound) { err = errors.New("job does not exist") @@ -316,9 +365,81 @@ func (r *EtcdRegistry) DestroyUnit(name string) error { } // TODO(jonboulle): add unit reference counting and actually destroying Units + + // Delete unit from the etcd registry + if u != nil { + var unitMatch bool + // check if the unit is really valid. If not, return err. + key = r.hashedUnitPath(u.Unit.Hash()) + if unitMatch, err = r.checkUnitMatch(u); err != nil { + err = errors.New("cannot check for the unit is valid") + return err + } + if unitMatch { + err = errors.New("Invalid unit in the etcd registry. Skip deleting the unit.") + return err + } + _, err = r.kAPI.Delete(r.ctx(), key, opts) + if err != nil { + if isEtcdError(err, etcd.ErrorCodeKeyNotFound) { + err = errors.New("cannot delete unit from the registry. Unit not found.") + } + return err + } + } return nil } +// checkUnitMatch() determines if the given unit is a really valid entry in the +// etcd registry, by querying the entries via RPC. +func (r *EtcdRegistry) checkUnitMatch(unitDel *job.Unit) (unitMatch bool, err error) { + key := path.Join(r.keyPrefix, unitPrefix) + opts := &etcd.GetOptions{ + Recursive: true, + } + res, err := r.kAPI.Get(r.ctx(), key, opts) + if err != nil { + if isEtcdError(err, etcd.ErrorCodeKeyNotFound) { + err = nil + } + return false, err + } + + return r.checkUnitSiblings(unitDel, res.Node) +} + +// checkUnitSiblings() returns true if there's a duplicated entry already in +// the etcd registry. +func (r *EtcdRegistry) checkUnitSiblings(unitDel *job.Unit, dir *etcd.Node) (bool, error) { + uhashKey := dir.Key + unitDelName := r.hashedUnitPath(unitDel.Unit.Hash()) + for _, uhashNode := range dir.Nodes { + newUnit, err := r.getUnitFromPlainNode(uhashNode, r.getUnitByHash) + if err != nil { + log.Errorf("cannot get unit. err: %v", err) + return false, err + } + if newUnit == nil { + log.Debugf("unable to parse Unit in Registry at key %s", uhashKey) + continue + } + + if unitDelName == uhashNode.Key { + log.Debugf("skipping the entry itself.") + continue + } + + if reflect.DeepEqual(unitDel.Unit.Contents, newUnit.Unit.Contents) { + // matched. so this unit has a duplicated entry, so return + log.Debugf("won't erase this key, as a duplicated entry is found.") + return true, nil + } + } + + log.Debugf("no matching entry, so it can be removed.") + return false, nil +} + // CreateUnit attempts to store a Unit and its associated unit file in the registry func (r *EtcdRegistry) CreateUnit(u *job.Unit) (err error) { if err := r.storeOrGetUnitFile(u.Unit); err != nil {