Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NETOBSERV-1248: shared k8s cache #681

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/push_image_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
- name: get short sha
run: echo "short_sha=$(git rev-parse --short HEAD)" >> $GITHUB_ENV
- name: build and push manifest with images
run: OCI_BUILD_OPTS="--label quay.expires-after=2w" IMAGE_ORG=${{ env.WF_ORG }} IMAGE=${{ env.WF_REGISTRY }}/${{ env.WF_IMAGE }}:${{ env.short_sha }} make images
run: OCI_BUILD_OPTS="--label quay.expires-after=2w" IMAGE_ORG=${{ env.WF_ORG }} IMAGE=${{ env.WF_REGISTRY }}/${{ env.WF_IMAGE }}:${{ env.short_sha }} IMAGE_CACHE=${{ env.WF_REGISTRY }}/${{ env.WF_IMAGE }}-cache:${{ env.short_sha }} make images
- uses: actions/github-script@v6
with:
github-token: ${{secrets.GITHUB_TOKEN}}
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/flowlogs-pipeline
/confgenerator
/k8s-cache
/bin/
cover.out
19 changes: 16 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,17 @@ IMAGE_TAG_BASE ?= quay.io/$(IMAGE_ORG)/flowlogs-pipeline

# Image URL to use all building/pushing image targets
IMAGE ?= $(IMAGE_TAG_BASE):$(VERSION)
IMAGE_CACHE ?= $(IMAGE_TAG_BASE)-cache:$(VERSION)
OCI_BUILD_OPTS ?=

# Image building tool (docker / podman) - docker is preferred in CI
OCI_BIN_PATH = $(shell which docker 2>/dev/null || which podman)
OCI_BIN ?= $(shell basename ${OCI_BIN_PATH})
OCI_BIN ?= $(shell basename ${OCI_BIN_PATH} 2>/dev/null)

MIN_GO_VERSION := 1.20.0
FLP_BIN_FILE=flowlogs-pipeline
CG_BIN_FILE=confgenerator
K8S_CACHE_BIN_FILE=k8s-cache
NETFLOW_GENERATOR=nflow-generator
CMD_DIR=./cmd/
FLP_CONF_FILE ?= contrib/kubernetes/flowlogs-pipeline.conf.yaml
Expand All @@ -61,18 +63,21 @@ FORCE: ;
define build_target
echo 'building image for arch $(1)'; \
DOCKER_BUILDKIT=1 $(OCI_BIN) buildx build --load --build-arg TARGETPLATFORM=linux/$(1) --build-arg TARGETARCH=$(1) --build-arg BUILDPLATFORM=linux/amd64 ${OCI_BUILD_OPTS} -t ${IMAGE}-$(1) -f contrib/docker/Dockerfile .;
DOCKER_BUILDKIT=1 $(OCI_BIN) buildx build --load --build-arg TARGETPLATFORM=linux/$(1) --build-arg TARGETARCH=$(1) --build-arg BUILDPLATFORM=linux/amd64 ${OCI_BUILD_OPTS} -t ${IMAGE_CACHE}-$(1) -f contrib/docker/cache.Dockerfile .;
endef

# push a single arch target image
define push_target
echo 'pushing image ${IMAGE}-$(1)'; \
DOCKER_BUILDKIT=1 $(OCI_BIN) push ${IMAGE}-$(1);
DOCKER_BUILDKIT=1 $(OCI_BIN) push ${IMAGE_CACHE}-$(1);
endef

# manifest create a single arch target provided as argument
define manifest_add_target
echo 'manifest add target $(1)'; \
DOCKER_BUILDKIT=1 $(OCI_BIN) manifest add ${IMAGE} ${IMAGE}-$(1);
DOCKER_BUILDKIT=1 $(OCI_BIN) manifest add ${IMAGE_CACHE} ${IMAGE_CACHE}-$(1);
endef

##@ General
Expand Down Expand Up @@ -114,8 +119,12 @@ build_code:
GOARCH=${GOARCH} go build -ldflags "-X 'main.BuildVersion=$(BUILD_VERSION)' -X 'main.BuildDate=$(BUILD_DATE)'" "${CMD_DIR}${FLP_BIN_FILE}"
GOARCH=${GOARCH} go build -ldflags "-X 'main.BuildVersion=$(BUILD_VERSION)' -X 'main.BuildDate=$(BUILD_DATE)'" "${CMD_DIR}${CG_BIN_FILE}"

