Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use sink name and command name for consule lock key #31

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions helper/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ type Runner interface {
}

func NewManager(r Runner) *Manager {
lockName := os.Getenv("LOCK_NAME")
return &Manager{
runner: r,
logger: log.WithField("type", r.Name()),
stopCh: make(chan interface{}),
voluntarilyReleaseLockCh: make(chan interface{}),
lockName: lockName,
}
}

Expand All @@ -37,6 +39,7 @@ type Manager struct {
logger *log.Entry // logger for the consul connection struct
stopCh chan interface{} // internal channel used to stop all go-routines when gracefully shutting down
voluntarilyReleaseLockCh chan interface{}
lockName string
}

// cleanup will do cleanup tasks when the reconciler is shutting down
Expand All @@ -50,6 +53,13 @@ func (m *Manager) cleanup() {
m.logger.Debugf("Cleanup complete")
}

func (m *Manager) getLockName() string {
if m.lockName != "" {
return m.lockName
}
return m.runner.Name()
}

// continuouslyAcquireConsulLeadership waits to acquire the lock to the Consul KV key.
// it will run until the stopCh is closed
func (m *Manager) continuouslyAcquireConsulLeadership() error {
Expand All @@ -76,7 +86,7 @@ func (m *Manager) continuouslyAcquireConsulLeadership() error {

// Read the Last Change Time from Consul KV, so we don't re-process tasks over and over on restart
func (m *Manager) restoreLastChangeTime() interface{} {
kv, _, err := m.client.KV().Get(fmt.Sprintf("nomad-firehose/%s.value", m.runner.Name()), nil)
kv, _, err := m.client.KV().Get(fmt.Sprintf("nomad-firehose/%s.value", m.getLockName()), nil)
if err != nil {
return 0
}
Expand All @@ -102,8 +112,8 @@ func (m *Manager) restoreLastChangeTime() interface{} {
func (m *Manager) acquireConsulLeadership() error {
var err error
m.lock, err = m.client.LockOpts(&consulapi.LockOptions{
Key: fmt.Sprintf("nomad-firehose/%s.lock", m.runner.Name()),
SessionName: fmt.Sprintf("nomad-firehose-%s", m.runner.Name()),
Key: fmt.Sprintf("nomad-firehose/%s.lock", m.getLockName()),
SessionName: fmt.Sprintf("nomad-firehose-%s", m.getLockName()),
MonitorRetries: 10,
MonitorRetryTime: 5 * time.Second,
})
Expand Down Expand Up @@ -161,7 +171,7 @@ func (m *Manager) acquireConsulLeadership() error {

m.logger.Infof("Writing lastChangedTime to KV: %s", r)
kv := &consulapi.KVPair{
Key: fmt.Sprintf("nomad-firehose/%s.value", m.runner.Name()),
Key: fmt.Sprintf("nomad-firehose/%s.value", m.getLockName()),
Value: []byte(r),
}
_, err := m.client.KV().Put(kv, nil)
Expand Down