diff --git a/cmd/trillian_log_signer/main.go b/cmd/trillian_log_signer/main.go index 273099bbd6..5b86d81746 100644 --- a/cmd/trillian_log_signer/main.go +++ b/cmd/trillian_log_signer/main.go @@ -47,6 +47,7 @@ import ( "github.com/google/trillian/util/election" "github.com/google/trillian/util/election2" etcdelect "github.com/google/trillian/util/election2/etcd" + mysqlElection "github.com/google/trillian/util/election2/mysql" clientv3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc" "k8s.io/klog/v2" @@ -54,7 +55,7 @@ import ( // Register supported storage providers. _ "github.com/google/trillian/storage/cloudspanner" _ "github.com/google/trillian/storage/crdb" - _ "github.com/google/trillian/storage/mysql" + "github.com/google/trillian/storage/mysql" // Load quota providers _ "github.com/google/trillian/quota/crdbqm" @@ -71,6 +72,7 @@ var ( numSeqFlag = flag.Int("num_sequencers", 10, "Number of sequencer workers to run in parallel") sequencerGuardWindowFlag = flag.Duration("sequencer_guard_window", 0, "If set, the time elapsed before submitted leaves are eligible for sequencing") forceMaster = flag.Bool("force_master", false, "If true, assume master for all logs") + electionBackend = flag.String("election_backend", "etcd", "Election backend to use. One of: mysql, etcd, noop") etcdHTTPService = flag.String("etcd_http_service", "trillian-logsigner-http", "Service name to announce our HTTP endpoint under") lockDir = flag.String("lock_file_path", "/test/multimaster", "etcd lock file directory path") healthzTimeout = flag.Duration("healthz_timeout", time.Second*5, "Timeout used during healthz checks") @@ -143,13 +145,22 @@ func main() { instanceID := fmt.Sprintf("%s.%d", hostname, os.Getpid()) var electionFactory election2.Factory switch { - case *forceMaster: + case *forceMaster || *electionBackend == "noop": klog.Warning("**** Acting as master for all logs ****") electionFactory = election2.NoopFactory{} - case client != nil: + case client != nil && *electionBackend == "etcd": electionFactory = etcdelect.NewFactory(instanceID, client, *lockDir) + case *storageSystem == "mysql" && *electionBackend == "mysql": + db, err := mysql.GetDatabase() + if err != nil { + klog.Exit("Failed to get MySQL database when reuested: %v", err) + } + electionFactory, err = mysqlElection.NewFactory(instanceID, db) + if err != nil { + klog.Exitf("Failed to create MySQL election factory: %v", err) + } default: - klog.Exit("Either --force_master or --etcd_servers must be supplied") + klog.Exit("Either --force_master, --etcd_servers, or --election_backend=mysql must be supplied") } qm, err := quota.NewManager(*quotaSystem) diff --git a/go.mod b/go.mod index 4a039ce447..1c8612e38d 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 github.com/letsencrypt/pkcs11key/v4 v4.0.0 github.com/lib/pq v1.10.9 + github.com/mattn/go-sqlite3 v1.14.20 github.com/prometheus/client_golang v1.18.0 github.com/prometheus/client_model v0.5.0 github.com/pseudomuto/protoc-gen-doc v1.5.1 diff --git a/go.sum b/go.sum index cf1d89c469..5aa0051432 100644 --- a/go.sum +++ b/go.sum @@ -480,6 +480,8 @@ github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU= github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/mattn/go-sqlite3 v1.14.20 h1:BAZ50Ns0OFBNxdAqFhbZqdPcht1Xlb16pDCqkq1spr0= +github.com/mattn/go-sqlite3 v1.14.20/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg= github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k= github.com/miekg/pkcs11 v1.0.2/go.mod h1:XsNlhZGX73bx86s2hdc/FuaLm2CPZJemRLMA+WTFxgs= diff --git a/scripts/resetdb.sh b/scripts/resetdb.sh index f8fc90cbc7..ee877772ba 100755 --- a/scripts/resetdb.sh +++ b/scripts/resetdb.sh @@ -19,6 +19,7 @@ Accepts environment variables: (default: zaphod). - MYSQL_USER_HOST: The host that the Trillian user will connect from; use '%' as a wildcard (default: localhost). +- MYSQL_USE_ELECTION: If set to true, create election tables as well. EOF } @@ -36,6 +37,7 @@ collect_vars() { [ -z ${MYSQL_USER+x} ] && MYSQL_USER="test" [ -z ${MYSQL_PASSWORD+x} ] && MYSQL_PASSWORD="zaphod" [ -z ${MYSQL_USER_HOST+x} ] && MYSQL_USER_HOST="localhost" + FLAGS=() # handle flags @@ -85,6 +87,10 @@ main() { die "Error: Failed to grant '${MYSQL_USER}' user all privileges on '${MYSQL_DATABASE}'." mysql "${FLAGS[@]}" -D ${MYSQL_DATABASE} < ${TRILLIAN_PATH}/storage/mysql/schema/storage.sql || \ die "Error: Failed to create tables in '${MYSQL_DATABASE}' database." + if [[ "${MYSQL_USE_ELECTION}" = 'true' ]]; then + mysql "${FLAGS[@]}" -D ${MYSQL_DATABASE} < ${TRILLIAN_PATH}/util/election2/sql/election.sql || \ + die "Error: Failed to create election tables in '${MYSQL_DATABASE}' database." + fi echo "Reset Complete" fi } diff --git a/util/election2/mysql/election.go b/util/election2/mysql/election.go new file mode 100644 index 0000000000..7b0b5d6ef3 --- /dev/null +++ b/util/election2/mysql/election.go @@ -0,0 +1,280 @@ +// Copyright 2023 Google LLC. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package mysql provides an implementation of leader election based on a SQL database. +package mysql + +import ( + "context" + "database/sql" + "errors" + "fmt" + "sync" + "time" + + "github.com/google/trillian/util/election2" + "k8s.io/klog/v2" +) + +type leaderData struct { + currentLeader string + timestamp time.Time +} + +// Election is an implementation of election2.Election based on a SQL database. +type Election struct { + db *sql.DB + instanceID string + resourceID string + + currentLeader leaderData + leaderLock sync.Cond + + // If a channel is supplied with the cancel, it will be signalled when the election routine has exited. + cancel chan *chan error + electionInterval time.Duration +} + +var _ election2.Election = (*Election)(nil) + +// Await implements election2.Election +func (e *Election) Await(ctx context.Context) error { + e.leaderLock.L.Lock() + defer e.leaderLock.L.Unlock() + if e.cancel == nil { + e.cancel = make(chan *chan error) + go e.becomeLeaderLoop(context.Background(), e.cancel) + } + if e.currentLeader.currentLeader == e.instanceID { + return nil + } + for e.currentLeader.currentLeader != e.instanceID { + e.leaderLock.Wait() + + select { + case <-ctx.Done(): + return ctx.Err() + default: + klog.Infof("Waiting for leadership, %s is the leader at %s", e.currentLeader.currentLeader, e.currentLeader.timestamp) + } + } + klog.Infof("%s became leader for %s at %s", e.instanceID, e.resourceID, e.currentLeader.timestamp) + return nil +} + +// Close implements election2.Election +func (e *Election) Close(ctx context.Context) error { + if err := e.Resign(ctx); err != nil { + klog.Errorf("Failed to resign leadership: %v", err) + return err + } + return nil +} + +// Resign implements election2.Election +func (e *Election) Resign(ctx context.Context) error { + e.leaderLock.L.Lock() + closer := e.cancel + e.cancel = nil + e.leaderLock.L.Unlock() + if closer == nil { + return nil + } + // Stop trying to elect ourselves + done := make(chan error) + closer <- &done + return <-done +} + +// WithMastership implements election2.Election +func (e *Election) WithMastership(ctx context.Context) (context.Context, error) { + cctx, cancel := context.WithCancel(ctx) + e.leaderLock.L.Lock() + defer e.leaderLock.L.Unlock() + if e.currentLeader.currentLeader != e.instanceID { + // Not the leader, cancel + cancel() + return cctx, nil + } + + // Start a goroutine to cancel the context when we are no longer leader + go func() { + e.leaderLock.L.Lock() + defer e.leaderLock.L.Unlock() + for e.currentLeader.currentLeader == e.instanceID { + e.leaderLock.Wait() + } + select { + case <-ctx.Done(): + // Don't complain if our context already completed. + return + default: + cancel() + klog.Warningf("%s cancelled: lost leadership, %s is the leader at %s", e.resourceID, e.currentLeader.currentLeader, e.currentLeader.timestamp) + } + }() + + return cctx, nil +} + +// becomeLeaderLoop runs continuously to participate in elections until a message is sent on `cancel` +func (e *Election) becomeLeaderLoop(ctx context.Context, closer chan *chan error) { + for { + select { + case ch := <-closer: + err := e.tearDown() + klog.Infof("Election teardown for %s: %v", e.resourceID, err) + if ch != nil { + *ch <- err + } + return + default: + leader, err := e.tryBecomeLeader(ctx) + if err != nil { + klog.Errorf("Failed attempt to become leader for %s, retrying: %v", e.resourceID, err) + } else { + e.leaderLock.L.Lock() + if leader != e.currentLeader { + // Note: this code does not actually care _which_ instance was + // elected, it sends notifications on each leadership change. + e.currentLeader = leader + e.leaderLock.Broadcast() + } + e.leaderLock.L.Unlock() + } + time.Sleep(e.electionInterval) + } + } +} + +func (e *Election) tryBecomeLeader(ctx context.Context) (leaderData, error) { + leader := leaderData{} + tx, err := e.db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable}) + if err != nil { + return leader, fmt.Errorf("BeginTX: %w", err) + } + defer func() { + if err := tx.Rollback(); err != nil { + klog.Errorf("Rollback failed: %v", err) + } + }() + row := tx.QueryRow( + "SELECT leader, last_update FROM LeaderElection WHERE resource_id = ?", + e.resourceID) + if err := row.Scan(&leader.currentLeader, &leader.timestamp); err != nil { + return leader, fmt.Errorf("Select: %w", err) + } + + if leader.currentLeader != e.instanceID && leader.timestamp.Add(e.electionInterval*10).After(time.Now()) { + return leader, nil // Someone else won the election + } + + timestamp := time.Now() + _, err = tx.Exec( + "UPDATE LeaderElection SET leader = ?, last_update = ? WHERE resource_id = ? AND leader = ? AND last_update = ?", + e.instanceID, timestamp, e.resourceID, leader.currentLeader, leader.timestamp) + if err != nil { + return leader, fmt.Errorf("Update: %w", err) + } + + if err := tx.Commit(); err != nil { + return leader, fmt.Errorf("Commit failed: %w", err) + } + leader = leaderData{currentLeader: e.instanceID, timestamp: timestamp} + return leader, nil +} + +func (e *Election) tearDown() error { + e.leaderLock.L.Lock() + defer e.leaderLock.L.Unlock() + if e.currentLeader.currentLeader != e.instanceID { + return nil + } + e.currentLeader.currentLeader = "empty leader" + e.leaderLock.Broadcast() + + // Reset election time to epoch to allow a faster fail-over + res, err := e.db.Exec( + "UPDATE LeaderElection SET last_update = ? WHERE resource_id = ? AND leader = ? AND last_update = ?", + time.Time{}, e.resourceID, e.instanceID, e.currentLeader.timestamp) + if err != nil { + return fmt.Errorf("Update: %w", err) + } + if n, err := res.RowsAffected(); n != 1 || err != nil { + return fmt.Errorf("failed to resign leadership: %d, %w", n, err) + } + return nil +} + +func (e *Election) initializeLock(ctx context.Context) error { + var leader string + err := e.db.QueryRow( + "SELECT leader FROM LeaderElection WHERE resource_id = ?", + e.resourceID, + ).Scan(&leader) + if errors.Is(err, sql.ErrNoRows) { + _, err = e.db.Exec( + "INSERT INTO LeaderElection (resource_id, leader, last_update) VALUES (?, ?, ?)", + e.resourceID, "empty leader", time.Time{}, + ) + } + return err +} + +type SqlFactory struct { + db *sql.DB + instanceID string + opts []Option +} + +var _ election2.Factory = (*SqlFactory)(nil) + +type Option func(*Election) *Election + +func NewFactory(instanceID string, database *sql.DB, opts ...Option) (*SqlFactory, error) { + return &SqlFactory{db: database, instanceID: instanceID, opts: opts}, nil +} + +func WithElectionInterval(interval time.Duration) Option { + return func(f *Election) *Election { + f.electionInterval = interval + return f + } +} + +// NewElection implements election2.Factory. +func (f *SqlFactory) NewElection(ctx context.Context, resourceID string) (election2.Election, error) { + // Ensure we have a database connection + if f.db == nil { + return nil, fmt.Errorf("no database connection") + } + if err := f.db.Ping(); err != nil { + return nil, err + } + e := &Election{ + db: f.db, + instanceID: f.instanceID, + resourceID: resourceID, + leaderLock: sync.Cond{L: &sync.Mutex{}}, + electionInterval: 1 * time.Second, + } + for _, opt := range f.opts { + e = opt(e) + } + if err := e.initializeLock(ctx); err != nil { + return nil, err + } + + return e, nil +} diff --git a/util/election2/mysql/election.sql b/util/election2/mysql/election.sql new file mode 100644 index 0000000000..2ef7a798ac --- /dev/null +++ b/util/election2/mysql/election.sql @@ -0,0 +1,17 @@ +-- MySQL / MariaDB version of the leader election schema + +-- We only have a single table called LeaderElection. It contains +-- a row holding the current leader for each resource, as well as the +-- timestamp that the election was acquired at (last_update). +-- +-- This is less an election than a mad scramble at the start, but once +-- a leader has won the election, they remain in power until they +-- resign or fail to update the last_update time for 10x the +-- electionInterval, which should be coordinated across participants. +-- This is extremely simple, and doesn't perform any sort of +-- load-shedding or fairness at this layer. +CREATE TABLE IF NOT EXISTS LeaderElection( + resource_id VARCHAR(50) PRIMARY KEY, + leader VARCHAR(300) NOT NULL, + last_update TIMESTAMP NOT NULL +); \ No newline at end of file diff --git a/util/election2/mysql/election_test.go b/util/election2/mysql/election_test.go new file mode 100644 index 0000000000..4c87347108 --- /dev/null +++ b/util/election2/mysql/election_test.go @@ -0,0 +1,188 @@ +// Copyright 2023 Google LLC. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package mysql provides an implementation of leader election based on a SQL database. +package mysql + +import ( + "context" + "database/sql" + "fmt" + "os" + "testing" + "time" + + "github.com/google/trillian/util/election2/testonly" + _ "github.com/mattn/go-sqlite3" +) + +func TestOneElection(t *testing.T) { + db, err := initializeDB("sqllite3", "file::one-election?mode=memory") + if err != nil { + t.Fatalf("Unable to initialize database: %v", err) + } + + factory, err := NewFactory("serv", db, WithElectionInterval(10*time.Millisecond)) + if err != nil { + t.Fatalf("Unable to open database: %v", err) + } + + ctx := context.Background() + el1, err := factory.NewElection(ctx, "5") + if err != nil { + t.Fatalf("NewElection(5): %v", err) + } + + if err := el1.Await(ctx); err != nil { + t.Fatalf("Await(5): %v", err) + } + if err := el1.Await(ctx); err != nil { + t.Fatalf("Await when holding lock(5): %v", err) + } + + if err := el1.Resign(ctx); err != nil { + t.Fatalf("Resign(5): %v", err) + } + + if err := el1.Await(ctx); err != nil { + t.Fatalf("Await(5): %v", err) + } + + if err := el1.Close(ctx); err != nil { + t.Fatalf("Close(5): %v", err) + } +} + +func TestTwoElections(t *testing.T) { + db, err := initializeDB("sqllite3", "file::two-election?mode=memory") + if err != nil { + t.Fatalf("Unable to initialize database: %v", err) + } + + factory, err := NewFactory("serv", db, WithElectionInterval(10*time.Millisecond)) + if err != nil { + t.Fatalf("Unable to open database: %v", err) + } + + ctx := context.Background() + el1, err := factory.NewElection(ctx, "10") + if err != nil { + t.Fatalf("NewElection(10): %v", err) + } + el2, err := factory.NewElection(ctx, "20") + if err != nil { + t.Fatalf("NewElection(20): %v", err) + } + + if err := el1.Await(ctx); err != nil { + t.Fatalf("Await(10): %v", err) + } + if err := el2.Await(ctx); err != nil { + t.Fatalf("Await(20): %v", err) + } + + if err := el1.Close(ctx); err != nil { + t.Fatalf("Close(10): %v", err) + } + + if err := el2.Close(ctx); err != nil { + t.Fatalf("Close(20): %v", err) + } +} + +func TestElectionTwoServers(t *testing.T) { + db, err := initializeDB("sqllite3", "file::two-election?mode=memory") + if err != nil { + t.Fatalf("Unable to initialize database: %v", err) + } + + factory1, err := NewFactory("s1", db, WithElectionInterval(10*time.Millisecond)) + if err != nil { + t.Fatalf("Unable to open database: %v", err) + } + factory2, err := NewFactory("s2", db, WithElectionInterval(10*time.Millisecond)) + if err != nil { + t.Fatalf("Unable to open database: %v", err) + } + + ctx := context.Background() + el1, err := factory1.NewElection(ctx, "10") + if err != nil { + t.Fatalf("NewElection(10): %v", err) + } + el2, err := factory2.NewElection(ctx, "10") + if err != nil { + t.Fatalf("NewElection(10, again): %t %v", err, err) + } + + if err := el1.Await(ctx); err != nil { + t.Fatalf("Await(el1): %v", err) + } + go func() { + time.Sleep(4 * time.Millisecond) + err := el1.Resign(ctx) + if err != nil { + t.Log(err) + } + }() + if err := el2.Await(ctx); err != nil { + t.Fatalf("Await(el2): %v", err) + } + + if err := el1.Close(ctx); err != nil { + t.Fatalf("Close(el1): %v", err) + } + if err := el2.Close(ctx); err != nil { + t.Fatalf("Close(el2): %v", err) + } +} + +func TestElection(t *testing.T) { + for _, nt := range testonly.Tests { + // Create a new DB and Factory for each test for better isolation. + db, err := initializeDB("sqllite3", fmt.Sprintf("file::%s?mode=memory", nt.Name)) + if err != nil { + t.Fatalf("Initialize DB: %v", err) + } + + fact, err := NewFactory("testID", db, WithElectionInterval(10*time.Millisecond)) + if err != nil { + t.Fatalf("NewFactory: %v", err) + } + t.Run(nt.Name, func(t *testing.T) { + nt.Run(t, fact) + }) + } +} + +func initializeDB(driver string, uri string) (*sql.DB, error) { + db, err := sql.Open("sqlite3", uri) + if err != nil { + return nil, err + } + // Additional connections open a _new_, _empty_ database! + db.SetMaxOpenConns(1) + + tableDecl, err := os.ReadFile("election.sql") + if err != nil { + return nil, err + } + + _, err = db.Exec(string(tableDecl)) + if err != nil { + return nil, err + } + + return db, nil +}