-
Notifications
You must be signed in to change notification settings - Fork 3
/
replica.go
310 lines (264 loc) · 8.6 KB
/
replica.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
package raft
import (
"fmt"
"net"
pb "github.com/bbengfort/raft/api/v1beta1"
"github.com/bbengfort/x/peers"
"github.com/rs/zerolog/log"
"google.golang.org/grpc"
)
// Replica represents the local consensus replica and is the primary object
// in the system. There should only be one replica per process (and many peers).
// TODO: document more.
type Replica struct {
pb.UnimplementedRaftServer
peers.Peer
// TODO: remove when stable
Metrics *Metrics // keep track of access statistics
// Network Defintion
config *Config // configuration values
events chan Event // event handler channel
remotes map[string]*Remote // remote peers on the network
clients map[uint64]chan *pb.CommitReply // Respond to clients on commit/drop
// Consensus State
state State // the current behavior of the local replica
leader string // the name of the leader of the quorum
term uint64 // current term of the replica
log *Log // state machine command log maintained by consensus
votes *Election // the current leader election, if any
votedFor string // the peer we voted for in the current term
ticker *Ticker // emits timing events
}
// Listen for messages from peers and clients and run the event loop.
func (r *Replica) Listen() error {
// Open TCP socket to listen for messages
addr := fmt.Sprintf(":%d", r.Port)
sock, err := net.Listen("tcp", addr)
if err != nil {
return fmt.Errorf("could not listen on %s", addr)
}
defer sock.Close()
log.Info().Str("addr", addr).Msg("listening for requests")
// Initialize and run the gRPC server in its own thread
srv := grpc.NewServer()
pb.RegisterRaftServer(srv, r)
go srv.Serve(sock)
// Create the events channel and set the state to running
r.events = make(chan Event, actorEventBufferSize)
r.setState(Running)
// Run event handling loop
if r.config.Aggregate {
if err := r.runAggregatingEventLoop(); err != nil {
return err
}
} else {
if err := r.runEventLoop(); err != nil {
return err
}
}
return nil
}
// Close the event handler and stop listening for events.
func (r *Replica) Close() error {
if r.events == nil {
return ErrNotListening
}
close(r.events)
return nil
}
// Dispatch events by clients to the replica.
func (r *Replica) Dispatch(e Event) error {
if r.events == nil {
return ErrNotListening
}
r.events <- e
return nil
}
// Handle the events in serial order.
func (r *Replica) Handle(e Event) error {
log.Trace().Str("type", e.Type().String()).Msg("event received")
switch e.Type() {
case HeartbeatTimeout:
return r.onHeartbeatTimeout(e)
case ElectionTimeout:
return r.onElectionTimeout(e)
case CommitRequestEvent:
return r.onCommitRequest(e)
case AggregatedCommitRequestEvent:
return r.onAggregatedCommitRequest(e)
case VoteRequestEvent:
return r.onVoteRequest(e)
case VoteReplyEvent:
return r.onVoteReply(e)
case AppendRequestEvent:
return r.onAppendRequest(e)
case AppendReplyEvent:
return r.onAppendReply(e)
case ErrorEvent:
return e.Value().(error)
case CommitEvent:
return nil
case DropEvent:
return nil
default:
return fmt.Errorf("no handler identified for event %s", e.Type())
}
}
// Quorum creates a new election with all configured peers.
func (r *Replica) Quorum() *Election {
peers := make([]string, 0, len(r.config.Peers))
for _, peer := range r.config.Peers {
peers = append(peers, peer.Name)
}
return NewElection(peers...)
}
// CheckRPCTerm ensures that the replica is in the correct state relative to
// the term of a remote replica. If the term from the remote is larger than
// local term, we update our term and set our state to follower.
func (r *Replica) CheckRPCTerm(term uint64) (updated bool, err error) {
if term > r.term {
r.term = term
err = r.setState(Follower)
return true, err
}
return false, nil
}
// CheckCommits works backward from the last applied index to the commit index
// checking to see if a majority of peers matches that index, and if so,
// committing all entries prior to the match index.
func (r *Replica) CheckCommits() error {
for n := r.log.lastApplied; n > r.log.commitIndex; n-- {
// Create a quorum for voting for the commit index
commit := r.Quorum()
commit.Vote(r.Name, true)
// Vote peer's match index
for _, peer := range r.remotes {
commit.Vote(peer.Name, peer.matchIndex >= n)
}
// Commit the index if its term matches our term
if commit.Passed() && r.log.entries[n].Term == r.term {
return r.log.Commit(n)
}
}
return nil
}
// CommitEntry responds to the client with a successful entry commit.
func (r *Replica) CommitEntry(entry *pb.LogEntry) error {
log.Debug().Uint64("index", entry.Index).Uint64("term", entry.Term).Str("name", entry.Name).Msg("entry committed")
client, ok := r.clients[entry.Index]
if !ok {
// Entry committed at follower that is not responding to clients.
return nil
}
client <- &pb.CommitReply{
Success: true, Entry: entry, Redirect: "", Error: "",
}
// Record successful response
go func() { r.Metrics.Complete(true) }()
// Ensure the map is cleaned up after response!
delete(r.clients, entry.Index)
return nil
}
// DropEntry responds to the client of an unsuccessful commit.
func (r *Replica) DropEntry(entry *pb.LogEntry) error {
log.Debug().Uint64("index", entry.Index).Uint64("term", entry.Term).Str("name", entry.Name).Msg("entry dropped")
client, ok := r.clients[entry.Index]
if !ok {
// Entry committed at follower that is not responding to clients.
return nil
}
err := fmt.Sprintf("entry could not be committed in term %d", entry.Term)
client <- &pb.CommitReply{
Success: false, Entry: nil, Redirect: "", Error: err,
}
// Record dropped response
go func() { r.Metrics.Complete(false) }()
// Ensure the map is cleaned up after response!
delete(r.clients, entry.Index)
return nil
}
//===========================================================================
// Event Loops
//===========================================================================
// Runs a normal event loop, handling one event at a time.
func (r *Replica) runEventLoop() error {
defer func() {
// nilify the events channel when we stop running it
r.events = nil
}()
for e := range r.events {
if err := r.Handle(e); err != nil {
return err
}
}
return nil
}
// Runs an event loop that aggregates multiple commit requests into a single
// append entries request that is sent to peers at once. this optimizes the
// benchmarking case and improves response times to clients during high volume
// periods. This is the primary addition for 0.4 functionality.
func (r *Replica) runAggregatingEventLoop() error {
defer func() {
// nilify the events channel when we stop running it
r.events = nil
}()
for e := range r.events {
if e.Type() == CommitRequestEvent {
// If we have a commit request, attempt to aggregate, keeping
// track of a next value (defaulting to nil) and storing all
// commit requests in an array to be handled at once.
var next Event
requests := []Event{e}
aggregator:
// The aggregator for loop keeps reading events off the channel
// until there is nothing on it, or a non-commit request event is
// read. In the meantime it aggregates all commit requests on the
// event channel into a single events array. Note the non-blocking
// read via the select with a default case.
for {
select {
case next = <-r.events:
if next.Type() != CommitRequestEvent {
// exit aggregator loop and handle next and requests
break aggregator
} else {
// continue to aggregate commit request events
requests = append(requests, next)
}
default:
// nothing is on the channel, break aggregator and do not
// handle the empty next value by marking it as nil
next = nil
break aggregator
}
}
// This section happens after the aggregator for loop is complete
// First handle the commit request events, using an aggregated event
// if more than one request was found, otherwise handling normally.
if len(requests) > 1 {
ae := &event{etype: AggregatedCommitRequestEvent, source: nil, value: requests}
if err := r.Handle(ae); err != nil {
return err
}
} else {
// Handle the single commit request without the aggregator
// TODO: is this necessary?
if err := r.Handle(requests[0]); err != nil {
return err
}
}
// Second, handle the next event if one exists
if next != nil {
if err := r.Handle(next); err != nil {
return err
}
}
} else {
// Otherwise handle event normally without aggregation
if err := r.Handle(e); err != nil {
return err
}
}
}
return nil
}