.PHONY: build_k8s_cache
build_k8s_cache:
GOARCH=${GOARCH} go build -ldflags "-X 'main.BuildVersion=$(BUILD_VERSION)' -X 'main.BuildDate=$(BUILD_DATE)'" "${CMD_DIR}${K8S_CACHE_BIN_FILE}"

.PHONY: build
build: validate_go lint build_code docs ## Build flowlogs-pipeline executable and update the docs
build: validate_go lint build_code build_k8s_cache docs ## Build flowlogs-pipeline executables and update the docs

.PHONY: docs
docs: FORCE ## Update flowlogs-pipeline documentation
Expand Down Expand Up @@ -187,16 +196,20 @@ image-push: ## Push MULTIARCH_TARGETS images
.PHONY: manifest-build
manifest-build: ## Build MULTIARCH_TARGETS manifest
@echo 'building manifest $(IMAGE)'
DOCKER_BUILDKIT=1 $(OCI_BIN) rmi ${IMAGE} -f
DOCKER_BUILDKIT=1 $(OCI_BIN) rmi ${IMAGE} -f || true
DOCKER_BUILDKIT=1 $(OCI_BIN) rmi ${IMAGE_CACHE} -f || true
DOCKER_BUILDKIT=1 $(OCI_BIN) manifest create ${IMAGE} $(foreach target,$(MULTIARCH_TARGETS), --amend ${IMAGE}-$(target));
DOCKER_BUILDKIT=1 $(OCI_BIN) manifest create ${IMAGE_CACHE} $(foreach target,$(MULTIARCH_TARGETS), --amend ${IMAGE_CACHE}-$(target));

.PHONY: manifest-push
manifest-push: ## Push MULTIARCH_TARGETS manifest
@echo 'publish manifest $(IMAGE)'
ifeq (${OCI_BIN}, docker)
DOCKER_BUILDKIT=1 $(OCI_BIN) manifest push ${IMAGE};
DOCKER_BUILDKIT=1 $(OCI_BIN) manifest push ${IMAGE_CACHE};
else
DOCKER_BUILDKIT=1 $(OCI_BIN) manifest push ${IMAGE} docker://${IMAGE};
DOCKER_BUILDKIT=1 $(OCI_BIN) manifest push ${IMAGE_CACHE} docker://${IMAGE_CACHE};
endif

include .mk/development.mk
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -924,7 +924,7 @@ General

Develop
lint Lint the code
build Build flowlogs-pipeline executable and update the docs
build Build flowlogs-pipeline executables and update the docs
docs Update flowlogs-pipeline documentation
clean Clean
tests-unit Unit tests
Expand Down
78 changes: 78 additions & 0 deletions cmd/k8s-cache/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package main

import (
"flag"
"fmt"
"os"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
"github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"
)

var (
buildVersion = "unknown"
buildDate = "unknown"
app = "flp-cache"
configPath = flag.String("config", "", "path to a config file")
versionFlag = flag.Bool("v", false, "print version")
log = logrus.WithField("module", "main")
)

type Config struct {
KubeConfigPath string `yaml:"kubeConfigPath"`
KafkaConfig api.EncodeKafka `yaml:"kafkaConfig"`
PProfPort int32 `yaml:"pprofPort"` // TODO: manage pprof
LogLevel string `yaml:"logLevel"`
}

func main() {
flag.Parse()

appVersion := fmt.Sprintf("%s [build version: %s, build date: %s]", app, buildVersion, buildDate)
if *versionFlag {
fmt.Println(appVersion)
os.Exit(0)
}

cfg, err := readConfig(*configPath)
if err != nil {
log.WithError(err).Fatal("error reading config file")
}

lvl, err := logrus.ParseLevel(cfg.LogLevel)
if err != nil {
log.Errorf("Log level %s not recognized, using info", cfg.LogLevel)
lvl = logrus.InfoLevel
}
logrus.SetLevel(lvl)
log.Infof("Starting %s at log level %s", appVersion, lvl)
log.Infof("Configuration: %#v", cfg)

err = kubernetes.InitInformerDatasource(cfg.KubeConfigPath, &cfg.KafkaConfig)
if err != nil {
log.WithError(err).Fatal("error initializing Kubernetes & informers")
}

stopCh := utils.SetupElegantExit()
<-stopCh
}

