Skip to content

Commit

Permalink
NETOBSERV-1248: shared k8s cache
Browse files Browse the repository at this point in the history
The goal is to allow deploying a dedicated FLP instance as just a k8s cache component, alongside with the "traditional" FLP deployments.
The k8s cache is a Kafka producer, and traditional instances are
consumers.

- New FLP binary, k8s-cache, that just starts the informers and writes update events to
  Kafka
- New optional config for kubernetes enrichment, allowing to read from
  kafka instead of using informers
  • Loading branch information
jotak committed Jun 24, 2024
1 parent caf7ac2 commit ca14819
Show file tree
Hide file tree
Showing 25 changed files with 1,033 additions and 431 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/push_image_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
- name: get short sha
run: echo "short_sha=$(git rev-parse --short HEAD)" >> $GITHUB_ENV
- name: build and push manifest with images
run: OCI_BUILD_OPTS="--label quay.expires-after=2w" IMAGE_ORG=${{ env.WF_ORG }} IMAGE=${{ env.WF_REGISTRY }}/${{ env.WF_IMAGE }}:${{ env.short_sha }} make images
run: OCI_BUILD_OPTS="--label quay.expires-after=2w" IMAGE_ORG=${{ env.WF_ORG }} IMAGE=${{ env.WF_REGISTRY }}/${{ env.WF_IMAGE }}:${{ env.short_sha }} IMAGE_CACHE=${{ env.WF_REGISTRY }}/${{ env.WF_IMAGE }}-cache:${{ env.short_sha }} make images
- uses: actions/github-script@v6
with:
github-token: ${{secrets.GITHUB_TOKEN}}
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/flowlogs-pipeline
/confgenerator
/k8s-cache
/bin/
cover.out
19 changes: 16 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,17 @@ IMAGE_TAG_BASE ?= quay.io/$(IMAGE_ORG)/flowlogs-pipeline

# Image URL to use all building/pushing image targets
IMAGE ?= $(IMAGE_TAG_BASE):$(VERSION)
IMAGE_CACHE ?= $(IMAGE_TAG_BASE)-cache:$(VERSION)
OCI_BUILD_OPTS ?=

# Image building tool (docker / podman) - docker is preferred in CI
OCI_BIN_PATH = $(shell which docker 2>/dev/null || which podman)
OCI_BIN ?= $(shell basename ${OCI_BIN_PATH})
OCI_BIN ?= $(shell basename ${OCI_BIN_PATH} 2>/dev/null)

MIN_GO_VERSION := 1.20.0
FLP_BIN_FILE=flowlogs-pipeline
CG_BIN_FILE=confgenerator
K8S_CACHE_BIN_FILE=k8s-cache
NETFLOW_GENERATOR=nflow-generator
CMD_DIR=./cmd/
FLP_CONF_FILE ?= contrib/kubernetes/flowlogs-pipeline.conf.yaml
Expand All @@ -61,18 +63,21 @@ FORCE: ;
define build_target
echo 'building image for arch $(1)'; \
DOCKER_BUILDKIT=1 $(OCI_BIN) buildx build --load --build-arg TARGETPLATFORM=linux/$(1) --build-arg TARGETARCH=$(1) --build-arg BUILDPLATFORM=linux/amd64 ${OCI_BUILD_OPTS} -t ${IMAGE}-$(1) -f contrib/docker/Dockerfile .;
DOCKER_BUILDKIT=1 $(OCI_BIN) buildx build --load --build-arg TARGETPLATFORM=linux/$(1) --build-arg TARGETARCH=$(1) --build-arg BUILDPLATFORM=linux/amd64 ${OCI_BUILD_OPTS} -t ${IMAGE_CACHE}-$(1) -f contrib/docker/cache.Dockerfile .;
endef

# push a single arch target image
define push_target
echo 'pushing image ${IMAGE}-$(1)'; \
DOCKER_BUILDKIT=1 $(OCI_BIN) push ${IMAGE}-$(1);
DOCKER_BUILDKIT=1 $(OCI_BIN) push ${IMAGE_CACHE}-$(1);
endef

# manifest create a single arch target provided as argument
define manifest_add_target
echo 'manifest add target $(1)'; \
DOCKER_BUILDKIT=1 $(OCI_BIN) manifest add ${IMAGE} ${IMAGE}-$(1);
DOCKER_BUILDKIT=1 $(OCI_BIN) manifest add ${IMAGE_CACHE} ${IMAGE_CACHE}-$(1);
endef

