diff --git a/src/zgroup.zig b/src/zgroup.zig index 3bd1356..5db3081 100644 --- a/src/zgroup.zig +++ b/src/zgroup.zig @@ -24,7 +24,7 @@ pub fn Fleet() type { suspected_time: u64, ping_req_k: u32, - // Our per-member data. Key format is "ip:port", eg. "0.0.0.0:8080". + // Our per-member data. Key format is "ip:port", eg. "127.0.0.1:8080". members: std.StringHashMap(MemberData), members_mtx: std.Thread.Mutex = .{}, @@ -35,9 +35,6 @@ pub fn Fleet() type { isd_queue: std.ArrayList(KeyInfo), isd_mtx: std.Thread.Mutex = .{}, - // Internal: our incarnation number for suspicion subprotocol. - incarnation: u64 = 0, - /// SWIM protocol generic commands. pub const Command = enum(u8) { noop, @@ -74,25 +71,23 @@ pub fn Fleet() type { /// Our generic UDP comms/protocol payload. pub const Message = packed struct { name: u128 = 0, - cmd: Command = .nack, + // Section for ping, ping_req, ack, nack. + cmd: Command = .noop, src_ip: u32 = 0, src_port: u16 = 0, src_state: MemberState = .alive, src_incarnation: u64 = 0, + dst_cmd: IsdCommand = .noop, dst_ip: u32 = 0, dst_port: u16 = 0, dst_state: MemberState = .alive, dst_incarnation: u64 = 0, - isd1_cmd: IsdCommand = .noop, - isd1_ip: u32 = 0, - isd1_port: u16 = 0, - isd1_state: MemberState = .alive, - isd1_incarnation: u64 = 0, - isd2_cmd: IsdCommand = .noop, - isd2_ip: u32 = 0, - isd2_port: u16 = 0, - isd2_state: MemberState = .alive, - isd2_incarnation: u64 = 0, + // Infection-style dissemination section. + isd_cmd: IsdCommand = .noop, + isd_ip: u32 = 0, + isd_port: u16 = 0, + isd_state: MemberState = .alive, + isd_incarnation: u64 = 0, }; /// Per-member context data. @@ -121,7 +116,7 @@ pub fn Fleet() type { protocol_time: u64 = std.time.ns_per_s * 2, /// Suspicion subprotocol timeout duration. - suspected_time: u64 = std.time.ns_per_ms * 1500, + suspected_time: u64 = std.time.ns_per_s * 2, /// Number of members we will request to do indirect pings for us (agents). /// Valid value at the moment is `1`. @@ -165,16 +160,9 @@ pub fn Fleet() type { /// Start group membership tracking. pub fn run(self: *Self) !void { - var arena = std.heap.ArenaAllocator.init(self.allocator); - defer arena.deinit(); // destroy arena in one go - - const key = try std.fmt.allocPrint( - arena.allocator(), - "{s}:{d}", - .{ self.ip, self.port }, - ); - - try self.addOrSet(key, .alive, 0); + const me = try self.getOwnKey(); + defer self.allocator.free(me); + try self.addOrSet(me, .alive, 0); const server = try std.Thread.spawn(.{}, Self.listen, .{self}); server.detach(); @@ -193,15 +181,16 @@ pub fn Fleet() type { ) !void { log.info("joining via {s}:{any}, name={s}...", .{ dst_ip, dst_port, name }); - var arena = std.heap.ArenaAllocator.init(self.allocator); - defer arena.deinit(); // destroy arena in one go + var parent = std.heap.ArenaAllocator.init(self.allocator); + defer parent.deinit(); // destroy arena in one go + const arena = parent.allocator(); - const buf = try arena.allocator().alloc(u8, @sizeOf(Message)); + const buf = try arena.alloc(u8, @sizeOf(Message)); const msg: *Message = @ptrCast(@alignCast(buf)); try self.presetMessage(msg); msg.cmd = .join; - try self.setMsgSrcToOwn(arena.allocator(), msg); + try self.setMsgSrcToOwn(msg); try self.send(dst_ip, dst_port, buf); @@ -209,12 +198,7 @@ pub fn Fleet() type { .ack => { const sname = try std.fmt.parseUnsigned(u128, self.name, 0); if (sname == msg.name) { - const key = try std.fmt.allocPrint( - arena.allocator(), - "{s}:{d}", - .{ dst_ip, dst_port }, - ); - + const key = try std.fmt.allocPrint(arena, "{s}:{d}", .{ dst_ip, dst_port }); try self.addOrSet(key, .alive, 0); joined.* = true; } @@ -262,76 +246,24 @@ pub fn Fleet() type { }; // Test only, remove: - if (self.port == 8080 and (i == 15 or i == 16)) { - log.debug("8082:trigger hiccup test...", .{}); - continue; - } - - var arena = std.heap.ArenaAllocator.init(self.allocator); - defer arena.deinit(); - - // 1st section of our infection-style dissemination payload. - switch (msg.isd1_cmd) { - .infect => { - const key = try self.msgIpPortToKey( - arena.allocator(), - msg.isd1_ip, - msg.isd1_port, - ); - - try self.addOrSet(key, msg.isd1_state, msg.isd1_incarnation); - }, - .suspect => { - const key = try self.msgIpPortToKey( - arena.allocator(), - msg.isd1_ip, - msg.isd1_port, - ); - - if (try self.keyIsMe(key)) { - self.IncrementIncarnation(); + // if (self.port == 8080 and (i == 15 or i == 16)) { + // log.debug("8082:trigger hiccup test...", .{}); + // continue; + // } - { - self.isd_mtx.lock(); - defer self.isd_mtx.unlock(); - try self.isd_queue.append(.{ - .key = key, - .state = .alive, - .isd_cmd = .confirm_alive, - .incarnation = self.getIncarnation(), - }); - } - } else self.setMemberInfo(key, .suspected, msg.isd1_incarnation); - }, - .confirm_alive => { - log.debug(">>>>> 1: todo: confirm alive, inc={d}", .{msg.isd1_incarnation}); - }, - .confirm_faulty => { - log.debug(">>>>> 1: todo: confirm faulty", .{}); - }, - else => {}, - } + var parent = std.heap.ArenaAllocator.init(self.allocator); + defer parent.deinit(); // destroy arena in one go + const arena = parent.allocator(); - // 2nd section of our infection-style dissemination payload. - switch (msg.isd2_cmd) { + switch (msg.isd_cmd) { .infect => { - const key = try self.msgIpPortToKey( - arena.allocator(), - msg.isd2_ip, - msg.isd2_port, - ); - - try self.addOrSet(key, msg.isd2_state, msg.isd2_incarnation); + const key = try keyFromIpPort(arena, msg.isd_ip, msg.isd_port); + log.debug(">>>>> todo: isd={s}", .{key}); }, .suspect => { - const key = try self.msgIpPortToKey( - arena.allocator(), - msg.isd2_ip, - msg.isd2_port, - ); - - if (try self.keyIsMe(key)) { - self.IncrementIncarnation(); + const key = try keyFromIpPort(arena, msg.isd_ip, msg.isd_port); + if (self.keyIsMe(key)) { + try self.IncrementIncarnation(); { self.isd_mtx.lock(); @@ -340,16 +272,18 @@ pub fn Fleet() type { .key = key, .state = .alive, .isd_cmd = .confirm_alive, - .incarnation = self.getIncarnation(), + .incarnation = try self.getIncarnation(), }); } - } else self.setMemberInfo(key, .suspected, msg.isd2_incarnation); + } else { + try self.setMemberInfo(key, .suspected, msg.isd_incarnation); + } }, .confirm_alive => { - log.debug(">>>>> 2: todo: confirm alive, inc={d}", .{msg.isd2_incarnation}); + log.debug(">>>>> todo: confirm alive, inc={d}", .{msg.isd_incarnation}); }, .confirm_faulty => { - log.debug(">>>>> 2: todo: confirm faulty", .{}); + log.debug(">>>>> todo: confirm faulty", .{}); }, else => {}, } @@ -358,16 +292,11 @@ pub fn Fleet() type { switch (msg.cmd) { .join => block: { if (msg.name == name) { - const key = try self.msgIpPortToKey( - arena.allocator(), - msg.src_ip, - msg.src_port, - ); - + const key = try keyFromIpPort(arena, msg.src_ip, msg.src_port); try self.addOrSet(key, .alive, msg.src_incarnation); - // Always set src_* to our own info when ack. - try self.setMsgSrcToOwn(arena.allocator(), msg); + // Always set src_* to own info. + try self.setMsgSrcToOwn(msg); msg.cmd = .ack; _ = std.posix.sendto( @@ -392,23 +321,22 @@ pub fn Fleet() type { }, .ping => { // Payload information: - // src_* : caller - // dst_* : ??? - // isd1_*: ISD1 (already processed above) - // isd2_*: ISD2 (already processed above) + // src_*: caller/requester + // dst_*: ISD + // isd_*: ISD msg.cmd = .nack; if (msg.name == name) { msg.cmd = .ack; - const key = try self.msgIpPortToKey( - arena.allocator(), - msg.src_ip, - msg.src_port, - ); + const src = try keyFromIpPort(arena, msg.src_ip, msg.src_port); + try self.addOrSet(src, .alive, msg.src_incarnation); - try self.addOrSet(key, .alive, msg.src_incarnation); + if (msg.dst_cmd == .infect) { + const dst = try keyFromIpPort(arena, msg.dst_ip, msg.dst_port); + try self.addOrSet(dst, msg.dst_state, msg.dst_incarnation); + } - // Always set src_* to our own info when ack. - try self.setMsgSrcToOwn(arena.allocator(), msg); + // Always set src_* to own info. + try self.setMsgSrcToOwn(msg); } _ = std.posix.sendto( @@ -421,51 +349,38 @@ pub fn Fleet() type { }, .ping_req => block: { // Payload information: - // src_* : caller/requester (we are the agent) - // dst_* : target of the ping-request - // isd1_*: ISD1 (already processed above) - // isd2_*: ISD2 (already processed above) + // src_*: caller/requester (we are the agent) + // dst_*: target of the ping-request + // isd_*: ISD if (msg.name == name) { - const src = try self.msgIpPortToKey( - arena.allocator(), - msg.src_ip, - msg.src_port, - ); + const src = try keyFromIpPort(arena, msg.src_ip, msg.src_port); + try self.addOrSet(src, msg.src_state, msg.src_incarnation); - try self.addOrSet(src, .alive, msg.src_incarnation); + const dst = try keyFromIpPort(arena, msg.dst_ip, msg.dst_port); - const dst = try self.msgIpPortToKey( - arena.allocator(), - msg.dst_ip, - msg.dst_port, - ); - - // ISD info already processed above. We can reuse for ISD. - var excludes: [0][]const u8 = .{}; - const isd = try self.pickRandomNonFaulty( - arena.allocator(), - &excludes, - 2, - ); + log.debug("({d}) ping-req: requested to ping {s}", .{ len, dst }); - log.debug("({d}) ping-req: requested to ping {s}, isd={d}", .{ - len, - dst, - isd.items.len, - }); + const ack = self.ping(dst) catch false; - // This ping will overwrite src_* with our data (from caller). - const ack = self.ping(dst, isd) catch false; + msg.cmd = .nack; // default - msg.cmd = .nack; if (ack) { + // The src_* info here is the original ping target. + // Copy its info to the dst_* section before overwriting. msg.cmd = .ack; - try self.addOrSet(dst, .alive, msg.src_incarnation); + msg.dst_ip = msg.src_ip; + msg.dst_port = msg.src_port; + msg.dst_state = msg.src_state; + msg.dst_incarnation = msg.src_incarnation; - // Always set src_* to our own info when ack. - try self.setMsgSrcToOwn(arena.allocator(), msg); + try self.addOrSet(dst, .alive, msg.src_incarnation); } + // Always set src_* to own info. + try self.setMsgSrcToOwn(msg); + + // TODO: Utilize ISD section for broadcast. + _ = std.posix.sendto( sock, std.mem.asBytes(msg), @@ -477,6 +392,8 @@ pub fn Fleet() type { break :block; // return block } + // Not in this group. + self.presetMessage(msg) catch {}; msg.cmd = .nack; _ = std.posix.sendto( sock, @@ -497,8 +414,9 @@ pub fn Fleet() type { fn tick(self: *Self) !void { var i: usize = 0; while (true) : (i += 1) { - var arena = std.heap.ArenaAllocator.init(self.allocator); - defer arena.deinit(); + var parent = std.heap.ArenaAllocator.init(self.allocator); + defer parent.deinit(); // destroy arena in one go + const arena = parent.allocator(); log.debug("[{d}]", .{i}); // log separator @@ -507,13 +425,11 @@ pub fn Fleet() type { defer self.members_mtx.unlock(); var it = self.members.iterator(); while (it.next()) |v| { - var inc = v.value_ptr.incarnation; - if (try self.keyIsMe(v.key_ptr.*)) inc = self.getIncarnation(); log.debug("[{d}] members: key={s}, state={any}, inc={d}", .{ i, v.key_ptr.*, v.value_ptr.state, - inc, + v.value_ptr.incarnation, }); } } @@ -522,31 +438,20 @@ pub fn Fleet() type { var tm = try std.time.Timer.start(); var key_ptr: ?[]const u8 = null; - var isd: ?std.ArrayList(KeyInfo) = null; // via arena - const pt = try self.selectPingTarget(arena.allocator()); + const pt = try self.getPingTarget(arena); if (pt) |v| key_ptr = v; - // Look for a random live non-faulty member(s) to broadcast. - if (key_ptr) |ping_key| { - var excludes: [1][]const u8 = .{ping_key}; - isd = try self.pickRandomNonFaulty(arena.allocator(), &excludes, 2); - } - if (key_ptr) |ping_key| { - log.debug("[{d}] try pinging {s}, broadcast(s)={d}", .{ - i, - ping_key, - if (isd) |v| v.items.len else 0, - }); + log.debug("[{d}] try pinging {s}", .{ i, ping_key }); - switch (self.ping(ping_key, isd) catch false) { + switch (self.ping(ping_key) catch false) { false => { // Let's do indirect ping for this suspicious node. var do_suspected = false; var excludes: [1][]const u8 = .{ping_key}; - const agents = try self.pickRandomNonFaulty( - arena.allocator(), + const agents = try self.getRandomMember( + arena, &excludes, self.ping_req_k, ); @@ -554,13 +459,12 @@ pub fn Fleet() type { if (agents.items.len == 0) do_suspected = true else { log.debug("[{d}] ping-req: agent(s)={d}", .{ i, agents.items.len }); - var ts = std.ArrayList(IndirectPing).init(arena.allocator()); + var ts = std.ArrayList(IndirectPing).init(arena); for (agents.items) |v| { var td = IndirectPing{ .self = self, .src = v.key, .dst = ping_key, - .isd = isd, }; td.thr = try std.Thread.spawn(.{}, Self.indirectPing, .{&td}); @@ -571,17 +475,10 @@ pub fn Fleet() type { var acks = false; for (ts.items) |v| acks = acks or v.ack; - if (!acks) do_suspected = true else { - try self.addOrSet(ping_key, .alive, null); - } + if (!acks) do_suspected = true; } - if (do_suspected) { - self.setMemberInfo(ping_key, .suspected, null); - var sf = SuspectToFaulty{ .self = self, .key = ping_key }; - const t = try std.Thread.spawn(.{}, Self.suspectToFaulty, .{&sf}); - t.detach(); - } + if (do_suspected) try self.setMemberInfo(ping_key, .suspected, null); }, else => log.debug("[{d}] ack from {s}", .{ i, ping_key }), } @@ -597,7 +494,7 @@ pub fn Fleet() type { } // Round-robin for one sweep, then randomize before doing another sweep. - fn selectPingTarget(self: *Self, allocator: std.mem.Allocator) !?[]const u8 { + fn getPingTarget(self: *Self, allocator: std.mem.Allocator) !?[]const u8 { while (true) { const pop = self.ping_queue.popOrNull(); if (pop) |v| return v; @@ -612,7 +509,7 @@ pub fn Fleet() type { var iter = self.members.iterator(); while (iter.next()) |v| { if (v.value_ptr.state == .faulty) continue; - if (try self.keyIsMe(v.key_ptr.*)) continue; + if (self.keyIsMe(v.key_ptr.*)) continue; try tl.append(v.key_ptr.*); } } @@ -649,51 +546,13 @@ pub fn Fleet() type { unreachable; } - // Pick random non-faulty members excluding `excludes` and ourselves. - // The return ArrayList will be owned by the caller and is expected - // to be freed outside of this function. - fn pickRandomNonFaulty( + // Caller is responsible for releasing the returned memory. + fn getRandomMember( self: *Self, allocator: std.mem.Allocator, // arena excludes: [][]const u8, max: usize, ) !std.ArrayList(KeyInfo) { - var out = std.ArrayList(KeyInfo).init(allocator); - var isd = std.ArrayList(KeyInfo).init(allocator); - - // First, let's see if there are subprotocol-related - // items in our queue. - - { - self.isd_mtx.lock(); - defer self.isd_mtx.unlock(); - while (true) { - const pop = self.isd_queue.popOrNull(); - if (pop) |v| try isd.append(v) else break; - if (out.items.len >= max) break; - } - } - - if (isd.items.len > 0) { - self.members_mtx.lock(); - defer self.members_mtx.unlock(); - for (isd.items) |k| { - const ptr = self.members.getPtr(k.key); - // if (ptr) |v| if (v.state == .faulty) continue; - // if (ptr) |v| if (k.state != v.state) continue; // TODO: revisit - if (ptr) |v| try out.append(.{ - .key = k.key, - .state = k.state, - .isd_cmd = k.isd_cmd, - .incarnation = v.incarnation, - }); - } - } - - if (out.items.len >= max) return out; - - // If there's still space, we get random items from members. - var hm = std.AutoHashMap(u64, KeyInfo).init(allocator); defer hm.deinit(); // noop @@ -703,7 +562,7 @@ pub fn Fleet() type { var iter = self.members.iterator(); while (iter.next()) |v| { if (v.value_ptr.state == .faulty) continue; - if (try self.keyIsMe(v.key_ptr.*)) continue; + if (self.keyIsMe(v.key_ptr.*)) continue; var eql: usize = 0; for (excludes) |x| { if (std.mem.eql(u8, x, v.key_ptr.*)) eql += 1; @@ -718,7 +577,9 @@ pub fn Fleet() type { } } - var limit = max - out.items.len; + var out = std.ArrayList(KeyInfo).init(allocator); + + var limit = max; if (limit > hm.count()) limit = hm.count(); if (hm.count() == 1 and limit > 0) { const get = hm.get(0); @@ -753,30 +614,65 @@ pub fn Fleet() type { return out; } - // Ping a peer for liveness. Expected format for `key` is ip:port, eg. 0.0.0.0:8080. - // For pings, we use the src_* payload fields to identify us, the sender. - fn ping(self: *Self, key: []const u8, isd: ?std.ArrayList(KeyInfo)) !bool { - var arena = std.heap.ArenaAllocator.init(self.allocator); - defer arena.deinit(); // destroy arena in one go + // Caller is responsible for releasing the returned memory. + fn getIsdInfo( + self: *Self, + allocator: std.mem.Allocator, + max: usize, + ) !std.ArrayList(KeyInfo) { + var out = std.ArrayList(KeyInfo).init(allocator); + self.isd_mtx.lock(); + defer self.isd_mtx.unlock(); + while (true) { + const pop = self.isd_queue.popOrNull(); + if (pop) |v| try out.append(v) else break; + if (out.items.len >= max) break; + } + + return out; + } + + // Ping a peer for liveness. Expected format for `key` is "ip:port", + // eg. "127.0.0.1:8080". For pings, we use the src_* payload fields + // to identify us, the sender. + fn ping(self: *Self, key: []const u8) !bool { + var parent = std.heap.ArenaAllocator.init(self.allocator); + defer parent.deinit(); // destroy arena in one go + const arena = parent.allocator(); const sep = std.mem.indexOf(u8, key, ":") orelse return false; const ip = key[0..sep]; const port = try std.fmt.parseUnsigned(u16, key[sep + 1 ..], 10); if (std.mem.eql(u8, ip, self.ip) and port == self.port) return true; - const buf = try arena.allocator().alloc(u8, @sizeOf(Message)); + const buf = try arena.alloc(u8, @sizeOf(Message)); const msg: *Message = @ptrCast(@alignCast(buf)); try self.presetMessage(msg); msg.cmd = .ping; - try self.setMsgSrcToOwn(arena.allocator(), msg); + try self.setMsgSrcToOwn(msg); - if (isd) |isd_v| try self.setIsdInfo(msg, isd_v); // piggyback ISD + var excludes: [1][]const u8 = .{key}; + const dst = try self.getRandomMember(arena, &excludes, 1); + if (dst.items.len > 0) { + msg.dst_cmd = .infect; + try setMsgSection(msg, .dst, dst.items[0]); + } + + const isd = try self.getIsdInfo(arena, 1); + switch (isd.items.len) { + 1 => { + msg.isd_cmd = .infect; + try setMsgSection(msg, .isd, isd.items[0]); + }, + else => {}, + } try self.send(ip, port, buf); return switch (msg.cmd) { .ack => b: { + // Set here while we have access to the msg pointer. try self.addOrSet(key, .alive, msg.src_incarnation); break :b true; }, @@ -789,7 +685,6 @@ pub fn Fleet() type { self: *Self, src: []const u8, // agent dst: []const u8, // target - isd: ?std.ArrayList(KeyInfo) = null, ack: bool = false, }; @@ -797,39 +692,55 @@ pub fn Fleet() type { // for us, piggybacking on some of the messages we need to propagate. fn indirectPing(args: *IndirectPing) !void { log.debug("[thread] try pinging {s} via {s}", .{ args.dst, args.src }); - var arena = std.heap.ArenaAllocator.init(args.self.allocator); - defer arena.deinit(); // destroy arena in one go + + var parent = std.heap.ArenaAllocator.init(args.self.allocator); + defer parent.deinit(); // destroy arena in one go + const arena = parent.allocator(); const sep = std.mem.indexOf(u8, args.src, ":") orelse return; const ip = args.src[0..sep]; const port = try std.fmt.parseUnsigned(u16, args.src[sep + 1 ..], 10); - const buf = try arena.allocator().alloc(u8, @sizeOf(Message)); + const buf = try arena.alloc(u8, @sizeOf(Message)); const msg: *Message = @ptrCast(@alignCast(buf)); try args.self.presetMessage(msg); msg.cmd = .ping_req; // Set src_* to our info, the sender. - try args.self.setMsgSrcToOwn(arena.allocator(), msg); + try args.self.setMsgSrcToOwn(msg); // The dst_* section is the target of our ping. - try args.self.setMessageSection(msg, .dst, .{ + try setMsgSection(msg, .dst, .{ .key = args.dst, .state = .suspected, // will not be used .incarnation = 0, // will not be used }); - if (args.isd) |isd| try args.self.setIsdInfo(msg, isd); // piggyback isd + const isd = try args.self.getIsdInfo(arena, 1); + switch (isd.items.len) { + 0 => {}, + else => try setMsgSection(msg, .isd, isd.items[0]), + } args.self.send(ip, port, buf) catch |err| log.err("send failed: {any}", .{err}); switch (msg.cmd) { .ack => { - log.debug("[thread] got ack from {s}", .{args.src}); - try args.self.addOrSet(args.src, .alive, null); // agent + log.debug("[thread] got ACK from {s}", .{args.src}); + + // Set here while we have access to the msg pointer. + try args.self.addOrSet(args.src, msg.src_state, msg.src_incarnation); + try args.self.addOrSet(args.dst, msg.dst_state, msg.dst_incarnation); + const ptr = &args.ack; ptr.* = true; }, + .nack => { + log.debug("[thread] got NACK from {s}", .{args.src}); + + // Set here while we have access to the msg pointer. + try args.self.addOrSet(args.src, msg.src_state, msg.src_incarnation); + }, else => {}, } } @@ -852,202 +763,72 @@ pub fn Fleet() type { _ = try std.posix.recv(sock, msg, 0); } - // [0] = # of alive members - // [1] = # of suspected members - // [2] = # of faulty members - // [3] = total number of members - fn nStates(self: *Self) [4]usize { - var n: [4]usize = .{ 0, 0, 0, 0 }; - self.members_mtx.lock(); - defer self.members_mtx.unlock(); - var it = self.members.iterator(); - while (it.next()) |v| { - switch (v.value_ptr.state) { - .alive => n[0] += 1, - .suspected => n[1] += 1, - .faulty => n[2] += 1, - } - } + // NOTE: Not using locks; only atomic. + fn getIncarnation(self: *Self) !u64 { + const me = try self.getOwnKey(); + defer self.allocator.free(me); + const ptr = self.members.getPtr(me); + if (ptr) |v| return @atomicLoad( + u64, + &v.incarnation, + AtomicOrder.seq_cst, + ); - n[3] = self.members.count(); - return n; + unreachable; } - fn getIncarnation(self: *Self) u64 { - return @atomicLoad(u64, &self.incarnation, AtomicOrder.seq_cst); + // NOTE: Not using locks; only atomic. + fn IncrementIncarnation(self: *Self) !void { + const me = try self.getOwnKey(); + defer self.allocator.free(me); + const ptr = self.members.getPtr(me); + if (ptr) |v| + _ = @atomicRmw( + u64, + &v.incarnation, + AtomicRmwOp.Add, + 1, + AtomicOrder.seq_cst, + ); } - fn IncrementIncarnation(self: *Self) void { - _ = @atomicRmw( - u64, - &self.incarnation, - AtomicRmwOp.Add, - 1, - AtomicOrder.seq_cst, - ); + // Caller must free the returned memory. + fn getOwnKey(self: *Self) ![]const u8 { + return try std.fmt.allocPrint(self.allocator, "{s}:{d}", .{ self.ip, self.port }); } // Expected format for `key` is ip:port, eg. 0.0.0.0:8080. - fn keyIsMe(self: *Self, key: []const u8) !bool { + fn keyIsMe(self: *Self, key: []const u8) bool { const sep = std.mem.indexOf(u8, key, ":") orelse return false; const ip = key[0..sep]; - const port = try std.fmt.parseUnsigned(u16, key[sep + 1 ..], 10); + const port = std.fmt.parseUnsigned(u16, key[sep + 1 ..], 10) catch return false; return if (std.mem.eql(u8, ip, self.ip) and port == self.port) true else false; } - fn msgIpPortToKey( - _: *Self, - allocator: std.mem.Allocator, - ip: u32, - port: u16, - ) ![]const u8 { - const ipb = std.mem.asBytes(&ip); - return try std.fmt.allocPrint(allocator, "{d}.{d}.{d}.{d}:{d}", .{ - ipb[0], - ipb[1], - ipb[2], - ipb[3], - port, - }); - } - // Set default values for the message. fn presetMessage(self: *Self, msg: *Message) !void { msg.name = try std.fmt.parseUnsigned(u128, self.name, 0); msg.cmd = .noop; msg.src_state = .alive; + msg.dst_cmd = .noop; msg.dst_state = .alive; - msg.isd1_cmd = .noop; - msg.isd2_cmd = .noop; - msg.isd1_state = .alive; - msg.isd2_state = .alive; - } - - const MessageSection = enum { - src, - dst, - isd1, - isd2, - }; - - // Set a section of the message payload with ip, port, and state info. - fn setMessageSection( - _: *Self, - msg: *Message, - section: MessageSection, - info: KeyInfo, - ) !void { - const sep = std.mem.indexOf(u8, info.key, ":") orelse return; - const ip = info.key[0..sep]; - const port = try std.fmt.parseUnsigned(u16, info.key[sep + 1 ..], 10); - const addr = try std.net.Address.resolveIp(ip, port); - - switch (section) { - .src => { - msg.src_ip = addr.in.sa.addr; - msg.src_port = port; - msg.src_state = info.state; - msg.src_incarnation = info.incarnation; - }, - .dst => { - msg.dst_ip = addr.in.sa.addr; - msg.dst_port = port; - msg.dst_state = info.state; - msg.dst_incarnation = info.incarnation; - }, - .isd1 => { - msg.isd1_ip = addr.in.sa.addr; - msg.isd1_port = port; - msg.isd1_state = info.state; - msg.isd1_incarnation = info.incarnation; - }, - .isd2 => { - msg.isd2_ip = addr.in.sa.addr; - msg.isd2_port = port; - msg.isd2_state = info.state; - msg.isd2_incarnation = info.incarnation; - }, - } + msg.isd_cmd = .noop; + msg.isd_state = .alive; } - // Convenience function. We assume an arena in the `allocator` arg so we don't have - // to free the allocated key after returning. - fn setMsgSrcToOwn(self: *Self, allocator: std.mem.Allocator, msg: *Message) !void { - const me = try std.fmt.allocPrint(allocator, "{s}:{d}", .{ self.ip, self.port }); - try self.setMessageSection(msg, .src, .{ + fn setMsgSrcToOwn(self: *Self, msg: *Message) !void { + const me = try self.getOwnKey(); + defer self.allocator.free(me); + try setMsgSection(msg, .src, .{ .key = me, .state = .alive, - .incarnation = self.getIncarnation(), + .incarnation = try self.getIncarnation(), }); } - // Set a section of the message payload with ISD info. - // ISD = Infection-style dissemination. - fn setIsdInfo(self: *Self, msg: *Message, isd: std.ArrayList(KeyInfo)) !void { - switch (isd.items.len) { - 0 => return, - 1 => b: { // utilize the isd1_* section only - const pop1 = isd.items[0]; - msg.isd1_cmd = .infect; - if (pop1.isd_cmd != .noop) { - msg.isd1_cmd = pop1.isd_cmd; - } else { - if (pop1.state == .suspected) msg.isd1_cmd = .suspect; - } - - msg.isd2_cmd = .noop; // don't use isd2_* - self.setMessageSection(msg, .isd1, .{ - .key = pop1.key, - .state = pop1.state, - .incarnation = pop1.incarnation, - }) catch break :b; - }, - else => b: { // utilize both isd1_* and isd2_* sections - const pop1 = isd.items[0]; - msg.isd1_cmd = .infect; - if (pop1.isd_cmd != .noop) { - msg.isd1_cmd = pop1.isd_cmd; - } else { - if (pop1.state == .suspected) msg.isd1_cmd = .suspect; - } - - self.setMessageSection(msg, .isd1, .{ - .key = pop1.key, - .state = pop1.state, - .incarnation = pop1.incarnation, - }) catch break :b; - - const pop2 = isd.items[1]; - msg.isd2_cmd = .infect; - if (pop2.isd_cmd != .noop) { - msg.isd2_cmd = pop2.isd_cmd; - } else { - if (pop1.state == .suspected) msg.isd1_cmd = .suspect; - } - - self.setMessageSection(msg, .isd2, .{ - .key = pop2.key, - .state = pop2.state, - .incarnation = pop2.incarnation, - }) catch break :b; - }, - } - } - - fn setMemberInfo(self: *Self, key: []const u8, state: MemberState, incarnation: ?u64) void { - self.members_mtx.lock(); - defer self.members_mtx.unlock(); - const ptr = self.members.getPtr(key); - if (ptr) |v| { - v.state = state; - if (v.state == .faulty) v.age_faulty.reset(); - if (incarnation) |inc| v.incarnation = inc; - } - } - // Add a new member or update an existing member's info. This function duplicates the key // using self.allocator when adding a new member, not when updating an existing one. - fn addOrSet(self: *Self, key: []const u8, state: MemberState, incarnation: ?u64) !void { + fn addOrSet(self: *Self, key: []const u8, state: ?MemberState, incarnation: ?u64) !void { const contains = b: { self.members_mtx.lock(); defer self.members_mtx.unlock(); @@ -1055,7 +836,7 @@ pub fn Fleet() type { }; if (contains) { - self.setMemberInfo(key, state, incarnation); + try self.setMemberInfo(key, state, incarnation); return; } @@ -1063,11 +844,44 @@ pub fn Fleet() type { const nkey = try self.allocator.dupe(u8, key); self.members_mtx.lock(); defer self.members_mtx.unlock(); - try self.members.put(nkey, .{ - .state = state, - .age_faulty = try std.time.Timer.start(), - }); + try self.members.put(nkey, .{ .age_faulty = try std.time.Timer.start() }); } + + try self.setMemberInfo(key, state, incarnation); + } + + fn setMemberInfo( + self: *Self, + key: []const u8, + state: ?MemberState, + incarnation: ?u64, + ) !void { + const spawn = b: { + var spawn: bool = false; + self.members_mtx.lock(); + defer self.members_mtx.unlock(); + const ptr = self.members.getPtr(key); + if (ptr) |v| { + if (state) |s| v.state = s; + if (incarnation) |inc| v.incarnation = inc; + if (v.state == .faulty) v.age_faulty.reset(); + if (state) |s| { + if (s == .suspected) spawn = true; + } + } + + break :b spawn; + }; + + if (!spawn) return; + + // Thread is responsible for releasing thread data. + const tkey = try self.allocator.dupe(u8, key); + const pdata = try self.allocator.create(SuspectToFaulty); + pdata.self = self; + pdata.key = tkey; + const t = try std.Thread.spawn(.{}, Self.suspectToFaulty, .{pdata}); + t.detach(); } const SuspectToFaulty = struct { @@ -1078,6 +892,11 @@ pub fn Fleet() type { // To be run as a separate thread. Keep it suspected // for a while before marking it as faulty. fn suspectToFaulty(args: *SuspectToFaulty) !void { + defer { + args.self.allocator.free(args.key); + args.self.allocator.destroy(args); + } + { args.self.isd_mtx.lock(); defer args.self.isd_mtx.unlock(); @@ -1132,9 +951,57 @@ pub fn Fleet() type { const fr = self.members.fetchRemove(key); if (fr) |v| self.allocator.free(v.key); } + + const MsgSection = enum { + src, + dst, + isd, + }; + + // Set a section of the message payload with ip, port, and state info. + fn setMsgSection(msg: *Message, section: MsgSection, info: KeyInfo) !void { + const sep = std.mem.indexOf(u8, info.key, ":") orelse return; + const ip = info.key[0..sep]; + const port = try std.fmt.parseUnsigned(u16, info.key[sep + 1 ..], 10); + const addr = try std.net.Address.resolveIp(ip, port); + + switch (section) { + .src => { + msg.src_ip = addr.in.sa.addr; + msg.src_port = port; + msg.src_state = info.state; + msg.src_incarnation = info.incarnation; + }, + .dst => { + msg.dst_ip = addr.in.sa.addr; + msg.dst_port = port; + msg.dst_state = info.state; + msg.dst_incarnation = info.incarnation; + }, + .isd => { + msg.isd_ip = addr.in.sa.addr; + msg.isd_port = port; + msg.isd_state = info.state; + msg.isd_incarnation = info.incarnation; + }, + } + } }; } +/// Converts an ip and port to a string with format ip:port, eg. "127.0.0.1:8080". +/// Caller is responsible for releasing the returned memory. +fn keyFromIpPort(allocator: std.mem.Allocator, ip: u32, port: u16) ![]const u8 { + const ipb = std.mem.asBytes(&ip); + return try std.fmt.allocPrint(allocator, "{d}.{d}.{d}.{d}:{d}", .{ + ipb[0], + ipb[1], + ipb[2], + ipb[3], + port, + }); +} + /// Set socket read timeout in microseconds. Linux only. pub fn setReadTimeout(socket: std.posix.socket_t, read: ?u32) !void { std.debug.assert(read == null or read.? != 0);