Skip to content

Commit

Permalink
add in kubernetes python module
Browse files Browse the repository at this point in the history
  • Loading branch information
gilesknap committed Sep 6, 2024
1 parent 0305519 commit eb9c9be
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 3 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ jobs:
env:
TAG: ghcr.io/${{ github.repository_owner }}/${{ github.event.repository.name }}
steps:

- name: Log in to GitHub Docker Registry
uses: docker/login-action@v3
with:
Expand Down Expand Up @@ -72,7 +71,7 @@ jobs:
uses: docker/build-push-action@v6
with:
platforms: ${{ matrix.platform }}
target: dockerizer
target: builder
tags: ${{ env.TAG }}-debug:${{ github.ref_name }}
push: true

Expand Down
7 changes: 6 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ RUN cd /epics/src/ca-gateway \
# Install debugging tools to use this target as a debug container
RUN apt update && apt install -y net-tools tcpdump iproute2 iputils-ping vim

# install python libraries for set_addr_list.py
RUN pip3 install setuptools scapy kubernetes ipython

ENTRYPOINT ["bash"]
CMD ["-c", "/epics/ca-gateway/bin/linux_x86-64/gateway"]

## ======================================
# 3rd stage: "dockerize" the application - copy executable, lib dependencies
# to a new root folder. For more information, read
Expand All @@ -51,7 +57,6 @@ RUN dockerize -L preserve -n -u scs -o /ca-gateway_root --verbose /epics/gateway
# /epics is owned by scs in this image and should also be in later one:
&& chown -R scs:users /ca-gateway_root/epics

ENTRYPOINT ["/epics/gateway"]

## =========================================
# 4th stage: Finally put together our image
Expand Down
130 changes: 130 additions & 0 deletions set_addr_list.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
#!/usr/bin/env python
import argparse

import logging
import queue
import threading
import time

from collections import namedtuple
from typing import Generator, List, Set, Tuple
from scapy.all import *

try:
from kubernetes import client, config, watch
has_kubelib = True
except ImportError:
has_kubelib = False


log = logging.getLogger(__name__)
level2num = {"debug": logging.DEBUG,
"info": logging.INFO,
"warn": logging.WARN,
"error": logging.ERROR}


ServiceEvent = namedtuple("ServiceEvent", ["type", "ip", "port"])


class ServiceEventType(object):
ADDED = 'ADDED'
DELETED = 'DELETED'


def services_events_task(namespace: str, port: int,
eventq: 'queue.Queue[ServiceEvent]'):
"""Contains loop to get services' events and push them to a queue
Args:
namespace: used to select namespace that services belongs to
port: specify the UDP port that the service should be listening
eventq: queue used to push events
"""
while True:
try:
if not has_kubelib:
log.error("kubernetes library not found")
services_events = kubelib_services_events
for events in services_events(namespace, port):
log.info("Got services events %s", repr(events))
for event in events:
eventq.put(event)
except Exception as e:
log.error("Problems: %s\nRetrying ...", e)
time.sleep(5)


def kubelib_services_events(namespace: str,
port: int) -> Generator[List[ServiceEvent],
None,
None]:
"""Generator for services' events using kubernetes library
Args:
namespace: used to select namespace that services belongs to
port: specify the UDP port that the service should be listening
"""
config.load_kube_config()
api_watch = watch.Watch()
v1 = client.CoreV1Api()
while True:
for event in api_watch.stream(v1.list_namespaced_service,
namespace=namespace):
log.debug("Got kubernetes event: %s", event)
result = []
event_type = event.get('type')
for ingress in event['object'].status.load_balancer.ingress:
ip = ingress.ip
for ports in event['object'].spec.ports:
srv_port = ports.port
proto = ports.protocol
if srv_port == port and proto == 'UDP':
result.append(ServiceEvent(event_type, ip, port))
if result:
yield result



def handle_events(eventq: 'queue.Queue[ServiceEvent]',
search_endpoints: Set[Tuple[str, int]]):
"""Updates search_endpoints acording to events received via an event queue
Args:
eventq: queue to receive services events
search_endpoints: set that will be updated acording to events
"""
while True:
try:
event = eventq.get(False)
except queue.Empty:
return
if event.type == ServiceEventType.ADDED:
search_endpoints.add((event.ip, event.port))
elif event.type == ServiceEventType.DELETED:
search_endpoints.discard((event.ip, event.port))
else:
log.error("Incorrect service event type: %s", event.type)


def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('--port', type=int, default=5064)
parser.add_argument('--namespace', type=str, required=True)
parser.add_argument('--loglevel', type=str, default="info")
return parser.parse_args()


def main():
args = parse_args()
search_endpoints = set()
logging.basicConfig(level=level2num.get(args.loglevel.lower(), "info"))
eventq = queue.Queue()
threading.Thread(None, services_events_task, "services_events",
args=(args.namespace, args.port, eventq)).start()
while True:
handle_events(eventq, search_endpoints)


if __name__ == "__main__":
main()

0 comments on commit eb9c9be

Please sign in to comment.