Skip to content

Commit

Permalink
frontend: Add websocket multiplexer
Browse files Browse the repository at this point in the history
This adds a single websocket connection from frontend to backend and
sends lists of messages to the backend. With the help of messages
backend creates multiple websocket connection to k8s API and returns
back the data.

This solves the issue for limiting of websocket connection in frontend
in case of multi cluster setup.

Signed-off-by: Kautilya Tripathi <[email protected]>
  • Loading branch information
knrt10 committed Nov 15, 2024
1 parent 60f43fc commit aa7ed65
Show file tree
Hide file tree
Showing 5 changed files with 408 additions and 220 deletions.
2 changes: 1 addition & 1 deletion backend/cmd/headlamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -1608,7 +1608,7 @@ func (c *HeadlampConfig) addClusterSetupRoute(r *mux.Router) {
r.HandleFunc("/cluster/{name}", c.renameCluster).Methods("PUT")

// Websocket connections
r.HandleFunc("/wsMutliplexer", c.multiplexer.HandleClientWebSocket)
r.HandleFunc("/wsMultiplexer", c.multiplexer.HandleClientWebSocket)
}

/*
Expand Down
3 changes: 2 additions & 1 deletion backend/cmd/multiplexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ type Message struct {
UserID string `json:"userId"`
// Data contains the message payload.
Data []byte `json:"data,omitempty"`
// Type is the type of the message.
Type string `json:"type"`
}

// Multiplexer manages multiple WebSocket connections.
Expand Down Expand Up @@ -315,7 +317,6 @@ func (m *Multiplexer) reconnect(conn *Connection) (*Connection, error) {
return newConn, nil
}

// HandleClientWebSocket handles incoming WebSocket connections from clients.
// HandleClientWebSocket handles incoming WebSocket connections from clients.
func (m *Multiplexer) HandleClientWebSocket(w http.ResponseWriter, r *http.Request) {
clientConn, err := m.upgrader.Upgrade(w, r, nil)
Expand Down
3 changes: 1 addition & 2 deletions frontend/src/lib/k8s/api/v2/hooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { getCluster } from '../../../cluster';
import { ApiError, QueryParameters } from '../../apiProxy';
import { KubeObject, KubeObjectInterface } from '../../KubeObject';
import { clusterFetch } from './fetch';
import { KubeListUpdateEvent } from './KubeList';
import { KubeObjectEndpoint } from './KubeObjectEndpoint';
import { makeUrl } from './makeUrl';
import { useWebSocket } from './webSocket';
Expand Down Expand Up @@ -118,7 +117,7 @@ export function useKubeObject<K extends KubeObject>({

const data: Instance | null = query.error ? null : query.data ?? null;

useWebSocket<KubeListUpdateEvent<Instance>>({
useWebSocket<Instance>({
url: () =>
makeUrl([KubeObjectEndpoint.toUrl(endpoint!)], {
...cleanedUpQueryParams,
Expand Down
94 changes: 63 additions & 31 deletions frontend/src/lib/k8s/api/v2/useKubeObjectList.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { QueryObserverOptions, useQueries, useQueryClient } from '@tanstack/react-query';
import { useMemo, useState } from 'react';
import { useEffect, useMemo, useRef, useState } from 'react';
import { KubeObject, KubeObjectClass } from '../../KubeObject';
import { ApiError } from '../v1/clusterRequests';
import { QueryParameters } from '../v1/queryParameters';
import { clusterFetch } from './fetch';
import { QueryListResponse, useEndpoints } from './hooks';
import { KubeList, KubeListUpdateEvent } from './KubeList';
import { KubeList } from './KubeList';
import { KubeObjectEndpoint } from './KubeObjectEndpoint';
import { makeUrl } from './makeUrl';
import { useWebSockets } from './webSocket';
import { BASE_WS_URL, WebSocketManager } from './webSocket';

/**
* Object representing a List of Kube object
Expand Down Expand Up @@ -113,43 +113,75 @@ function useWatchKubeObjectLists<K extends KubeObject>({
lists: Array<{ cluster: string; namespace?: string; resourceVersion: string }>;
}) {
const client = useQueryClient();
const latestResourceVersions = useRef<Record<string, string>>({});

// Create URLs for all lists
const connections = useMemo(() => {
if (!endpoint) return [];

return lists.map(({ cluster, namespace, resourceVersion }) => {
const url = makeUrl([KubeObjectEndpoint.toUrl(endpoint!, namespace)], {
...queryParams,
watch: 1,
resourceVersion,
});
return lists.map(list => {
const key = `${list.cluster}:${list.namespace || ''}`;
// Only update resourceVersion if it's newer
if (
!latestResourceVersions.current[key] ||
parseInt(list.resourceVersion) > parseInt(latestResourceVersions.current[key])
) {
latestResourceVersions.current[key] = list.resourceVersion;
}

return {
cluster,
url,
onMessage(update: KubeListUpdateEvent<K>) {
const key = kubeObjectListQuery<K>(
kubeObjectClass,
endpoint,
namespace,
cluster,
queryParams ?? {}
).queryKey;
client.setQueryData(key, (oldResponse: ListResponse<any> | undefined | null) => {
if (!oldResponse) return oldResponse;

const newList = KubeList.applyUpdate(oldResponse.list, update, kubeObjectClass);
return { ...oldResponse, list: newList };
});
},
url: makeUrl([KubeObjectEndpoint.toUrl(endpoint, list.namespace)], {
...queryParams,
watch: 1,
resourceVersion: latestResourceVersions.current[key],
}),
cluster: list.cluster,
namespace: list.namespace,
};
});
}, [lists, kubeObjectClass, endpoint]);
}, [endpoint, lists, queryParams]);

useWebSockets<KubeListUpdateEvent<K>>({
enabled: !!endpoint,
connections,
});
useEffect(() => {
if (!endpoint || connections.length === 0) return;

const cleanups: (() => void)[] = [];

connections.forEach(({ url, cluster, namespace }) => {
const parsedUrl = new URL(url, BASE_WS_URL);
const key = `${cluster}:${namespace || ''}`;

WebSocketManager.subscribe(cluster, parsedUrl.pathname, parsedUrl.search.slice(1), update => {
if (!update || typeof update !== 'object') return;

// Update latest resourceVersion
if (update.object?.metadata?.resourceVersion) {
latestResourceVersions.current[key] = update.object.metadata.resourceVersion;
}

const queryKey = kubeObjectListQuery<K>(
kubeObjectClass,
endpoint,
namespace,
cluster,
queryParams ?? {}
).queryKey;

client.setQueryData(queryKey, (oldResponse: ListResponse<any> | undefined | null) => {
if (!oldResponse) return oldResponse;
const newList = KubeList.applyUpdate(oldResponse.list, update, kubeObjectClass);
if (newList === oldResponse.list) return oldResponse;
return { ...oldResponse, list: newList };
});
}).then(
cleanup => cleanups.push(cleanup),
error => console.error('WebSocket subscription failed:', error)
);
});

return () => {
cleanups.forEach(cleanup => cleanup());
};
}, [connections, endpoint, client, kubeObjectClass, queryParams]);
}

/**
Expand Down
Loading

0 comments on commit aa7ed65

Please sign in to comment.