Skip to content

Commit

Permalink
main: provide an option for join addr discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
flowerinthenight committed Sep 22, 2024
1 parent 86b792f commit b9119ad
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 45 deletions.
172 changes: 133 additions & 39 deletions src/main.zig
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,45 @@ pub const std_options = .{
},
};

const Args = struct {
args: []u8,
};

const UserData = struct {
dummy: u32,
group: []const u8,
};

// The "seegmed7" in the url is our API key. The allocator here is the allocator passed
// to Fleet's init function. `addr`'s format is "ip:port", e.g. "127.0.0.1:8080", and
// needs to be freed after use.
fn callback(allocator: std.mem.Allocator, data: ?*UserData, addr: []const u8) !void {
var tm = try std.time.Timer.start();
defer allocator.free(addr);

const enc = std.base64.Base64Encoder.init(std.base64.url_safe_alphabet_chars, '=');
const buf = try allocator.alloc(u8, enc.calcSize(addr.len));
defer allocator.free(buf);
const out = enc.encode(buf, addr);

log.info("callback: leader={s}, set join info to {s}", .{ addr, out });

const url = try std.fmt.allocPrint(
allocator,
"https://keyvalue.immanuel.co/api/KeyVal/UpdateValue/seegmed7/{s}/{s}",
.{ data.?.group, out },
);

const result = try std.process.Child.run(.{
.allocator = allocator,
.argv = &[_][]const u8{
"curl",
"-X",
"POST",
"-H",
"Content-Length: 1", // somehow, this works with this endpoint (required though)
url,
},
});

defer {
allocator.free(addr);
log.info("callback took {any}", .{std.fmt.fmtDuration(tm.read())});
allocator.free(result.stdout);
allocator.free(result.stderr);
}

_ = data;
log.info("callback: leader={s}", .{addr});
}

const Fleet = zgroup.Fleet(UserData);
Expand All @@ -37,52 +59,45 @@ const Fleet = zgroup.Fleet(UserData);
// [0] = bin
// [1] = name
// [2] = member ip:port
// [3] = join ip:port
// [3] = join ip:port (optional)
//
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
var arena = std.heap.ArenaAllocator.init(gpa.allocator());
defer arena.deinit(); // destroy arena in one go

var args = try std.process.argsWithAllocator(arena.allocator());
var hm = std.AutoHashMap(usize, Args).init(arena.allocator());
var hm = std.AutoHashMap(usize, []const u8).init(arena.allocator());
var i: usize = 0;
while (args.next()) |val| : (i += 1) {
const key = try std.fmt.allocPrint(arena.allocator(), "{s}", .{val});
try hm.put(i, .{ .args = key });
const arg = try std.fmt.allocPrint(arena.allocator(), "{s}", .{val});
try hm.put(i, arg);
}

