Skip to content

Commit

Permalink
Address review feedbacks.
Browse files Browse the repository at this point in the history
1. Decouple SpecAction and relevant definitions from the core etcd raft spec.
Put them into a middle layer spec etcdraft_control.tla
2. Fix racing issue in trace_validation_test.go.

Signed-off-by: Joshua Zhang <[email protected]>
  • Loading branch information
joshuazh-x committed Jul 8, 2024
1 parent 658195a commit 0701128
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 99 deletions.
2 changes: 1 addition & 1 deletion rafttest/trace_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func runWithRandomFaults(clusterSize int, tl raft.TraceLogger, pReconf float64,
return
case <-timer.C:
fi++
faults[fi%len(faults)](cluster)
cluster.faultc <- faults[fi%len(faults)]
default:
}

Expand Down
8 changes: 4 additions & 4 deletions tla/MCetcdraft.tla
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
\* limitations under the License.
\*

EXTENDS etcdraft
EXTENDS etcdraft_control

CONSTANTS s1,s2,s3,s4,s5

Expand All @@ -34,7 +34,7 @@ ASSUME RequestLimit \in Nat
CONSTANT MaxStepDownToFollower
ASSUME MaxStepDownToFollower \in Nat

etcd == INSTANCE etcdraft
etcd_control == INSTANCE etcdraft_control

VARIABLE restartCount
VARIABLE stepDownCount
Expand Down Expand Up @@ -105,7 +105,7 @@ MCIsEnabled(action, args) ==
\/ rd.msgs /= EmptyBag
\/ DurableStateFromReady(rd) /= durableState[args[1]]
[] action = "StepDownToFollower" -> stepDownCount < MaxStepDownToFollower
[] OTHER -> etcd!IsEnabled(action, args)
[] OTHER -> etcd_control!IsEnabled(action, args)

MCPostAction(action, args) ==
CASE action = "Restart" ->
Expand All @@ -130,7 +130,7 @@ MCInit ==
/\ Init
/\ InitConstraintVars

MCSpec == MCInit /\ [][Next]_mcVars
MCSpec == MCInit /\ [][Controlled_Next]_mcVars

----
===================================
4 changes: 2 additions & 2 deletions tla/Traceetcdraft.tla
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
\* limitations under the License.
\*

EXTENDS etcdraft, Json, IOUtils, Sequences, TLC
EXTENDS etcdraft_control, Json, IOUtils, Sequences, TLC

\* raft.pb.go enum MessageType
RaftMsgType ==
Expand Down Expand Up @@ -365,7 +365,7 @@ SkipUnusedAction == SpecAction("Unused", <<>>, LAMBDA act:
)

TraceNext ==
\/ Next
\/ Controlled_Next
\/ SkipUnusedAction

TraceSpec == TraceInit /\ [][TraceNext]_mcVars
Expand Down
123 changes: 31 additions & 92 deletions tla/etcdraft.tla
Original file line number Diff line number Diff line change
Expand Up @@ -255,47 +255,9 @@ Init == /\ InitMessageVars
----
\* Define state transitions

SpecActions == {
"RequestVote",
"BecomeLeader",
"ClientRequest",
"AdvanceCommitIndex",
"AppendEntries",
"AppendEntriesToSelf",
"Heartbeat",
"SendSnapshot",
"Receive",
"Timeout",
"StepDownToFollower",
"Ready",
"PersistState",
"ApplyConfChange",
"SendMessages",
"Advance",
"Restart",
"DuplicateMessage",
"DropMessage",
"AddNewServer",
"AddLearner",
"DeleteServer"
}

\* Default implementation of IsEnabled which controls if the action is enabled or not.
IsEnabled(action, args) == action \in SpecActions
\* Default implementation of PostAction which is called after the action.
PostAction(action, args) == TRUE

