Skip to content

Commit

Permalink
registry: remove units from etcd registry upon DestroyUnit()
Browse files Browse the repository at this point in the history
So far each command "fleetctl destroy unit" has removed job entries from
the etcd registry, under /_coreos.com/fleet/job. But it has not removed
its unit file, under /_coreos.com/fleet/unit. As a result, fleet left
lots of garbages in the etcd registry, so users had to manually clean
them up.

So this patch gets unit contents deleted actually from etcd registry
when DestroyUnit() gets called. To avoid potential hash collisions,
it first fetches a list of units from registry, to check there's any
duplicated entry. Only if no duplicated unit is found, fleetd actually
deletes the unit from registry.

Fixes: coreos#1456
Fixes: coreos#1290
Reference: coreos#1291
  • Loading branch information
Dongsu Park committed Mar 18, 2016
1 parent 2e03edd commit 0be1141
Showing 1 changed file with 122 additions and 1 deletion.
123 changes: 122 additions & 1 deletion registry/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"errors"
"fmt"
"path"
"reflect"
"sort"
"strings"

etcd "github.com/coreos/fleet/Godeps/_workspace/src/github.com/coreos/etcd/client"

Expand Down Expand Up @@ -293,6 +295,48 @@ func (r *EtcdRegistry) getUnitFromObjectNode(node *etcd.Node, unitHashLookupFunc

}

// getUnitFromPlainNode() takes a *etcd.Node containing a Unit's jobModel, and
// instantiates and returns a representative *job.Unit, transitively fetching
// the associated UnitFile as necessary. In contrast to getUnitFromObjectNode(),
// this does not use not.Value itself as a hash key, but it uses the last part
// of node.Key for the hash key.
func (r *EtcdRegistry) getUnitFromPlainNode(node *etcd.Node, unitHashLookupFunc func(unit.Hash) *unit.UnitFile) (*job.Unit, error) {
var err error
var jm jobModel

if err = unmarshal(node.Value, &jm); err != nil {
return nil, err
}

parts := strings.Split(node.Key, "/")
if len(parts) == 0 {
log.Errorf("key '%v' doesn't have enough parts", node.Key)
return nil, nil
}
stringHash := parts[len(parts)-1]

hashKey, err := unit.HashFromHexString(stringHash)
if err != nil {
log.Errorf("cannot convert key string into hash. %v", err)
return nil, nil
}

var unit *unit.UnitFile

unit = unitHashLookupFunc(hashKey)
if unit == nil {
log.Warningf("No Unit found in Registry for Job(%s)", jm.Name)
return nil, nil
}

ju := &job.Unit{
Name: jm.Name,
Unit: *unit,
}
return ju, nil

}

// jobModel is used for serializing and deserializing Jobs stored in the Registry
type jobModel struct {
Name string
Expand All @@ -306,7 +350,12 @@ func (r *EtcdRegistry) DestroyUnit(name string) error {
opts := &etcd.DeleteOptions{
Recursive: true,
}
_, err := r.kAPI.Delete(r.ctx(), key, opts)
u, err := r.Unit(name)
if err != nil {
log.Warningf("r.Unit error, name=%s\n", name)
u = nil
}
_, err = r.kAPI.Delete(r.ctx(), key, opts)
if err != nil {
if isEtcdError(err, etcd.ErrorCodeKeyNotFound) {
err = errors.New("job does not exist")
Expand All @@ -316,9 +365,81 @@ func (r *EtcdRegistry) DestroyUnit(name string) error {
}

// TODO(jonboulle): add unit reference counting and actually destroying Units

// Delete unit from the etcd registry
if u != nil {
var unitMatch bool
// check if the unit is really valid. If not, return err.
key = r.hashedUnitPath(u.Unit.Hash())
if unitMatch, err = r.checkUnitMatch(u); err != nil {
err = errors.New("cannot check for the unit is valid")
return err
}
if unitMatch {
err = errors.New("Invalid unit in the etcd registry. Skip deleting the unit.")
return err
}
_, err = r.kAPI.Delete(r.ctx(), key, opts)
if err != nil {
if isEtcdError(err, etcd.ErrorCodeKeyNotFound) {
err = errors.New("cannot delete unit from the registry. Unit not found.")
}
return err
}
}
return nil
}

// checkUnitMatch() determines if the given unit is a really valid entry in the
// etcd registry, by querying the entries via RPC.
func (r *EtcdRegistry) checkUnitMatch(unitDel *job.Unit) (unitMatch bool, err error) {
key := path.Join(r.keyPrefix, unitPrefix)
opts := &etcd.GetOptions{
Recursive: true,
}
res, err := r.kAPI.Get(r.ctx(), key, opts)
if err != nil {
if isEtcdError(err, etcd.ErrorCodeKeyNotFound) {
err = nil
}
return false, err
}

return r.checkUnitSiblings(unitDel, res.Node)
}

// checkUnitSiblings() returns true if there's a duplicated entry already in
// the etcd registry.
func (r *EtcdRegistry) checkUnitSiblings(unitDel *job.Unit, dir *etcd.Node) (bool, error) {
uhashKey := dir.Key
unitDelName := r.hashedUnitPath(unitDel.Unit.Hash())
for _, uhashNode := range dir.Nodes {
newUnit, err := r.getUnitFromPlainNode(uhashNode, r.getUnitByHash)
if err != nil {
log.Errorf("cannot get unit. err: %v", err)
return false, err
}
if newUnit == nil {
log.Debugf("unable to parse Unit in Registry at key %s", uhashKey)
continue
}

if unitDelName == uhashNode.Key {
log.Debugf("skipping the entry itself.")
continue
}

if reflect.DeepEqual(unitDel.Unit.Contents, newUnit.Unit.Contents) {
// matched. so this unit has a duplicated entry, so return
log.Debugf("won't erase this key, as a duplicated entry is found.")
return true, nil
}
}

log.Debugf("no matching entry, so it can be removed.")
return false, nil
}

// CreateUnit attempts to store a Unit and its associated unit file in the registry
func (r *EtcdRegistry) CreateUnit(u *job.Unit) (err error) {
if err := r.storeOrGetUnitFile(u.Unit); err != nil {
Expand Down

0 comments on commit 0be1141

Please sign in to comment.