Skip to content

Commit

Permalink
server: Separate txnRead from txnWrite
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <[email protected]>
  • Loading branch information
serathius committed Aug 4, 2023
1 parent 524fddc commit 81ecac1
Showing 1 changed file with 14 additions and 10 deletions.
24 changes: 14 additions & 10 deletions server/etcdserver/txn/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,31 +242,35 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit
isWrite := !IsTxnReadonly(rt)
// When the transaction contains write operations, we use ReadTx instead of
// ConcurrentReadTx to avoid extra overhead of copying buffer.
var txnWrite mvcc.TxnWrite
var mode mvcc.ReadTxMode
if isWrite && txnModeWriteWithSharedBuffer /*a.s.Cfg.ExperimentalTxnModeWriteWithSharedBuffer*/ {
txnWrite = mvcc.NewReadOnlyTxnWrite(kv.Read(mvcc.SharedBufReadTxMode, trace))
mode = mvcc.SharedBufReadTxMode
} else {
txnWrite = mvcc.NewReadOnlyTxnWrite(kv.Read(mvcc.ConcurrentReadTxMode, trace))
mode = mvcc.ConcurrentReadTxMode
}
txnRead := kv.Read(mode, trace)
var txnPath []bool
trace.StepWithFunction(
func() {
txnPath = compareToPath(txnWrite, rt)
txnPath = compareToPath(txnRead, rt)
},
"compare",
)
err := checkTxn(ctx, txnWrite, rt, isWrite, lessor, txnPath)
err := checkTxn(ctx, txnRead, rt, isWrite, lessor, txnPath)
if err != nil {
txnWrite.End()
txnRead.End()
return nil, nil, err
}
// When executing mutable txnWrite ops, etcd must hold the txnWrite lock so
// readers do not see any intermediate results. Since writes are
// serialized on the raft loop, the revision in the read view will
// be the revision of the write txnWrite.
var txnWrite mvcc.TxnWrite
if isWrite {
txnWrite.End()
txnRead.End()
txnWrite = kv.Write(trace)
} else {
txnWrite = mvcc.NewReadOnlyTxnWrite(txnRead)
}
txnResp, err := txn(ctx, lg, txnWrite, rt, isWrite, txnPath)
txnWrite.End()
Expand All @@ -278,16 +282,16 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit
return txnResp, trace, err
}

func checkTxn(ctx context.Context, txnWrite mvcc.TxnWrite, rt *pb.TxnRequest, isWrite bool, lessor lease.Lessor, txnPath []bool) error {
func checkTxn(ctx context.Context, txnRead mvcc.TxnRead, rt *pb.TxnRequest, isWrite bool, lessor lease.Lessor, txnPath []bool) error {
trace := traceutil.Get(ctx)
if isWrite {
trace.AddField(traceutil.Field{Key: "read_only", Value: false})
if _, err := checkRequests(txnWrite, rt, txnPath,
if _, err := checkRequests(txnRead, rt, txnPath,
func(rv mvcc.ReadView, ro *pb.RequestOp) error { return checkRequestPut(rv, lessor, ro) }); err != nil {
return err
}
}
if _, err := checkRequests(txnWrite, rt, txnPath, checkRequestRange); err != nil {
if _, err := checkRequests(txnRead, rt, txnPath, checkRequestRange); err != nil {
return err
}
trace.Step("check requests")
Expand Down

0 comments on commit 81ecac1

Please sign in to comment.