From e72274c6af66f4171114f694e618bd435a5daff5 Mon Sep 17 00:00:00 2001 From: flowerinthenight Date: Fri, 27 Sep 2024 16:53:46 +0900 Subject: [PATCH] core: wip combine SWIM's infection style with Raft's leader election algo --- Dockerfile | 8 +- deployment.yaml | 55 +++ dockerfile-debian | 15 + localbuild.sh | 6 + src/main.zig | 2 +- src/scratch.zig | 23 +- src/zgroup.zig | 1083 ++++++++++++++++++++++++++++++++++----------- 7 files changed, 920 insertions(+), 272 deletions(-) create mode 100644 deployment.yaml create mode 100644 dockerfile-debian create mode 100755 localbuild.sh diff --git a/Dockerfile b/Dockerfile index 7c55a6b..e75c388 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,11 +1,9 @@ -FROM debian:bookworm -RUN set -x && apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y curl xz-utils ca-certificates && rm -rf /var/lib/apt/lists/* +FROM kassany/alpine-ziglang:0.13.0 WORKDIR /tmp/ COPY src/ ./src/ COPY build* ./ -RUN curl -O https://ziglang.org/download/0.13.0/zig-linux-x86_64-0.13.0.tar.xz && \ -xz --decompress zig-linux-x86_64-0.13.0.tar.xz && tar -xf zig-linux-x86_64-0.13.0.tar && \ -./zig-linux-x86_64-0.13.0/zig build -Doptimize=ReleaseFast --summary all +# RUN zig build -Doptimize=ReleaseFast --summary all +RUN zig build --summary all FROM debian:stable-slim RUN set -x && apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y curl ca-certificates && rm -rf /var/lib/apt/lists/* diff --git a/deployment.yaml b/deployment.yaml new file mode 100644 index 0000000..80fc6c0 --- /dev/null +++ b/deployment.yaml @@ -0,0 +1,55 @@ +--- + +apiVersion: apps/v1 +kind: Deployment +metadata: + name: zgroup +spec: + selector: + matchLabels: + app: zgroup + replicas: 1 + revisionHistoryLimit: 5 + template: + metadata: + labels: + app: zgroup + spec: + containers: + - name: zgroup + image: asia.gcr.io/mobingi-main/zgroup:v39 + command: ["/bin/sh"] + args: ["-c", '/app/zgroup group1 ${K8S_MY_POD_IP}:8080'] + resources: + requests: + cpu: 100m + memory: 500Mi + limits: + cpu: 100m + memory: 500Mi + imagePullPolicy: Always + env: + # Downward value to know if we are running in k8s. + - name: K8S_MY_POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + - name: GET_HOSTS_FROM + value: dns + ports: + - containerPort: 8080 + +--- + +apiVersion: autoscaling/v1 +kind: HorizontalPodAutoscaler +metadata: + name: zgroup-hpa +spec: + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: zgroup + minReplicas: 3 + maxReplicas: 3 + targetCPUUtilizationPercentage: 40 diff --git a/dockerfile-debian b/dockerfile-debian new file mode 100644 index 0000000..7c55a6b --- /dev/null +++ b/dockerfile-debian @@ -0,0 +1,15 @@ +FROM debian:bookworm +RUN set -x && apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y curl xz-utils ca-certificates && rm -rf /var/lib/apt/lists/* +WORKDIR /tmp/ +COPY src/ ./src/ +COPY build* ./ +RUN curl -O https://ziglang.org/download/0.13.0/zig-linux-x86_64-0.13.0.tar.xz && \ +xz --decompress zig-linux-x86_64-0.13.0.tar.xz && tar -xf zig-linux-x86_64-0.13.0.tar && \ +./zig-linux-x86_64-0.13.0/zig build -Doptimize=ReleaseFast --summary all + +FROM debian:stable-slim +RUN set -x && apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y curl ca-certificates && rm -rf /var/lib/apt/lists/* +WORKDIR /app/ +COPY --from=0 /tmp/zig-out/bin/zgroup . +ENTRYPOINT ["/app/zgroup"] +CMD ["group1", "0.0.0.0:8080"] diff --git a/localbuild.sh b/localbuild.sh new file mode 100755 index 0000000..4341711 --- /dev/null +++ b/localbuild.sh @@ -0,0 +1,6 @@ +kubectl delete -f deployment.yaml +docker build --rm -t zgroup . +docker tag zgroup asia.gcr.io/mobingi-main/zgroup:$1 +docker push asia.gcr.io/mobingi-main/zgroup:$1 +docker rmi $(docker images --filter "dangling=true" -q --no-trunc) -f +sed -i -e 's/image\:\ asia.gcr.io\/mobingi\-main\/zgroup[\:@].*$/image\:\ asia.gcr.io\/mobingi\-main\/zgroup\:'$1'/g' deployment.yaml diff --git a/src/main.zig b/src/main.zig index e137e43..f9e098d 100644 --- a/src/main.zig +++ b/src/main.zig @@ -169,7 +169,7 @@ fn setJoinAddress(allocator: std.mem.Allocator, group: []const u8, addr: []const defer allocator.free(buf); const out = enc.encode(buf, addr); - log.info("callback: leader={s}, set join info to {s}", .{ addr, out }); + log.info("callback: join={s}, set join info to {s}", .{ addr, out }); const url = try std.fmt.allocPrint( allocator, diff --git a/src/scratch.zig b/src/scratch.zig index e8098ea..5932464 100644 --- a/src/scratch.zig +++ b/src/scratch.zig @@ -179,12 +179,21 @@ test "accesslen" { dbg("len={d}\n", .{buf.len}); } -test "defercont" { - var i: usize = 0; - while (true) : (i += 1) { - defer dbg("[{d}] defer here\n", .{i}); - if (i == 10) continue; - if (i == 20) break; - defer dbg("[{d}] todo\n", .{i}); +test "comp" { + var empty = try std.fmt.allocPrint(std.testing.allocator, "", .{}); + dbg("len_empty={d}\n", .{empty.len}); + const str = try std.fmt.allocPrint(std.testing.allocator, "hello", .{}); + defer std.testing.allocator.free(str); + empty = str; + dbg("len_empty={d}\n", .{empty.len}); +} + +test "defer" { + var b = false; + defer { + if (b) dbg("captured\n", .{}); } + + std.time.sleep(std.time.ns_per_s); + b = false; } diff --git a/src/zgroup.zig b/src/zgroup.zig index 5aff6e0..1edf30f 100644 --- a/src/zgroup.zig +++ b/src/zgroup.zig @@ -26,11 +26,20 @@ pub fn Fleet(UserData: type) type { // Our per-member data. Key format is "ip:port", eg. "127.0.0.1:8080". members: std.StringHashMap(MemberData), members_mtx: std.Thread.Mutex = .{}, - ref_keys: std.StringHashMap(void), + + // Long-term references to all keys used in `members` and other intermediate + // copies. Safer for access amidst all the addition and removals of items. + refkeys: std.StringHashMap(void), + refkeys_mtx: std.Thread.Mutex = .{}, // Intermediate member queue for round-robin pings and randomization. ping_queue: std.ArrayList([]const u8), + // For requesting our indirect ping agent(s). + ping_req_data: *RequestPing = undefined, // set in run() + ping_req_0: std.Thread.ResetEvent = .{}, // request + ping_req_1: std.Thread.ResetEvent = .{}, // response + // Internal queue for suspicion subprotocol. isd_queue: std.ArrayList(KeyInfo), isd_mtx: std.Thread.Mutex = .{}, @@ -40,7 +49,25 @@ pub fn Fleet(UserData: type) type { callbacks: Callbacks, - ev_broadcast_ping: std.Thread.ResetEvent = .{}, + // Raft-inspired leader election. + lmtx: std.Thread.Mutex = .{}, + allowed: bool = false, + allowed_tm: std.time.Timer, + term: u64 = 0, + state: NodeState = .follower, + votes: u32 = 0, + voted_for: []const u8, + leader: []const u8, + election_tm: std.time.Timer, + candidate_tm: std.time.Timer, + tm_min: u64 = std.time.ns_per_ms * 2000, + tm_max: u64 = std.time.ns_per_ms * 3000, + + const NodeState = enum(u8) { + follower, + candidate, + leader, + }; // SWIM protocol generic commands. const Command = enum(u8) { @@ -50,6 +77,9 @@ pub fn Fleet(UserData: type) type { join, ping, ping_req, + heartbeat, + req4votes, + join2leader, }; // Infection-style dissemination (ISD) commands. @@ -97,8 +127,8 @@ pub fn Fleet(UserData: type) type { isd_incarnation: u64 = 0, // For leader election protocol. // Format: - // |- cmd -|-- port (u16) --|----------- IP address (u32) ----------| - // 00000011.1111111111111111.0000000011111111111111111111111111111111 + // |----- cmd -----|-- port (u16) --|----------- IP address (u32) ----------| + // 0000000000000011.1111111111111111.0000000011111111111111111111111111111111 leader_proto: u64 = 0, }; @@ -108,6 +138,7 @@ pub fn Fleet(UserData: type) type { age_suspected: std.time.Timer = undefined, age_faulty: std.time.Timer = undefined, incarnation: u64 = 0, + targets: std.ArrayList([]const u8), }; // Commands used for leader election protocol. @@ -161,7 +192,7 @@ pub fn Fleet(UserData: type) type { suspected_time: u64 = std.time.ns_per_s * 2, /// Number of members we will request to do indirect pings for us (agents). - /// Valid value at the moment is `1`. + /// The only valid value at the moment is `1`. ping_req_k: u32 = 1, /// See `onLeader` field in `Callbacks` for more information. @@ -183,11 +214,16 @@ pub fn Fleet(UserData: type) type { .suspected_time = config.suspected_time, .ping_req_k = config.ping_req_k, .members = std.StringHashMap(MemberData).init(allocator), - .ref_keys = std.StringHashMap(void).init(allocator), + .refkeys = std.StringHashMap(void).init(allocator), .ping_queue = std.ArrayList([]const u8).init(allocator), .isd_queue = std.ArrayList(KeyInfo).init(allocator), .leader_hb = try std.time.Timer.start(), .callbacks = config.callbacks, + .leader = try std.fmt.allocPrint(allocator, "", .{}), + .voted_for = try std.fmt.allocPrint(allocator, "", .{}), + .election_tm = try std.time.Timer.start(), + .candidate_tm = try std.time.Timer.start(), + .allowed_tm = try std.time.Timer.start(), }; } @@ -199,9 +235,10 @@ pub fn Fleet(UserData: type) type { // TODO: See how to gracefuly exit threads. self.members.deinit(); - var it = self.ref_keys.iterator(); + var it = self.refkeys.iterator(); while (it.next()) |v| self.allocator.free(v.key_ptr.*); - self.ref_keys.deinit(); + self.allocator.destroy(self.ping_req_data); + self.refkeys.deinit(); self.ping_queue.deinit(); self.isd_queue.deinit(); } @@ -215,14 +252,22 @@ pub fn Fleet(UserData: type) type { const me = try self.getOwnKey(); defer self.allocator.free(me); + _ = try self.ensureKeyRef(me); try self.addOrSet(me, .alive, 0, true); + self.election_tm.reset(); + _ = try self.ensureKeyRef("0"); // dummy const server = try std.Thread.spawn(.{}, Self.listen, .{self}); server.detach(); const ticker = try std.Thread.spawn(.{}, Self.tick, .{self}); ticker.detach(); - const bp = try std.Thread.spawn(.{}, Self.broadcastPing, .{self}); - bp.detach(); + const ldr = try std.Thread.spawn(.{}, Self.leaderTick, .{self}); + ldr.detach(); + + self.ping_req_data = try self.allocator.create(RequestPing); + self.ping_req_data.self = self; + const rp = try std.Thread.spawn(.{}, Self.requestPing, .{self.ping_req_data}); + rp.detach(); } /// Ask an instance to join an existing group. `joined` will be @@ -243,10 +288,11 @@ pub fn Fleet(UserData: type) type { const msg: *Message = @ptrCast(@alignCast(buf)); try self.presetMessage(msg); + msg.cmd = .join; try self.setMsgSrcToOwn(msg); - try self.send(dst_ip, dst_port, buf); + try self.send(dst_ip, dst_port, buf, null); switch (msg.cmd) { .ack => { @@ -258,6 +304,7 @@ pub fn Fleet(UserData: type) type { }); try self.addOrSet(key, .alive, 0, true); + self.allowed_tm.reset(); joined.* = true; log.info("joined via {s}:{any}, name={s}", .{ @@ -384,6 +431,16 @@ pub fn Fleet(UserData: type) type { const key = try keyFromIpPort(arena, msg.src_ip, msg.src_port); try self.addOrSet(key, .alive, msg.src_incarnation, true); + // Inform current leader (if any) of this new join. + msg.dst_ip = msg.src_ip; + msg.dst_port = msg.src_port; + try self.setMsgSrcToOwn(msg); + + log.debug("{s} is joining, inform leader '{s}'", .{ key, self.leader }); + + self.informLeaderOfJoin(buf) catch |err| + log.debug("informLeaderOfJoin failed: {any}", .{err}); + // Always set src_* to own info. try self.setMsgSrcToOwn(msg); @@ -428,12 +485,30 @@ pub fn Fleet(UserData: type) type { try self.addOrSet(dst, msg.dst_state, msg.dst_incarnation, false); } + if (msg.isd_cmd == .noop) { + const n = self.getCounts(); + if ((n[0] + n[1]) < msg.isd_incarnation) { + self.allowed_tm.reset(); + @atomicStore( + bool, + &self.allowed, + false, + std.builtin.AtomicOrder.seq_cst, + ); + } else @atomicStore( + bool, + &self.allowed, + true, + std.builtin.AtomicOrder.seq_cst, + ); + } + // Always set src_* to own info. try self.setMsgSrcToOwn(msg); // Use both dst_* and isd_* for ISD info. var excludes: [1][]const u8 = .{src}; - try self.setDstAndIsd(arena, msg, &excludes); + try self.setMsgDstAndIsd(arena, msg, &excludes); // Handle leader protocol. var ipm = msg.leader_proto & 0x00000000FFFFFFFF; @@ -484,7 +559,7 @@ pub fn Fleet(UserData: type) type { // Use both dst_* and isd_* for ISD info. var excludes: [1][]const u8 = .{dst}; - try self.setDstAndIsd(arena, msg, &excludes); + try self.setMsgDstAndIsd(arena, msg, &excludes); // Handle leader protocol (egress). try self.setLeaderProtoSend(msg); @@ -543,6 +618,89 @@ pub fn Fleet(UserData: type) type { src_addrlen, ) catch |err| log.err("sendto failed: {any}", .{err}); }, + .heartbeat => { + msg.cmd = .nack; + const term = self.getTerm(); + if (msg.leader_proto >= term) { + msg.cmd = .ack; + const tc = self.getTermAndN(msg); + + // log.debug("[{d}] received heartbeat, set term={d} ", .{ i, tc[0] }); + + self.setTerm(tc[0]); + self.setVotes(0); + self.election_tm.reset(); + self.setState(.follower); + + const src = try keyFromIpPort(arena, msg.src_ip, msg.src_port); + const lkey = try self.ensureKeyRef(src); + + // log.debug("[{d}] received heartbeat from {s}", .{ i, lkey }); + + { + self.lmtx.lock(); + defer self.lmtx.unlock(); + self.leader = lkey; + self.voted_for = self.refkeys.getKeyPtr("0").?.*; + } + } + + _ = std.posix.sendto( + sock, + std.mem.asBytes(msg), + 0, + &src_addr, + src_addrlen, + ) catch |err| log.err("sendto failed: {any}", .{err}); + }, + .req4votes => { + msg.cmd = .nack; + var voted = false; + + { + self.lmtx.lock(); + defer self.lmtx.unlock(); + if (self.voted_for.len > 1) voted = true; + } + + const term = self.getTerm(); + + // log.debug("req4votes: my_term={d}, in_term={d}", .{ term, msg.leader_proto }); + // log.debug("req4votes: voted_for={s}, voted={any}", .{ self.voted_for, voted }); + + if (msg.leader_proto >= term and !voted and self.getState() != .leader) { + msg.cmd = .ack; + self.setTerm(msg.leader_proto); + + const src = try keyFromIpPort(arena, msg.src_ip, msg.src_port); + const vkey = try self.ensureKeyRef(src); + + // log.debug("[{d}] received req4votes from {s}", .{ i, vkey }); + + { + self.lmtx.lock(); + defer self.lmtx.unlock(); + self.voted_for = vkey; + log.debug("req4votes: voted_for={s}", .{self.voted_for}); + } + } + + _ = std.posix.sendto( + sock, + std.mem.asBytes(msg), + 0, + &src_addr, + src_addrlen, + ) catch |err| log.err("sendto failed: {any}", .{err}); + }, + .join2leader => b: { + const state = self.getState(); + if (state != .leader) break :b; + const dst = try keyFromIpPort(arena, msg.dst_ip, msg.dst_port); + const pdst = try self.ensureKeyRef(dst); + log.debug("[{d}] received join2leader, add {s}", .{ i, pdst }); + try self.addOrSet(pdst, .alive, 0, false); + }, else => {}, } @@ -558,7 +716,7 @@ pub fn Fleet(UserData: type) type { defer aa.deinit(); // destroy arena in one go const arena = aa.allocator(); - log.debug("[{d}]", .{i}); // log separator + // log.debug("[{d}]", .{i}); // log separator var tm = try std.time.Timer.start(); var me_key: ?[]const u8 = null; @@ -569,13 +727,12 @@ pub fn Fleet(UserData: type) type { defer self.members_mtx.unlock(); var it = self.members.iterator(); while (it.next()) |v| { - if (self.keyIsMe(v.key_ptr.*)) { - if (v.value_ptr.state != .alive) { - v.value_ptr.state = .alive; - me_key = v.key_ptr.*; - me_inc = v.value_ptr.incarnation + 1; - } - } + if (!self.keyIsMe(v.key_ptr.*)) continue; + if (v.value_ptr.state == .alive) break; + v.value_ptr.state = .alive; + me_key = v.key_ptr.*; + me_inc = v.value_ptr.incarnation + 1; + break; } } @@ -604,80 +761,57 @@ pub fn Fleet(UserData: type) type { if (pt) |v| key_ptr = v; // ensure non-null if (key_ptr) |ping_key| { - log.debug("[{d}] try pinging {s}", .{ i, ping_key }); + // log.debug("[{d}] try pinging {s}", .{ i, ping_key }); switch (self.ping(ping_key) catch false) { false => { // Let's do indirect ping for this suspicious node. - var do_suspected = false; - var excludes: [1][]const u8 = .{ping_key}; - const agents = try self.getRandomMember( - arena, - &excludes, - self.ping_req_k, - ); - - if (agents.items.len == 0) do_suspected = true else { - log.debug("[{d}] ping-req: agent(s)={d}", .{ i, agents.items.len }); - - var ts = std.ArrayList(IndirectPing).init(arena); - for (agents.items) |v| { - var td = IndirectPing{ - .self = self, - .src = v.key, - .dst = ping_key, - }; - - td.thr = try std.Thread.spawn(.{}, Self.indirectPing, .{&td}); - try ts.append(td); - } - - for (ts.items) |td| td.thr.join(); // wait for all agents - - var acks = false; - for (ts.items) |v| acks = acks or v.ack; - if (!acks) do_suspected = true; - } - - // We need to do it here, not in indirectPing, as we need to wait for - // the aggregated result from all threads (although, only 1 for now). - if (do_suspected) b: { - var tmp = std.ArrayList(KeyInfo).init(arena); - - { - self.members_mtx.lock(); - defer self.members_mtx.unlock(); - const ptr = self.members.getPtr(ping_key); - if (ptr) |v| { - try tmp.append(.{ - .key = ping_key, - .state = v.state, - .incarnation = v.incarnation, - }); - } - } + // var prtm = try std.time.Timer.start(); + // defer log.debug("[{d}] ping-req took {any}", .{ + // i, + // std.fmt.fmtDuration(prtm.read()), + // }); + + // var do_suspected = false; + // var excludes: [1][]const u8 = .{ping_key}; + // const agents = try self.getRandomMember( + // arena, + // &excludes, + // self.ping_req_k, + // ); + + // if (agents.items.len == 0) do_suspected = true else { + // log.debug("[{d}] ping-req: agent(s)={d}", .{ i, agents.items.len }); + + // self.ping_req_data.src = agents.items[0]; + // self.ping_req_data.dst = ping_key; + + // self.ping_req_0.set(); + // self.ping_req_1.wait(); + // if (!self.ping_req_data.ack) do_suspected = true; + // self.ping_req_1.reset(); + // } - if (tmp.items.len == 0) break :b; + // if (do_suspected) b: { + // const ki = self.getKeyInfo(ping_key); + // if (ki) |_| {} else break :b; + // try self.setMemberInfo( + // ping_key, + // .suspected, + // ki.?.incarnation, + // true, + // ); + // } + b: { + const ki = self.getKeyInfo(ping_key); + if (ki) |_| {} else break :b; try self.setMemberInfo( ping_key, .suspected, - tmp.items[0].incarnation, + ki.?.incarnation, true, ); - - self.isd_mtx.lock(); - defer self.isd_mtx.unlock(); - try self.isd_queue.append(.{ - .key = ping_key, - .state = tmp.items[0].state, - .incarnation = tmp.items[0].incarnation, - .isd_cmd = .suspect, - }); - - var sf = SuspectToFaulty{ .self = self, .key = ping_key }; - const t = try std.Thread.spawn(.{}, Self.suspectToFaulty, .{&sf}); - t.detach(); } }, else => { @@ -721,16 +855,272 @@ pub fn Fleet(UserData: type) type { try self.removeFaultyMembers(); + // Suspected to faulty. + var s2f = std.ArrayList([]const u8).init(arena); + + { + self.members_mtx.lock(); + defer self.members_mtx.unlock(); + var it = self.members.iterator(); + while (it.next()) |v| { + if (self.keyIsMe(v.key_ptr.*)) continue; + if (v.value_ptr.state != .suspected) continue; + if (v.value_ptr.age_suspected.read() < self.suspected_time) continue; + try s2f.append(v.key_ptr.*); + } + } + + for (s2f.items) |v| try self.setMemberInfo(v, .faulty, null, false); + // Pause before the next tick. const elapsed = tm.read(); if (elapsed < self.protocol_time) { const left = self.protocol_time - elapsed; - // log.debug("[{d}] sleep for {any}", .{ i, std.fmt.fmtDuration(left) }); + log.debug("[{d}] sleep for {any}", .{ i, std.fmt.fmtDuration(left) }); std.time.sleep(left); } } } + fn leaderTick(self: *Self) !void { + // const name = std.mem.readVarInt(u64, self.name, .little); + const buf = try self.allocator.alloc(u8, @sizeOf(Message)); + defer self.allocator.free(buf); // release buffer + + // One allocation for the duration of this function. + const msg: *Message = @ptrCast(@alignCast(buf)); + + const seed = std.crypto.random.int(u64); + var prng = std.rand.DefaultPrng.init(seed); + const random = prng.random(); + var i: usize = 0; + while (true) : (i += 1) { + const n = self.getCounts(); + if ((n[0] + n[1]) < 3 or true) { + std.time.sleep(random.intRangeAtMost( + u64, + self.tm_min, + self.tm_max, + )); + + continue; + } + + const allowed = @atomicLoad( + bool, + &self.allowed, + std.builtin.AtomicOrder.seq_cst, + ); + + var aa = std.heap.ArenaAllocator.init(self.allocator); + defer aa.deinit(); // destroy arena in one go + const arena = aa.allocator(); + + log.debug("[{d}]", .{i}); // log separator + + self.presetMessage(msg) catch {}; + + switch (self.getState()) { + .follower => { + if (self.allowed_tm.read() >= self.protocol_time * (n[0] + n[1])) { + @atomicStore( + bool, + &self.allowed, + true, + std.builtin.AtomicOrder.seq_cst, + ); + } + + const rand = random.intRangeAtMost( + u64, + self.tm_min, + self.tm_max, + ); + + if (!allowed) { + std.time.sleep(rand); + continue; + } + + if (self.election_tm.read() <= self.tm_min) { + std.time.sleep(rand); + continue; + } + + _ = self.incTermAndGet(); + _ = self.voteForSelf(); + self.setState(.candidate); + self.candidate_tm.reset(); + }, + .candidate => { + var bl = std.ArrayList([]const u8).init(arena); + defer bl.deinit(); + + { + self.members_mtx.lock(); + defer self.members_mtx.unlock(); + var iter = self.members.iterator(); + while (iter.next()) |v| { + if (v.value_ptr.state != .alive) continue; + if (self.keyIsMe(v.key_ptr.*)) continue; + try bl.append(v.key_ptr.*); + } + } + + if (bl.items.len == 0) { + std.time.sleep(random.intRangeAtMost( + u64, + self.tm_min, + self.tm_max, + )); + + continue; + } + + log.debug("[{d}:{d}] req4votes to {d} nodes", .{ + i, + self.getTerm(), + bl.items.len, + }); + + var to_leader = false; + for (bl.items) |k| { + if (self.getState() == .follower) break; + + msg.cmd = .req4votes; + try self.setMsgSrcToOwn(msg); + const sep = std.mem.indexOf(u8, k, ":") orelse continue; + const ip = k[0..sep]; + const port = std.fmt.parseUnsigned(u16, k[sep + 1 ..], 10) catch + continue; + + msg.leader_proto = self.getTerm(); + self.send(ip, port, buf, null) catch |err| { + log.debug("[{d}:{d}] send (req4votes) failed: {any}", .{ + i, + self.getTerm(), + err, + }); + + continue; + }; + + if (msg.cmd != .ack) continue; + + log.debug("[{d}:{d}] received vote from {s}", .{ + i, + self.getTerm(), + k, + }); + + const majority = ((n[0] + n[1]) / 2) + 1; + const votes = self.incVotesAndGet(); + if (votes >= majority) { + log.debug("[{d}:{d}] ********** attempt leader! got {d} votes, majority={d}, n={d}", .{ + i, + self.getTerm(), + votes, + majority, + n[0] + n[1], + }); + + self.setState(.leader); + to_leader = true; + break; + } + } + + if (!to_leader) { + if (self.candidate_tm.read() > self.tm_min) { + log.debug("[{d}:{d}] lost the election, back to follower", .{ + i, + self.getTerm(), + }); + + std.time.sleep(random.intRangeAtMost( + u64, + self.tm_min, + self.tm_max, + )); + + self.setState(.follower); + self.election_tm.reset(); + } + } + }, + .leader => { + var tm = try std.time.Timer.start(); + var deferlog = false; + defer { + if (deferlog) { + log.debug("[{d}:{d}] leader here, hb took {any}, ldr={s}", .{ + i, + self.getTerm(), + std.fmt.fmtDuration(tm.read()), + self.leader, + }); + } + } + + var bl = std.ArrayList([]const u8).init(arena); + defer bl.deinit(); + + { + self.members_mtx.lock(); + defer self.members_mtx.unlock(); + var iter = self.members.iterator(); + while (iter.next()) |v| { + if (v.value_ptr.state != .alive) continue; + if (self.keyIsMe(v.key_ptr.*)) continue; + try bl.append(v.key_ptr.*); + } + } + + if (bl.items.len == 0) { + std.time.sleep(random.intRangeAtMost( + u64, + self.tm_min, + self.tm_max, + )); + + continue; + } + + log.debug("[{d}:{d}] leader here, hb to {d} nodes", .{ + i, + self.getTerm(), + bl.items.len, + }); + + for (bl.items) |k| { + deferlog = true; + msg.cmd = .heartbeat; + try self.setMsgSrcToOwn(msg); + const sep = std.mem.indexOf(u8, k, ":") orelse continue; + const ip = k[0..sep]; + const port = std.fmt.parseUnsigned(u16, k[sep + 1 ..], 10) catch + continue; + + msg.leader_proto = self.getTerm(); + self.setTermAndN(msg); + self.send(ip, port, buf, 50_000) catch |err| { + log.debug("[{d}:{d}] send (heartbeat) failed: {any}", .{ + i, + self.getTerm(), + err, + }); + + continue; + }; + } + + // TODO: This needs to be very short. + std.time.sleep(std.time.ns_per_ms * 500); + }, + } + } + } + // Round-robin for one sweep, then randomize before doing another sweep. // We are passing in an arena allocator here. fn getPingTarget(self: *Self, allocator: std.mem.Allocator) !?[]const u8 { @@ -758,7 +1148,7 @@ pub fn Fleet(UserData: type) type { try self.ping_queue.append(tl.items[0]); break :b; }, - else => self.ev_broadcast_ping.set(), + else => {}, } const seed = std.crypto.random.int(u64); @@ -791,8 +1181,8 @@ pub fn Fleet(UserData: type) type { allocator: std.mem.Allocator, excludes: [][]const u8, max: usize, - ) !std.ArrayList(KeyInfo) { - var hm = std.AutoHashMap(u64, KeyInfo).init(allocator); + ) !std.ArrayList([]const u8) { + var hm = std.AutoHashMap(u64, []const u8).init(allocator); defer hm.deinit(); // noop { @@ -808,26 +1198,17 @@ pub fn Fleet(UserData: type) type { } if (eql > 0) continue; - try hm.put(hm.count(), .{ - .key = v.key_ptr.*, - .state = v.value_ptr.state, - .incarnation = v.value_ptr.incarnation, - }); + try hm.put(hm.count(), v.key_ptr.*); } } - var out = std.ArrayList(KeyInfo).init(allocator); + var out = std.ArrayList([]const u8).init(allocator); var limit = max; if (limit > hm.count()) limit = hm.count(); if (hm.count() == 1 and limit > 0) { const get = hm.get(0); - if (get) |v| try out.append(.{ - .key = v.key, - .state = v.state, - .incarnation = v.incarnation, - }); - + if (get) |v| try out.append(v); return out; } @@ -840,12 +1221,7 @@ pub fn Fleet(UserData: type) type { if (hm.count() == 0) break; const rv = random.uintAtMost(u64, hm.count() - 1); const fr = hm.fetchRemove(rv); - if (fr) |v| try out.append(.{ - .key = v.value.key, - .state = v.value.state, - .incarnation = v.value.incarnation, - }); - + if (fr) |v| try out.append(v.value); break; } } @@ -874,16 +1250,19 @@ pub fn Fleet(UserData: type) type { // Setup both dst_* and isd_* sections of the payload. // We are passing in an arena allocator here. - fn setDstAndIsd( + fn setMsgDstAndIsd( self: *Self, allocator: std.mem.Allocator, msg: *Message, excludes: [][]const u8, ) !void { - const dst = try self.getRandomMember(allocator, excludes, 1); - if (dst.items.len > 0) { + b: { + const dst = try self.getRandomMember(allocator, excludes, 1); + if (dst.items.len == 0) break :b; msg.dst_cmd = .infect; - try setMsgSection(msg, .dst, dst.items[0]); + const ki = self.getKeyInfo(dst.items[0]); + if (ki) |_| {} else break :b; + try setMsgSection(msg, .dst, ki.?); } // Setup main ISD info. @@ -894,6 +1273,73 @@ pub fn Fleet(UserData: type) type { } } + // Setup both dst_* and isd_* sections of the payload. + // We are passing in an arena allocator here. + // fn _setMsgDstAndIsd( + // self: *Self, + // allocator: std.mem.Allocator, + // key: []const u8, + // msg: *Message, + // excludes: ?[][]const u8, + // ) !void { + // b: { + // const dst = try self.getNextDstTarget(allocator, key, excludes); + // if (dst.items.len == 0) break :b; + // const ki = self.getKeyInfo(dst.items[0]); + // if (ki) |_| {} else break :b; + // msg.dst_cmd = .infect; + // try setMsgSection(msg, .dst, ki.?); + // } + + // // Setup main ISD info. + // const isd = try self.getIsdInfo(allocator, 1); + // if (isd.items.len > 0) { + // msg.isd_cmd = isd.items[0].isd_cmd; + // try setMsgSection(msg, .isd, isd.items[0]); + // } + // } + + // Caller is responsible for calling deinit on the returned list, + // unless arena. We are passing in an arena allocator here. + // fn getNextDstTarget( + // self: *Self, + // allocator: std.mem.Allocator, + // key: []const u8, + // excludes: ?[][]const u8, + // ) !std.ArrayList([]const u8) { + // var out = std.ArrayList([]const u8).init(allocator); + // while (true) { + // self.members_mtx.lock(); + // defer self.members_mtx.unlock(); + // const val = self.members.getPtr(key); + // if (val) |_| {} else return out; + + // const popn = val.?.targets.popOrNull(); + // if (popn) |pop| { + // try out.append(pop); + // return out; + // } + + // // If we're here, refill targets. + // var iter = self.members.iterator(); + // while (iter.next()) |v| { + // if (v.value_ptr.state == .faulty) continue; + // if (std.mem.eql(u8, v.key_ptr.*, key)) continue; + // var eql: usize = 0; + // if (excludes) |excl| { + // for (excl) |x| { + // if (std.mem.eql(u8, x, v.key_ptr.*)) eql += 1; + // } + // } + + // if (eql > 0) continue; + // try val.?.targets.append(v.key_ptr.*); + // } + // } + + // unreachable; + // } + // Ping a peer for liveness. Expected format for `key` is "ip:port", // eg. "127.0.0.1:8080". For pings, we use the src_* payload fields // to identify us, the sender. @@ -916,12 +1362,18 @@ pub fn Fleet(UserData: type) type { // Use both dst_* and isd_* for ISD info. var excludes: [1][]const u8 = .{key}; - try self.setDstAndIsd(arena, msg, &excludes); + try self.setMsgDstAndIsd(arena, msg, &excludes); // Handle leader protocol (egress). try self.setLeaderProtoSend(msg); - try self.send(ip, port, buf); + // Propagate number of members. + if (msg.isd_cmd == .noop) { + const n = self.getCounts(); + msg.isd_incarnation = n[0] + n[1]; + } + + try self.send(ip, port, buf, null); // Handle leader protocol (ingress). const cmdm: LeaderCmd = @enumFromInt((msg.leader_proto & @@ -955,121 +1407,95 @@ pub fn Fleet(UserData: type) type { }; } - const IndirectPing = struct { - thr: std.Thread = undefined, + const RequestPing = struct { self: *Self, src: []const u8, // agent dst: []const u8, // target ack: bool = false, }; - // To be run as a separate thread. Ask somebody else to do an indirect ping - // for us, piggybacking on some of the messages we need to propagate. - fn indirectPing(args: *IndirectPing) !void { - log.debug("[thread] try pinging {s} via {s}", .{ args.dst, args.src }); - - var aa = std.heap.ArenaAllocator.init(args.self.allocator); - defer aa.deinit(); // destroy arena in one go - const arena = aa.allocator(); - - const sep = std.mem.indexOf(u8, args.src, ":") orelse return; - const ip = args.src[0..sep]; - const port = try std.fmt.parseUnsigned(u16, args.src[sep + 1 ..], 10); - - const buf = try arena.alloc(u8, @sizeOf(Message)); - const msg: *Message = @ptrCast(@alignCast(buf)); - try args.self.presetMessage(msg); - msg.cmd = .ping_req; - - // Set src_* to our info, the sender. - try args.self.setMsgSrcToOwn(msg); - - // The dst_* section is the target of our ping. - try setMsgSection(msg, .dst, .{ - .key = args.dst, - .state = .suspected, // will not be used - .incarnation = 0, // will not be used - }); + // Our only agent for doing indirect pings for suspicious nodes. Long-running. + fn requestPing(args: *RequestPing) !void { + while (true) { + args.self.ping_req_0.wait(); + defer { + args.self.ping_req_0.reset(); + args.self.ping_req_1.set(); + } - // Handle ISD info. - const isd = try args.self.getIsdInfo(arena, 1); - if (isd.items.len > 0) { - msg.isd_cmd = .infect; - try setMsgSection(msg, .isd, isd.items[0]); - } + log.debug("[thread] try pinging {s} via {s}", .{ args.dst, args.src }); - // Handle leader protocol (egress). - try args.self.setLeaderProtoSend(msg); + var aa = std.heap.ArenaAllocator.init(args.self.allocator); + defer aa.deinit(); // destroy arena in one go + const arena = aa.allocator(); - args.self.send(ip, port, buf) catch |err| log.err("send failed: {any}", .{err}); + const sep = std.mem.indexOf(u8, args.src, ":") orelse return; + const ip = args.src[0..sep]; + const port = try std.fmt.parseUnsigned(u16, args.src[sep + 1 ..], 10); - // Handle leader protocol (ingress). - args.self.setLeaderProtoRecv(msg); + const buf = try arena.alloc(u8, @sizeOf(Message)); + const msg: *Message = @ptrCast(@alignCast(buf)); + try args.self.presetMessage(msg); + msg.cmd = .ping_req; - switch (msg.cmd) { - .ack => { - try args.self.addOrSet(args.src, msg.src_state, msg.src_incarnation, true); - try args.self.addOrSet(args.dst, msg.dst_state, msg.dst_incarnation, true); + // Set src_* to our info, the sender. + try args.self.setMsgSrcToOwn(msg); - // Consume isd_* as the main ISD info. - switch (msg.isd_cmd) { - .infect, - .confirm_alive, - => try args.self.handleIsd(arena, msg, false), - .suspect => try args.self.handleSuspicion(arena, msg), - .confirm_faulty => try args.self.handleConfirmFaulty(arena, msg), - else => {}, - } + // The dst_* section is the target of our ping. + try setMsgSection(msg, .dst, .{ + .key = args.dst, + .state = .suspected, // will not be used + .incarnation = 0, // will not be used + }); - const ptr = &args.ack; - ptr.* = true; - }, - .nack => try args.self.addOrSet( - args.src, - msg.src_state, - msg.src_incarnation, - false, - ), - else => {}, - } - } + // Handle ISD info. + const isd = try args.self.getIsdInfo(arena, 1); + if (isd.items.len > 0) { + msg.isd_cmd = .infect; + try setMsgSection(msg, .isd, isd.items[0]); + } - // We are responsible for freeing `bpl` and `args` itself. Uses internal allocator. - fn broadcastPing(self: *Self) !void { - while (true) { - self.ev_broadcast_ping.wait(); - defer self.ev_broadcast_ping.reset(); + // Handle leader protocol (egress). + try args.self.setLeaderProtoSend(msg); - if (true) continue; + args.self.send(ip, port, buf, null) catch continue; - var tm = try std.time.Timer.start(); - defer log.debug("broadcastPing took {any}", .{std.fmt.fmtDuration(tm.read())}); + // Handle leader protocol (ingress). + args.self.setLeaderProtoRecv(msg); - var bl = std.ArrayList([]const u8).init(self.allocator); - defer bl.deinit(); + switch (msg.cmd) { + .ack => { + try args.self.addOrSet(args.src, msg.src_state, msg.src_incarnation, true); + try args.self.addOrSet(args.dst, msg.dst_state, msg.dst_incarnation, true); + + // Consume isd_* as the main ISD info. + switch (msg.isd_cmd) { + .infect, + .confirm_alive, + => try args.self.handleIsd(arena, msg, false), + .suspect => try args.self.handleSuspicion(arena, msg), + .confirm_faulty => try args.self.handleConfirmFaulty(arena, msg), + else => {}, + } - { - self.members_mtx.lock(); - defer self.members_mtx.unlock(); - var iter = self.members.iterator(); - while (iter.next()) |v| { - if (v.value_ptr.state != .alive) continue; - if (self.keyIsMe(v.key_ptr.*)) continue; - try bl.append(v.key_ptr.*); - } + const ptr = &args.ack; + ptr.* = true; + }, + .nack => try args.self.addOrSet( + args.src, + msg.src_state, + msg.src_incarnation, + false, + ), + else => {}, } - - if (bl.items.len == 0) continue; - - log.debug("start broadcastPing for {d} nodes", .{bl.items.len}); - - for (bl.items) |v| _ = self.ping(v) catch false; } } - // Helper function for internal one-shot send/recv. The same - // message ptr is used for both request and response payloads. - fn send(_: *Self, ip: []const u8, port: u16, msg: []u8) !void { + // Helper function for internal one-shot send/recv. The same message ptr is + // used for both request and response payloads. If `tm_us` is not null, + // default timeout will be 5s. + fn send(_: *Self, ip: []const u8, port: u16, msg: []u8, tm_us: ?u32) !void { const addr = try std.net.Address.resolveIp(ip, port); const sock = try std.posix.socket( std.posix.AF.INET, @@ -1077,9 +1503,12 @@ pub fn Fleet(UserData: type) type { 0, ); + var tm: u32 = 1_000_000; + if (tm_us) |v| tm = v; + defer std.posix.close(sock); - try setReadTimeout(sock, 5_000_000); - try setWriteTimeout(sock, 5_000_000); + try setReadTimeout(sock, tm); + try setWriteTimeout(sock, tm); try std.posix.connect(sock, &addr.any, addr.getOsSockLen()); _ = try std.posix.write(sock, msg); _ = try std.posix.recv(sock, msg, 0); @@ -1101,14 +1530,14 @@ pub fn Fleet(UserData: type) type { const pkey = self.getPersistentKeyFromKey(key); if (pkey) |_| {} else break :b; - self.isd_mtx.lock(); - defer self.isd_mtx.unlock(); - try self.isd_queue.append(.{ - .key = pkey.?, - .state = .alive, - .isd_cmd = .confirm_alive, - .incarnation = try self.getIncarnation(), // ok since atomic - }); + // self.isd_mtx.lock(); + // defer self.isd_mtx.unlock(); + // try self.isd_queue.append(.{ + // .key = pkey.?, + // .state = .alive, + // .isd_cmd = .confirm_alive, + // .incarnation = try self.getIncarnation(), // ok since atomic + // }); return; } @@ -1134,16 +1563,16 @@ pub fn Fleet(UserData: type) type { const pkey = self.getPersistentKeyFromKey(key); if (pkey) |_| {} else return; - { - self.isd_mtx.lock(); - defer self.isd_mtx.unlock(); - try self.isd_queue.append(.{ - .key = pkey.?, - .state = suspected.items[0].state, - .isd_cmd = suspected.items[0].isd_cmd, - .incarnation = suspected.items[0].incarnation, - }); - } + // { + // self.isd_mtx.lock(); + // defer self.isd_mtx.unlock(); + // try self.isd_queue.append(.{ + // .key = pkey.?, + // .state = suspected.items[0].state, + // .isd_cmd = suspected.items[0].isd_cmd, + // .incarnation = suspected.items[0].incarnation, + // }); + // } } // Handle the isd_* faulty protocol of the message payload. @@ -1158,14 +1587,14 @@ pub fn Fleet(UserData: type) type { const pkey = self.getPersistentKeyFromKey(key); if (pkey) |_| {} else return; - self.isd_mtx.lock(); - defer self.isd_mtx.unlock(); - try self.isd_queue.append(.{ - .key = pkey.?, - .state = .alive, - .isd_cmd = .confirm_alive, - .incarnation = try self.getIncarnation(), // ok since atomic - }); + // self.isd_mtx.lock(); + // defer self.isd_mtx.unlock(); + // try self.isd_queue.append(.{ + // .key = pkey.?, + // .state = .alive, + // .isd_cmd = .confirm_alive, + // .incarnation = try self.getIncarnation(), // ok since atomic + // }); } // NOTE: Not using locks; only atomic. @@ -1244,6 +1673,18 @@ pub fn Fleet(UserData: type) type { }; } + fn getKeyInfo(self: *Self, key: []const u8) ?KeyInfo { + self.members_mtx.lock(); + defer self.members_mtx.unlock(); + const ptr = self.members.getPtr(key); + if (ptr) |_| {} else return null; + return .{ + .key = key, + .state = ptr.?.state, + .incarnation = ptr.?.incarnation, + }; + } + // We always assume the node with the largest ip(int)+port to be leader. // [0] - leader's (highest) ip in int format // [1] - leader's (highest) port number @@ -1288,6 +1729,23 @@ pub fn Fleet(UserData: type) type { if (cmdm != .invalidate) _ = self.leader_hb.lap(); } + fn setTermAndN(self: *Self, msg: *Message) void { + const n = self.getCounts(); + const total = n[0] + n[1]; + const term = @atomicLoad(u64, &self.term, std.builtin.AtomicOrder.seq_cst); + const mterm: u64 = term & 0x0000FFFFFFFFFFFF; + const mcount: u64 = (total << 48) & 0xFFFF000000000000; + msg.leader_proto = mcount | mterm; + } + + // [0] - term + // [1] - count + fn getTermAndN(_: *Self, msg: *Message) std.meta.Tuple(&.{ u64, u64 }) { + const term = msg.leader_proto & 0x0000FFFFFFFFFFFF; + const count = (msg.leader_proto & 0xFFFF000000000000) >> 48; + return .{ term, count }; + } + // Set default values for the message. fn presetMessage(self: *Self, msg: *Message) !void { msg.name = std.mem.readVarInt(u64, self.name, .little); @@ -1334,7 +1792,7 @@ pub fn Fleet(UserData: type) type { const nkey = try self.allocator.dupe(u8, key); // Our copy of all member keys being allocated; to free later. - if (!self.ref_keys.contains(nkey)) try self.ref_keys.put(nkey, {}); + if (!self.refkeys.contains(nkey)) try self.refkeys.put(nkey, {}); { self.members_mtx.lock(); @@ -1342,12 +1800,24 @@ pub fn Fleet(UserData: type) type { try self.members.put(nkey, .{ .age_suspected = try std.time.Timer.start(), .age_faulty = try std.time.Timer.start(), + .targets = std.ArrayList([]const u8).init(self.allocator), }); } try self.setMemberInfo(key, state, incarnation, true); } + // `key` should be in fmt: "ip:port", e.g. "127.0.0.1:8080". We + // duplicate `key` to our internal list to be able to free later. + fn ensureKeyRef(self: *Self, key: []const u8) ![]const u8 { + self.refkeys_mtx.lock(); + defer self.refkeys_mtx.unlock(); + if (self.refkeys.contains(key)) return self.refkeys.getKey(key).?; + const dup = try self.allocator.dupe(u8, key); + try self.refkeys.put(dup, {}); + return dup; + } + // Reference: SWIM:4.2 // Order of preference: // @@ -1401,32 +1871,32 @@ pub fn Fleet(UserData: type) type { p.?.state = in_state; p.?.incarnation = in_inc; - if (p.?.state == .suspected) p.?.age_suspected.reset(); - if (p.?.state == .faulty) p.?.age_faulty.reset(); + if (p.?.state == .suspected and in_state != .suspected) p.?.age_suspected.reset(); + if (p.?.state == .faulty and in_state != .faulty) p.?.age_faulty.reset(); } - const SuspectToFaulty = struct { - self: *Self, - key: []const u8, - }; - - // To be run as a separate thread. Keep it suspected - // for a while before marking it as faulty. - fn suspectToFaulty(args: *SuspectToFaulty) !void { - // Pause for a bit before we set to faulty. - std.time.sleep(args.self.suspected_time); - try args.self.setMemberInfo(args.key, .faulty, null, false); - - // Broadcast confirm_faulty to the group. - args.self.isd_mtx.lock(); - defer args.self.isd_mtx.unlock(); - try args.self.isd_queue.append(.{ - .key = args.key, - .state = .faulty, - .isd_cmd = .confirm_faulty, - .incarnation = try args.self.getIncarnation(), // ok since atomic - }); - } + // const SuspectToFaulty = struct { + // self: *Self, + // key: []const u8, + // }; + + // // To be run as a separate thread. Keep it suspected + // // for a while before marking it as faulty. + // fn suspectToFaulty(args: *SuspectToFaulty) !void { + // // Pause for a bit before we set to faulty. + // std.time.sleep(args.self.suspected_time); + // try args.self.setMemberInfo(args.key, .faulty, null, false); + + // // Broadcast confirm_faulty to the group. + // args.self.isd_mtx.lock(); + // defer args.self.isd_mtx.unlock(); + // try args.self.isd_queue.append(.{ + // .key = args.key, + // .state = .faulty, + // .isd_cmd = .confirm_faulty, + // .incarnation = try args.self.getIncarnation(), // ok since atomic + // }); + // } // Attempt removing faulty members after some time. fn removeFaultyMembers(self: *Self) !void { @@ -1453,7 +1923,8 @@ pub fn Fleet(UserData: type) type { fn removeMember(self: *Self, key: []const u8) void { self.members_mtx.lock(); defer self.members_mtx.unlock(); - _ = self.members.fetchRemove(key); + const fr = self.members.fetchRemove(key); + if (fr) |v| v.value.targets.deinit(); } const MsgSection = enum { @@ -1490,6 +1961,100 @@ pub fn Fleet(UserData: type) type { }, } } + + fn getState(self: *Self) NodeState { + self.lmtx.lock(); + defer self.lmtx.unlock(); + return self.state; + } + + fn setState(self: *Self, state: NodeState) void { + self.lmtx.lock(); + defer self.lmtx.unlock(); + self.state = state; + } + + // Best-effort basis only. `msg` should already contain the new join info + // in the dst_* portion, as well as it's source info. + fn informLeaderOfJoin(self: *Self, msg: []u8) !void { + const leader = b: { + self.lmtx.lock(); + defer self.lmtx.unlock(); + break :b self.leader; + }; + + log.debug("informLeaderOfJoin: current leader is '{s}'", .{leader}); + + if (leader.len < 2) return; + + const sep = std.mem.indexOf(u8, leader, ":") orelse return; + const ip = leader[0..sep]; + const port = try std.fmt.parseUnsigned(u16, leader[sep + 1 ..], 10); + + try self.send(ip, port, msg, null); + } + + fn getTerm(self: *Self) u64 { + return @atomicLoad( + u64, + &self.term, + std.builtin.AtomicOrder.seq_cst, + ); + } + + fn setTerm(self: *Self, term: u64) void { + @atomicStore( + u64, + &self.term, + term, + std.builtin.AtomicOrder.seq_cst, + ); + } + + fn incTermAndGet(self: *Self) u64 { + _ = @atomicRmw( + u64, + &self.term, + std.builtin.AtomicRmwOp.Add, + 1, + std.builtin.AtomicOrder.seq_cst, + ); + + return self.getTerm(); + } + + fn getVotes(self: *Self) u32 { + return @atomicLoad( + u32, + &self.votes, + std.builtin.AtomicOrder.seq_cst, + ); + } + + fn setVotes(self: *Self, vote: u32) void { + @atomicStore( + u32, + &self.votes, + vote, + std.builtin.AtomicOrder.seq_cst, + ); + } + + fn voteForSelf(self: *Self) u32 { + _ = @atomicRmw( + u32, + &self.votes, + std.builtin.AtomicRmwOp.Add, + 1, + std.builtin.AtomicOrder.seq_cst, + ); + + return self.getVotes(); + } + + fn incVotesAndGet(self: *Self) u32 { + return self.voteForSelf(); + } }; }