##@ General
Expand Down Expand Up @@ -114,8 +119,12 @@ build_code:
GOARCH=${GOARCH} go build -ldflags "-X 'main.BuildVersion=$(BUILD_VERSION)' -X 'main.BuildDate=$(BUILD_DATE)'" "${CMD_DIR}${FLP_BIN_FILE}"
GOARCH=${GOARCH} go build -ldflags "-X 'main.BuildVersion=$(BUILD_VERSION)' -X 'main.BuildDate=$(BUILD_DATE)'" "${CMD_DIR}${CG_BIN_FILE}"

.PHONY: build_k8s_cache
build_k8s_cache:
GOARCH=${GOARCH} go build -ldflags "-X 'main.BuildVersion=$(BUILD_VERSION)' -X 'main.BuildDate=$(BUILD_DATE)'" "${CMD_DIR}${K8S_CACHE_BIN_FILE}"

.PHONY: build
build: validate_go lint build_code docs ## Build flowlogs-pipeline executable and update the docs
build: validate_go lint build_code build_k8s_cache docs ## Build flowlogs-pipeline executables and update the docs

.PHONY: docs
docs: FORCE ## Update flowlogs-pipeline documentation
Expand Down Expand Up @@ -187,16 +196,20 @@ image-push: ## Push MULTIARCH_TARGETS images
.PHONY: manifest-build
manifest-build: ## Build MULTIARCH_TARGETS manifest
@echo 'building manifest $(IMAGE)'
DOCKER_BUILDKIT=1 $(OCI_BIN) rmi ${IMAGE} -f
DOCKER_BUILDKIT=1 $(OCI_BIN) rmi ${IMAGE} -f || true
DOCKER_BUILDKIT=1 $(OCI_BIN) rmi ${IMAGE_CACHE} -f || true
DOCKER_BUILDKIT=1 $(OCI_BIN) manifest create ${IMAGE} $(foreach target,$(MULTIARCH_TARGETS), --amend ${IMAGE}-$(target));
DOCKER_BUILDKIT=1 $(OCI_BIN) manifest create ${IMAGE_CACHE} $(foreach target,$(MULTIARCH_TARGETS), --amend ${IMAGE_CACHE}-$(target));

.PHONY: manifest-push
manifest-push: ## Push MULTIARCH_TARGETS manifest
@echo 'publish manifest $(IMAGE)'
ifeq (${OCI_BIN}, docker)
DOCKER_BUILDKIT=1 $(OCI_BIN) manifest push ${IMAGE};
DOCKER_BUILDKIT=1 $(OCI_BIN) manifest push ${IMAGE_CACHE};
else
DOCKER_BUILDKIT=1 $(OCI_BIN) manifest push ${IMAGE} docker://${IMAGE};
DOCKER_BUILDKIT=1 $(OCI_BIN) manifest push ${IMAGE_CACHE} docker://${IMAGE_CACHE};
endif

include .mk/development.mk
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -924,7 +924,7 @@ General
Develop
lint Lint the code
build Build flowlogs-pipeline executable and update the docs
build Build flowlogs-pipeline executables and update the docs
docs Update flowlogs-pipeline documentation
clean Clean
tests-unit Unit tests
Expand Down
78 changes: 78 additions & 0 deletions cmd/k8s-cache/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package main

import (
"flag"
"fmt"
"os"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
"github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"
)

var (
buildVersion = "unknown"
buildDate = "unknown"
app = "flp-cache"
configPath = flag.String("config", "", "path to a config file")
versionFlag = flag.Bool("v", false, "print version")
log = logrus.WithField("module", "main")
)

type Config struct {
KubeConfigPath string `yaml:"kubeConfigPath"`
KafkaConfig api.EncodeKafka `yaml:"kafkaConfig"`
PProfPort int32 `yaml:"pprofPort"` // TODO: manage pprof
LogLevel string `yaml:"logLevel"`
}

func main() {
flag.Parse()

appVersion := fmt.Sprintf("%s [build version: %s, build date: %s]", app, buildVersion, buildDate)
if *versionFlag {
fmt.Println(appVersion)
os.Exit(0)
}

cfg, err := readConfig(*configPath)
if err != nil {
log.WithError(err).Fatal("error reading config file")
}

lvl, err := logrus.ParseLevel(cfg.LogLevel)
if err != nil {
log.Errorf("Log level %s not recognized, using info", cfg.LogLevel)
lvl = logrus.InfoLevel
}
logrus.SetLevel(lvl)
log.Infof("Starting %s at log level %s", appVersion, lvl)
log.Infof("Configuration: %#v", cfg)

err = kubernetes.InitInformerDatasource(cfg.KubeConfigPath, &cfg.KafkaConfig)
if err != nil {
log.WithError(err).Fatal("error initializing Kubernetes & informers")
}

stopCh := utils.SetupElegantExit()
<-stopCh
}

