Skip to content

Commit

Permalink
even more refactoring
Browse files Browse the repository at this point in the history
Signed-off-by: catsby <[email protected]>
  • Loading branch information
catsby committed Dec 12, 2024
1 parent f695335 commit 1fde0ce
Showing 1 changed file with 53 additions and 79 deletions.
132 changes: 53 additions & 79 deletions src/pepr/operator/controllers/network/generators/allNodes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,150 +6,114 @@
import { KubernetesListObject } from "@kubernetes/client-node";
import { V1NetworkPolicyPeer, V1NodeCondition, V1NodeAddress } from "@kubernetes/client-node";
import { K8s, kind } from "pepr";

import { Component, setupLogger } from "../../../../logger";
import { RemoteGenerated } from "../../../crd";
import { anywhere } from "./anywhere";
import { UDSConfig } from "../../../../config";

// configure subproject logger
const log = setupLogger(Component.OPERATOR_GENERATORS);

// nodePolicies is a list of all the nodes in the cluster in egress:to format
let nodePolicies: V1NetworkPolicyPeer[];
// nodeSet is a set of all the nodes in the cluster, to ensure uniqueness and
// easily add/remove nodes
// Maintain a set of all node internal IPs
const nodeSet = new Set<string>();

/**
* Initialize the node CIDRs by getting the initial list of nodes
* Initialize the node targets by fetching the current nodes in the cluster
* and populating the nodeSet with their Internal IPs.
*/
export async function initAllNodesTarget() {
try {
const nodes = await fetchKubernetesNodes();
// reset the node policies, which shouldn't be necessary but doesn't hurt
nodePolicies = [];
// Iterate through the nodes and extract their IP addresses
nodes.items.forEach((node: kind.Node) => {
const addresses = node.status?.addresses;
const internalIP = addresses?.find(
(addr: V1NodeAddress) => addr.type === "InternalIP",
)?.address;

// if the internal IP is not null, add it to the set
if (internalIP) {
nodeSet.add(internalIP);
}
});
nodeSet.clear();

for (const node of nodes.items) {
const ip = getNodeInternalIP(node);
if (ip) nodeSet.add(ip);
}
} catch (err) {
log.error("error fetching node IPs:", err);
}
}

/**
* Get the CIDRs of all nodes
* @returns The an egreess/to block of CIDRs of all nodes
* Returns the egress CIDRs of all known nodes as network policy peers.
* If none are known, defaults to 0.0.0.0/0 and logs a warning.
*/
export function nodeCIDRs(): V1NetworkPolicyPeer[] {
// If the node cidrs are already cached, return them
if (nodePolicies) {
return nodePolicies;
}
const policies = buildNodePolicies([...nodeSet]);
if (policies.length > 0) return policies;

buildNodePolicies(Array.from(nodeSet));
if (nodePolicies) {
return nodePolicies;
}

// Otherwise, log a warning and default to 0.0.0.0/0
log.warn("Unable to get API server CIDR, defaulting to 0.0.0.0/0");
return [anywhere];
}

/**
* Update the node CIDRs when a node is created or updated
*
* @param node The node that was created or updated
* When a node is created or updated, if it's Ready, add its IP to the set,
* rebuild the policies, and update the NetworkPolicies.
*/
export async function updateKubeNodesFromCreateUpdate(node: kind.Node) {
const conditions = node.status?.conditions;

conditions?.forEach((condition: V1NodeCondition) => {
if (condition.type === "Ready") {
const addresses = node.status?.addresses;
const internalIP = addresses?.find(
(addr: V1NodeAddress) => addr.type === "InternalIP",
)?.address;
if (internalIP) {
nodeSet.add(internalIP);
}
}
});
buildNodePolicies(Array.from(nodeSet));
await updateKubeNodesNetworkPolicies(nodePolicies);
const isReady = node.status?.conditions?.some(
(condition: V1NodeCondition) => condition.type === "Ready" && condition.status === "True",
);

if (isReady) {
const ip = getNodeInternalIP(node);
if (ip) nodeSet.add(ip);
}

const policies = buildNodePolicies([...nodeSet]);
await updateKubeNodesNetworkPolicies(policies);
}

