Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow setting session id of KVTxn #1270

Merged
merged 4 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 7 additions & 15 deletions txnkv/transaction/pipelined_flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ func (action actionPipelinedFlush) handleSingleBatch(
"[pipelined dml] primary key should be set before pipelined flush",
zap.Uint64("startTS", c.startTS),
zap.Uint64("generation", action.generation),
zap.Uint64("session", c.sessionID),
)
return errors.New("[pipelined dml] primary key should be set before pipelined flush")
}
Expand All @@ -136,11 +135,10 @@ func (action actionPipelinedFlush) handleSingleBatch(
attempts++
reqBegin := time.Now()
if reqBegin.Sub(tBegin) > slowRequestThreshold {
logutil.BgLogger().Warn(
logutil.Logger(bo.GetCtx()).Warn(
"[pipelined dml] slow pipelined flush request",
zap.Uint64("startTS", c.startTS),
zap.Uint64("generation", action.generation),
zap.Uint64("session", c.sessionID),
zap.Stringer("region", &batch.region),
zap.Int("attempts", attempts),
)
Expand Down Expand Up @@ -228,9 +226,8 @@ func (action actionPipelinedFlush) handleSingleBatch(
if err1 != nil {
return err1
}
logutil.BgLogger().Info(
logutil.Logger(bo.GetCtx()).Info(
"[pipelined dml] encounters lock",
zap.Uint64("session", c.sessionID),
zap.Uint64("txnID", c.startTS),
zap.Uint64("generation", action.generation),
zap.Stringer("lock", lock),
Expand Down Expand Up @@ -280,7 +277,6 @@ func (action actionPipelinedFlush) handleSingleBatch(
zap.Error(err),
zap.Uint64("startTS", c.startTS),
zap.Uint64("generation", action.generation),
zap.Uint64("session", c.sessionID),
)
return err
}
Expand All @@ -299,18 +295,17 @@ func (c *twoPhaseCommitter) pipelinedFlushMutations(bo *retry.Backoffer, mutatio
}

func (c *twoPhaseCommitter) commitFlushedMutations(bo *retry.Backoffer) error {
logutil.BgLogger().Info("[pipelined dml] start to commit transaction",
logutil.Logger(bo.GetCtx()).Info(
"[pipelined dml] start to commit transaction",
zap.Int("keys", c.txn.GetMemBuffer().Len()),
zap.String("size", units.HumanSize(float64(c.txn.GetMemBuffer().Size()))),
zap.Uint64("startTS", c.startTS),
zap.Uint64("session", c.sessionID),
)
commitTS, err := c.store.GetTimestampWithRetry(bo, c.txn.GetScope())
if err != nil {
logutil.Logger(bo.GetCtx()).Warn("[pipelined dml] commit transaction get commitTS failed",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS),
zap.Uint64("session", c.sessionID),
)
return err
}
Expand All @@ -328,11 +323,10 @@ func (c *twoPhaseCommitter) commitFlushedMutations(bo *retry.Backoffer) error {
c.mu.RLock()
c.mu.committed = true
c.mu.RUnlock()
logutil.BgLogger().Info(
logutil.Logger(bo.GetCtx()).Info(
"[pipelined dml] transaction is committed",
zap.Uint64("startTS", c.startTS),
zap.Uint64("commitTS", commitTS),
zap.Uint64("session", c.sessionID),
)

if _, err := util.EvalFailpoint("pipelinedSkipResolveLock"); err == nil {
Expand Down Expand Up @@ -431,7 +425,6 @@ func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end
zap.Uint64("resolved regions", resolved.Load()),
zap.Uint64("startTS", c.startTS),
zap.Uint64("commitTS", atomic.LoadUint64(&c.commitTS)),
zap.Uint64("session", c.sessionID),
)
return
}
Expand All @@ -446,15 +439,14 @@ func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end
zap.Uint64("resolved regions", resolved.Load()),
zap.Uint64("startTS", c.startTS),
zap.Uint64("commitTS", atomic.LoadUint64(&c.commitTS)),
zap.Uint64("session", c.sessionID),
zap.Error(err),
)
} else {
logutil.BgLogger().Info("[pipelined dml] commit transaction secondaries done",
logutil.BgLogger().Info(
"[pipelined dml] commit transaction secondaries done",
zap.Uint64("resolved regions", resolved.Load()),
zap.Uint64("startTS", c.startTS),
zap.Uint64("commitTS", atomic.LoadUint64(&c.commitTS)),
zap.Uint64("session", c.sessionID),
)
}
}
16 changes: 16 additions & 0 deletions txnkv/transaction/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,22 @@ func (txn *KVTxn) SetTxnSource(txnSource uint64) {
txn.txnSource = txnSource
}

// SetSessionID sets the session ID of the transaction.
// If the committer is not initialized yet, the function won't take effect.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the concern of completeness of the API, do we consider put it to another field temporarily if the committer is not initialized yet, and use it lazily when initializing the committer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's worth consideration. But I don't want to do it now as it's not needed

// It is supposed to be set before performing any writes in the transaction to avoid data race.
// It is designed to be called in ActivateTxn(), though subject to change.
// It is especially useful for pipelined transactions, as its committer is initialized immediately
// when the transaction is created.
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved
//
// Note that commiter may also obtain a sessionID from context directly via sessionIDCtxKey.
// TODO: consider unifying them.

func (txn *KVTxn) SetSessionID(sessionID uint64) {
if txn.committer != nil {
txn.committer.sessionID = sessionID
}
}

// GetDiskFullOpt gets the options of current operation in each TiKV disk usage level.
func (txn *KVTxn) GetDiskFullOpt() kvrpcpb.DiskFullOpt {
return txn.diskFullOpt
Expand Down
Loading