func readConfig(path string) (*Config, error) {
var cfg Config
if len(path) == 0 {
return &cfg, nil
}
yamlFile, err := os.ReadFile(path)
if err != nil {
return nil, err
}
err = yaml.Unmarshal(yamlFile, &cfg)
if err != nil {
return nil, err
}

return &cfg, err
}
29 changes: 29 additions & 0 deletions contrib/docker/cache.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# We do not use --platform feature to auto fill this ARG because of incompatibility between podman and docker
ARG TARGETPLATFORM=linux/amd64
ARG BUILDPLATFORM=linux/amd64
FROM --platform=$BUILDPLATFORM docker.io/library/golang:1.22 as builder

ARG TARGETPLATFORM
ARG TARGETARCH=amd64
WORKDIR /app

# Copy source code
COPY go.mod .
COPY go.sum .
COPY Makefile .
COPY .mk/ .mk/
COPY .bingo/ .bingo/
COPY vendor/ vendor/
COPY .git/ .git/
COPY cmd/ cmd/
COPY pkg/ pkg/

RUN git status --porcelain
RUN GOARCH=$TARGETARCH make build_k8s_cache

# final stage
FROM --platform=$TARGETPLATFORM registry.access.redhat.com/ubi9/ubi-minimal:9.4

COPY --from=builder /app/k8s-cache /app/

ENTRYPOINT ["/app/k8s-cache"]
26 changes: 26 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,32 @@ Following is the supported API format for network transformations:
output: entry output field
protocol: entry protocol field
kubeConfigPath: path to kubeconfig file (optional)
kafkaCacheConfig: Kafka config for informers cache (optional)
brokers: list of kafka broker addresses
topic: kafka topic to listen on
groupid: separate groupid for each consumer on specified topic
groupBalancers: list of balancing strategies (range, roundRobin, rackAffinity)
startOffset: FirstOffset (least recent - default) or LastOffset (most recent) offset available for a partition
batchReadTimeout: how often (in milliseconds) to process input
decoder: decoder to use (E.g. json or protobuf)
type: (enum) one of the following:
json: JSON decoder
protobuf: Protobuf decoder
batchMaxLen: the number of accumulated flows before being forwarded for processing
pullQueueCapacity: the capacity of the queue use to store pulled flows
pullMaxBytes: the maximum number of bytes being pulled from kafka
commitInterval: the interval (in milliseconds) at which offsets are committed to the broker. If 0, commits will be handled synchronously.
tls: TLS client configuration (optional)
insecureSkipVerify: skip client verifying the server's certificate chain and host name
caCertPath: path to the CA certificate
userCertPath: path to the user certificate
userKeyPath: path to the user private key
sasl: SASL configuration (optional)
type: SASL type
plain: Plain SASL
scramSHA512: SCRAM/SHA512 SASL
clientIDPath: path to the client ID / SASL username
clientSecretPath: path to the client secret / SASL password
servicesFile: path to services file (optional, default: /etc/services)
protocolsFile: path to protocols file (optional, default: /etc/protocols)
subnetLabels: configure subnet and IPs custom labels
Expand Down
13 changes: 7 additions & 6 deletions pkg/api/transform_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
package api

type TransformNetwork struct {
Rules NetworkTransformRules `yaml:"rules" json:"rules" doc:"list of transform rules, each includes:"`
KubeConfigPath string `yaml:"kubeConfigPath,omitempty" json:"kubeConfigPath,omitempty" doc:"path to kubeconfig file (optional)"`
ServicesFile string `yaml:"servicesFile,omitempty" json:"servicesFile,omitempty" doc:"path to services file (optional, default: /etc/services)"`
ProtocolsFile string `yaml:"protocolsFile,omitempty" json:"protocolsFile,omitempty" doc:"path to protocols file (optional, default: /etc/protocols)"`
SubnetLabels []NetworkTransformSubnetLabel `yaml:"subnetLabels,omitempty" json:"subnetLabels,omitempty" doc:"configure subnet and IPs custom labels"`
DirectionInfo NetworkTransformDirectionInfo `yaml:"directionInfo,omitempty" json:"directionInfo,omitempty" doc:"information to reinterpret flow direction (optional, to use with reinterpret_direction rule)"`
Rules NetworkTransformRules `yaml:"rules" json:"rules" doc:"list of transform rules, each includes:"`
KubeConfigPath string `yaml:"kubeConfigPath,omitempty" json:"kubeConfigPath,omitempty" doc:"path to kubeconfig file (optional)"`
KafkaCacheConfig *IngestKafka `yaml:"kafkaCacheConfig,omitempty" json:"kafkaCacheConfig,omitempty" doc:"Kafka config for informers cache (optional)"`
ServicesFile string `yaml:"servicesFile,omitempty" json:"servicesFile,omitempty" doc:"path to services file (optional, default: /etc/services)"`
ProtocolsFile string `yaml:"protocolsFile,omitempty" json:"protocolsFile,omitempty" doc:"path to protocols file (optional, default: /etc/protocols)"`
SubnetLabels []NetworkTransformSubnetLabel `yaml:"subnetLabels,omitempty" json:"subnetLabels,omitempty" doc:"configure subnet and IPs custom labels"`
DirectionInfo NetworkTransformDirectionInfo `yaml:"directionInfo,omitempty" json:"directionInfo,omitempty" doc:"information to reinterpret flow direction (optional, to use with reinterpret_direction rule)"`
}