func readConfig(path string) (*Config, error) {
var cfg Config
if len(path) == 0 {
return &cfg, nil
}
yamlFile, err := os.ReadFile(path)
if err != nil {
return nil, err
}
err = yaml.Unmarshal(yamlFile, &cfg)
if err != nil {
return nil, err
}

return &cfg, err
}
29 changes: 29 additions & 0 deletions contrib/docker/cache.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# We do not use --platform feature to auto fill this ARG because of incompatibility between podman and docker
ARG TARGETPLATFORM=linux/amd64
ARG BUILDPLATFORM=linux/amd64
FROM --platform=$BUILDPLATFORM docker.io/library/golang:1.22 as builder

ARG TARGETPLATFORM
ARG TARGETARCH=amd64
WORKDIR /app

# Copy source code
COPY go.mod .
COPY go.sum .
COPY Makefile .
COPY .mk/ .mk/
COPY .bingo/ .bingo/
COPY vendor/ vendor/
COPY .git/ .git/
COPY cmd/ cmd/
COPY pkg/ pkg/

RUN git status --porcelain
RUN GOARCH=$TARGETARCH make build_k8s_cache

# final stage
FROM --platform=$TARGETPLATFORM registry.access.redhat.com/ubi9/ubi-minimal:9.4

COPY --from=builder /app/k8s-cache /app/

ENTRYPOINT ["/app/k8s-cache"]
26 changes: 26 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,32 @@ Following is the supported API format for network transformations:
output: entry output field
protocol: entry protocol field
kubeConfigPath: path to kubeconfig file (optional)
kafkaCacheConfig: Kafka config for informers cache (optional)
brokers: list of kafka broker addresses
topic: kafka topic to listen on
groupid: separate groupid for each consumer on specified topic
groupBalancers: list of balancing strategies (range, roundRobin, rackAffinity)
startOffset: FirstOffset (least recent - default) or LastOffset (most recent) offset available for a partition
batchReadTimeout: how often (in milliseconds) to process input
decoder: decoder to use (E.g. json or protobuf)
type: (enum) one of the following:
json: JSON decoder
protobuf: Protobuf decoder
batchMaxLen: the number of accumulated flows before being forwarded for processing
pullQueueCapacity: the capacity of the queue use to store pulled flows
pullMaxBytes: the maximum number of bytes being pulled from kafka
commitInterval: the interval (in milliseconds) at which offsets are committed to the broker. If 0, commits will be handled synchronously.
tls: TLS client configuration (optional)
insecureSkipVerify: skip client verifying the server's certificate chain and host name
caCertPath: path to the CA certificate
userCertPath: path to the user certificate
userKeyPath: path to the user private key
sasl: SASL configuration (optional)
type: SASL type
plain: Plain SASL
scramSHA512: SCRAM/SHA512 SASL
clientIDPath: path to the client ID / SASL username
clientSecretPath: path to the client secret / SASL password
servicesFile: path to services file (optional, default: /etc/services)
protocolsFile: path to protocols file (optional, default: /etc/protocols)
subnetLabels: configure subnet and IPs custom labels
Expand Down
13 changes: 7 additions & 6 deletions pkg/api/transform_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
package api

type TransformNetwork struct {
Rules NetworkTransformRules `yaml:"rules" json:"rules" doc:"list of transform rules, each includes:"`
KubeConfigPath string `yaml:"kubeConfigPath,omitempty" json:"kubeConfigPath,omitempty" doc:"path to kubeconfig file (optional)"`
ServicesFile string `yaml:"servicesFile,omitempty" json:"servicesFile,omitempty" doc:"path to services file (optional, default: /etc/services)"`
ProtocolsFile string `yaml:"protocolsFile,omitempty" json:"protocolsFile,omitempty" doc:"path to protocols file (optional, default: /etc/protocols)"`
SubnetLabels []NetworkTransformSubnetLabel `yaml:"subnetLabels,omitempty" json:"subnetLabels,omitempty" doc:"configure subnet and IPs custom labels"`
DirectionInfo NetworkTransformDirectionInfo `yaml:"directionInfo,omitempty" json:"directionInfo,omitempty" doc:"information to reinterpret flow direction (optional, to use with reinterpret_direction rule)"`
Rules NetworkTransformRules `yaml:"rules" json:"rules" doc:"list of transform rules, each includes:"`
KubeConfigPath string `yaml:"kubeConfigPath,omitempty" json:"kubeConfigPath,omitempty" doc:"path to kubeconfig file (optional)"`
KafkaCacheConfig *IngestKafka `yaml:"kafkaCacheConfig,omitempty" json:"kafkaCacheConfig,omitempty" doc:"Kafka config for informers cache (optional)"`
ServicesFile string `yaml:"servicesFile,omitempty" json:"servicesFile,omitempty" doc:"path to services file (optional, default: /etc/services)"`
ProtocolsFile string `yaml:"protocolsFile,omitempty" json:"protocolsFile,omitempty" doc:"path to protocols file (optional, default: /etc/protocols)"`
SubnetLabels []NetworkTransformSubnetLabel `yaml:"subnetLabels,omitempty" json:"subnetLabels,omitempty" doc:"configure subnet and IPs custom labels"`
DirectionInfo NetworkTransformDirectionInfo `yaml:"directionInfo,omitempty" json:"directionInfo,omitempty" doc:"information to reinterpret flow direction (optional, to use with reinterpret_direction rule)"`
}

