diff --git a/server/auth/range_perm_cache.go b/server/auth/range_perm_cache.go index 2c9ab9a80f3..e791a241e2b 100644 --- a/server/auth/range_perm_cache.go +++ b/server/auth/range_perm_cache.go @@ -21,7 +21,7 @@ import ( "go.etcd.io/etcd/pkg/v3/adt" ) -func getMergedPerms(tx AuthReadTx, userName string) *unifiedRangePermissions { +func getMergedPerms(tx UnsafeAuthReader, userName string) *unifiedRangePermissions { user := tx.UnsafeGetUser(userName) if user == nil { return nil @@ -127,7 +127,7 @@ func (as *authStore) isRangeOpPermitted(userName string, key, rangeEnd []byte, p return checkKeyInterval(as.lg, rangePerm, key, rangeEnd, permtyp) } -func (as *authStore) refreshRangePermCache(tx AuthReadTx) { +func (as *authStore) refreshRangePermCache(tx UnsafeAuthReader) { // Note that every authentication configuration update calls this method and it invalidates the entire // rangePermCache and reconstruct it based on information of users and roles stored in the backend. // This can be a costly operation. diff --git a/server/auth/store.go b/server/auth/store.go index f9d60f1416e..ab091b91356 100644 --- a/server/auth/store.go +++ b/server/auth/store.go @@ -206,7 +206,9 @@ type AuthBackend interface { } type AuthBatchTx interface { - AuthReadTx + Lock() + Unlock() + UnsafeAuthReader UnsafeSaveAuthEnabled(enabled bool) UnsafeSaveAuthRevision(rev uint64) UnsafePutUser(*authpb.User) @@ -216,14 +218,18 @@ type AuthBatchTx interface { } type AuthReadTx interface { + RLock() + RUnlock() + UnsafeAuthReader +} + +type UnsafeAuthReader interface { UnsafeReadAuthEnabled() bool UnsafeReadAuthRevision() uint64 UnsafeGetUser(string) *authpb.User UnsafeGetRole(string) *authpb.Role UnsafeGetAllUsers() []*authpb.User UnsafeGetAllRoles() []*authpb.Role - Lock() - Unlock() } type authStore struct { @@ -354,8 +360,8 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) { // to avoid putting it in the critical section of the tx lock. revision, err := func() (uint64, error) { tx := as.be.ReadTx() - tx.Lock() - defer tx.Unlock() + tx.RLock() + defer tx.RUnlock() user = tx.UnsafeGetUser(username) if user == nil { @@ -382,13 +388,13 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) { func (as *authStore) Recover(be AuthBackend) { as.be = be tx := be.ReadTx() - tx.Lock() + tx.RLock() enabled := tx.UnsafeReadAuthEnabled() as.setRevision(tx.UnsafeReadAuthRevision()) as.refreshRangePermCache(tx) - tx.Unlock() + tx.RUnlock() as.enabledMu.Lock() as.enabled = enabled @@ -864,8 +870,8 @@ func (as *authStore) isOpPermitted(userName string, revision uint64, key, rangeE } tx := as.be.ReadTx() - tx.Lock() - defer tx.Unlock() + tx.RLock() + defer tx.RUnlock() user := tx.UnsafeGetUser(userName) if user == nil { @@ -906,8 +912,8 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error { } tx := as.be.ReadTx() - tx.Lock() - defer tx.Unlock() + tx.RLock() + defer tx.RUnlock() u := tx.UnsafeGetUser(authInfo.Username) if u == nil { diff --git a/server/auth/store_mock_test.go b/server/auth/store_mock_test.go index a8d7fcf81ac..764100fa3cb 100644 --- a/server/auth/store_mock_test.go +++ b/server/auth/store_mock_test.go @@ -106,6 +106,12 @@ func (t txMock) Lock() { func (t txMock) Unlock() { } +func (t txMock) RLock() { +} + +func (t txMock) RUnlock() { +} + func (t txMock) UnsafeSaveAuthEnabled(enabled bool) { t.be.enabled = enabled } diff --git a/server/storage/backend/batch_tx.go b/server/storage/backend/batch_tx.go index 2fdba37bcf8..3f8c131ad78 100644 --- a/server/storage/backend/batch_tx.go +++ b/server/storage/backend/batch_tx.go @@ -44,9 +44,9 @@ type Bucket interface { } type BatchTx interface { - ReadTx Lock() Unlock() + UnsafeReader UnsafeCreateBucket(bucket Bucket) UnsafeDeleteBucket(bucket Bucket) UnsafePut(bucket Bucket, key []byte, value []byte) @@ -103,18 +103,6 @@ func (t *batchTx) Unlock() { t.Mutex.Unlock() } -// BatchTx interface embeds ReadTx interface. But RLock() and RUnlock() do not -// have appropriate semantics in BatchTx interface. Therefore should not be called. -// TODO: might want to decouple ReadTx and BatchTx - -func (t *batchTx) RLock() { - panic("unexpected RLock") -} - -func (t *batchTx) RUnlock() { - panic("unexpected RUnlock") -} - func (t *batchTx) UnsafeCreateBucket(bucket Bucket) { _, err := t.tx.CreateBucket(bucket.Name()) if err != nil && err != bolt.ErrBucketExists { diff --git a/server/storage/backend/read_tx.go b/server/storage/backend/read_tx.go index 8b4e48e83c2..4ca2621411c 100644 --- a/server/storage/backend/read_tx.go +++ b/server/storage/backend/read_tx.go @@ -28,7 +28,10 @@ import ( type ReadTx interface { RLock() RUnlock() + UnsafeReader +} +type UnsafeReader interface { UnsafeRange(bucket Bucket, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error } diff --git a/server/storage/mvcc/hash.go b/server/storage/mvcc/hash.go index 385d0c97966..cf9ffe3c085 100644 --- a/server/storage/mvcc/hash.go +++ b/server/storage/mvcc/hash.go @@ -30,7 +30,7 @@ const ( hashStorageMaxSize = 10 ) -func unsafeHashByRev(tx backend.ReadTx, compactRevision, revision int64, keep map[revision]struct{}) (KeyValueHash, error) { +func unsafeHashByRev(tx backend.UnsafeReader, compactRevision, revision int64, keep map[revision]struct{}) (KeyValueHash, error) { h := newKVHasher(compactRevision, revision, keep) err := tx.UnsafeForEach(schema.Key, func(k, v []byte) error { h.WriteKeyValue(k, v) diff --git a/server/storage/mvcc/kvstore_txn.go b/server/storage/mvcc/kvstore_txn.go index b93fcbe64da..d35d7fbaed0 100644 --- a/server/storage/mvcc/kvstore_txn.go +++ b/server/storage/mvcc/kvstore_txn.go @@ -28,8 +28,13 @@ import ( ) type storeTxnRead struct { - s *store + storeTxnCommon tx backend.ReadTx +} + +type storeTxnCommon struct { + s *store + tx backend.UnsafeReader firstRev int64 rev int64 @@ -54,17 +59,17 @@ func (s *store) Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead { tx.RLock() // RLock is no-op. concurrentReadTx does not need to be locked after it is created. firstRev, rev := s.compactMainRev, s.currentRev s.revMu.RUnlock() - return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev, trace}) + return newMetricsTxnRead(&storeTxnRead{storeTxnCommon{s, tx, firstRev, rev, trace}, tx}) } -func (tr *storeTxnRead) FirstRev() int64 { return tr.firstRev } -func (tr *storeTxnRead) Rev() int64 { return tr.rev } +func (tr *storeTxnCommon) FirstRev() int64 { return tr.firstRev } +func (tr *storeTxnCommon) Rev() int64 { return tr.rev } -func (tr *storeTxnRead) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) { +func (tr *storeTxnCommon) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) { return tr.rangeKeys(ctx, key, end, tr.Rev(), ro) } -func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) { +func (tr *storeTxnCommon) rangeKeys(ctx context.Context, key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) { rev := ro.Rev if rev > curRev { return &RangeResult{KVs: nil, Count: -1, Rev: curRev}, ErrFutureRev @@ -132,7 +137,7 @@ func (tr *storeTxnRead) End() { } type storeTxnWrite struct { - storeTxnRead + storeTxnCommon tx backend.BatchTx // beginRev is the revision where the txn begins; it will write to the next revision. beginRev int64 @@ -144,10 +149,10 @@ func (s *store) Write(trace *traceutil.Trace) TxnWrite { tx := s.b.BatchTx() tx.LockInsideApply() tw := &storeTxnWrite{ - storeTxnRead: storeTxnRead{s, tx, 0, 0, trace}, - tx: tx, - beginRev: s.currentRev, - changes: make([]mvccpb.KeyValue, 0, 4), + storeTxnCommon: storeTxnCommon{s, tx, 0, 0, trace}, + tx: tx, + beginRev: s.currentRev, + changes: make([]mvccpb.KeyValue, 0, 4), } return newMetricsTxnWrite(tw) } @@ -217,7 +222,7 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) { d, err := kv.Marshal() if err != nil { - tw.storeTxnRead.s.lg.Fatal( + tw.storeTxnCommon.s.lg.Fatal( "failed to marshal mvccpb.KeyValue", zap.Error(err), ) @@ -240,7 +245,7 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) { } err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}}) if err != nil { - tw.storeTxnRead.s.lg.Error( + tw.storeTxnCommon.s.lg.Error( "failed to detach old lease from a key", zap.Error(err), ) @@ -278,13 +283,13 @@ func (tw *storeTxnWrite) delete(key []byte) { idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))} revToBytes(idxRev, ibytes) - ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes) + ibytes = appendMarkTombstone(tw.storeTxnCommon.s.lg, ibytes) kv := mvccpb.KeyValue{Key: key} d, err := kv.Marshal() if err != nil { - tw.storeTxnRead.s.lg.Fatal( + tw.storeTxnCommon.s.lg.Fatal( "failed to marshal mvccpb.KeyValue", zap.Error(err), ) @@ -293,7 +298,7 @@ func (tw *storeTxnWrite) delete(key []byte) { tw.tx.UnsafeSeqPut(schema.Key, ibytes, d) err = tw.s.kvindex.Tombstone(key, idxRev) if err != nil { - tw.storeTxnRead.s.lg.Fatal( + tw.storeTxnCommon.s.lg.Fatal( "failed to tombstone an existing key", zap.String("key", string(key)), zap.Error(err), @@ -307,7 +312,7 @@ func (tw *storeTxnWrite) delete(key []byte) { if leaseID != lease.NoLease { err = tw.s.le.Detach(leaseID, []lease.LeaseItem{item}) if err != nil { - tw.storeTxnRead.s.lg.Error( + tw.storeTxnCommon.s.lg.Error( "failed to detach old lease from a key", zap.Error(err), ) diff --git a/server/storage/mvcc/store.go b/server/storage/mvcc/store.go index a002ada7177..5bc6e6dca95 100644 --- a/server/storage/mvcc/store.go +++ b/server/storage/mvcc/store.go @@ -19,7 +19,7 @@ import ( "go.etcd.io/etcd/server/v3/storage/schema" ) -func UnsafeReadFinishedCompact(tx backend.ReadTx) (finishedComact int64, found bool) { +func UnsafeReadFinishedCompact(tx backend.UnsafeReader) (finishedComact int64, found bool) { _, finishedCompactBytes := tx.UnsafeRange(schema.Meta, schema.FinishedCompactKeyName, nil, 0) if len(finishedCompactBytes) != 0 { return bytesToRev(finishedCompactBytes[0]).main, true @@ -27,7 +27,7 @@ func UnsafeReadFinishedCompact(tx backend.ReadTx) (finishedComact int64, found b return 0, false } -func UnsafeReadScheduledCompact(tx backend.ReadTx) (scheduledComact int64, found bool) { +func UnsafeReadScheduledCompact(tx backend.UnsafeReader) (scheduledComact int64, found bool) { _, scheduledCompactBytes := tx.UnsafeRange(schema.Meta, schema.ScheduledCompactKeyName, nil, 0) if len(scheduledCompactBytes) != 0 { return bytesToRev(scheduledCompactBytes[0]).main, true diff --git a/server/storage/schema/alarm.go b/server/storage/schema/alarm.go index 66468a57e30..39d43fa2e13 100644 --- a/server/storage/schema/alarm.go +++ b/server/storage/schema/alarm.go @@ -79,7 +79,7 @@ func (s *alarmBackend) GetAllAlarms() ([]*etcdserverpb.AlarmMember, error) { return s.unsafeGetAllAlarms(tx) } -func (s *alarmBackend) unsafeGetAllAlarms(tx backend.ReadTx) ([]*etcdserverpb.AlarmMember, error) { +func (s *alarmBackend) unsafeGetAllAlarms(tx backend.UnsafeReader) ([]*etcdserverpb.AlarmMember, error) { var ms []*etcdserverpb.AlarmMember err := tx.UnsafeForEach(Alarm, func(k, v []byte) error { var m etcdserverpb.AlarmMember diff --git a/server/storage/schema/auth.go b/server/storage/schema/auth.go index 2375f066654..3bff8412c9f 100644 --- a/server/storage/schema/auth.go +++ b/server/storage/schema/auth.go @@ -96,13 +96,11 @@ func (atx *authBatchTx) UnsafeSaveAuthRevision(rev uint64) { } func (atx *authBatchTx) UnsafeReadAuthEnabled() bool { - arx := &authReadTx{tx: atx.tx, lg: atx.lg} - return arx.UnsafeReadAuthEnabled() + return unsafeReadAuthEnabled(atx.tx) } func (atx *authBatchTx) UnsafeReadAuthRevision() uint64 { - arx := &authReadTx{tx: atx.tx, lg: atx.lg} - return arx.UnsafeReadAuthRevision() + return unsafeReadAuthRevision(atx.tx) } func (atx *authBatchTx) Lock() { @@ -117,7 +115,11 @@ func (atx *authBatchTx) Unlock() { } func (atx *authReadTx) UnsafeReadAuthEnabled() bool { - _, vs := atx.tx.UnsafeRange(Auth, AuthEnabledKeyName, nil, 0) + return unsafeReadAuthEnabled(atx.tx) +} + +func unsafeReadAuthEnabled(tx backend.UnsafeReader) bool { + _, vs := tx.UnsafeRange(Auth, AuthEnabledKeyName, nil, 0) if len(vs) == 1 { if bytes.Equal(vs[0], authEnabled) { return true @@ -127,7 +129,11 @@ func (atx *authReadTx) UnsafeReadAuthEnabled() bool { } func (atx *authReadTx) UnsafeReadAuthRevision() uint64 { - _, vs := atx.tx.UnsafeRange(Auth, AuthRevisionKeyName, nil, 0) + return unsafeReadAuthRevision(atx.tx) +} + +func unsafeReadAuthRevision(tx backend.UnsafeReader) uint64 { + _, vs := tx.UnsafeRange(Auth, AuthRevisionKeyName, nil, 0) if len(vs) != 1 { // this can happen in the initialization phase return 0 @@ -135,10 +141,10 @@ func (atx *authReadTx) UnsafeReadAuthRevision() uint64 { return binary.BigEndian.Uint64(vs[0]) } -func (atx *authReadTx) Lock() { +func (atx *authReadTx) RLock() { atx.tx.RLock() } -func (atx *authReadTx) Unlock() { +func (atx *authReadTx) RUnlock() { atx.tx.RUnlock() } diff --git a/server/storage/schema/auth_roles.go b/server/storage/schema/auth_roles.go index e0fea6d4f3f..58e6b834c2b 100644 --- a/server/storage/schema/auth_roles.go +++ b/server/storage/schema/auth_roles.go @@ -26,15 +26,14 @@ func UnsafeCreateAuthRolesBucket(tx backend.BatchTx) { } func (abe *authBackend) GetRole(roleName string) *authpb.Role { - tx := abe.BatchTx() - tx.Lock() - defer tx.Unlock() + tx := abe.ReadTx() + tx.RLock() + defer tx.RUnlock() return tx.UnsafeGetRole(roleName) } func (atx *authBatchTx) UnsafeGetRole(roleName string) *authpb.Role { - arx := &authReadTx{tx: atx.tx, lg: atx.lg} - return arx.UnsafeGetRole(roleName) + return unsafeGetRole(atx.lg, atx.tx, roleName) } func (abe *authBackend) GetAllRoles() []*authpb.Role { @@ -45,8 +44,7 @@ func (abe *authBackend) GetAllRoles() []*authpb.Role { } func (atx *authBatchTx) UnsafeGetAllRoles() []*authpb.Role { - arx := &authReadTx{tx: atx.tx, lg: atx.lg} - return arx.UnsafeGetAllRoles() + return unsafeGetAllRoles(atx.lg, atx.tx) } func (atx *authBatchTx) UnsafePutRole(role *authpb.Role) { @@ -67,7 +65,11 @@ func (atx *authBatchTx) UnsafeDeleteRole(rolename string) { } func (atx *authReadTx) UnsafeGetRole(roleName string) *authpb.Role { - _, vs := atx.tx.UnsafeRange(AuthRoles, []byte(roleName), nil, 0) + return unsafeGetRole(atx.lg, atx.tx, roleName) +} + +func unsafeGetRole(lg *zap.Logger, tx backend.UnsafeReader, roleName string) *authpb.Role { + _, vs := tx.UnsafeRange(AuthRoles, []byte(roleName), nil, 0) if len(vs) == 0 { return nil } @@ -75,13 +77,17 @@ func (atx *authReadTx) UnsafeGetRole(roleName string) *authpb.Role { role := &authpb.Role{} err := role.Unmarshal(vs[0]) if err != nil { - atx.lg.Panic("failed to unmarshal 'authpb.Role'", zap.Error(err)) + lg.Panic("failed to unmarshal 'authpb.Role'", zap.Error(err)) } return role } func (atx *authReadTx) UnsafeGetAllRoles() []*authpb.Role { - _, vs := atx.tx.UnsafeRange(AuthRoles, []byte{0}, []byte{0xff}, -1) + return unsafeGetAllRoles(atx.lg, atx.tx) +} + +func unsafeGetAllRoles(lg *zap.Logger, tx backend.UnsafeReader) []*authpb.Role { + _, vs := tx.UnsafeRange(AuthRoles, []byte{0}, []byte{0xff}, -1) if len(vs) == 0 { return nil } @@ -91,7 +97,7 @@ func (atx *authReadTx) UnsafeGetAllRoles() []*authpb.Role { role := &authpb.Role{} err := role.Unmarshal(vs[i]) if err != nil { - atx.lg.Panic("failed to unmarshal 'authpb.Role'", zap.Error(err)) + lg.Panic("failed to unmarshal 'authpb.Role'", zap.Error(err)) } roles[i] = role } diff --git a/server/storage/schema/auth_users.go b/server/storage/schema/auth_users.go index 762eaf30179..6ea9dd98a41 100644 --- a/server/storage/schema/auth_users.go +++ b/server/storage/schema/auth_users.go @@ -17,24 +17,24 @@ package schema import ( "go.uber.org/zap" + "go.etcd.io/etcd/server/v3/storage/backend" + "go.etcd.io/etcd/api/v3/authpb" ) func (abe *authBackend) GetUser(username string) *authpb.User { - tx := abe.BatchTx() - tx.Lock() - defer tx.Unlock() + tx := abe.ReadTx() + tx.RLock() + defer tx.RUnlock() return tx.UnsafeGetUser(username) } func (atx *authBatchTx) UnsafeGetUser(username string) *authpb.User { - arx := &authReadTx{tx: atx.tx, lg: atx.lg} - return arx.UnsafeGetUser(username) + return unsafeGetUser(atx.lg, atx.tx, username) } func (atx *authBatchTx) UnsafeGetAllUsers() []*authpb.User { - arx := &authReadTx{tx: atx.tx, lg: atx.lg} - return arx.UnsafeGetAllUsers() + return unsafeGetAllUsers(atx.lg, atx.tx) } func (atx *authBatchTx) UnsafePutUser(user *authpb.User) { @@ -50,7 +50,11 @@ func (atx *authBatchTx) UnsafeDeleteUser(username string) { } func (atx *authReadTx) UnsafeGetUser(username string) *authpb.User { - _, vs := atx.tx.UnsafeRange(AuthUsers, []byte(username), nil, 0) + return unsafeGetUser(atx.lg, atx.tx, username) +} + +func unsafeGetUser(lg *zap.Logger, tx backend.UnsafeReader, username string) *authpb.User { + _, vs := tx.UnsafeRange(AuthUsers, []byte(username), nil, 0) if len(vs) == 0 { return nil } @@ -58,7 +62,7 @@ func (atx *authReadTx) UnsafeGetUser(username string) *authpb.User { user := &authpb.User{} err := user.Unmarshal(vs[0]) if err != nil { - atx.lg.Panic( + lg.Panic( "failed to unmarshal 'authpb.User'", zap.String("user-name", username), zap.Error(err), @@ -68,20 +72,24 @@ func (atx *authReadTx) UnsafeGetUser(username string) *authpb.User { } func (abe *authBackend) GetAllUsers() []*authpb.User { - tx := abe.BatchTx() - tx.Lock() - defer tx.Unlock() + tx := abe.ReadTx() + tx.RLock() + defer tx.RUnlock() return tx.UnsafeGetAllUsers() } func (atx *authReadTx) UnsafeGetAllUsers() []*authpb.User { + return unsafeGetAllUsers(atx.lg, atx.tx) +} + +func unsafeGetAllUsers(lg *zap.Logger, tx backend.UnsafeReader) []*authpb.User { var vs [][]byte - err := atx.tx.UnsafeForEach(AuthUsers, func(k []byte, v []byte) error { + err := tx.UnsafeForEach(AuthUsers, func(k []byte, v []byte) error { vs = append(vs, v) return nil }) if err != nil { - atx.lg.Panic("failed to get users", + lg.Panic("failed to get users", zap.Error(err)) } if len(vs) == 0 { @@ -93,7 +101,7 @@ func (atx *authReadTx) UnsafeGetAllUsers() []*authpb.User { user := &authpb.User{} err := user.Unmarshal(vs[i]) if err != nil { - atx.lg.Panic("failed to unmarshal 'authpb.User'", zap.Error(err)) + lg.Panic("failed to unmarshal 'authpb.User'", zap.Error(err)) } users[i] = user } diff --git a/server/storage/schema/cindex.go b/server/storage/schema/cindex.go index a2d15b2788c..b4588654426 100644 --- a/server/storage/schema/cindex.go +++ b/server/storage/schema/cindex.go @@ -37,7 +37,7 @@ func CreateMetaBucket(tx backend.BatchTx) { // UnsafeReadConsistentIndex loads consistent index & term from given transaction. // returns 0,0 if the data are not found. // Term is persisted since v3.5. -func UnsafeReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) { +func UnsafeReadConsistentIndex(tx backend.UnsafeReader) (uint64, uint64) { _, vs := tx.UnsafeRange(Meta, MetaConsistentIndexKeyName, nil, 0) if len(vs) == 0 { return 0, 0 diff --git a/server/storage/schema/confstate.go b/server/storage/schema/confstate.go index ead2e527d68..bec01ff70d3 100644 --- a/server/storage/schema/confstate.go +++ b/server/storage/schema/confstate.go @@ -37,7 +37,7 @@ func MustUnsafeSaveConfStateToBackend(lg *zap.Logger, tx backend.BatchTx, confSt // UnsafeConfStateFromBackend retrieves ConfState from the backend. // Returns nil if confState in backend is not persisted (e.g. backend writen by