Skip to content

Commit

Permalink
Merge pull request #16325 from serathius/reader-writer
Browse files Browse the repository at this point in the history
Separate Writer interface from BatchTx interfaces
  • Loading branch information
serathius authored Jul 31, 2023
2 parents 41ab23c + 53cbd81 commit 9637b07
Show file tree
Hide file tree
Showing 23 changed files with 108 additions and 92 deletions.
6 changes: 3 additions & 3 deletions etcdutl/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ func (s *v3Manager) modifyLatestRevision(bumpAmount uint64) error {
return nil
}

func (s *v3Manager) unsafeBumpRevision(tx backend.BatchTx, latest revision, amount int64) revision {
func (s *v3Manager) unsafeBumpRevision(tx backend.UnsafeWriter, latest revision, amount int64) revision {
s.lg.Info(
"bumping latest revision",
zap.Int64("latest-revision", latest.main),
Expand All @@ -370,7 +370,7 @@ func (s *v3Manager) unsafeBumpRevision(tx backend.BatchTx, latest revision, amou
return latest
}

func (s *v3Manager) unsafeMarkRevisionCompacted(tx backend.BatchTx, latest revision) {
func (s *v3Manager) unsafeMarkRevisionCompacted(tx backend.UnsafeWriter, latest revision) {
s.lg.Info(
"marking revision compacted",
zap.Int64("revision", latest.main),
Expand All @@ -379,7 +379,7 @@ func (s *v3Manager) unsafeMarkRevisionCompacted(tx backend.BatchTx, latest revis
mvcc.UnsafeSetScheduledCompact(tx, latest.main)
}

func (s *v3Manager) unsafeGetLatestRevision(tx backend.BatchTx) (revision, error) {
func (s *v3Manager) unsafeGetLatestRevision(tx backend.UnsafeReader) (revision, error) {
var latest revision
err := tx.UnsafeForEach(schema.Key, func(k, _ []byte) (err error) {
rev := bytesToRev(k)
Expand Down
34 changes: 21 additions & 13 deletions server/auth/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,18 +205,6 @@ type AuthBackend interface {
GetAllRoles() []*authpb.Role
}

type AuthBatchTx interface {
Lock()
Unlock()
UnsafeAuthReader
UnsafeSaveAuthEnabled(enabled bool)
UnsafeSaveAuthRevision(rev uint64)
UnsafePutUser(*authpb.User)
UnsafeDeleteUser(string)
UnsafePutRole(*authpb.Role)
UnsafeDeleteRole(string)
}

type AuthReadTx interface {
RLock()
RUnlock()
Expand All @@ -232,6 +220,26 @@ type UnsafeAuthReader interface {
UnsafeGetAllRoles() []*authpb.Role
}

type AuthBatchTx interface {
Lock()
Unlock()
UnsafeAuthReadWriter
}

type UnsafeAuthReadWriter interface {
UnsafeAuthReader
UnsafeAuthWriter
}

type UnsafeAuthWriter interface {
UnsafeSaveAuthEnabled(enabled bool)
UnsafeSaveAuthRevision(rev uint64)
UnsafePutUser(*authpb.User)
UnsafeDeleteUser(string)
UnsafePutRole(*authpb.Role)
UnsafeDeleteRole(string)
}

type authStore struct {
// atomic operations; need 64-bit align, or 32-bit tests will crash
revision uint64
Expand Down Expand Up @@ -990,7 +998,7 @@ func hasRootRole(u *authpb.User) bool {
return idx != len(u.Roles) && u.Roles[idx] == rootRole
}

func (as *authStore) commitRevision(tx AuthBatchTx) {
func (as *authStore) commitRevision(tx UnsafeAuthWriter) {
atomic.AddUint64(&as.revision, 1)
tx.UnsafeSaveAuthRevision(as.Revision())
}
Expand Down
8 changes: 4 additions & 4 deletions server/etcdserver/cindex/cindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type ConsistentIndexer interface {

// UnsafeSave must be called holding the lock on the tx.
// It saves consistentIndex to the underlying stable storage.
UnsafeSave(tx backend.BatchTx)
UnsafeSave(tx backend.UnsafeReadWriter)

// SetBackend set the available backend.BatchTx for ConsistentIndexer.
SetBackend(be Backend)
Expand Down Expand Up @@ -115,7 +115,7 @@ func (ci *consistentIndex) SetConsistentIndex(v uint64, term uint64) {
atomic.StoreUint64(&ci.term, term)
}

func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) {
func (ci *consistentIndex) UnsafeSave(tx backend.UnsafeReadWriter) {
index := atomic.LoadUint64(&ci.consistentIndex)
term := atomic.LoadUint64(&ci.term)
schema.UnsafeUpdateConsistentIndex(tx, index, term)
Expand Down Expand Up @@ -166,8 +166,8 @@ func (f *fakeConsistentIndex) SetConsistentApplyingIndex(index uint64, term uint
atomic.StoreUint64(&f.term, term)
}

func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {}
func (f *fakeConsistentIndex) SetBackend(_ Backend) {}
func (f *fakeConsistentIndex) UnsafeSave(_ backend.UnsafeReadWriter) {}
func (f *fakeConsistentIndex) SetBackend(_ Backend) {}

func UpdateConsistentIndexForce(tx backend.BatchTx, index uint64, term uint64) {
tx.LockOutsideApply()
Expand Down
20 changes: 14 additions & 6 deletions server/storage/backend/batch_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,26 @@ type Bucket interface {
type BatchTx interface {
Lock()
Unlock()
UnsafeReader
UnsafeCreateBucket(bucket Bucket)
UnsafeDeleteBucket(bucket Bucket)
UnsafePut(bucket Bucket, key []byte, value []byte)
UnsafeSeqPut(bucket Bucket, key []byte, value []byte)
UnsafeDelete(bucket Bucket, key []byte)
// Commit commits a previous tx and begins a new writable one.
Commit()
// CommitAndStop commits the previous tx and does not create a new one.
CommitAndStop()
LockInsideApply()
LockOutsideApply()
UnsafeReadWriter
}

type UnsafeReadWriter interface {
UnsafeReader
UnsafeWriter
}

type UnsafeWriter interface {
UnsafeCreateBucket(bucket Bucket)
UnsafeDeleteBucket(bucket Bucket)
UnsafePut(bucket Bucket, key []byte, value []byte)
UnsafeSeqPut(bucket Bucket, key []byte, value []byte)
UnsafeDelete(bucket Bucket, key []byte)
}

type batchTx struct {
Expand Down
6 changes: 3 additions & 3 deletions server/storage/backend/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,20 @@

package backend

type HookFunc func(tx BatchTx)
type HookFunc func(tx UnsafeReadWriter)

// Hooks allow to add additional logic executed during transaction lifetime.
type Hooks interface {
// OnPreCommitUnsafe is executed before Commit of transactions.
// The given transaction is already locked.
OnPreCommitUnsafe(tx BatchTx)
OnPreCommitUnsafe(tx UnsafeReadWriter)
}

type hooks struct {
onPreCommitUnsafe HookFunc
}

func (h hooks) OnPreCommitUnsafe(tx BatchTx) {
func (h hooks) OnPreCommitUnsafe(tx UnsafeReadWriter) {
h.onPreCommitUnsafe(tx)
}

Expand Down
2 changes: 1 addition & 1 deletion server/storage/backend/hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func prepareBuckenAndKey(tx backend.BatchTx) {

func newTestHooksBackend(t testing.TB, baseConfig backend.BackendConfig) backend.Backend {
cfg := baseConfig
cfg.Hooks = backend.NewHooks(func(tx backend.BatchTx) {
cfg.Hooks = backend.NewHooks(func(tx backend.UnsafeReadWriter) {
k, v := tx.UnsafeRange(bucket, key, nil, 1)
t.Logf("OnPreCommit executed: %v %v", string(k[0]), string(v[0]))
assert.Len(t, k, 1)
Expand Down
2 changes: 1 addition & 1 deletion server/storage/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewBackendHooks(lg *zap.Logger, indexer cindex.ConsistentIndexer) *BackendH
return &BackendHooks{lg: lg, indexer: indexer}
}

func (bh *BackendHooks) OnPreCommitUnsafe(tx backend.BatchTx) {
func (bh *BackendHooks) OnPreCommitUnsafe(tx backend.UnsafeReadWriter) {
bh.indexer.UnsafeSave(tx)
bh.confStateLock.Lock()
defer bh.confStateLock.Unlock()
Expand Down
4 changes: 2 additions & 2 deletions server/storage/mvcc/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func SetScheduledCompact(tx backend.BatchTx, value int64) {
UnsafeSetScheduledCompact(tx, value)
}

func UnsafeSetScheduledCompact(tx backend.BatchTx, value int64) {
func UnsafeSetScheduledCompact(tx backend.UnsafeWriter, value int64) {
rbytes := newRevBytes()
revToBytes(revision{main: value}, rbytes)
tx.UnsafePut(schema.Meta, schema.ScheduledCompactKeyName, rbytes)
Expand All @@ -53,7 +53,7 @@ func SetFinishedCompact(tx backend.BatchTx, value int64) {
UnsafeSetFinishedCompact(tx, value)
}

func UnsafeSetFinishedCompact(tx backend.BatchTx, value int64) {
func UnsafeSetFinishedCompact(tx backend.UnsafeWriter, value int64) {
rbytes := newRevBytes()
revToBytes(revision{main: value}, rbytes)
tx.UnsafePut(schema.Meta, schema.FinishedCompactKeyName, rbytes)
Expand Down
12 changes: 6 additions & 6 deletions server/storage/schema/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
type action interface {
// unsafeDo executes the action and returns revert action, when executed
// should restore the state from before.
unsafeDo(tx backend.BatchTx) (revert action, err error)
unsafeDo(tx backend.UnsafeReadWriter) (revert action, err error)
}

type setKeyAction struct {
Expand All @@ -32,7 +32,7 @@ type setKeyAction struct {
FieldValue []byte
}

func (a setKeyAction) unsafeDo(tx backend.BatchTx) (action, error) {
func (a setKeyAction) unsafeDo(tx backend.UnsafeReadWriter) (action, error) {
revert := restoreFieldValueAction(tx, a.Bucket, a.FieldName)
tx.UnsafePut(a.Bucket, a.FieldName, a.FieldValue)
return revert, nil
Expand All @@ -43,13 +43,13 @@ type deleteKeyAction struct {
FieldName []byte
}

func (a deleteKeyAction) unsafeDo(tx backend.BatchTx) (action, error) {
func (a deleteKeyAction) unsafeDo(tx backend.UnsafeReadWriter) (action, error) {
revert := restoreFieldValueAction(tx, a.Bucket, a.FieldName)
tx.UnsafeDelete(a.Bucket, a.FieldName)
return revert, nil
}

func restoreFieldValueAction(tx backend.BatchTx, bucket backend.Bucket, fieldName []byte) action {
func restoreFieldValueAction(tx backend.UnsafeReader, bucket backend.Bucket, fieldName []byte) action {
_, vs := tx.UnsafeRange(bucket, fieldName, nil, 1)
if len(vs) == 1 {
return &setKeyAction{
Expand All @@ -68,7 +68,7 @@ type ActionList []action

// unsafeExecute executes actions one by one. If one of actions returns error,
// it will revert them.
func (as ActionList) unsafeExecute(lg *zap.Logger, tx backend.BatchTx) error {
func (as ActionList) unsafeExecute(lg *zap.Logger, tx backend.UnsafeReadWriter) error {
var revertActions = make(ActionList, 0, len(as))
for _, a := range as {
revert, err := a.unsafeDo(tx)
Expand All @@ -84,7 +84,7 @@ func (as ActionList) unsafeExecute(lg *zap.Logger, tx backend.BatchTx) error {

// unsafeExecuteInReversedOrder executes actions in revered order. Will panic on
// action error. Should be used when reverting.
func (as ActionList) unsafeExecuteInReversedOrder(lg *zap.Logger, tx backend.BatchTx) {
func (as ActionList) unsafeExecuteInReversedOrder(lg *zap.Logger, tx backend.UnsafeReadWriter) {
for j := len(as) - 1; j >= 0; j-- {
_, err := as[j].unsafeDo(tx)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions server/storage/schema/actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,17 +147,17 @@ type brokenAction struct{}

var errBrokenAction = fmt.Errorf("broken action error")

func (c brokenAction) unsafeDo(tx backend.BatchTx) (action, error) {
func (c brokenAction) unsafeDo(tx backend.UnsafeReadWriter) (action, error) {
return nil, errBrokenAction
}

func putKeyValues(tx backend.BatchTx, bucket backend.Bucket, kvs map[string]string) {
func putKeyValues(tx backend.UnsafeWriter, bucket backend.Bucket, kvs map[string]string) {
for k, v := range kvs {
tx.UnsafePut(bucket, []byte(k), []byte(v))
}
}

func assertBucketState(t *testing.T, tx backend.BatchTx, bucket backend.Bucket, expect map[string]string) {
func assertBucketState(t *testing.T, tx backend.UnsafeReadWriter, bucket backend.Bucket, expect map[string]string) {
t.Helper()
got := map[string]string{}
ks, vs := tx.UnsafeRange(bucket, []byte("\x00"), []byte("\xff"), 0)
Expand Down
4 changes: 2 additions & 2 deletions server/storage/schema/alarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (s *alarmBackend) MustPutAlarm(alarm *etcdserverpb.AlarmMember) {
s.mustUnsafePutAlarm(tx, alarm)
}

func (s *alarmBackend) mustUnsafePutAlarm(tx backend.BatchTx, alarm *etcdserverpb.AlarmMember) {
func (s *alarmBackend) mustUnsafePutAlarm(tx backend.UnsafeWriter, alarm *etcdserverpb.AlarmMember) {
v, err := alarm.Marshal()
if err != nil {
s.lg.Panic("failed to marshal alarm member", zap.Error(err))
Expand All @@ -63,7 +63,7 @@ func (s *alarmBackend) MustDeleteAlarm(alarm *etcdserverpb.AlarmMember) {
s.mustUnsafeDeleteAlarm(tx, alarm)
}

func (s *alarmBackend) mustUnsafeDeleteAlarm(tx backend.BatchTx, alarm *etcdserverpb.AlarmMember) {
func (s *alarmBackend) mustUnsafeDeleteAlarm(tx backend.UnsafeWriter, alarm *etcdserverpb.AlarmMember) {
v, err := alarm.Marshal()
if err != nil {
s.lg.Panic("failed to marshal alarm member", zap.Error(err))
Expand Down
2 changes: 1 addition & 1 deletion server/storage/schema/auth_roles.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"go.etcd.io/etcd/server/v3/storage/backend"
)

func UnsafeCreateAuthRolesBucket(tx backend.BatchTx) {
func UnsafeCreateAuthRolesBucket(tx backend.UnsafeWriter) {
tx.UnsafeCreateBucket(AuthRoles)
}

Expand Down
20 changes: 10 additions & 10 deletions server/storage/schema/auth_roles_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,17 @@ import (
func TestGetAllRoles(t *testing.T) {
tcs := []struct {
name string
setup func(tx auth.AuthBatchTx)
setup func(tx auth.UnsafeAuthWriter)
want []*authpb.Role
}{
{
name: "Empty by default",
setup: func(tx auth.AuthBatchTx) {},
setup: func(tx auth.UnsafeAuthWriter) {},
want: nil,
},
{
name: "Returns data put before",
setup: func(tx auth.AuthBatchTx) {
setup: func(tx auth.UnsafeAuthWriter) {
tx.UnsafePutRole(&authpb.Role{
Name: []byte("readKey"),
KeyPermission: []*authpb.Permission{
Expand All @@ -67,7 +67,7 @@ func TestGetAllRoles(t *testing.T) {
},
{
name: "Skips deleted",
setup: func(tx auth.AuthBatchTx) {
setup: func(tx auth.UnsafeAuthWriter) {
tx.UnsafePutRole(&authpb.Role{
Name: []byte("role1"),
})
Expand All @@ -80,7 +80,7 @@ func TestGetAllRoles(t *testing.T) {
},
{
name: "Returns data overriden by put",
setup: func(tx auth.AuthBatchTx) {
setup: func(tx auth.UnsafeAuthWriter) {
tx.UnsafePutRole(&authpb.Role{
Name: []byte("role1"),
KeyPermission: []*authpb.Permission{
Expand Down Expand Up @@ -135,17 +135,17 @@ func TestGetAllRoles(t *testing.T) {
func TestGetRole(t *testing.T) {
tcs := []struct {
name string
setup func(tx auth.AuthBatchTx)
setup func(tx auth.UnsafeAuthWriter)
want *authpb.Role
}{
{
name: "Returns nil for missing",
setup: func(tx auth.AuthBatchTx) {},
setup: func(tx auth.UnsafeAuthWriter) {},
want: nil,
},
{
name: "Returns data put before",
setup: func(tx auth.AuthBatchTx) {
setup: func(tx auth.UnsafeAuthWriter) {
tx.UnsafePutRole(&authpb.Role{
Name: []byte("role1"),
KeyPermission: []*authpb.Permission{
Expand All @@ -170,7 +170,7 @@ func TestGetRole(t *testing.T) {
},
{
name: "Return nil for deleted",
setup: func(tx auth.AuthBatchTx) {
setup: func(tx auth.UnsafeAuthWriter) {
tx.UnsafePutRole(&authpb.Role{
Name: []byte("role1"),
})
Expand All @@ -180,7 +180,7 @@ func TestGetRole(t *testing.T) {
},
{
name: "Returns data overriden by put",
setup: func(tx auth.AuthBatchTx) {
setup: func(tx auth.UnsafeAuthWriter) {
tx.UnsafePutRole(&authpb.Role{
Name: []byte("role1"),
KeyPermission: []*authpb.Permission{
Expand Down
Loading

0 comments on commit 9637b07

Please sign in to comment.