func (tn *TransformNetwork) GetServiceFiles() (string, string) {
Expand Down
117 changes: 117 additions & 0 deletions pkg/kafka/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package kafka

import (
"errors"
"os"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
kafkago "github.com/segmentio/kafka-go"
"github.com/sirupsen/logrus"
)

var klog = logrus.WithField("component", "kafka-reader")

const defaultBatchReadTimeout = int64(1000)
const defaultKafkaBatchMaxLength = 500
const defaultKafkaCommitInterval = 500

func NewReader(config *api.IngestKafka) (*kafkago.Reader, int, error) {
startOffsetString := config.StartOffset
var startOffset int64
switch startOffsetString {
case "FirstOffset", "":
startOffset = kafkago.FirstOffset
case "LastOffset":
startOffset = kafkago.LastOffset
default:
startOffset = kafkago.FirstOffset
klog.Errorf("illegal value for StartOffset: %s; using default\n", startOffsetString)
}
klog.Debugf("startOffset = %v", startOffset)
groupBalancers := make([]kafkago.GroupBalancer, 0)
for _, gb := range config.GroupBalancers {
switch gb {
case "range":
groupBalancers = append(groupBalancers, &kafkago.RangeGroupBalancer{})
case "roundRobin":
groupBalancers = append(groupBalancers, &kafkago.RoundRobinGroupBalancer{})
case "rackAffinity":
groupBalancers = append(groupBalancers, &kafkago.RackAffinityGroupBalancer{})
default:
klog.Warningf("groupbalancers parameter missing")
groupBalancers = append(groupBalancers, &kafkago.RoundRobinGroupBalancer{})
}
}

batchReadTimeout := defaultBatchReadTimeout
if config.BatchReadTimeout != 0 {
batchReadTimeout = config.BatchReadTimeout
}
klog.Debugf("batchReadTimeout = %d", batchReadTimeout)

commitInterval := int64(defaultKafkaCommitInterval)
if config.CommitInterval != 0 {
commitInterval = config.CommitInterval
}
klog.Debugf("commitInterval = %d", config.CommitInterval)

dialer := &kafkago.Dialer{
Timeout: kafkago.DefaultDialer.Timeout,
DualStack: kafkago.DefaultDialer.DualStack,
}
if config.TLS != nil {
klog.Infof("Using TLS configuration: %v", config.TLS)
tlsConfig, err := config.TLS.Build()
if err != nil {
return nil, 0, err
}
dialer.TLS = tlsConfig
}

if config.SASL != nil {
m, err := utils.SetupSASLMechanism(config.SASL)
if err != nil {
return nil, 0, err
}
dialer.SASLMechanism = m
}

readerConfig := kafkago.ReaderConfig{
Brokers: config.Brokers,
Topic: config.Topic,
GroupID: config.GroupID,
GroupBalancers: groupBalancers,
StartOffset: startOffset,
CommitInterval: time.Duration(commitInterval) * time.Millisecond,
Dialer: dialer,
}

if readerConfig.GroupID == "" {
// Use hostname
readerConfig.GroupID = os.Getenv("HOSTNAME")
}

if config.PullQueueCapacity > 0 {
readerConfig.QueueCapacity = config.PullQueueCapacity
}

if config.PullMaxBytes > 0 {
readerConfig.MaxBytes = config.PullMaxBytes
}

bml := defaultKafkaBatchMaxLength
if config.BatchMaxLen != 0 {
bml = config.BatchMaxLen
}

klog.Debugf("reader config: %#v", readerConfig)

kafkaReader := kafkago.NewReader(readerConfig)
if kafkaReader == nil {
return nil, 0, errors.New("NewIngestKafka: failed to create kafka-go reader")
}

return kafkaReader, bml, nil
}
Loading
Loading