From 53cbd81009a1b1bc1387f1a1f51ee8c09fcc732f Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 28 Jul 2023 11:56:46 +0200 Subject: [PATCH] Separate Writer interface from BatchTx interfaces Signed-off-by: Marek Siarkowicz --- etcdutl/snapshot/v3_snapshot.go | 6 ++--- server/auth/store.go | 34 +++++++++++++++--------- server/etcdserver/cindex/cindex.go | 8 +++--- server/storage/backend/batch_tx.go | 20 +++++++++----- server/storage/backend/hooks.go | 6 ++--- server/storage/backend/hooks_test.go | 2 +- server/storage/hooks.go | 2 +- server/storage/mvcc/store.go | 4 +-- server/storage/schema/actions.go | 12 ++++----- server/storage/schema/actions_test.go | 6 ++--- server/storage/schema/alarm.go | 4 +-- server/storage/schema/auth_roles.go | 2 +- server/storage/schema/auth_roles_test.go | 20 +++++++------- server/storage/schema/auth_users_test.go | 20 +++++++------- server/storage/schema/cindex.go | 8 +++--- server/storage/schema/confstate.go | 2 +- server/storage/schema/lease.go | 8 +++--- server/storage/schema/lease_test.go | 8 +++--- server/storage/schema/migration.go | 4 +-- server/storage/schema/migration_test.go | 2 +- server/storage/schema/schema.go | 2 +- server/storage/schema/schema_test.go | 16 +++++------ server/storage/schema/version.go | 4 +-- 23 files changed, 108 insertions(+), 92 deletions(-) diff --git a/etcdutl/snapshot/v3_snapshot.go b/etcdutl/snapshot/v3_snapshot.go index 60580006536..00969963d64 100644 --- a/etcdutl/snapshot/v3_snapshot.go +++ b/etcdutl/snapshot/v3_snapshot.go @@ -353,7 +353,7 @@ func (s *v3Manager) modifyLatestRevision(bumpAmount uint64) error { return nil } -func (s *v3Manager) unsafeBumpRevision(tx backend.BatchTx, latest revision, amount int64) revision { +func (s *v3Manager) unsafeBumpRevision(tx backend.UnsafeWriter, latest revision, amount int64) revision { s.lg.Info( "bumping latest revision", zap.Int64("latest-revision", latest.main), @@ -370,7 +370,7 @@ func (s *v3Manager) unsafeBumpRevision(tx backend.BatchTx, latest revision, amou return latest } -func (s *v3Manager) unsafeMarkRevisionCompacted(tx backend.BatchTx, latest revision) { +func (s *v3Manager) unsafeMarkRevisionCompacted(tx backend.UnsafeWriter, latest revision) { s.lg.Info( "marking revision compacted", zap.Int64("revision", latest.main), @@ -379,7 +379,7 @@ func (s *v3Manager) unsafeMarkRevisionCompacted(tx backend.BatchTx, latest revis mvcc.UnsafeSetScheduledCompact(tx, latest.main) } -func (s *v3Manager) unsafeGetLatestRevision(tx backend.BatchTx) (revision, error) { +func (s *v3Manager) unsafeGetLatestRevision(tx backend.UnsafeReader) (revision, error) { var latest revision err := tx.UnsafeForEach(schema.Key, func(k, _ []byte) (err error) { rev := bytesToRev(k) diff --git a/server/auth/store.go b/server/auth/store.go index ab091b91356..f272112884d 100644 --- a/server/auth/store.go +++ b/server/auth/store.go @@ -205,18 +205,6 @@ type AuthBackend interface { GetAllRoles() []*authpb.Role } -type AuthBatchTx interface { - Lock() - Unlock() - UnsafeAuthReader - UnsafeSaveAuthEnabled(enabled bool) - UnsafeSaveAuthRevision(rev uint64) - UnsafePutUser(*authpb.User) - UnsafeDeleteUser(string) - UnsafePutRole(*authpb.Role) - UnsafeDeleteRole(string) -} - type AuthReadTx interface { RLock() RUnlock() @@ -232,6 +220,26 @@ type UnsafeAuthReader interface { UnsafeGetAllRoles() []*authpb.Role } +type AuthBatchTx interface { + Lock() + Unlock() + UnsafeAuthReadWriter +} + +type UnsafeAuthReadWriter interface { + UnsafeAuthReader + UnsafeAuthWriter +} + +type UnsafeAuthWriter interface { + UnsafeSaveAuthEnabled(enabled bool) + UnsafeSaveAuthRevision(rev uint64) + UnsafePutUser(*authpb.User) + UnsafeDeleteUser(string) + UnsafePutRole(*authpb.Role) + UnsafeDeleteRole(string) +} + type authStore struct { // atomic operations; need 64-bit align, or 32-bit tests will crash revision uint64 @@ -990,7 +998,7 @@ func hasRootRole(u *authpb.User) bool { return idx != len(u.Roles) && u.Roles[idx] == rootRole } -func (as *authStore) commitRevision(tx AuthBatchTx) { +func (as *authStore) commitRevision(tx UnsafeAuthWriter) { atomic.AddUint64(&as.revision, 1) tx.UnsafeSaveAuthRevision(as.Revision()) } diff --git a/server/etcdserver/cindex/cindex.go b/server/etcdserver/cindex/cindex.go index 70646e19e8b..200008ff9b3 100644 --- a/server/etcdserver/cindex/cindex.go +++ b/server/etcdserver/cindex/cindex.go @@ -46,7 +46,7 @@ type ConsistentIndexer interface { // UnsafeSave must be called holding the lock on the tx. // It saves consistentIndex to the underlying stable storage. - UnsafeSave(tx backend.BatchTx) + UnsafeSave(tx backend.UnsafeReadWriter) // SetBackend set the available backend.BatchTx for ConsistentIndexer. SetBackend(be Backend) @@ -115,7 +115,7 @@ func (ci *consistentIndex) SetConsistentIndex(v uint64, term uint64) { atomic.StoreUint64(&ci.term, term) } -func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) { +func (ci *consistentIndex) UnsafeSave(tx backend.UnsafeReadWriter) { index := atomic.LoadUint64(&ci.consistentIndex) term := atomic.LoadUint64(&ci.term) schema.UnsafeUpdateConsistentIndex(tx, index, term) @@ -166,8 +166,8 @@ func (f *fakeConsistentIndex) SetConsistentApplyingIndex(index uint64, term uint atomic.StoreUint64(&f.term, term) } -func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {} -func (f *fakeConsistentIndex) SetBackend(_ Backend) {} +func (f *fakeConsistentIndex) UnsafeSave(_ backend.UnsafeReadWriter) {} +func (f *fakeConsistentIndex) SetBackend(_ Backend) {} func UpdateConsistentIndexForce(tx backend.BatchTx, index uint64, term uint64) { tx.LockOutsideApply() diff --git a/server/storage/backend/batch_tx.go b/server/storage/backend/batch_tx.go index 3f8c131ad78..60be4ce6d74 100644 --- a/server/storage/backend/batch_tx.go +++ b/server/storage/backend/batch_tx.go @@ -46,18 +46,26 @@ type Bucket interface { type BatchTx interface { Lock() Unlock() - UnsafeReader - UnsafeCreateBucket(bucket Bucket) - UnsafeDeleteBucket(bucket Bucket) - UnsafePut(bucket Bucket, key []byte, value []byte) - UnsafeSeqPut(bucket Bucket, key []byte, value []byte) - UnsafeDelete(bucket Bucket, key []byte) // Commit commits a previous tx and begins a new writable one. Commit() // CommitAndStop commits the previous tx and does not create a new one. CommitAndStop() LockInsideApply() LockOutsideApply() + UnsafeReadWriter +} + +type UnsafeReadWriter interface { + UnsafeReader + UnsafeWriter +} + +type UnsafeWriter interface { + UnsafeCreateBucket(bucket Bucket) + UnsafeDeleteBucket(bucket Bucket) + UnsafePut(bucket Bucket, key []byte, value []byte) + UnsafeSeqPut(bucket Bucket, key []byte, value []byte) + UnsafeDelete(bucket Bucket, key []byte) } type batchTx struct { diff --git a/server/storage/backend/hooks.go b/server/storage/backend/hooks.go index 9750828ef7b..817d0c5eb50 100644 --- a/server/storage/backend/hooks.go +++ b/server/storage/backend/hooks.go @@ -14,20 +14,20 @@ package backend -type HookFunc func(tx BatchTx) +type HookFunc func(tx UnsafeReadWriter) // Hooks allow to add additional logic executed during transaction lifetime. type Hooks interface { // OnPreCommitUnsafe is executed before Commit of transactions. // The given transaction is already locked. - OnPreCommitUnsafe(tx BatchTx) + OnPreCommitUnsafe(tx UnsafeReadWriter) } type hooks struct { onPreCommitUnsafe HookFunc } -func (h hooks) OnPreCommitUnsafe(tx BatchTx) { +func (h hooks) OnPreCommitUnsafe(tx UnsafeReadWriter) { h.onPreCommitUnsafe(tx) } diff --git a/server/storage/backend/hooks_test.go b/server/storage/backend/hooks_test.go index b77efbba492..afc4e883351 100644 --- a/server/storage/backend/hooks_test.go +++ b/server/storage/backend/hooks_test.go @@ -113,7 +113,7 @@ func prepareBuckenAndKey(tx backend.BatchTx) { func newTestHooksBackend(t testing.TB, baseConfig backend.BackendConfig) backend.Backend { cfg := baseConfig - cfg.Hooks = backend.NewHooks(func(tx backend.BatchTx) { + cfg.Hooks = backend.NewHooks(func(tx backend.UnsafeReadWriter) { k, v := tx.UnsafeRange(bucket, key, nil, 1) t.Logf("OnPreCommit executed: %v %v", string(k[0]), string(v[0])) assert.Len(t, k, 1) diff --git a/server/storage/hooks.go b/server/storage/hooks.go index cf09e06b3a6..ffec71c0bca 100644 --- a/server/storage/hooks.go +++ b/server/storage/hooks.go @@ -41,7 +41,7 @@ func NewBackendHooks(lg *zap.Logger, indexer cindex.ConsistentIndexer) *BackendH return &BackendHooks{lg: lg, indexer: indexer} } -func (bh *BackendHooks) OnPreCommitUnsafe(tx backend.BatchTx) { +func (bh *BackendHooks) OnPreCommitUnsafe(tx backend.UnsafeReadWriter) { bh.indexer.UnsafeSave(tx) bh.confStateLock.Lock() defer bh.confStateLock.Unlock() diff --git a/server/storage/mvcc/store.go b/server/storage/mvcc/store.go index 5bc6e6dca95..886375ec8c9 100644 --- a/server/storage/mvcc/store.go +++ b/server/storage/mvcc/store.go @@ -41,7 +41,7 @@ func SetScheduledCompact(tx backend.BatchTx, value int64) { UnsafeSetScheduledCompact(tx, value) } -func UnsafeSetScheduledCompact(tx backend.BatchTx, value int64) { +func UnsafeSetScheduledCompact(tx backend.UnsafeWriter, value int64) { rbytes := newRevBytes() revToBytes(revision{main: value}, rbytes) tx.UnsafePut(schema.Meta, schema.ScheduledCompactKeyName, rbytes) @@ -53,7 +53,7 @@ func SetFinishedCompact(tx backend.BatchTx, value int64) { UnsafeSetFinishedCompact(tx, value) } -func UnsafeSetFinishedCompact(tx backend.BatchTx, value int64) { +func UnsafeSetFinishedCompact(tx backend.UnsafeWriter, value int64) { rbytes := newRevBytes() revToBytes(revision{main: value}, rbytes) tx.UnsafePut(schema.Meta, schema.FinishedCompactKeyName, rbytes) diff --git a/server/storage/schema/actions.go b/server/storage/schema/actions.go index fb161560502..415ffcd792f 100644 --- a/server/storage/schema/actions.go +++ b/server/storage/schema/actions.go @@ -23,7 +23,7 @@ import ( type action interface { // unsafeDo executes the action and returns revert action, when executed // should restore the state from before. - unsafeDo(tx backend.BatchTx) (revert action, err error) + unsafeDo(tx backend.UnsafeReadWriter) (revert action, err error) } type setKeyAction struct { @@ -32,7 +32,7 @@ type setKeyAction struct { FieldValue []byte } -func (a setKeyAction) unsafeDo(tx backend.BatchTx) (action, error) { +func (a setKeyAction) unsafeDo(tx backend.UnsafeReadWriter) (action, error) { revert := restoreFieldValueAction(tx, a.Bucket, a.FieldName) tx.UnsafePut(a.Bucket, a.FieldName, a.FieldValue) return revert, nil @@ -43,13 +43,13 @@ type deleteKeyAction struct { FieldName []byte } -func (a deleteKeyAction) unsafeDo(tx backend.BatchTx) (action, error) { +func (a deleteKeyAction) unsafeDo(tx backend.UnsafeReadWriter) (action, error) { revert := restoreFieldValueAction(tx, a.Bucket, a.FieldName) tx.UnsafeDelete(a.Bucket, a.FieldName) return revert, nil } -func restoreFieldValueAction(tx backend.BatchTx, bucket backend.Bucket, fieldName []byte) action { +func restoreFieldValueAction(tx backend.UnsafeReader, bucket backend.Bucket, fieldName []byte) action { _, vs := tx.UnsafeRange(bucket, fieldName, nil, 1) if len(vs) == 1 { return &setKeyAction{ @@ -68,7 +68,7 @@ type ActionList []action // unsafeExecute executes actions one by one. If one of actions returns error, // it will revert them. -func (as ActionList) unsafeExecute(lg *zap.Logger, tx backend.BatchTx) error { +func (as ActionList) unsafeExecute(lg *zap.Logger, tx backend.UnsafeReadWriter) error { var revertActions = make(ActionList, 0, len(as)) for _, a := range as { revert, err := a.unsafeDo(tx) @@ -84,7 +84,7 @@ func (as ActionList) unsafeExecute(lg *zap.Logger, tx backend.BatchTx) error { // unsafeExecuteInReversedOrder executes actions in revered order. Will panic on // action error. Should be used when reverting. -func (as ActionList) unsafeExecuteInReversedOrder(lg *zap.Logger, tx backend.BatchTx) { +func (as ActionList) unsafeExecuteInReversedOrder(lg *zap.Logger, tx backend.UnsafeReadWriter) { for j := len(as) - 1; j >= 0; j-- { _, err := as[j].unsafeDo(tx) if err != nil { diff --git a/server/storage/schema/actions_test.go b/server/storage/schema/actions_test.go index 5a3ef28fd97..4e61a0dac9a 100644 --- a/server/storage/schema/actions_test.go +++ b/server/storage/schema/actions_test.go @@ -147,17 +147,17 @@ type brokenAction struct{} var errBrokenAction = fmt.Errorf("broken action error") -func (c brokenAction) unsafeDo(tx backend.BatchTx) (action, error) { +func (c brokenAction) unsafeDo(tx backend.UnsafeReadWriter) (action, error) { return nil, errBrokenAction } -func putKeyValues(tx backend.BatchTx, bucket backend.Bucket, kvs map[string]string) { +func putKeyValues(tx backend.UnsafeWriter, bucket backend.Bucket, kvs map[string]string) { for k, v := range kvs { tx.UnsafePut(bucket, []byte(k), []byte(v)) } } -func assertBucketState(t *testing.T, tx backend.BatchTx, bucket backend.Bucket, expect map[string]string) { +func assertBucketState(t *testing.T, tx backend.UnsafeReadWriter, bucket backend.Bucket, expect map[string]string) { t.Helper() got := map[string]string{} ks, vs := tx.UnsafeRange(bucket, []byte("\x00"), []byte("\xff"), 0) diff --git a/server/storage/schema/alarm.go b/server/storage/schema/alarm.go index 39d43fa2e13..6e81d0f4671 100644 --- a/server/storage/schema/alarm.go +++ b/server/storage/schema/alarm.go @@ -47,7 +47,7 @@ func (s *alarmBackend) MustPutAlarm(alarm *etcdserverpb.AlarmMember) { s.mustUnsafePutAlarm(tx, alarm) } -func (s *alarmBackend) mustUnsafePutAlarm(tx backend.BatchTx, alarm *etcdserverpb.AlarmMember) { +func (s *alarmBackend) mustUnsafePutAlarm(tx backend.UnsafeWriter, alarm *etcdserverpb.AlarmMember) { v, err := alarm.Marshal() if err != nil { s.lg.Panic("failed to marshal alarm member", zap.Error(err)) @@ -63,7 +63,7 @@ func (s *alarmBackend) MustDeleteAlarm(alarm *etcdserverpb.AlarmMember) { s.mustUnsafeDeleteAlarm(tx, alarm) } -func (s *alarmBackend) mustUnsafeDeleteAlarm(tx backend.BatchTx, alarm *etcdserverpb.AlarmMember) { +func (s *alarmBackend) mustUnsafeDeleteAlarm(tx backend.UnsafeWriter, alarm *etcdserverpb.AlarmMember) { v, err := alarm.Marshal() if err != nil { s.lg.Panic("failed to marshal alarm member", zap.Error(err)) diff --git a/server/storage/schema/auth_roles.go b/server/storage/schema/auth_roles.go index 58e6b834c2b..6161a0885a9 100644 --- a/server/storage/schema/auth_roles.go +++ b/server/storage/schema/auth_roles.go @@ -21,7 +21,7 @@ import ( "go.etcd.io/etcd/server/v3/storage/backend" ) -func UnsafeCreateAuthRolesBucket(tx backend.BatchTx) { +func UnsafeCreateAuthRolesBucket(tx backend.UnsafeWriter) { tx.UnsafeCreateBucket(AuthRoles) } diff --git a/server/storage/schema/auth_roles_test.go b/server/storage/schema/auth_roles_test.go index 31c3ff60500..d7323c6f4a6 100644 --- a/server/storage/schema/auth_roles_test.go +++ b/server/storage/schema/auth_roles_test.go @@ -30,17 +30,17 @@ import ( func TestGetAllRoles(t *testing.T) { tcs := []struct { name string - setup func(tx auth.AuthBatchTx) + setup func(tx auth.UnsafeAuthWriter) want []*authpb.Role }{ { name: "Empty by default", - setup: func(tx auth.AuthBatchTx) {}, + setup: func(tx auth.UnsafeAuthWriter) {}, want: nil, }, { name: "Returns data put before", - setup: func(tx auth.AuthBatchTx) { + setup: func(tx auth.UnsafeAuthWriter) { tx.UnsafePutRole(&authpb.Role{ Name: []byte("readKey"), KeyPermission: []*authpb.Permission{ @@ -67,7 +67,7 @@ func TestGetAllRoles(t *testing.T) { }, { name: "Skips deleted", - setup: func(tx auth.AuthBatchTx) { + setup: func(tx auth.UnsafeAuthWriter) { tx.UnsafePutRole(&authpb.Role{ Name: []byte("role1"), }) @@ -80,7 +80,7 @@ func TestGetAllRoles(t *testing.T) { }, { name: "Returns data overriden by put", - setup: func(tx auth.AuthBatchTx) { + setup: func(tx auth.UnsafeAuthWriter) { tx.UnsafePutRole(&authpb.Role{ Name: []byte("role1"), KeyPermission: []*authpb.Permission{ @@ -135,17 +135,17 @@ func TestGetAllRoles(t *testing.T) { func TestGetRole(t *testing.T) { tcs := []struct { name string - setup func(tx auth.AuthBatchTx) + setup func(tx auth.UnsafeAuthWriter) want *authpb.Role }{ { name: "Returns nil for missing", - setup: func(tx auth.AuthBatchTx) {}, + setup: func(tx auth.UnsafeAuthWriter) {}, want: nil, }, { name: "Returns data put before", - setup: func(tx auth.AuthBatchTx) { + setup: func(tx auth.UnsafeAuthWriter) { tx.UnsafePutRole(&authpb.Role{ Name: []byte("role1"), KeyPermission: []*authpb.Permission{ @@ -170,7 +170,7 @@ func TestGetRole(t *testing.T) { }, { name: "Return nil for deleted", - setup: func(tx auth.AuthBatchTx) { + setup: func(tx auth.UnsafeAuthWriter) { tx.UnsafePutRole(&authpb.Role{ Name: []byte("role1"), }) @@ -180,7 +180,7 @@ func TestGetRole(t *testing.T) { }, { name: "Returns data overriden by put", - setup: func(tx auth.AuthBatchTx) { + setup: func(tx auth.UnsafeAuthWriter) { tx.UnsafePutRole(&authpb.Role{ Name: []byte("role1"), KeyPermission: []*authpb.Permission{ diff --git a/server/storage/schema/auth_users_test.go b/server/storage/schema/auth_users_test.go index ed0d0f5b801..2261e57071b 100644 --- a/server/storage/schema/auth_users_test.go +++ b/server/storage/schema/auth_users_test.go @@ -30,17 +30,17 @@ import ( func TestGetAllUsers(t *testing.T) { tcs := []struct { name string - setup func(tx auth.AuthBatchTx) + setup func(tx auth.UnsafeAuthWriter) want []*authpb.User }{ { name: "Empty by default", - setup: func(tx auth.AuthBatchTx) {}, + setup: func(tx auth.UnsafeAuthWriter) {}, want: nil, }, { name: "Returns user put before", - setup: func(tx auth.AuthBatchTx) { + setup: func(tx auth.UnsafeAuthWriter) { tx.UnsafePutUser(&authpb.User{ Name: []byte("alice"), Password: []byte("alicePassword"), @@ -63,7 +63,7 @@ func TestGetAllUsers(t *testing.T) { }, { name: "Skips deleted user", - setup: func(tx auth.AuthBatchTx) { + setup: func(tx auth.UnsafeAuthWriter) { tx.UnsafePutUser(&authpb.User{ Name: []byte("alice"), }) @@ -76,7 +76,7 @@ func TestGetAllUsers(t *testing.T) { }, { name: "Returns data overriden by put", - setup: func(tx auth.AuthBatchTx) { + setup: func(tx auth.UnsafeAuthWriter) { tx.UnsafePutUser(&authpb.User{ Name: []byte("alice"), Password: []byte("oldPassword"), @@ -123,17 +123,17 @@ func TestGetAllUsers(t *testing.T) { func TestGetUser(t *testing.T) { tcs := []struct { name string - setup func(tx auth.AuthBatchTx) + setup func(tx auth.UnsafeAuthWriter) want *authpb.User }{ { name: "Returns nil for missing user", - setup: func(tx auth.AuthBatchTx) {}, + setup: func(tx auth.UnsafeAuthWriter) {}, want: nil, }, { name: "Returns data put before", - setup: func(tx auth.AuthBatchTx) { + setup: func(tx auth.UnsafeAuthWriter) { tx.UnsafePutUser(&authpb.User{ Name: []byte("alice"), Password: []byte("alicePassword"), @@ -154,7 +154,7 @@ func TestGetUser(t *testing.T) { }, { name: "Skips deleted", - setup: func(tx auth.AuthBatchTx) { + setup: func(tx auth.UnsafeAuthWriter) { tx.UnsafePutUser(&authpb.User{ Name: []byte("alice"), }) @@ -164,7 +164,7 @@ func TestGetUser(t *testing.T) { }, { name: "Returns data overriden by put", - setup: func(tx auth.AuthBatchTx) { + setup: func(tx auth.UnsafeAuthWriter) { tx.UnsafePutUser(&authpb.User{ Name: []byte("alice"), Password: []byte("oldPassword"), diff --git a/server/storage/schema/cindex.go b/server/storage/schema/cindex.go index b4588654426..cdf938d346c 100644 --- a/server/storage/schema/cindex.go +++ b/server/storage/schema/cindex.go @@ -23,7 +23,7 @@ import ( ) // UnsafeCreateMetaBucket creates the `meta` bucket (if it does not exist yet). -func UnsafeCreateMetaBucket(tx backend.BatchTx) { +func UnsafeCreateMetaBucket(tx backend.UnsafeWriter) { tx.UnsafeCreateBucket(Meta) } @@ -59,15 +59,15 @@ func ReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) { return UnsafeReadConsistentIndex(tx) } -func UnsafeUpdateConsistentIndexForce(tx backend.BatchTx, index uint64, term uint64) { +func UnsafeUpdateConsistentIndexForce(tx backend.UnsafeReadWriter, index uint64, term uint64) { unsafeUpdateConsistentIndex(tx, index, term, true) } -func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) { +func UnsafeUpdateConsistentIndex(tx backend.UnsafeReadWriter, index uint64, term uint64) { unsafeUpdateConsistentIndex(tx, index, term, false) } -func unsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, allowDecreasing bool) { +func unsafeUpdateConsistentIndex(tx backend.UnsafeReadWriter, index uint64, term uint64, allowDecreasing bool) { if index == 0 { // Never save 0 as it means that we didn't load the real index yet. return diff --git a/server/storage/schema/confstate.go b/server/storage/schema/confstate.go index bec01ff70d3..ac3a2cc1485 100644 --- a/server/storage/schema/confstate.go +++ b/server/storage/schema/confstate.go @@ -26,7 +26,7 @@ import ( // MustUnsafeSaveConfStateToBackend persists confState using given transaction (tx). // confState in backend is persisted since etcd v3.5. -func MustUnsafeSaveConfStateToBackend(lg *zap.Logger, tx backend.BatchTx, confState *raftpb.ConfState) { +func MustUnsafeSaveConfStateToBackend(lg *zap.Logger, tx backend.UnsafeWriter, confState *raftpb.ConfState) { confStateBytes, err := json.Marshal(confState) if err != nil { lg.Panic("Cannot marshal raftpb.ConfState", zap.Stringer("conf-state", confState), zap.Error(err)) diff --git a/server/storage/schema/lease.go b/server/storage/schema/lease.go index bb68f05eba9..3ffeec64015 100644 --- a/server/storage/schema/lease.go +++ b/server/storage/schema/lease.go @@ -22,7 +22,7 @@ import ( "go.etcd.io/etcd/server/v3/storage/backend" ) -func UnsafeCreateLeaseBucket(tx backend.BatchTx) { +func UnsafeCreateLeaseBucket(tx backend.UnsafeWriter) { tx.UnsafeCreateBucket(Lease) } @@ -43,7 +43,7 @@ func MustUnsafeGetAllLeases(tx backend.UnsafeReader) []*leasepb.Lease { return ls } -func MustUnsafePutLease(tx backend.BatchTx, lpb *leasepb.Lease) { +func MustUnsafePutLease(tx backend.UnsafeWriter, lpb *leasepb.Lease) { key := leaseIdToBytes(lpb.ID) val, err := lpb.Marshal() @@ -53,11 +53,11 @@ func MustUnsafePutLease(tx backend.BatchTx, lpb *leasepb.Lease) { tx.UnsafePut(Lease, key, val) } -func UnsafeDeleteLease(tx backend.BatchTx, lpb *leasepb.Lease) { +func UnsafeDeleteLease(tx backend.UnsafeWriter, lpb *leasepb.Lease) { tx.UnsafeDelete(Lease, leaseIdToBytes(lpb.ID)) } -func MustUnsafeGetLease(tx backend.BatchTx, leaseID int64) *leasepb.Lease { +func MustUnsafeGetLease(tx backend.UnsafeReader, leaseID int64) *leasepb.Lease { _, vs := tx.UnsafeRange(Lease, leaseIdToBytes(leaseID), nil, 0) if len(vs) != 1 { return nil diff --git a/server/storage/schema/lease_test.go b/server/storage/schema/lease_test.go index a06b1769e91..88a1cd7e063 100644 --- a/server/storage/schema/lease_test.go +++ b/server/storage/schema/lease_test.go @@ -30,17 +30,17 @@ import ( func TestLeaseBackend(t *testing.T) { tcs := []struct { name string - setup func(tx backend.BatchTx) + setup func(tx backend.UnsafeWriter) want []*leasepb.Lease }{ { name: "Empty by default", - setup: func(tx backend.BatchTx) {}, + setup: func(tx backend.UnsafeWriter) {}, want: []*leasepb.Lease{}, }, { name: "Returns data put before", - setup: func(tx backend.BatchTx) { + setup: func(tx backend.UnsafeWriter) { MustUnsafePutLease(tx, &leasepb.Lease{ ID: -1, TTL: 2, @@ -55,7 +55,7 @@ func TestLeaseBackend(t *testing.T) { }, { name: "Skips deleted", - setup: func(tx backend.BatchTx) { + setup: func(tx backend.UnsafeWriter) { MustUnsafePutLease(tx, &leasepb.Lease{ ID: -1, TTL: 2, diff --git a/server/storage/schema/migration.go b/server/storage/schema/migration.go index 08062834a68..e62acc92c38 100644 --- a/server/storage/schema/migration.go +++ b/server/storage/schema/migration.go @@ -56,7 +56,7 @@ func (p migrationPlan) Execute(lg *zap.Logger, tx backend.BatchTx) error { return p.unsafeExecute(lg, tx) } -func (p migrationPlan) unsafeExecute(lg *zap.Logger, tx backend.BatchTx) (err error) { +func (p migrationPlan) unsafeExecute(lg *zap.Logger, tx backend.UnsafeReadWriter) (err error) { for _, s := range p { err = s.unsafeExecute(lg, tx) if err != nil { @@ -98,7 +98,7 @@ func (s migrationStep) execute(lg *zap.Logger, tx backend.BatchTx) error { } // unsafeExecute is non thread-safe version of execute. -func (s migrationStep) unsafeExecute(lg *zap.Logger, tx backend.BatchTx) error { +func (s migrationStep) unsafeExecute(lg *zap.Logger, tx backend.UnsafeReadWriter) error { err := s.actions.unsafeExecute(lg, tx) if err != nil { return err diff --git a/server/storage/schema/migration_test.go b/server/storage/schema/migration_test.go index 128def4fc14..fccf2029689 100644 --- a/server/storage/schema/migration_test.go +++ b/server/storage/schema/migration_test.go @@ -221,7 +221,7 @@ type actionMock struct { err error } -func (a actionMock) unsafeDo(tx backend.BatchTx) (action, error) { +func (a actionMock) unsafeDo(tx backend.UnsafeReadWriter) (action, error) { a.recorder.actions = append(a.recorder.actions, a.name) return actionMock{ recorder: a.recorder, diff --git a/server/storage/schema/schema.go b/server/storage/schema/schema.go index 4cdce54e0b3..890810511ae 100644 --- a/server/storage/schema/schema.go +++ b/server/storage/schema/schema.go @@ -62,7 +62,7 @@ func Migrate(lg *zap.Logger, tx backend.BatchTx, w WALVersion, target semver.Ver } // UnsafeMigrate is non thread-safe version of Migrate. -func UnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, w WALVersion, target semver.Version) error { +func UnsafeMigrate(lg *zap.Logger, tx backend.UnsafeReadWriter, w WALVersion, target semver.Version) error { current, err := UnsafeDetectSchemaVersion(lg, tx) if err != nil { return fmt.Errorf("cannot detect storage schema version: %v", err) diff --git a/server/storage/schema/schema_test.go b/server/storage/schema/schema_test.go index 6b0024ba747..3d9bd1065c3 100644 --- a/server/storage/schema/schema_test.go +++ b/server/storage/schema/schema_test.go @@ -37,7 +37,7 @@ func TestValidate(t *testing.T) { name string version semver.Version // Overrides which keys should be set (default based on version) - overrideKeys func(tx backend.BatchTx) + overrideKeys func(tx backend.UnsafeReadWriter) expectError bool expectErrorMsg string }{ @@ -50,19 +50,19 @@ func TestValidate(t *testing.T) { { name: `V3.5 schema without confstate and term fields is correct`, version: version.V3_5, - overrideKeys: func(tx backend.BatchTx) {}, + overrideKeys: func(tx backend.UnsafeReadWriter) {}, }, { name: `V3.5 schema without term field is correct`, version: version.V3_5, - overrideKeys: func(tx backend.BatchTx) { + overrideKeys: func(tx backend.UnsafeReadWriter) { MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{}) }, }, { name: `V3.5 schema with all fields is correct`, version: version.V3_5, - overrideKeys: func(tx backend.BatchTx) { + overrideKeys: func(tx backend.UnsafeReadWriter) { MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{}) UnsafeUpdateConsistentIndex(tx, 1, 1) }, @@ -101,7 +101,7 @@ func TestMigrate(t *testing.T) { name string version semver.Version // Overrides which keys should be set (default based on version) - overrideKeys func(tx backend.BatchTx) + overrideKeys func(tx backend.UnsafeReadWriter) targetVersion semver.Version walEntries []etcdserverpb.InternalRaftRequest @@ -114,7 +114,7 @@ func TestMigrate(t *testing.T) { { name: `Upgrading v3.5 to v3.6 should be rejected if confstate is not set`, version: version.V3_5, - overrideKeys: func(tx backend.BatchTx) {}, + overrideKeys: func(tx backend.UnsafeReadWriter) {}, targetVersion: version.V3_6, expectVersion: nil, expectError: true, @@ -123,7 +123,7 @@ func TestMigrate(t *testing.T) { { name: `Upgrading v3.5 to v3.6 should be rejected if term is not set`, version: version.V3_5, - overrideKeys: func(tx backend.BatchTx) { + overrideKeys: func(tx backend.UnsafeReadWriter) { MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{}) }, targetVersion: version.V3_6, @@ -294,7 +294,7 @@ func TestMigrateIsReversible(t *testing.T) { } } -func setupBackendData(t *testing.T, ver semver.Version, overrideKeys func(tx backend.BatchTx)) string { +func setupBackendData(t *testing.T, ver semver.Version, overrideKeys func(tx backend.UnsafeReadWriter)) string { t.Helper() be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10) tx := be.BatchTx() diff --git a/server/storage/schema/version.go b/server/storage/schema/version.go index 2c2034fda56..83984e6b5ff 100644 --- a/server/storage/schema/version.go +++ b/server/storage/schema/version.go @@ -56,12 +56,12 @@ func ReadStorageVersionFromSnapshot(tx *bbolt.Tx) *semver.Version { // UnsafeSetStorageVersion updates etcd storage version in backend. // Populated since v3.6 -func UnsafeSetStorageVersion(tx backend.BatchTx, v *semver.Version) { +func UnsafeSetStorageVersion(tx backend.UnsafeWriter, v *semver.Version) { sv := semver.Version{Major: v.Major, Minor: v.Minor} tx.UnsafePut(Meta, MetaStorageVersionName, []byte(sv.String())) } // UnsafeClearStorageVersion removes etcd storage version in backend. -func UnsafeClearStorageVersion(tx backend.BatchTx) { +func UnsafeClearStorageVersion(tx backend.UnsafeWriter) { tx.UnsafeDelete(Meta, MetaStorageVersionName) }