if (hm.count() < 4) {
if (hm.count() < 3) {
log.err("invalid args", .{});
return;
}

var iter = hm.iterator();
while (iter.next()) |entry| {
log.info("{any}, {s}", .{ entry.key_ptr.*, entry.value_ptr.args });
log.info("{any}, {s}", .{ entry.key_ptr.*, entry.value_ptr.* });
}

var data = UserData{ .dummy = 10 };
const name = hm.getEntry(1).?.value_ptr.*;

var data = UserData{ .group = name };
const callbacks = Fleet.Callbacks{
.data = &data,
.onLeader = callback,
.on_leader_every = 10,
};

const name = hm.getEntry(1).?.value_ptr.args;
var member = hm.getEntry(2).?.value_ptr.args;
var member = hm.getEntry(2).?.value_ptr.*;
var sep = std.mem.indexOf(u8, member, ":").?;
var config = Fleet.Config{ .name = name, .ip = member[0..sep], .callbacks = callbacks };
config.port = try std.fmt.parseUnsigned(u16, member[sep + 1 ..], 10);

const join = hm.getEntry(3).?.value_ptr.args;
sep = std.mem.indexOf(u8, join, ":").?;
const join_ip = join[0..sep];
var join_port: u16 = 0;
if (join[sep + 1 ..].len > 0) {
join_port = try std.fmt.parseUnsigned(u16, join[sep + 1 ..], 10);
}

var fleet = try Fleet.init(gpa.allocator(), &config);
try fleet.run();
defer fleet.deinit();
Expand All @@ -91,17 +106,75 @@ pub fn main() !void {
var bo = backoff.Backoff{};
while (true) : (i += 1) {
std.time.sleep(std.time.ns_per_s * 1);
if (i == 2 and join_ip.len > 0) {
for (0..3) |_| {
var joined = false;
fleet.join(
name,
join_ip,
join_port,
&joined,
) catch |err| log.err("join failed: {any}", .{err});

if (joined) break else std.time.sleep(bo.pause());

// Delay for a bit before joining group.
if (i == 2) {
switch (hm.count()) {
3 => {
// No join address in args. Try using a free discovery service.
var join_addr: []const u8 = "";
for (0..10) |_| {
const ja = try getJoinAddress(arena.allocator(), name);
if (ja.len > 0) {
join_addr = ja;
break;
} else std.time.sleep(bo.pause());
}

log.info("join address found, addr={s}", .{join_addr});

sep = std.mem.indexOf(u8, join_addr, ":").?;
const join_ip = join_addr[0..sep];
if (join_ip.len == 0) {
log.err("invalid join address", .{});
return;
}

var join_port: u16 = 0;
if (join_addr[sep + 1 ..].len > 0) {
join_port = try std.fmt.parseUnsigned(u16, join_addr[sep + 1 ..], 10);
}

for (0..3) |_| {
var joined = false;
fleet.join(
name,
join_ip,
join_port,
&joined,
) catch |err| log.err("join failed: {any}", .{err});

if (joined) break else std.time.sleep(bo.pause());
}
},
4 => {
// Join address is provided.
const join = hm.getEntry(3).?.value_ptr.*;
sep = std.mem.indexOf(u8, join, ":").?;
const join_ip = join[0..sep];
if (join_ip.len == 0) {
log.err("invalid join address", .{});
return;
}

var join_port: u16 = 0;
if (join[sep + 1 ..].len > 0) {
join_port = try std.fmt.parseUnsigned(u16, join[sep + 1 ..], 10);
}

for (0..3) |_| {
var joined = false;
fleet.join(
name,
join_ip,
join_port,
&joined,
) catch |err| log.err("join failed: {any}", .{err});

if (joined) break else std.time.sleep(bo.pause());
}
},
else => {},
}
}

Expand All @@ -121,3 +194,24 @@ pub fn main() !void {
// }
}
}

// We are using curl here as std.http.Client seems to not play well with this endpoint.
// The "seegmed7" in the url is our API key. We are passing an arena allocator here.
fn getJoinAddress(allocator: std.mem.Allocator, group: []const u8) ![]u8 {
const url = try std.fmt.allocPrint(
allocator,
"https://keyvalue.immanuel.co/api/KeyVal/GetValue/seegmed7/{s}",
.{group},
);

const result = try std.process.Child.run(.{
.allocator = allocator,
.argv = &[_][]const u8{ "curl", url },
});

const out = std.mem.trim(u8, result.stdout, "\"");
const dec = std.base64.Base64Decoder.init(std.base64.url_safe_alphabet_chars, '=');
const buf = try allocator.alloc(u8, try dec.calcSizeUpperBound(out.len));
try dec.decode(buf, out);
return buf;
}
6 changes: 0 additions & 6 deletions src/zgroup.zig
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,6 @@ pub fn Fleet(UserData: type) type {
dst_port: u16,
joined: *bool,
) !void {
log.info("joining via {s}:{any}, name={s}...", .{
dst_ip,
dst_port,
name,
});

var parent = std.heap.ArenaAllocator.init(self.allocator);
defer parent.deinit(); // destroy arena in one go
const arena = parent.allocator();
Expand Down

0 comments on commit b9119ad

Please sign in to comment.