Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SQL election module #3318

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0
github.com/letsencrypt/pkcs11key/v4 v4.0.0
github.com/lib/pq v1.10.9
github.com/mattn/go-sqlite3 v1.14.20
github.com/prometheus/client_golang v1.18.0
github.com/prometheus/client_model v0.5.0
github.com/pseudomuto/protoc-gen-doc v1.5.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,8 @@ github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU=
github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-sqlite3 v1.14.20 h1:BAZ50Ns0OFBNxdAqFhbZqdPcht1Xlb16pDCqkq1spr0=
github.com/mattn/go-sqlite3 v1.14.20/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k=
github.com/miekg/pkcs11 v1.0.2/go.mod h1:XsNlhZGX73bx86s2hdc/FuaLm2CPZJemRLMA+WTFxgs=
Expand Down
269 changes: 269 additions & 0 deletions util/election2/sql/election.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
// Copyright 2023 Google LLC. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package etcd provides an implementation of leader election based on a SQL database.
package sql

import (
"context"
"database/sql"
"fmt"
"sync"
"time"

"github.com/google/trillian/util/election2"
"k8s.io/klog/v2"
)

type leaderData struct {
currentLeader string
timestamp time.Time
}

// Election is an implementation of election2.Election based on a SQL database.
type Election struct {
db *sql.DB
instanceID string
resourceID string

currentLeader leaderData
leaderLock sync.Cond

// If a channel is supplied with the cancel, it will be signalled when the election routine has exited.
cancel chan *chan error
electionInterval time.Duration
}

var _ election2.Election = (*Election)(nil)

// Await implements election2.Election
func (e *Election) Await(ctx context.Context) error {
e.leaderLock.L.Lock()
defer e.leaderLock.L.Unlock()
if e.cancel == nil {
e.cancel = make(chan *chan error)
go e.becomeLeaderLoop(context.Background(), e.cancel)
}
if e.currentLeader.currentLeader == e.instanceID {
return nil
}
for e.currentLeader.currentLeader != e.instanceID {
e.leaderLock.Wait()

select {
case <-ctx.Done():
return ctx.Err()
default:
klog.Infof("Waiting for leadership, %s is the leader at %s", e.currentLeader.currentLeader, e.currentLeader.timestamp)
}
}
klog.Infof("%s became leader for %s at %s", e.instanceID, e.resourceID, e.currentLeader.timestamp)
return nil
}

// Close implements election2.Election
func (e *Election) Close(ctx context.Context) error {
if err := e.Resign(ctx); err != nil {
klog.Errorf("Failed to resign leadership: %v", err)
return err
}
return nil
}

// Resign implements election2.Election
func (e *Election) Resign(ctx context.Context) error {
e.leaderLock.L.Lock()
closer := e.cancel
e.cancel = nil
e.leaderLock.L.Unlock()
if closer == nil {
return nil
}
// Stop trying to elect ourselves
done := make(chan error)
closer <- &done
return <-done
}

// WithMastership implements election2.Election
func (e *Election) WithMastership(ctx context.Context) (context.Context, error) {
cctx, cancel := context.WithCancel(ctx)
e.leaderLock.L.Lock()
defer e.leaderLock.L.Unlock()
if e.currentLeader.currentLeader != e.instanceID {
// Not the leader, cancel
cancel()
return cctx, nil
}

// Start a goroutine to cancel the context when we are no longer leader
go func() {
e.leaderLock.L.Lock()
defer e.leaderLock.L.Unlock()
for e.currentLeader.currentLeader == e.instanceID {
e.leaderLock.Wait()
}
select {
case <-ctx.Done():
// Don't complain if our context already completed.
return
default:
cancel()
klog.Warningf("%s cancelled: lost leadership, %s is the leader at %s", e.resourceID, e.currentLeader.currentLeader, e.currentLeader.timestamp)
}
}()

return cctx, nil
}

// becomeLeaderLoop runs continuously to participate in elections until a message is sent on `cancel`
func (e *Election) becomeLeaderLoop(ctx context.Context, closer chan *chan error) {
for {
select {
case ch := <-closer:
err := e.tearDown()
klog.Infof("Election teardown for %s: %v", e.resourceID, err)
if ch != nil {
*ch <- err
}
return
default:
leader, err := e.tryBecomeLeader(ctx)
if err != nil {
klog.Errorf("Failed attempt to become leader for %s, retrying: %v", e.resourceID, err)
} else {
e.leaderLock.L.Lock()
if leader != e.currentLeader {
// Note: this code does not actually care _which_ instance was
// elected, it sends notifications on each leadership cahnge.
evankanderson marked this conversation as resolved.
Show resolved Hide resolved
e.currentLeader = leader
e.leaderLock.Broadcast()
}
e.leaderLock.L.Unlock()
}
time.Sleep(e.electionInterval)
}
}
}