func (tn *TransformNetwork) GetServiceFiles() (string, string) {
Expand Down
117 changes: 117 additions & 0 deletions pkg/kafka/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package kafka

import (
"errors"
"os"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
kafkago "github.com/segmentio/kafka-go"
"github.com/sirupsen/logrus"
)

var klog = logrus.WithField("component", "kafka-reader")

const defaultBatchReadTimeout = int64(1000)
const defaultKafkaBatchMaxLength = 500
const defaultKafkaCommitInterval = 500

func NewReader(config *api.IngestKafka) (*kafkago.Reader, int, error) {
startOffsetString := config.StartOffset
var startOffset int64
switch startOffsetString {
case "FirstOffset", "":
startOffset = kafkago.FirstOffset
case "LastOffset":
startOffset = kafkago.LastOffset
default:
startOffset = kafkago.FirstOffset
klog.Errorf("illegal value for StartOffset: %s; using default\n", startOffsetString)
}
klog.Debugf("startOffset = %v", startOffset)
groupBalancers := make([]kafkago.GroupBalancer, 0)
for _, gb := range config.GroupBalancers {
switch gb {
case "range":
groupBalancers = append(groupBalancers, &kafkago.RangeGroupBalancer{})
case "roundRobin":
groupBalancers = append(groupBalancers, &kafkago.RoundRobinGroupBalancer{})
case "rackAffinity":
groupBalancers = append(groupBalancers, &kafkago.RackAffinityGroupBalancer{})
default:
klog.Warningf("groupbalancers parameter missing")
groupBalancers = append(groupBalancers, &kafkago.RoundRobinGroupBalancer{})
}
}

batchReadTimeout := defaultBatchReadTimeout
if config.BatchReadTimeout != 0 {
batchReadTimeout = config.BatchReadTimeout
}
klog.Debugf("batchReadTimeout = %d", batchReadTimeout)

commitInterval := int64(defaultKafkaCommitInterval)
if config.CommitInterval != 0 {
commitInterval = config.CommitInterval
}
klog.Debugf("commitInterval = %d", config.CommitInterval)

dialer := &kafkago.Dialer{
Timeout: kafkago.DefaultDialer.Timeout,
DualStack: kafkago.DefaultDialer.DualStack,
}
if config.TLS != nil {
klog.Infof("Using TLS configuration: %v", config.TLS)
tlsConfig, err := config.TLS.Build()
if err != nil {
return nil, 0, err
}
dialer.TLS = tlsConfig
}

if config.SASL != nil {
m, err := utils.SetupSASLMechanism(config.SASL)
if err != nil {
return nil, 0, err
}
dialer.SASLMechanism = m
}

readerConfig := kafkago.ReaderConfig{
Brokers: config.Brokers,
Topic: config.Topic,
GroupID: config.GroupID,
GroupBalancers: groupBalancers,
StartOffset: startOffset,
CommitInterval: time.Duration(commitInterval) * time.Millisecond,
Dialer: dialer,
}

if readerConfig.GroupID == "" {
// Use hostname
readerConfig.GroupID = os.Getenv("HOSTNAME")
}

if config.PullQueueCapacity > 0 {
readerConfig.QueueCapacity = config.PullQueueCapacity
}

if config.PullMaxBytes > 0 {
readerConfig.MaxBytes = config.PullMaxBytes
}

bml := defaultKafkaBatchMaxLength
if config.BatchMaxLen != 0 {
bml = config.BatchMaxLen
}

klog.Debugf("reader config: %#v", readerConfig)

kafkaReader := kafkago.NewReader(readerConfig)
if kafkaReader == nil {
return nil, 0, errors.New("NewIngestKafka: failed to create kafka-go reader")
}

return kafkaReader, bml, nil
}
Loading

0 comments on commit ca14819

Please sign in to comment.