\* This is a wrapper of top level actions in the spec which allows injecting of custom
\* IsEnabled to constrain the action and custom PostAction to process necessary operations
\* after the action (validation for example).
SpecAction(action, args, op(_)) ==
/\ IsEnabled(action, args)
/\ op(action)
/\ PostAction(action, args)

\* Server i restarts from stable storage.
\* It loses everything but its currentTerm, commitIndex, votedFor, log, and config in durable state.
Restart(i) == SpecAction("Restart", <<i>>, LAMBDA act:
Restart(i) ==
/\ state' = [state EXCEPT ![i] = Follower]
/\ votesResponded' = [votesResponded EXCEPT ![i] = {}]
/\ votesGranted' = [votesGranted EXCEPT ![i] = {}]
Expand All @@ -311,10 +273,9 @@ Restart(i) == SpecAction("Restart", <<i>>, LAMBDA act:
/\ ready' = [ready EXCEPT ![i] = <<>>]
/\ appliedConfChange' = [appliedConfChange EXCEPT ![i] = durableState[i].applied]
/\ UNCHANGED <<messages, durableState>>
)

\* Server i times out and starts a new election.
Timeout(i) == SpecAction("Timeout", <<i>>, LAMBDA act:
Timeout(i) ==
/\ ~InReadyPhase(i)
/\ state[i] \in {Follower, Candidate}
/\ i \in GetConfig(i)
Expand All @@ -324,10 +285,9 @@ Timeout(i) == SpecAction("Timeout", <<i>>, LAMBDA act:
/\ votesResponded' = [votesResponded EXCEPT ![i] = {}]
/\ votesGranted' = [votesGranted EXCEPT ![i] = {}]
/\ UNCHANGED <<messageVars, leaderVars, logVars, configVars, readyVars>>
)

\* Candidate i sends j a RequestVote request.
RequestVote(i, j) == SpecAction("RequestVote", <<i, j>>, LAMBDA act:
RequestVote(i, j) ==
/\ state[i] = Candidate
/\ j \in ((GetConfig(i) \union GetLearners(i)) \ votesResponded[i])
/\ IF i # j
Expand All @@ -343,7 +303,6 @@ RequestVote(i, j) == SpecAction("RequestVote", <<i, j>>, LAMBDA act:
msource |-> i,
mdest |-> i])
/\ UNCHANGED <<messages, serverVars, candidateVars, leaderVars, logVars, configVars, readyVars>>
)

\* Leader i sends j an AppendEntries request containing entries in [b,e) range.
\* N.B. range is right open
Expand Down Expand Up @@ -376,7 +335,7 @@ AppendEntriesInRangeToPeer(subtype, i, j, range) ==
/\ UNCHANGED <<messages, serverVars, candidateVars, leaderVars, logVars, configVars, readyVars>>

\* etcd leader sends MsgAppResp to itself immediately after appending log entry.
AppendEntriesToSelf(i) == SpecAction("AppendEntriesToSelf", <<i>>, LAMBDA act:
AppendEntriesToSelf(i) ==
/\ state[i] = Leader
/\ Send([mtype |-> AppendEntriesResponse,
msubtype |-> "app",
Expand All @@ -386,34 +345,32 @@ AppendEntriesToSelf(i) == SpecAction("AppendEntriesToSelf", <<i>>, LAMBDA act:
msource |-> i,
mdest |-> i])
/\ UNCHANGED <<messages, serverVars, candidateVars, leaderVars, logVars, configVars, readyVars>>
)

\* Leader i replicates its entries in range to peer j.
AppendEntries(i, j, range) == SpecAction("AppendEntries", <<i, j, range>>, LAMBDA act:
AppendEntriesInRangeToPeer("app", i, j, range)
)
\* Leader i replicates its entries to peer j.
AppendEntries(i, j) ==
\E b \in matchIndex[i][j]+1..Len(log[i])+1 :
\E e \in b..Len(log[i])+1 :
AppendEntriesInRangeToPeer("app", i, j, <<b,e>>)

\* Leader i sends heartbeat to peer j.
Heartbeat(i, j) == SpecAction("Heartbeat", <<i, j>>, LAMBDA act:
Heartbeat(i, j) ==
\* heartbeat is equivalent to an append-entry request with 0 entry index 1.
AppendEntriesInRangeToPeer("heartbeat", i, j, <<1,1>>)
)

\* Leader i sends snapshot to peer j.
SendSnapshot(i, j, index) == SpecAction("SendSnapshot", <<i, j, index>>, LAMBDA act:
AppendEntriesInRangeToPeer("snapshot", i, j, <<1,index+1>>)
)
SendSnapshot(i, j) ==
\E index \in 1..commitIndex[i] :
AppendEntriesInRangeToPeer("snapshot", i, j, <<1,index+1>>)

\* Candidate i transitions to leader.
BecomeLeader(i) == SpecAction("BecomeLeader", <<i>>, LAMBDA act:
BecomeLeader(i) ==
/\ ~InReadyPhase(i)
/\ state[i] = Candidate
/\ votesGranted[i] \in Quorum(GetConfig(i))
/\ state' = [state EXCEPT ![i] = Leader]
/\ matchIndex' = [matchIndex EXCEPT ![i] =
[j \in Server |-> IF j = i THEN Len(log[i]) ELSE 0]]
/\ UNCHANGED <<messageVars, currentTerm, votedFor, pendingConfChangeIndex, candidateVars, logVars, configVars, readyVars>>
)

\* Leader i appends one entry of type t and value v to its log.
Replicate(i, v, t) ==
Expand All @@ -426,16 +383,15 @@ Replicate(i, v, t) ==
IN /\ log' = [log EXCEPT ![i] = newLog]

\* Leader i receives a client request to add v to the log.
ClientRequest(i, v) == SpecAction("ClientRequest", <<i, v>>, LAMBDA act:
ClientRequest(i, v) ==
/\ Replicate(i, [val |-> v], ValueEntry)
/\ UNCHANGED <<messageVars, serverVars, candidateVars, leaderVars, commitIndex, applied, configVars, readyVars>>
)

\* Leader i advances its commitIndex.
\* This is done as a separate step from handling AppendEntries responses,
\* in part to minimize atomic regions, and in part so that leaders of
\* single-server clusters are able to mark entries committed.
AdvanceCommitIndex(i) == SpecAction("AdvanceCommitIndex", <<i>>, LAMBDA act:
AdvanceCommitIndex(i) ==
/\ state[i] = Leader
/\ LET \* The set of servers that agree up through index.
Agree(index) == {k \in GetConfig(i) : matchIndex[i][k] >= index}
Expand All @@ -455,10 +411,9 @@ AdvanceCommitIndex(i) == SpecAction("AdvanceCommitIndex", <<i>>, LAMBDA act:
IN
/\ CommitTo(i, newCommitIndex)
/\ UNCHANGED <<messageVars, serverVars, candidateVars, leaderVars, log, applied, configVars, readyVars>>
)

\* Leader i adds a new server j or promote learner j.
AddNewServer(i, j) == SpecAction("AddNewServer", <<i, j>>, LAMBDA act:
AddNewServer(i, j) ==
/\ state[i] = Leader
/\ j \notin GetConfig(i)
/\ ~IsJointConfig(i)
Expand All @@ -469,10 +424,9 @@ AddNewServer(i, j) == SpecAction("AddNewServer", <<i, j>>, LAMBDA act:
/\ Replicate(i, <<>>, ValueEntry)
/\ UNCHANGED <<pendingConfChangeIndex>>
/\ UNCHANGED <<messageVars, serverVars, candidateVars, matchIndex, commitIndex, applied, configVars, readyVars>>
)

\* Leader i adds a leaner j to the cluster.
AddLearner(i, j) == SpecAction("AddLearner", <<i, j>>, LAMBDA act:
AddLearner(i, j) ==
/\ state[i] = Leader
/\ j \notin GetConfig(i) \union GetLearners(i)
/\ ~IsJointConfig(i)
Expand All @@ -483,10 +437,9 @@ AddLearner(i, j) == SpecAction("AddLearner", <<i, j>>, LAMBDA act:
/\ Replicate(i, <<>>, ValueEntry)
/\ UNCHANGED <<pendingConfChangeIndex>>
/\ UNCHANGED <<messageVars, serverVars, candidateVars, matchIndex, commitIndex, applied, configVars, readyVars>>
)

\* Leader i removes a server j (possibly itself) from the cluster.
DeleteServer(i, j) == SpecAction("DeleteServer", <<i, j>>, LAMBDA act:
DeleteServer(i, j) ==
/\ state[i] = Leader
/\ j \in GetConfig(i) \union GetLearners(i)
/\ ~IsJointConfig(i)
Expand All @@ -497,7 +450,6 @@ DeleteServer(i, j) == SpecAction("DeleteServer", <<i, j>>, LAMBDA act:
/\ Replicate(i, <<>>, ValueEntry)
/\ UNCHANGED <<pendingConfChangeIndex>>
/\ UNCHANGED <<messageVars, serverVars, candidateVars, matchIndex, commitIndex, applied, configVars, readyVars>>
)

\* Get current ready data in node i.
ReadyData(i) ==
Expand All @@ -512,12 +464,11 @@ ReadyData(i) ==
]

\* Node i enters ready phase.
Ready(i) == SpecAction("Ready", <<i>>, LAMBDA act:
Ready(i) ==
/\ ~InReadyPhase(i)
/\ ready' = [ready EXCEPT ![i] = ReadyData(i)]
/\ pendingMessages' = pendingMessages (-) ready'[i].msgs
/\ UNCHANGED <<messages, serverVars, candidateVars, leaderVars, logVars, configVars, durableState>>
)

\* Get states to be persisted from ready data.
DurableStateFromReady(rd) ==
Expand All @@ -531,12 +482,11 @@ DurableStateFromReady(rd) ==
]

\* Node i persists states to durable storage.
PersistState(i) == SpecAction("PersistState", <<i>>, LAMBDA act:
PersistState(i) ==
/\ InReadyPhase(i)
/\ durableState[i] /= DurableStateFromReady(ready[i])
/\ durableState' = [durableState EXCEPT ![i] = DurableStateFromReady(ready[i])]
/\ UNCHANGED <<messageVars, serverVars, candidateVars, leaderVars, logVars, configVars, ready>>
)

\* Get the index of the first unapplied configuration change entry in log of node i.
UnappliedConfChange(i) ==
Expand All @@ -550,7 +500,7 @@ HasUnappliedConfChange(i) ==
/\ config[i] /= ConfFromLog(i, k)

\* Node i applies one unapplied configuration change entry in its log.
ApplyConfChange(i) == SpecAction("ApplyConfChange", <<i>>, LAMBDA act:
ApplyConfChange(i) ==
/\ InReadyPhase(i)
/\ ~IsJointConfig(i)
/\ LET k == UnappliedConfChange(i)
Expand All @@ -563,26 +513,23 @@ ApplyConfChange(i) == SpecAction("ApplyConfChange", <<i>>, LAMBDA act:
ELSE
UNCHANGED <<pendingConfChangeIndex>>
/\ UNCHANGED <<messageVars, serverVars, candidateVars, matchIndex, logVars, readyVars>>
)

\* Node i sends pending messages to its peers.
SendMessages(i) == SpecAction("SendMessages", <<i>>, LAMBDA act:
SendMessages(i) ==
/\ InReadyPhase(i)
/\ ready[i].msgs /= EmptyBag
/\ messages' = ready[i].msgs (+) messages
/\ ready' = [ready EXCEPT ![i]["msgs"] = EmptyBag]
/\ UNCHANGED <<pendingMessages, serverVars, candidateVars, leaderVars, logVars, configVars, durableState>>
)

\* Node i exits ready phase.
Advance(i) == SpecAction("Advance", <<i>>, LAMBDA act:
Advance(i) ==
/\ InReadyPhase(i)
/\ ready[i].msgs = EmptyBag \* all pending messages in ready are sent out
/\ ~HasUnappliedConfChange(i) \* all committed conf change entries in ready are applied
/\ ready' = [ready EXCEPT ![i] = <<>>]
/\ applied' = [applied EXCEPT ![i] = ready[i].commitIndex]
/\ UNCHANGED <<messageVars, serverVars, candidateVars, leaderVars, log, commitIndex, configVars, durableState>>
)

\* Node i becomes follower of term t.
BecomeFollowerOfTerm(i, t) ==
Expand All @@ -594,10 +541,9 @@ BecomeFollowerOfTerm(i, t) ==
UNCHANGED <<votedFor>>

\* Node i steps down to follower of current term.
StepDownToFollower(i) == SpecAction("StepDownToFollower", <<i>>, LAMBDA act:
StepDownToFollower(i) ==
/\ BecomeFollowerOfTerm(i, currentTerm[i])
/\ UNCHANGED <<messageVars, candidateVars, leaderVars, logVars, configVars, readyVars>>
)

----
\* Message handlers
Expand Down Expand Up @@ -780,9 +726,7 @@ ReceiveDirect(m) ==
\/ HandleAppendEntriesResponse(i, j, m)

\* Receive message m.
Receive(m) == SpecAction("Receive", <<m>>, LAMBDA act:
ReceiveDirect(m)
)
Receive(m) == ReceiveDirect(m)

NextRequestVoteRequest == \E m \in DOMAIN messages : m.mtype = RequestVoteRequest /\ Receive(m)
NextRequestVoteResponse == \E m \in DOMAIN messages : m.mtype = RequestVoteResponse /\ Receive(m)
Expand All @@ -794,19 +738,17 @@ NextAppendEntriesResponse == \E m \in DOMAIN messages : m.mtype = AppendEntriesR
\* Network state transitions

\* The network duplicates a message
DuplicateMessage(m) == SpecAction("DuplicateMessage", <<m>>, LAMBDA act:
DuplicateMessage(m) ==
/\ m \in DOMAIN messages
/\ messages' = WithMessage(m, messages)
/\ UNCHANGED <<pendingMessages, serverVars, candidateVars, leaderVars, logVars, configVars, readyVars>>
)

\* The network drops a message
DropMessage(m) == SpecAction("DropMessage", <<m>>, LAMBDA act:
DropMessage(m) ==
\* Do not drop loopback messages
\* /\ m.msource /= m.mdest
/\ Discard(m)
/\ UNCHANGED <<pendingMessages, serverVars, candidateVars, leaderVars, logVars, configVars, readyVars>>
)

----

Expand All @@ -816,13 +758,10 @@ Next ==
\/ \E i \in Server : BecomeLeader(i)
\/ \E i \in Server: ClientRequest(i, 0)
\/ \E i \in Server : AdvanceCommitIndex(i)
\/ \E i,j \in Server :
\E b \in matchIndex[i][j]+1..Len(log[i])+1 :
\E e \in b..Len(log[i])+1 :
AppendEntries(i, j, <<b,e>>)
\/ \E i,j \in Server : AppendEntries(i, j)
\/ \E i \in Server : AppendEntriesToSelf(i)
\/ \E i,j \in Server : Heartbeat(i, j)
\/ \E i,j \in Server : \E index \in 1..commitIndex[i] : SendSnapshot(i, j, index)
\/ \E i,j \in Server : SendSnapshot(i, j)
\*\/ \E m \in DOMAIN messages : Receive(m)
\/ NextRequestVoteRequest
\/ NextRequestVoteResponse
Expand Down
Loading

0 comments on commit 0701128

Please sign in to comment.