Skip to content

Commit

Permalink
Move the create, exist functionality to the K8s impl
Browse files Browse the repository at this point in the history
  • Loading branch information
jrosinsk committed Jan 7, 2019
1 parent 0b3ae37 commit c902149
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 50 deletions.
20 changes: 10 additions & 10 deletions error.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
package lock

import (
"github.com/juju/errgo"
"errors"
)

var (
maskAny = errgo.MaskFunc(errgo.Any)
AlreadyLockedError = errgo.New("already locked")
NotLockedByMeError = errgo.New("not locked by me")
//maskAny = errgo.MaskFunc(errgo.Any)
AlreadyLockedError = errors.New("already locked")
NotLockedByMeError = errors.New("not locked by me")
)

// IsAlreadyLocked returns true if the given error is caused by a AlreadyLockedError error.
func IsAlreadyLocked(err error) bool {
return errgo.Cause(err) == AlreadyLockedError
}
//func IsAlreadyLocked(err error) bool {
// return errgo.Cause(err) == AlreadyLockedError
//}

// IsNotLockedByMe returns true if the given error is caused by a NotLockedByMeError error.
func IsNotLockedByMe(err error) bool {
return errgo.Cause(err) == NotLockedByMeError
}
//func IsNotLockedByMe(err error) bool {
// return errgo.Cause(err) == NotLockedByMeError
//}
56 changes: 16 additions & 40 deletions lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"encoding/json"
"fmt"
"time"

"github.com/juju/errgo"
)

// KubeLock is used to provide a distributed lock using Kubernetes annotation data.
Expand All @@ -32,38 +30,30 @@ type KubeLock interface {

// NewKubeLock creates a new KubeLock.
// The lock will not be aquired.
func NewKubeLock(annotationKey, ownerID string, ttl time.Duration, metaCreate MetaCreator, metaExists MetaExists, metaGet MetaGetter, metaUpdate MetaUpdater) (KubeLock, error) {
func NewKubeLock(annotationKey, ownerID string, ttl time.Duration, metaGet MetaGetter, metaUpdate MetaUpdater) (KubeLock, error) {
if annotationKey == "" {
annotationKey = defaultAnnotationKey
}
if ownerID == "" {
id := make([]byte, 16)
if _, err := rand.Read(id); err != nil {
return nil, maskAny(err)
return nil, err
}
ownerID = base64.StdEncoding.EncodeToString(id)
}
if ttl == 0 {
ttl = defaultTTL
}
if metaCreate == nil {
return nil, maskAny(fmt.Errorf("metaCreate cannot be nil"))
}
if metaExists == nil {
return nil, maskAny(fmt.Errorf("metaExists cannot be nil"))
}
if metaGet == nil {
return nil, maskAny(fmt.Errorf("metaGet cannot be nil"))
return nil, fmt.Errorf("metaGet cannot be nil")
}
if metaUpdate == nil {
return nil, maskAny(fmt.Errorf("metaUpdate cannot be nil"))
return nil, fmt.Errorf("metaUpdate cannot be nil")
}
return &kubeLock{
annotationKey: annotationKey,
ownerID: ownerID,
ttl: ttl,
createMeta: metaCreate,
existsMeta: metaExists,
getMeta: metaGet,
updateMeta: metaUpdate,
}, nil
Expand All @@ -78,8 +68,6 @@ type kubeLock struct {
annotationKey string
ownerID string
ttl time.Duration
createMeta MetaCreator
existsMeta MetaExists
getMeta MetaGetter
updateMeta MetaUpdater
}
Expand All @@ -88,26 +76,18 @@ type LockData struct {
Owner string `json:"owner"`
ExpiresAt time.Time `json:"expires_at"`
}
type MetaCreator func() (item interface{}, err error)
type MetaExists func() bool

type MetaGetter func() (annotations map[string]string, resourceVersion string, item interface{}, err error)
type MetaUpdater func(annotations map[string]string, resourceVersion string, item interface{}) error

// Acquire tries to acquire the lock.
// If the lock is already held by us, the lock will be updated.
// If successfull it returns nil, otherwise it returns an error.
func (l *kubeLock) Acquire() error {
//Verify the Resource Exists
if l.existsMeta() == false {
_, err :=l.createMeta()
if err != nil {
return maskAny(err)
}
}
// Get current state
ann, rv, extra, err := l.getMeta()
if err != nil {
return maskAny(err)
return err
}

// Get lock data
Expand All @@ -117,13 +97,13 @@ func (l *kubeLock) Acquire() error {
if lockDataRaw, ok := ann[l.annotationKey]; ok && lockDataRaw != "" {
var lockData LockData
if err := json.Unmarshal([]byte(lockDataRaw), &lockData); err != nil {
return maskAny(err)
return err
}
if lockData.Owner != l.ownerID {
// Lock is owned by someone else
if time.Now().Before(lockData.ExpiresAt) {
// Lock is held and not expired
return maskAny(errgo.WithCausef(nil, AlreadyLockedError, "locked by %s", lockData.Owner))
return fmt.Errorf( "locked by %s", lockData.Owner, AlreadyLockedError)
}
}
}
Expand All @@ -132,11 +112,11 @@ func (l *kubeLock) Acquire() error {
expiredAt := time.Now().Add(l.ttl)
lockDataRaw, err := json.Marshal(LockData{Owner: l.ownerID, ExpiresAt: expiredAt})
if err != nil {
return maskAny(err)
return err
}
ann[l.annotationKey] = string(lockDataRaw)
if err := l.updateMeta(ann, rv, extra); err != nil {
return maskAny(err)
return err
}

// Update successfull, we've acquired the lock
Expand All @@ -147,14 +127,10 @@ func (l *kubeLock) Acquire() error {
// If the lock is already held by us, the lock will be released.
// If successfull it returns nil, otherwise it returns an error.
func (l *kubeLock) Release() error {
//Verify the Resource Exists
if l.existsMeta() == false {
return nil
}
// Get current state
ann, rv, extra, err := l.getMeta()
if err != nil {
return maskAny(err)
return err
}

// Get lock data
Expand All @@ -164,11 +140,11 @@ func (l *kubeLock) Release() error {
if lockDataRaw, ok := ann[l.annotationKey]; ok && lockDataRaw != "" {
var lockData LockData
if err := json.Unmarshal([]byte(lockDataRaw), &lockData); err != nil {
return maskAny(err)
return err
}
if lockData.Owner != l.ownerID {
// Lock is owned by someone else
return maskAny(errgo.WithCausef(nil, NotLockedByMeError, "locked by %s", lockData.Owner))
return fmt.Errorf("locked by %s", lockData.Owner, NotLockedByMeError)
}
} else if ok && lockDataRaw == "" {
// Lock is not locked, we consider that a successfull release also.
Expand All @@ -178,7 +154,7 @@ func (l *kubeLock) Release() error {
// Try to release lock it now
ann[l.annotationKey] = ""
if err := l.updateMeta(ann, rv, extra); err != nil {
return maskAny(err)
return err
}

// Update successfull, we've released the lock
Expand All @@ -191,14 +167,14 @@ func (l *kubeLock) CurrentOwner() (string, error) {
// Get current state
ann, _, _, err := l.getMeta()
if err != nil {
return "", maskAny(err)
return "", err
}

// Get lock data
if lockDataRaw, ok := ann[l.annotationKey]; ok && lockDataRaw != "" {
var lockData LockData
if err := json.Unmarshal([]byte(lockDataRaw), &lockData); err != nil {
return "", maskAny(err)
return "", err
}
return lockData.Owner, nil
}
Expand Down

0 comments on commit c902149

Please sign in to comment.