/**
* Update the node CIDRs when a node is deleted
*
* @param slice The node that was deleted
* When a node is deleted, remove its IP from the set, rebuild the policies,
* and update the NetworkPolicies.
*/
export async function updateKubeNodesFromDelete(node: kind.Node) {
const addresses = node.status?.addresses;
const ip = getNodeInternalIP(node);
if (ip) nodeSet.delete(ip);

const internalIP = addresses?.find((addr: V1NodeAddress) => addr.type === "InternalIP")?.address;
if (internalIP) {
nodeSet.delete(internalIP);
}
buildNodePolicies(Array.from(nodeSet));
await updateKubeNodesNetworkPolicies(nodePolicies);
const policies = buildNodePolicies([...nodeSet]);
await updateKubeNodesNetworkPolicies(policies);
}

/**
* Fetches the Kubernetes Nodes. This is used to get the internal IPs of all nodes.
*
* @returns {Promise<KubernetesListObject<kind.Node>>} - The Node list object.
* Fetch all Kubernetes nodes.
*/
async function fetchKubernetesNodes(): Promise<KubernetesListObject<kind.Node>> {
return K8s(kind.Node).Get();
}

/**
* Update NetworkPolicies with new node CIDRs.
*
* @param {V1NetworkPolicyPeer[]} newNodes - The updated list of peers to apply to the NetworkPolicies.
* Update all NetworkPolicies labeled with uds/generated=KubeNodes to
* reflect the given node CIDRs.
*/
export async function updateKubeNodesNetworkPolicies(newNodes: V1NetworkPolicyPeer[]) {
const netPols = await K8s(kind.NetworkPolicy)
.WithLabel("uds/generated", RemoteGenerated.KubeNodes)
.Get();

for (const netPol of netPols.items) {
// Safety check for network policy spec existence
if (!netPol.spec) {
log.warn(
`KubeNodes NetworkPolicy ${netPol.metadata!.namespace}/${netPol.metadata!.name} is missing spec.`,
`KubeNodes NetworkPolicy ${netPol.metadata?.namespace}/${netPol.metadata?.name} is missing spec.`,
);
continue;
}

// Handle egress policies
if (netPol.spec.egress) {
if (!netPol.spec.egress[0]) {
netPol.spec.egress[0] = { to: [] };
}
netPol.spec.egress[0] = netPol.spec.egress[0] || { to: [] };
netPol.spec.egress[0].to = newNodes;
}

if (netPol.metadata) {
// Remove managed fields to prevent errors on server side apply
// TODO(clint): figure out if this is necessary
// Remove managed fields to prevent server-side apply errors
netPol.metadata.managedFields = undefined;
}

log.debug(
`Updating KubeNodes NetworkPolicy ${netPol.metadata!.namespace}/${netPol.metadata!.name} with new CIDRs.`,
`Updating KubeNodes NetworkPolicy ${netPol.metadata?.namespace}/${netPol.metadata?.name} with new CIDRs.`,
);

try {
await K8s(kind.NetworkPolicy).Apply(netPol, { force: true });
} catch (err) {
Expand All @@ -163,10 +127,20 @@ export async function updateKubeNodesNetworkPolicies(newNodes: V1NetworkPolicyPe
}
}

function buildNodePolicies(nodeIPs: string[]) {
nodePolicies = nodeIPs.flatMap(cidr => ({
/**
* Build V1NetworkPolicyPeer array from a list of node IPs.
*/
function buildNodePolicies(nodeIPs: string[]): V1NetworkPolicyPeer[] {
return nodeIPs.map(ip => ({
ipBlock: {
cidr: `${cidr}/32`,
cidr: `${ip}/32`,
},
}));
}

/**
* Utility function to get the InternalIP of a node.
*/
function getNodeInternalIP(node: kind.Node): string | undefined {
return node.status?.addresses?.find((addr: V1NodeAddress) => addr.type === "InternalIP")?.address;
}

0 comments on commit 1fde0ce

Please sign in to comment.