Skip to content

Commit

Permalink
core: k8s-related setup and test
Browse files Browse the repository at this point in the history
  • Loading branch information
flowerinthenight committed Sep 23, 2024
1 parent dfbb604 commit 68ce8af
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 91 deletions.
6 changes: 3 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ RUN set -x && apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -
WORKDIR /tmp/
COPY src/ ./src/
COPY build* ./
RUN ls -laF && curl -O https://ziglang.org/download/0.13.0/zig-linux-x86_64-0.13.0.tar.xz && \
RUN curl -O https://ziglang.org/download/0.13.0/zig-linux-x86_64-0.13.0.tar.xz && \
xz --decompress zig-linux-x86_64-0.13.0.tar.xz && tar -xf zig-linux-x86_64-0.13.0.tar && \
./zig-linux-x86_64-0.13.0/zig build -Doptimize=ReleaseSafe
./zig-linux-x86_64-0.13.0/zig build -Doptimize=ReleaseSafe --summary all

FROM debian:stable-slim
RUN set -x && apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/*
RUN set -x && apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y curl ca-certificates && rm -rf /var/lib/apt/lists/*
WORKDIR /app/
COPY --from=0 /tmp/zig-out/bin/zgroup .
ENTRYPOINT ["/app/zgroup"]
Expand Down
55 changes: 55 additions & 0 deletions k8s.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
---

apiVersion: apps/v1
kind: Deployment
metadata:
name: zgroup
spec:
selector:
matchLabels:
app: zgroup
replicas: 1
revisionHistoryLimit: 5
template:
metadata:
labels:
app: zgroup
spec:
containers:
- name: zgroup
image: quay.io/zgroup:v0.1.0
command: ["/bin/sh"]
args: ["-c", '/app/zgroup group1 ${K8S_MY_POD_IP}:8080']
resources:
requests:
cpu: 100m
memory: 500Mi
limits:
cpu: 100m
memory: 500Mi
imagePullPolicy: Always
env:
# Downward value to know if we are running in k8s.
- name: K8S_MY_POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: GET_HOSTS_FROM
value: dns
ports:
- containerPort: 8080

---

apiVersion: autoscaling/v1
kind: HorizontalPodAutoscaler
metadata:
name: zgroup-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: zgroup
minReplicas: 1
maxReplicas: 1
targetCPUUtilizationPercentage: 40
156 changes: 68 additions & 88 deletions src/main.zig
Original file line number Diff line number Diff line change
Expand Up @@ -16,43 +16,12 @@ const UserData = struct {
skip_callback: bool = false,
};

// We are using curl here as std.http.Client seems to not play well with this endpoint.
// The "seegmed7" in the url is our API key. The allocator here is the allocator passed
// to Fleet's init function. `addr`'s format is "ip:port", e.g. "127.0.0.1:8080", and
// needs to be freed after use.
// The allocator here is the allocator passed to Fleet's init function. `addr`'s
// format is "ip:port", e.g. "127.0.0.1:8080", and needs to be freed after use.
fn callback(allocator: std.mem.Allocator, data: ?*UserData, addr: []const u8) !void {
defer allocator.free(addr);
if (data.?.skip_callback) return;

const enc = std.base64.Base64Encoder.init(std.base64.url_safe_alphabet_chars, '=');
const buf = try allocator.alloc(u8, enc.calcSize(addr.len));
defer allocator.free(buf);
const out = enc.encode(buf, addr);

log.info("callback: leader={s}, set join info to {s}", .{ addr, out });

const url = try std.fmt.allocPrint(
allocator,
"https://keyvalue.immanuel.co/api/KeyVal/UpdateValue/seegmed7/{s}/{s}",
.{ data.?.group, out },
);

const result = try std.process.Child.run(.{
.allocator = allocator,
.argv = &[_][]const u8{
"curl",
"-X",
"POST",
"-H",
"Content-Length: 1", // somehow, this works with this endpoint (required though)
url,
},
});

defer {
allocator.free(result.stdout);
allocator.free(result.stderr);
}
try setJoinAddress(allocator, data.?.group, addr);
}

const Fleet = zgroup.Fleet(UserData);
Expand All @@ -66,14 +35,15 @@ const Fleet = zgroup.Fleet(UserData);
//
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
var arena = std.heap.ArenaAllocator.init(gpa.allocator());
defer arena.deinit(); // destroy arena in one go
var aa = std.heap.ArenaAllocator.init(gpa.allocator());
defer aa.deinit(); // destroy arena in one go
const arena = aa.allocator();

var args = try std.process.argsWithAllocator(arena.allocator());
var hm = std.AutoHashMap(usize, []const u8).init(arena.allocator());
var args = try std.process.argsWithAllocator(arena);
var hm = std.AutoHashMap(usize, []const u8).init(arena);
var i: usize = 0;
while (args.next()) |val| : (i += 1) {
const arg = try std.fmt.allocPrint(arena.allocator(), "{s}", .{val});
const arg = try std.fmt.allocPrint(arena, "{s}", .{val});
try hm.put(i, arg);
}

Expand Down Expand Up @@ -113,49 +83,33 @@ pub fn main() !void {
defer fleet.deinit();

i = 0;
var joined = false;
var bo = backoff.Backoff{};
while (true) : (i += 1) {
std.time.sleep(std.time.ns_per_s * 1);
if (joined)
std.time.sleep(std.time.ns_per_s * 1)
else
std.time.sleep(bo.pause());

// Delay for a bit before joining group.
if (i == 2) {
if (i > 1 and i < 100 and !joined) {
switch (hm.count()) {
3 => {
// No join address in args. Try using a free discovery service.
var join_addr: []const u8 = "";
for (0..10) |_| {
const ja = try getJoinAddress(arena.allocator(), name);
if (ja.len > 0) {
join_addr = ja;
break;
} else std.time.sleep(bo.pause());
}
const ja = try getJoinAddress(arena, name);
if (ja.len > 0) join_addr = ja else continue;

log.info("join address found, addr={s}", .{join_addr});
log.info("[{d}] join address found, addr={s}", .{ i, join_addr });

sep = std.mem.indexOf(u8, join_addr, ":").?;
const join_ip = join_addr[0..sep];
if (join_ip.len == 0) {
log.err("invalid join address", .{});
return;
}

var join_port: u16 = 0;
if (join_addr[sep + 1 ..].len > 0) {
join_port = try std.fmt.parseUnsigned(u16, join_addr[sep + 1 ..], 10);
}

for (0..3) |_| {
var joined = false;
fleet.join(
name,
join_ip,
join_port,
&joined,
) catch |err| log.err("join failed: {any}", .{err});

if (joined) break else std.time.sleep(bo.pause());
}
const join_port = try std.fmt.parseUnsigned(u16, join_addr[sep + 1 ..], 10);

fleet.join(
name,
join_addr[0..sep],
join_port,
&joined,
) catch |err| log.err("join failed: {any}", .{err});
},
4 => {
// Join address is provided. Skip callback.
Expand All @@ -169,22 +123,14 @@ pub fn main() !void {
return;
}

var join_port: u16 = 0;
if (join[sep + 1 ..].len > 0) {
join_port = try std.fmt.parseUnsigned(u16, join[sep + 1 ..], 10);
}

for (0..3) |_| {
var joined = false;
fleet.join(
name,
join_ip,
join_port,
&joined,
) catch |err| log.err("join failed: {any}", .{err});
const join_port = try std.fmt.parseUnsigned(u16, join[sep + 1 ..], 10);

if (joined) break else std.time.sleep(bo.pause());
}
fleet.join(
name,
join_ip,
join_port,
&joined,
) catch |err| log.err("join failed: {any}", .{err});
},
else => {},
}
Expand All @@ -207,6 +153,40 @@ pub fn main() !void {
}
}

// We are using curl here as std.http.Client seems to not play well with this endpoint.
fn setJoinAddress(allocator: std.mem.Allocator, group: []const u8, addr: []const u8) !void {
const enc = std.base64.Base64Encoder.init(std.base64.url_safe_alphabet_chars, '=');
const buf = try allocator.alloc(u8, enc.calcSize(addr.len));
defer allocator.free(buf);
const out = enc.encode(buf, addr);

log.info("callback: leader={s}, set join info to {s}", .{ addr, out });

const url = try std.fmt.allocPrint(
allocator,
"https://keyvalue.immanuel.co/api/KeyVal/UpdateValue/seegmed7/{s}/{s}",
.{ group, out },
);

defer allocator.free(url);
const result = try std.process.Child.run(.{
.allocator = allocator,
.argv = &[_][]const u8{
"curl",
"-X",
"POST",
"-H",
"Content-Length: 1", // somehow, this works with this endpoint (required though)
url,
},
});

defer {
allocator.free(result.stdout);
allocator.free(result.stderr);
}
}

// We are using curl here as std.http.Client seems to not play well with this endpoint.
// The "seegmed7" in the url is our API key. We are passing an arena allocator here.
fn getJoinAddress(allocator: std.mem.Allocator, group: []const u8) ![]u8 {
Expand All @@ -223,7 +203,7 @@ fn getJoinAddress(allocator: std.mem.Allocator, group: []const u8) ![]u8 {

const out = std.mem.trim(u8, result.stdout, "\"");
const dec = std.base64.Base64Decoder.init(std.base64.url_safe_alphabet_chars, '=');
const buf = try allocator.alloc(u8, try dec.calcSizeUpperBound(out.len));
const buf = try allocator.alloc(u8, try dec.calcSizeForSlice(out));
try dec.decode(buf, out);
return buf;
}

0 comments on commit 68ce8af

Please sign in to comment.