func (e *Election) tryBecomeLeader(ctx context.Context) (leaderData, error) {
leader := leaderData{}
tx, err := e.db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable})
if err != nil {
return leader, fmt.Errorf("BeginTX: %w", err)
}
defer tx.Rollback()

Check failure on line 166 in util/election2/sql/election.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `tx.Rollback` is not checked (errcheck)
row := tx.QueryRow(
"SELECT leader, last_update FROM leader_election WHERE resource_id = $1",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't the $<num> format be problematic in mysql?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, switched to ?, which probably makes postgres-likes unhappy, but since sqlite is happy with both and there's already a MySQL backend for trillian, that makes sense.

e.resourceID)
if err := row.Scan(&leader.currentLeader, &leader.timestamp); err != nil {
return leader, fmt.Errorf("Select: %w", err)
}

if leader.currentLeader != e.instanceID && leader.timestamp.Add(e.electionInterval*10).After(time.Now()) {
return leader, nil // Someone else won the election
}

timestamp := time.Now()
_, err = tx.Exec(
"UPDATE leader_election SET leader = $1, last_update = $2 WHERE resource_id = $3 AND leader = $4 AND last_update = $5",
e.instanceID, timestamp, e.resourceID, leader.currentLeader, leader.timestamp)
if err != nil {
return leader, fmt.Errorf("Update: %w", err)
}

if err := tx.Commit(); err != nil {
return leader, fmt.Errorf("Commit failed: %w", err)
}
leader = leaderData{currentLeader: e.instanceID, timestamp: timestamp}
return leader, nil
}

func (e *Election) tearDown() error {
e.leaderLock.L.Lock()
defer e.leaderLock.L.Unlock()
if e.currentLeader.currentLeader != e.instanceID {
return nil
}
e.currentLeader.currentLeader = "empty leader"
e.leaderLock.Broadcast()

// Reset election time to epoch to allow a faster fail-over
res, err := e.db.Exec(
"UPDATE leader_election SET last_update = $1 WHERE resource_id = $2 AND leader = $3 AND last_update = $4",
time.Time{}, e.resourceID, e.instanceID, e.currentLeader.timestamp)
if err != nil {
return fmt.Errorf("Update: %w", err)
}
if n, err := res.RowsAffected(); n != 1 || err != nil {
return fmt.Errorf("failed to resign leadership: %d, %w", n, err)
}
return nil
}

func (e *Election) initializeLock(ctx context.Context) error {
insert, err := e.db.Prepare("INSERT INTO leader_election (resource_id, leader, last_update) VALUES ($1, $2, $3)")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't we need to handle the case where more than one replica executes an initialization at the same time? Or is the idea is to crash loop, restart and continue?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reminder to actually handle the error in NewElection. The idea is that leader_election has a primary-key index on resource_id, so attempting a second INSERT with the same resource_id will simply fail with a conflict. But we should handle other SQL errors if there's a good way to do so.

Note that we also don't have a good way to clean up resource_id rows after they've been created... that's probably not a problem, as resource_ids are probably pretty stable, but still...

Copy link
Author

@evankanderson evankanderson Jan 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And, fixing the lint errors and adding a test to cover this case, I realize that "INSERT had a conflict" is not a standardized error, so I need to do the query first to look for sql.ErrNoRows, which is standardized. ¯\_(ツ)_/¯

if err != nil {
return err
}
defer insert.Close()

Check failure on line 220 in util/election2/sql/election.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `insert.Close` is not checked (errcheck)

_, err = insert.Exec(e.resourceID, "empty leader", time.Time{})
return err
}

type SqlFactory struct {
db *sql.DB
instanceID string
opts []Option
}

var _ election2.Factory = (*SqlFactory)(nil)

type Option func(*Election) *Election

func NewFactory(instanceID string, database *sql.DB, opts... Option) (*SqlFactory, error) {
return &SqlFactory{db: database, instanceID: instanceID, opts: opts}, nil
}

func WithElectionInterval(interval time.Duration) Option {
return func(f *Election) *Election {
f.electionInterval = interval
return f
}
}

// NewElection implements election2.Factory.
func (f *SqlFactory) NewElection(ctx context.Context, resourceID string) (election2.Election, error) {
// Ensure we have a database connection
if f.db == nil {
return nil, fmt.Errorf("no database connection")
}
if err := f.db.Ping(); err != nil {
return nil, err
}
e := &Election{
db: f.db,
instanceID: f.instanceID,
resourceID: resourceID,
leaderLock: sync.Cond{L: &sync.Mutex{}},
electionInterval: 1 * time.Second,
}
for _, opt := range f.opts {
e = opt(e)
}
e.initializeLock(ctx)

Check failure on line 266 in util/election2/sql/election.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `e.initializeLock` is not checked (errcheck)

return e, nil
}
Loading
Loading