diff --git a/.github/workflows/push_image_pr.yml b/.github/workflows/push_image_pr.yml index 621348b55..56cc443cd 100644 --- a/.github/workflows/push_image_pr.yml +++ b/.github/workflows/push_image_pr.yml @@ -37,7 +37,7 @@ jobs: - name: get short sha run: echo "short_sha=$(git rev-parse --short HEAD)" >> $GITHUB_ENV - name: build and push manifest with images - run: OCI_BUILD_OPTS="--label quay.expires-after=2w" IMAGE_ORG=${{ env.WF_ORG }} IMAGE=${{ env.WF_REGISTRY }}/${{ env.WF_IMAGE }}:${{ env.short_sha }} make images + run: OCI_BUILD_OPTS="--label quay.expires-after=2w" IMAGE_ORG=${{ env.WF_ORG }} IMAGE=${{ env.WF_REGISTRY }}/${{ env.WF_IMAGE }}:${{ env.short_sha }} IMAGE_CACHE=${{ env.WF_REGISTRY }}/${{ env.WF_IMAGE }}-cache:${{ env.short_sha }} make images - uses: actions/github-script@v6 with: github-token: ${{secrets.GITHUB_TOKEN}} diff --git a/.gitignore b/.gitignore index 16ce1e8da..f536a19e1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ /flowlogs-pipeline /confgenerator +/k8s-cache /bin/ cover.out diff --git a/Makefile b/Makefile index 4d8285033..a9a34a873 100644 --- a/Makefile +++ b/Makefile @@ -39,15 +39,17 @@ IMAGE_TAG_BASE ?= quay.io/$(IMAGE_ORG)/flowlogs-pipeline # Image URL to use all building/pushing image targets IMAGE ?= $(IMAGE_TAG_BASE):$(VERSION) +IMAGE_CACHE ?= $(IMAGE_TAG_BASE)-cache:$(VERSION) OCI_BUILD_OPTS ?= # Image building tool (docker / podman) - docker is preferred in CI OCI_BIN_PATH = $(shell which docker 2>/dev/null || which podman) -OCI_BIN ?= $(shell basename ${OCI_BIN_PATH}) +OCI_BIN ?= $(shell basename ${OCI_BIN_PATH} 2>/dev/null) MIN_GO_VERSION := 1.20.0 FLP_BIN_FILE=flowlogs-pipeline CG_BIN_FILE=confgenerator +K8S_CACHE_BIN_FILE=k8s-cache NETFLOW_GENERATOR=nflow-generator CMD_DIR=./cmd/ FLP_CONF_FILE ?= contrib/kubernetes/flowlogs-pipeline.conf.yaml @@ -61,18 +63,21 @@ FORCE: ; define build_target echo 'building image for arch $(1)'; \ DOCKER_BUILDKIT=1 $(OCI_BIN) buildx build --load --build-arg TARGETPLATFORM=linux/$(1) --build-arg TARGETARCH=$(1) --build-arg BUILDPLATFORM=linux/amd64 ${OCI_BUILD_OPTS} -t ${IMAGE}-$(1) -f contrib/docker/Dockerfile .; + DOCKER_BUILDKIT=1 $(OCI_BIN) buildx build --load --build-arg TARGETPLATFORM=linux/$(1) --build-arg TARGETARCH=$(1) --build-arg BUILDPLATFORM=linux/amd64 ${OCI_BUILD_OPTS} -t ${IMAGE_CACHE}-$(1) -f contrib/docker/cache.Dockerfile .; endef # push a single arch target image define push_target echo 'pushing image ${IMAGE}-$(1)'; \ DOCKER_BUILDKIT=1 $(OCI_BIN) push ${IMAGE}-$(1); + DOCKER_BUILDKIT=1 $(OCI_BIN) push ${IMAGE_CACHE}-$(1); endef # manifest create a single arch target provided as argument define manifest_add_target echo 'manifest add target $(1)'; \ DOCKER_BUILDKIT=1 $(OCI_BIN) manifest add ${IMAGE} ${IMAGE}-$(1); + DOCKER_BUILDKIT=1 $(OCI_BIN) manifest add ${IMAGE_CACHE} ${IMAGE_CACHE}-$(1); endef ##@ General @@ -114,8 +119,12 @@ build_code: GOARCH=${GOARCH} go build -ldflags "-X 'main.BuildVersion=$(BUILD_VERSION)' -X 'main.BuildDate=$(BUILD_DATE)'" "${CMD_DIR}${FLP_BIN_FILE}" GOARCH=${GOARCH} go build -ldflags "-X 'main.BuildVersion=$(BUILD_VERSION)' -X 'main.BuildDate=$(BUILD_DATE)'" "${CMD_DIR}${CG_BIN_FILE}" +.PHONY: build_k8s_cache +build_k8s_cache: + GOARCH=${GOARCH} go build -ldflags "-X 'main.BuildVersion=$(BUILD_VERSION)' -X 'main.BuildDate=$(BUILD_DATE)'" "${CMD_DIR}${K8S_CACHE_BIN_FILE}" + .PHONY: build -build: validate_go lint build_code docs ## Build flowlogs-pipeline executable and update the docs +build: validate_go lint build_code build_k8s_cache docs ## Build flowlogs-pipeline executables and update the docs .PHONY: docs docs: FORCE ## Update flowlogs-pipeline documentation @@ -187,16 +196,20 @@ image-push: ## Push MULTIARCH_TARGETS images .PHONY: manifest-build manifest-build: ## Build MULTIARCH_TARGETS manifest @echo 'building manifest $(IMAGE)' - DOCKER_BUILDKIT=1 $(OCI_BIN) rmi ${IMAGE} -f + DOCKER_BUILDKIT=1 $(OCI_BIN) rmi ${IMAGE} -f || true + DOCKER_BUILDKIT=1 $(OCI_BIN) rmi ${IMAGE_CACHE} -f || true DOCKER_BUILDKIT=1 $(OCI_BIN) manifest create ${IMAGE} $(foreach target,$(MULTIARCH_TARGETS), --amend ${IMAGE}-$(target)); + DOCKER_BUILDKIT=1 $(OCI_BIN) manifest create ${IMAGE_CACHE} $(foreach target,$(MULTIARCH_TARGETS), --amend ${IMAGE_CACHE}-$(target)); .PHONY: manifest-push manifest-push: ## Push MULTIARCH_TARGETS manifest @echo 'publish manifest $(IMAGE)' ifeq (${OCI_BIN}, docker) DOCKER_BUILDKIT=1 $(OCI_BIN) manifest push ${IMAGE}; + DOCKER_BUILDKIT=1 $(OCI_BIN) manifest push ${IMAGE_CACHE}; else DOCKER_BUILDKIT=1 $(OCI_BIN) manifest push ${IMAGE} docker://${IMAGE}; + DOCKER_BUILDKIT=1 $(OCI_BIN) manifest push ${IMAGE_CACHE} docker://${IMAGE_CACHE}; endif include .mk/development.mk diff --git a/README.md b/README.md index 61e3efccb..d4535db33 100644 --- a/README.md +++ b/README.md @@ -924,7 +924,7 @@ General Develop lint Lint the code - build Build flowlogs-pipeline executable and update the docs + build Build flowlogs-pipeline executables and update the docs docs Update flowlogs-pipeline documentation clean Clean tests-unit Unit tests diff --git a/cmd/k8s-cache/main.go b/cmd/k8s-cache/main.go new file mode 100644 index 000000000..7cf77dd04 --- /dev/null +++ b/cmd/k8s-cache/main.go @@ -0,0 +1,78 @@ +package main + +import ( + "flag" + "fmt" + "os" + + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" + "github.com/sirupsen/logrus" + "gopkg.in/yaml.v2" +) + +var ( + buildVersion = "unknown" + buildDate = "unknown" + app = "flp-cache" + configPath = flag.String("config", "", "path to a config file") + versionFlag = flag.Bool("v", false, "print version") + log = logrus.WithField("module", "main") +) + +type Config struct { + KubeConfigPath string `yaml:"kubeConfigPath"` + KafkaConfig api.EncodeKafka `yaml:"kafkaConfig"` + PProfPort int32 `yaml:"pprofPort"` // TODO: manage pprof + LogLevel string `yaml:"logLevel"` +} + +func main() { + flag.Parse() + + appVersion := fmt.Sprintf("%s [build version: %s, build date: %s]", app, buildVersion, buildDate) + if *versionFlag { + fmt.Println(appVersion) + os.Exit(0) + } + + cfg, err := readConfig(*configPath) + if err != nil { + log.WithError(err).Fatal("error reading config file") + } + + lvl, err := logrus.ParseLevel(cfg.LogLevel) + if err != nil { + log.Errorf("Log level %s not recognized, using info", cfg.LogLevel) + lvl = logrus.InfoLevel + } + logrus.SetLevel(lvl) + log.Infof("Starting %s at log level %s", appVersion, lvl) + log.Infof("Configuration: %#v", cfg) + + err = kubernetes.InitInformerDatasource(cfg.KubeConfigPath, &cfg.KafkaConfig) + if err != nil { + log.WithError(err).Fatal("error initializing Kubernetes & informers") + } + + stopCh := utils.SetupElegantExit() + <-stopCh +} + +func readConfig(path string) (*Config, error) { + var cfg Config + if len(path) == 0 { + return &cfg, nil + } + yamlFile, err := os.ReadFile(path) + if err != nil { + return nil, err + } + err = yaml.Unmarshal(yamlFile, &cfg) + if err != nil { + return nil, err + } + + return &cfg, err +} diff --git a/contrib/docker/cache.Dockerfile b/contrib/docker/cache.Dockerfile new file mode 100644 index 000000000..45c2f0bf3 --- /dev/null +++ b/contrib/docker/cache.Dockerfile @@ -0,0 +1,29 @@ +# We do not use --platform feature to auto fill this ARG because of incompatibility between podman and docker +ARG TARGETPLATFORM=linux/amd64 +ARG BUILDPLATFORM=linux/amd64 +FROM --platform=$BUILDPLATFORM docker.io/library/golang:1.22 as builder + +ARG TARGETPLATFORM +ARG TARGETARCH=amd64 +WORKDIR /app + +# Copy source code +COPY go.mod . +COPY go.sum . +COPY Makefile . +COPY .mk/ .mk/ +COPY .bingo/ .bingo/ +COPY vendor/ vendor/ +COPY .git/ .git/ +COPY cmd/ cmd/ +COPY pkg/ pkg/ + +RUN git status --porcelain +RUN GOARCH=$TARGETARCH make build_k8s_cache + +# final stage +FROM --platform=$TARGETPLATFORM registry.access.redhat.com/ubi9/ubi-minimal:9.4 + +COPY --from=builder /app/k8s-cache /app/ + +ENTRYPOINT ["/app/k8s-cache"] diff --git a/docs/api.md b/docs/api.md index f4d096f52..0bf13c10d 100644 --- a/docs/api.md +++ b/docs/api.md @@ -247,6 +247,32 @@ Following is the supported API format for network transformations: output: entry output field protocol: entry protocol field kubeConfigPath: path to kubeconfig file (optional) + kafkaCacheConfig: Kafka config for informers cache (optional) + brokers: list of kafka broker addresses + topic: kafka topic to listen on + groupid: separate groupid for each consumer on specified topic + groupBalancers: list of balancing strategies (range, roundRobin, rackAffinity) + startOffset: FirstOffset (least recent - default) or LastOffset (most recent) offset available for a partition + batchReadTimeout: how often (in milliseconds) to process input + decoder: decoder to use (E.g. json or protobuf) + type: (enum) one of the following: + json: JSON decoder + protobuf: Protobuf decoder + batchMaxLen: the number of accumulated flows before being forwarded for processing + pullQueueCapacity: the capacity of the queue use to store pulled flows + pullMaxBytes: the maximum number of bytes being pulled from kafka + commitInterval: the interval (in milliseconds) at which offsets are committed to the broker. If 0, commits will be handled synchronously. + tls: TLS client configuration (optional) + insecureSkipVerify: skip client verifying the server's certificate chain and host name + caCertPath: path to the CA certificate + userCertPath: path to the user certificate + userKeyPath: path to the user private key + sasl: SASL configuration (optional) + type: SASL type + plain: Plain SASL + scramSHA512: SCRAM/SHA512 SASL + clientIDPath: path to the client ID / SASL username + clientSecretPath: path to the client secret / SASL password servicesFile: path to services file (optional, default: /etc/services) protocolsFile: path to protocols file (optional, default: /etc/protocols) subnetLabels: configure subnet and IPs custom labels diff --git a/pkg/api/transform_network.go b/pkg/api/transform_network.go index b78b6665c..9b5d42c2c 100644 --- a/pkg/api/transform_network.go +++ b/pkg/api/transform_network.go @@ -18,12 +18,13 @@ package api type TransformNetwork struct { - Rules NetworkTransformRules `yaml:"rules" json:"rules" doc:"list of transform rules, each includes:"` - KubeConfigPath string `yaml:"kubeConfigPath,omitempty" json:"kubeConfigPath,omitempty" doc:"path to kubeconfig file (optional)"` - ServicesFile string `yaml:"servicesFile,omitempty" json:"servicesFile,omitempty" doc:"path to services file (optional, default: /etc/services)"` - ProtocolsFile string `yaml:"protocolsFile,omitempty" json:"protocolsFile,omitempty" doc:"path to protocols file (optional, default: /etc/protocols)"` - SubnetLabels []NetworkTransformSubnetLabel `yaml:"subnetLabels,omitempty" json:"subnetLabels,omitempty" doc:"configure subnet and IPs custom labels"` - DirectionInfo NetworkTransformDirectionInfo `yaml:"directionInfo,omitempty" json:"directionInfo,omitempty" doc:"information to reinterpret flow direction (optional, to use with reinterpret_direction rule)"` + Rules NetworkTransformRules `yaml:"rules" json:"rules" doc:"list of transform rules, each includes:"` + KubeConfigPath string `yaml:"kubeConfigPath,omitempty" json:"kubeConfigPath,omitempty" doc:"path to kubeconfig file (optional)"` + KafkaCacheConfig *IngestKafka `yaml:"kafkaCacheConfig,omitempty" json:"kafkaCacheConfig,omitempty" doc:"Kafka config for informers cache (optional)"` + ServicesFile string `yaml:"servicesFile,omitempty" json:"servicesFile,omitempty" doc:"path to services file (optional, default: /etc/services)"` + ProtocolsFile string `yaml:"protocolsFile,omitempty" json:"protocolsFile,omitempty" doc:"path to protocols file (optional, default: /etc/protocols)"` + SubnetLabels []NetworkTransformSubnetLabel `yaml:"subnetLabels,omitempty" json:"subnetLabels,omitempty" doc:"configure subnet and IPs custom labels"` + DirectionInfo NetworkTransformDirectionInfo `yaml:"directionInfo,omitempty" json:"directionInfo,omitempty" doc:"information to reinterpret flow direction (optional, to use with reinterpret_direction rule)"` } func (tn *TransformNetwork) GetServiceFiles() (string, string) { diff --git a/pkg/kafka/reader.go b/pkg/kafka/reader.go new file mode 100644 index 000000000..fb82523c7 --- /dev/null +++ b/pkg/kafka/reader.go @@ -0,0 +1,117 @@ +package kafka + +import ( + "errors" + "os" + "time" + + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" + kafkago "github.com/segmentio/kafka-go" + "github.com/sirupsen/logrus" +) + +var klog = logrus.WithField("component", "kafka-reader") + +const defaultBatchReadTimeout = int64(1000) +const defaultKafkaBatchMaxLength = 500 +const defaultKafkaCommitInterval = 500 + +func NewReader(config *api.IngestKafka) (*kafkago.Reader, int, error) { + startOffsetString := config.StartOffset + var startOffset int64 + switch startOffsetString { + case "FirstOffset", "": + startOffset = kafkago.FirstOffset + case "LastOffset": + startOffset = kafkago.LastOffset + default: + startOffset = kafkago.FirstOffset + klog.Errorf("illegal value for StartOffset: %s; using default\n", startOffsetString) + } + klog.Debugf("startOffset = %v", startOffset) + groupBalancers := make([]kafkago.GroupBalancer, 0) + for _, gb := range config.GroupBalancers { + switch gb { + case "range": + groupBalancers = append(groupBalancers, &kafkago.RangeGroupBalancer{}) + case "roundRobin": + groupBalancers = append(groupBalancers, &kafkago.RoundRobinGroupBalancer{}) + case "rackAffinity": + groupBalancers = append(groupBalancers, &kafkago.RackAffinityGroupBalancer{}) + default: + klog.Warningf("groupbalancers parameter missing") + groupBalancers = append(groupBalancers, &kafkago.RoundRobinGroupBalancer{}) + } + } + + batchReadTimeout := defaultBatchReadTimeout + if config.BatchReadTimeout != 0 { + batchReadTimeout = config.BatchReadTimeout + } + klog.Debugf("batchReadTimeout = %d", batchReadTimeout) + + commitInterval := int64(defaultKafkaCommitInterval) + if config.CommitInterval != 0 { + commitInterval = config.CommitInterval + } + klog.Debugf("commitInterval = %d", config.CommitInterval) + + dialer := &kafkago.Dialer{ + Timeout: kafkago.DefaultDialer.Timeout, + DualStack: kafkago.DefaultDialer.DualStack, + } + if config.TLS != nil { + klog.Infof("Using TLS configuration: %v", config.TLS) + tlsConfig, err := config.TLS.Build() + if err != nil { + return nil, 0, err + } + dialer.TLS = tlsConfig + } + + if config.SASL != nil { + m, err := utils.SetupSASLMechanism(config.SASL) + if err != nil { + return nil, 0, err + } + dialer.SASLMechanism = m + } + + readerConfig := kafkago.ReaderConfig{ + Brokers: config.Brokers, + Topic: config.Topic, + GroupID: config.GroupID, + GroupBalancers: groupBalancers, + StartOffset: startOffset, + CommitInterval: time.Duration(commitInterval) * time.Millisecond, + Dialer: dialer, + } + + if readerConfig.GroupID == "" { + // Use hostname + readerConfig.GroupID = os.Getenv("HOSTNAME") + } + + if config.PullQueueCapacity > 0 { + readerConfig.QueueCapacity = config.PullQueueCapacity + } + + if config.PullMaxBytes > 0 { + readerConfig.MaxBytes = config.PullMaxBytes + } + + bml := defaultKafkaBatchMaxLength + if config.BatchMaxLen != 0 { + bml = config.BatchMaxLen + } + + klog.Debugf("reader config: %#v", readerConfig) + + kafkaReader := kafkago.NewReader(readerConfig) + if kafkaReader == nil { + return nil, 0, errors.New("NewIngestKafka: failed to create kafka-go reader") + } + + return kafkaReader, bml, nil +} diff --git a/pkg/kafka/writer.go b/pkg/kafka/writer.go new file mode 100644 index 000000000..b81d8f80d --- /dev/null +++ b/pkg/kafka/writer.go @@ -0,0 +1,77 @@ +package kafka + +import ( + "time" + + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" + kafkago "github.com/segmentio/kafka-go" + log "github.com/sirupsen/logrus" +) + +const ( + defaultReadTimeoutSeconds = int64(10) + defaultWriteTimeoutSeconds = int64(10) +) + +func NewWriter(config *api.EncodeKafka) (*kafkago.Writer, error) { + var balancer kafkago.Balancer + switch config.Balancer { + case api.KafkaRoundRobin: + balancer = &kafkago.RoundRobin{} + case api.KafkaLeastBytes: + balancer = &kafkago.LeastBytes{} + case api.KafkaHash: + balancer = &kafkago.Hash{} + case api.KafkaCrc32: + balancer = &kafkago.CRC32Balancer{} + case api.KafkaMurmur2: + balancer = &kafkago.Murmur2Balancer{} + default: + balancer = nil + } + + readTimeoutSecs := defaultReadTimeoutSeconds + if config.ReadTimeout != 0 { + readTimeoutSecs = config.ReadTimeout + } + + writeTimeoutSecs := defaultWriteTimeoutSeconds + if config.WriteTimeout != 0 { + writeTimeoutSecs = config.WriteTimeout + } + + transport := kafkago.Transport{} + if config.TLS != nil { + log.Infof("Using TLS configuration: %v", config.TLS) + tlsConfig, err := config.TLS.Build() + if err != nil { + return nil, err + } + transport.TLS = tlsConfig + } + + if config.SASL != nil { + m, err := utils.SetupSASLMechanism(config.SASL) + if err != nil { + return nil, err + } + transport.SASL = m + } + + kafkaWriter := kafkago.Writer{ + Addr: kafkago.TCP(config.Address), + Topic: config.Topic, + Balancer: balancer, + ReadTimeout: time.Duration(readTimeoutSecs) * time.Second, + WriteTimeout: time.Duration(writeTimeoutSecs) * time.Second, + BatchSize: config.BatchSize, + BatchBytes: config.BatchBytes, + // Temporary fix may be we should implement a batching systems + // https://github.com/segmentio/kafka-go/issues/326#issuecomment-519375403 + BatchTimeout: time.Nanosecond, + Transport: &transport, + } + + return &kafkaWriter, nil +} diff --git a/pkg/pipeline/encode/encode_kafka.go b/pkg/pipeline/encode/encode_kafka.go index 2ead32be6..dcb16de27 100644 --- a/pkg/pipeline/encode/encode_kafka.go +++ b/pkg/pipeline/encode/encode_kafka.go @@ -19,23 +19,17 @@ package encode import ( "encoding/json" - "time" "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/kafka" "github.com/netobserv/flowlogs-pipeline/pkg/operational" - "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" "github.com/prometheus/client_golang/prometheus" kafkago "github.com/segmentio/kafka-go" log "github.com/sirupsen/logrus" "golang.org/x/net/context" ) -const ( - defaultReadTimeoutSeconds = int64(10) - defaultWriteTimeoutSeconds = int64(10) -) - type kafkaWriteMessage interface { WriteMessages(ctx context.Context, msgs ...kafkago.Message) error } @@ -78,68 +72,14 @@ func NewEncodeKafka(opMetrics *operational.Metrics, params config.StageParam) (E config = *params.Encode.Kafka } - var balancer kafkago.Balancer - switch config.Balancer { - case api.KafkaRoundRobin: - balancer = &kafkago.RoundRobin{} - case api.KafkaLeastBytes: - balancer = &kafkago.LeastBytes{} - case api.KafkaHash: - balancer = &kafkago.Hash{} - case api.KafkaCrc32: - balancer = &kafkago.CRC32Balancer{} - case api.KafkaMurmur2: - balancer = &kafkago.Murmur2Balancer{} - default: - balancer = nil - } - - readTimeoutSecs := defaultReadTimeoutSeconds - if config.ReadTimeout != 0 { - readTimeoutSecs = config.ReadTimeout - } - - writeTimeoutSecs := defaultWriteTimeoutSeconds - if config.WriteTimeout != 0 { - writeTimeoutSecs = config.WriteTimeout - } - - transport := kafkago.Transport{} - if config.TLS != nil { - log.Infof("Using TLS configuration: %v", config.TLS) - tlsConfig, err := config.TLS.Build() - if err != nil { - return nil, err - } - transport.TLS = tlsConfig - } - - if config.SASL != nil { - m, err := utils.SetupSASLMechanism(config.SASL) - if err != nil { - return nil, err - } - transport.SASL = m - } - - // connect to the kafka server - kafkaWriter := kafkago.Writer{ - Addr: kafkago.TCP(config.Address), - Topic: config.Topic, - Balancer: balancer, - ReadTimeout: time.Duration(readTimeoutSecs) * time.Second, - WriteTimeout: time.Duration(writeTimeoutSecs) * time.Second, - BatchSize: config.BatchSize, - BatchBytes: config.BatchBytes, - // Temporary fix may be we should implement a batching systems - // https://github.com/segmentio/kafka-go/issues/326#issuecomment-519375403 - BatchTimeout: time.Nanosecond, - Transport: &transport, + kafkaWriter, err := kafka.NewWriter(&config) + if err != nil { + return nil, err } return &encodeKafka{ kafkaParams: config, - kafkaWriter: &kafkaWriter, + kafkaWriter: kafkaWriter, recordsWritten: opMetrics.CreateRecordsWrittenCounter(params.Name), }, nil } diff --git a/pkg/pipeline/ingest/ingest_kafka.go b/pkg/pipeline/ingest/ingest_kafka.go index c5dac331a..323960bc4 100644 --- a/pkg/pipeline/ingest/ingest_kafka.go +++ b/pkg/pipeline/ingest/ingest_kafka.go @@ -18,11 +18,11 @@ package ingest import ( - "errors" "time" "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/kafka" "github.com/netobserv/flowlogs-pipeline/pkg/operational" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/decode" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" @@ -40,20 +40,14 @@ type kafkaReadMessage interface { } type ingestKafka struct { - kafkaReader kafkaReadMessage - decoder decode.Decoder - in chan []byte - exitChan <-chan struct{} - batchReadTimeout int64 - batchMaxLength int - metrics *metrics - canLogMessages bool + kafkaReader kafkaReadMessage + decoder decode.Decoder + in chan []byte + exitChan <-chan struct{} + metrics *metrics + canLogMessages bool } -const defaultBatchReadTimeout = int64(1000) -const defaultKafkaBatchMaxLength = 500 -const defaultKafkaCommitInterval = 500 - const kafkaStatsPeriod = 15 * time.Second // Ingest ingests entries from kafka topic @@ -175,125 +169,34 @@ func (k *ingestKafka) reportStats() { // NewIngestKafka create a new ingester // nolint:cyclop func NewIngestKafka(opMetrics *operational.Metrics, params config.StageParam) (Ingester, error) { - klog.Debugf("entering NewIngestKafka") - jsonIngestKafka := api.IngestKafka{} + config := &api.IngestKafka{} var ingestType string if params.Ingest != nil { ingestType = params.Ingest.Type if params.Ingest.Kafka != nil { - jsonIngestKafka = *params.Ingest.Kafka - } - } - - // connect to the kafka server - startOffsetString := jsonIngestKafka.StartOffset - var startOffset int64 - switch startOffsetString { - case "FirstOffset", "": - startOffset = kafkago.FirstOffset - case "LastOffset": - startOffset = kafkago.LastOffset - default: - startOffset = kafkago.FirstOffset - klog.Errorf("illegal value for StartOffset: %s\n", startOffsetString) - } - klog.Debugf("startOffset = %v", startOffset) - groupBalancers := make([]kafkago.GroupBalancer, 0) - for _, gb := range jsonIngestKafka.GroupBalancers { - switch gb { - case "range": - groupBalancers = append(groupBalancers, &kafkago.RangeGroupBalancer{}) - case "roundRobin": - groupBalancers = append(groupBalancers, &kafkago.RoundRobinGroupBalancer{}) - case "rackAffinity": - groupBalancers = append(groupBalancers, &kafkago.RackAffinityGroupBalancer{}) - default: - klog.Warningf("groupbalancers parameter missing") - groupBalancers = append(groupBalancers, &kafkago.RoundRobinGroupBalancer{}) - } - } - - batchReadTimeout := defaultBatchReadTimeout - if jsonIngestKafka.BatchReadTimeout != 0 { - batchReadTimeout = jsonIngestKafka.BatchReadTimeout - } - klog.Infof("batchReadTimeout = %d", batchReadTimeout) - - commitInterval := int64(defaultKafkaCommitInterval) - if jsonIngestKafka.CommitInterval != 0 { - commitInterval = jsonIngestKafka.CommitInterval - } - klog.Infof("commitInterval = %d", jsonIngestKafka.CommitInterval) - - dialer := &kafkago.Dialer{ - Timeout: kafkago.DefaultDialer.Timeout, - DualStack: kafkago.DefaultDialer.DualStack, - } - if jsonIngestKafka.TLS != nil { - klog.Infof("Using TLS configuration: %v", jsonIngestKafka.TLS) - tlsConfig, err := jsonIngestKafka.TLS.Build() - if err != nil { - return nil, err + config = params.Ingest.Kafka } - dialer.TLS = tlsConfig } - if jsonIngestKafka.SASL != nil { - m, err := utils.SetupSASLMechanism(jsonIngestKafka.SASL) - if err != nil { - return nil, err - } - dialer.SASLMechanism = m - } - - readerConfig := kafkago.ReaderConfig{ - Brokers: jsonIngestKafka.Brokers, - Topic: jsonIngestKafka.Topic, - GroupID: jsonIngestKafka.GroupID, - GroupBalancers: groupBalancers, - StartOffset: startOffset, - CommitInterval: time.Duration(commitInterval) * time.Millisecond, - Dialer: dialer, - } - - if jsonIngestKafka.PullQueueCapacity > 0 { - readerConfig.QueueCapacity = jsonIngestKafka.PullQueueCapacity - } - - if jsonIngestKafka.PullMaxBytes > 0 { - readerConfig.MaxBytes = jsonIngestKafka.PullMaxBytes - } - - klog.Debugf("reader config: %#v", readerConfig) - - kafkaReader := kafkago.NewReader(readerConfig) - if kafkaReader == nil { - errMsg := "NewIngestKafka: failed to create kafka-go reader" - klog.Errorf("%s", errMsg) - return nil, errors.New(errMsg) - } - - decoder, err := decode.GetDecoder(jsonIngestKafka.Decoder) + kafkaReader, bml, err := kafka.NewReader(config) if err != nil { return nil, err } - bml := defaultKafkaBatchMaxLength - if jsonIngestKafka.BatchMaxLen != 0 { - bml = jsonIngestKafka.BatchMaxLen + decoder, err := decode.GetDecoder(config.Decoder) + if err != nil { + return nil, err } in := make(chan []byte, 2*bml) metrics := newMetrics(opMetrics, params.Name, ingestType, func() int { return len(in) }) return &ingestKafka{ - kafkaReader: kafkaReader, - decoder: decoder, - exitChan: utils.ExitChannel(), - in: in, - batchMaxLength: bml, - batchReadTimeout: batchReadTimeout, - metrics: metrics, - canLogMessages: jsonIngestKafka.Decoder.Type == api.DecoderJSON, + kafkaReader: kafkaReader, + decoder: decoder, + exitChan: utils.ExitChannel(), + in: in, + metrics: metrics, + canLogMessages: config.Decoder.Type == api.DecoderJSON, }, nil } diff --git a/pkg/pipeline/ingest/ingest_kafka_test.go b/pkg/pipeline/ingest/ingest_kafka_test.go index 6af86d88e..bf08498e7 100644 --- a/pkg/pipeline/ingest/ingest_kafka_test.go +++ b/pkg/pipeline/ingest/ingest_kafka_test.go @@ -89,8 +89,6 @@ func Test_NewIngestKafka1(t *testing.T) { require.Equal(t, expectedBrokers, ingestKafka.kafkaReader.Config().Brokers) require.Equal(t, int64(-2), ingestKafka.kafkaReader.Config().StartOffset) require.Equal(t, 2, len(ingestKafka.kafkaReader.Config().GroupBalancers)) - require.Equal(t, int64(300), ingestKafka.batchReadTimeout) - require.Equal(t, int(500), ingestKafka.batchMaxLength) require.Equal(t, time.Duration(500)*time.Millisecond, ingestKafka.kafkaReader.Config().CommitInterval) } @@ -103,8 +101,6 @@ func Test_NewIngestKafka2(t *testing.T) { require.Equal(t, expectedBrokers, ingestKafka.kafkaReader.Config().Brokers) require.Equal(t, int64(-1), ingestKafka.kafkaReader.Config().StartOffset) require.Equal(t, 1, len(ingestKafka.kafkaReader.Config().GroupBalancers)) - require.Equal(t, defaultBatchReadTimeout, ingestKafka.batchReadTimeout) - require.Equal(t, int(1000), ingestKafka.batchMaxLength) require.Equal(t, time.Duration(1000)*time.Millisecond, ingestKafka.kafkaReader.Config().CommitInterval) } diff --git a/pkg/pipeline/transform/kubernetes/datasource/datasource.go b/pkg/pipeline/transform/kubernetes/datasource/datasource.go new file mode 100644 index 000000000..b83a2dbcf --- /dev/null +++ b/pkg/pipeline/transform/kubernetes/datasource/datasource.go @@ -0,0 +1,127 @@ +package datasource + +import ( + "context" + "sync" + + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/kafka" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/informers" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/model" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" + "github.com/sirupsen/logrus" +) + +var log = logrus.WithField("component", "transform.Network.Kubernetes") + +type Datasource struct { + Informers informers.InformersInterface + // We use map+mutex rather than sync.Map for better performance on writes, since the lock is acquired once to perform several writes. + kafkaIPCacheMut sync.RWMutex + kafkaIPCache map[string]model.ResourceMetaData + kafkaNodeNameCacheMut sync.RWMutex + kafkaNodeNameCache map[string]model.ResourceMetaData +} + +func NewInformerDatasource(kubeConfigPath string, kafkaConfig *api.EncodeKafka) (*Datasource, error) { + inf := &informers.Informers{} + if err := inf.InitFromConfig(kubeConfigPath, kafkaConfig); err != nil { + return nil, err + } + return &Datasource{Informers: inf}, nil +} + +func NewKafkaCacheDatasource(kafkaConfig *api.IngestKafka) (*Datasource, error) { + // Init Kafka reader + log.Debug("Initializing Kafka reader datasource") + kafkaReader, _, err := kafka.NewReader(kafkaConfig) + if err != nil { + return nil, err + } + + d := Datasource{ + kafkaIPCache: make(map[string]model.ResourceMetaData), + kafkaNodeNameCache: make(map[string]model.ResourceMetaData), + } + exitChan := utils.ExitChannel() + go func() { + for { + select { + case <-exitChan: + log.Info("gracefully exiting") + return + default: + } + // Blocking + msg, err := kafkaReader.ReadMessage(context.Background()) + if err != nil { + log.Errorln(err) + continue + } + if len(msg.Value) > 0 { + content, err := model.MessageFromBytes(msg.Value) + if err != nil { + log.Errorln(err) + continue + } + log.Debugf("Kafka reader: got message %v", content) + d.updateCache(content) + } else { + log.Debug("Kafka reader: empty message") + } + } + }() + + return &d, nil +} + +func (d *Datasource) updateCache(msg *model.KafkaCacheMessage) { + switch msg.Operation { + case model.OperationAdd, model.OperationUpdate: + d.kafkaIPCacheMut.Lock() + for _, ip := range msg.Resource.IPs { + d.kafkaIPCache[ip] = *msg.Resource + } + d.kafkaIPCacheMut.Unlock() + if msg.Resource.Kind == model.KindNode { + d.kafkaNodeNameCacheMut.Lock() + d.kafkaNodeNameCache[msg.Resource.Name] = *msg.Resource + d.kafkaNodeNameCacheMut.Unlock() + } + case model.OperationDelete: + d.kafkaIPCacheMut.Lock() + for _, ip := range msg.Resource.IPs { + delete(d.kafkaIPCache, ip) + } + d.kafkaIPCacheMut.Unlock() + if msg.Resource.Kind == model.KindNode { + d.kafkaNodeNameCacheMut.Lock() + delete(d.kafkaNodeNameCache, msg.Resource.Name) + d.kafkaNodeNameCacheMut.Unlock() + } + } +} + +func (d *Datasource) GetByIP(ip string) *model.ResourceMetaData { + if d.Informers != nil { + return d.Informers.GetByIP(ip) + } + d.kafkaIPCacheMut.RLock() + defer d.kafkaIPCacheMut.RUnlock() + if obj, ok := d.kafkaIPCache[ip]; ok { + return &obj + } + return nil +} + +func (d *Datasource) GetNodeByName(name string) (*model.ResourceMetaData, error) { + if d.Informers != nil { + return d.Informers.GetNodeByName(name) + } + d.kafkaNodeNameCacheMut.RLock() + defer d.kafkaNodeNameCacheMut.RUnlock() + if obj, ok := d.kafkaNodeNameCache[name]; ok { + return &obj, nil + } + return nil, nil +} diff --git a/pkg/pipeline/transform/kubernetes/datasource/datasource_test.go b/pkg/pipeline/transform/kubernetes/datasource/datasource_test.go new file mode 100644 index 000000000..62d1d5836 --- /dev/null +++ b/pkg/pipeline/transform/kubernetes/datasource/datasource_test.go @@ -0,0 +1,123 @@ +package datasource + +import ( + "testing" + + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/model" + "github.com/stretchr/testify/require" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var ( + node1 = model.ResourceMetaData{ + ObjectMeta: v1.ObjectMeta{ + Name: "node-1", + }, + Kind: model.KindNode, + IPs: []string{"1.2.3.4", "5.6.7.8"}, + } + pod1 = model.ResourceMetaData{ + ObjectMeta: v1.ObjectMeta{ + Name: "pod-1", + Namespace: "ns-1", + }, + Kind: model.KindPod, + IPs: []string{"10.0.0.1", "10.0.0.2"}, + } + svc1 = model.ResourceMetaData{ + ObjectMeta: v1.ObjectMeta{ + Name: "svc-1", + Namespace: "ns-1", + }, + Kind: model.KindService, + IPs: []string{"192.168.0.1"}, + } +) + +func TestCacheUpdate(t *testing.T) { + ds := Datasource{ + kafkaIPCache: make(map[string]model.ResourceMetaData), + kafkaNodeNameCache: make(map[string]model.ResourceMetaData), + } + var err error + + res := ds.GetByIP("1.2.3.4") + require.Nil(t, res) + res, err = ds.GetNodeByName("node-1") + require.NoError(t, err) + require.Nil(t, res) + + ds.updateCache(&model.KafkaCacheMessage{ + Operation: model.OperationAdd, + Resource: &node1, + }) + + ds.updateCache(&model.KafkaCacheMessage{ + Operation: model.OperationAdd, + Resource: &pod1, + }) + + ds.updateCache(&model.KafkaCacheMessage{ + Operation: model.OperationAdd, + Resource: &svc1, + }) + + res = ds.GetByIP("1.2.3.4") + require.Equal(t, node1, *res) + res, err = ds.GetNodeByName("node-1") + require.NoError(t, err) + require.Equal(t, node1, *res) + + res = ds.GetByIP("5.6.7.8") + require.Equal(t, node1, *res) + + res = ds.GetByIP("10.0.0.1") + require.Equal(t, pod1, *res) + + res = ds.GetByIP("192.168.0.1") + require.Equal(t, svc1, *res) + + svc2 := svc1 + svc2.Labels = map[string]string{"label": "value"} + ds.updateCache(&model.KafkaCacheMessage{ + Operation: model.OperationUpdate, + Resource: &svc2, + }) + ds.updateCache(&model.KafkaCacheMessage{ + Operation: model.OperationDelete, + Resource: &node1, + }) + + res = ds.GetByIP("192.168.0.1") + require.Equal(t, map[string]string{"label": "value"}, res.Labels) + + res = ds.GetByIP("1.2.3.4") + require.Nil(t, res) + res, err = ds.GetNodeByName("node-1") + require.NoError(t, err) + require.Nil(t, res) +} + +func BenchmarkPromEncode(b *testing.B) { + ds := Datasource{ + kafkaIPCache: make(map[string]model.ResourceMetaData), + kafkaNodeNameCache: make(map[string]model.ResourceMetaData), + } + + for i := 0; i < b.N; i++ { + ds.updateCache(&model.KafkaCacheMessage{ + Operation: model.OperationAdd, + Resource: &node1, + }) + + ds.updateCache(&model.KafkaCacheMessage{ + Operation: model.OperationAdd, + Resource: &pod1, + }) + + ds.updateCache(&model.KafkaCacheMessage{ + Operation: model.OperationAdd, + Resource: &svc1, + }) + } +} diff --git a/pkg/pipeline/transform/kubernetes/enrich.go b/pkg/pipeline/transform/kubernetes/enrich.go index 30bc93e35..ebae15797 100644 --- a/pkg/pipeline/transform/kubernetes/enrich.go +++ b/pkg/pipeline/transform/kubernetes/enrich.go @@ -6,25 +6,45 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" - inf "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/informers" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/datasource" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/informers" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/model" "github.com/sirupsen/logrus" ) -var informers inf.InformersInterface = &inf.Informers{} +var ds *datasource.Datasource -// For testing func MockInformers() { - informers = inf.NewInformersMock() + ds = &datasource.Datasource{Informers: informers.NewInformersMock()} } -func InitFromConfig(kubeConfigPath string) error { - return informers.InitFromConfig(kubeConfigPath) +func InitInformerDatasource(kubeConfigPath string, kafkaConfig *api.EncodeKafka) error { + var err error + if ds == nil { + ds, err = datasource.NewInformerDatasource(kubeConfigPath, kafkaConfig) + } + return err +} + +func InitKafkaCacheDatasource(kafkaConfig *api.IngestKafka) error { + var err error + ds, err = datasource.NewKafkaCacheDatasource(kafkaConfig) + return err } func Enrich(outputEntry config.GenericMap, rule api.K8sRule) { - kubeInfo, err := informers.GetInfo(fmt.Sprintf("%s", outputEntry[rule.Input])) - if err != nil { - logrus.WithError(err).Tracef("can't find kubernetes info for IP %v", outputEntry[rule.Input]) + ip, ok := outputEntry[rule.Input] + if !ok { + return + } + strIP, ok := ip.(string) + if !ok { + logrus.Debugf("IP %v not a string", outputEntry[rule.Input]) + return + } + kubeInfo := ds.GetByIP(strIP) + if kubeInfo == nil { + logrus.Tracef("can't find kubernetes info for IP %v", outputEntry[rule.Input]) return } if rule.Assignee != "otel" { @@ -34,9 +54,9 @@ func Enrich(outputEntry config.GenericMap, rule api.K8sRule) { outputEntry[rule.Output+"_Namespace"] = kubeInfo.Namespace } outputEntry[rule.Output+"_Name"] = kubeInfo.Name - outputEntry[rule.Output+"_Type"] = kubeInfo.Type - outputEntry[rule.Output+"_OwnerName"] = kubeInfo.Owner.Name - outputEntry[rule.Output+"_OwnerType"] = kubeInfo.Owner.Type + outputEntry[rule.Output+"_Type"] = kubeInfo.Kind + outputEntry[rule.Output+"_OwnerName"] = kubeInfo.OwnerName + outputEntry[rule.Output+"_OwnerType"] = kubeInfo.OwnerKind if rule.LabelsPrefix != "" { for labelKey, labelValue := range kubeInfo.Labels { outputEntry[rule.LabelsPrefix+"_"+labelKey] = labelValue @@ -56,21 +76,18 @@ func Enrich(outputEntry config.GenericMap, rule api.K8sRule) { if kubeInfo.Namespace != "" { outputEntry[rule.Output+"k8s.namespace.name"] = kubeInfo.Namespace } - switch kubeInfo.Type { - case inf.TypeNode: + switch kubeInfo.Kind { + case model.KindNode: outputEntry[rule.Output+"k8s.node.name"] = kubeInfo.Name - outputEntry[rule.Output+"k8s.node.uid"] = kubeInfo.UID - case inf.TypePod: + case model.KindPod: outputEntry[rule.Output+"k8s.pod.name"] = kubeInfo.Name - outputEntry[rule.Output+"k8s.pod.uid"] = kubeInfo.UID - case inf.TypeService: + case model.KindService: outputEntry[rule.Output+"k8s.service.name"] = kubeInfo.Name - outputEntry[rule.Output+"k8s.service.uid"] = kubeInfo.UID } outputEntry[rule.Output+"k8s.name"] = kubeInfo.Name - outputEntry[rule.Output+"k8s.type"] = kubeInfo.Type - outputEntry[rule.Output+"k8s.owner.name"] = kubeInfo.Owner.Name - outputEntry[rule.Output+"k8s.owner.type"] = kubeInfo.Owner.Type + outputEntry[rule.Output+"k8s.type"] = kubeInfo.Kind + outputEntry[rule.Output+"k8s.owner.name"] = kubeInfo.OwnerName + outputEntry[rule.Output+"k8s.owner.type"] = kubeInfo.OwnerKind if rule.LabelsPrefix != "" { for labelKey, labelValue := range kubeInfo.Labels { outputEntry[rule.LabelsPrefix+"."+labelKey] = labelValue @@ -88,20 +105,20 @@ func Enrich(outputEntry config.GenericMap, rule api.K8sRule) { const nodeZoneLabelName = "topology.kubernetes.io/zone" -func fillInK8sZone(outputEntry config.GenericMap, rule api.K8sRule, kubeInfo *inf.Info, zonePrefix string) { +func fillInK8sZone(outputEntry config.GenericMap, rule api.K8sRule, kubeInfo *model.ResourceMetaData, zonePrefix string) { if !rule.AddZone { //Nothing to do return } - switch kubeInfo.Type { - case inf.TypeNode: + switch kubeInfo.Kind { + case model.KindNode: zone, ok := kubeInfo.Labels[nodeZoneLabelName] if ok { outputEntry[rule.Output+zonePrefix] = zone } return - case inf.TypePod: - nodeInfo, err := informers.GetNodeInfo(kubeInfo.HostName) + case model.KindPod: + nodeInfo, err := ds.GetNodeByName(kubeInfo.HostName) if err != nil { logrus.WithError(err).Tracef("can't find nodes info for node %v", kubeInfo.HostName) return @@ -114,7 +131,7 @@ func fillInK8sZone(outputEntry config.GenericMap, rule api.K8sRule, kubeInfo *in } return - case inf.TypeService: + case model.KindService: //A service is not assigned to a dedicated zone, skipping return } @@ -131,9 +148,9 @@ func EnrichLayer(outputEntry config.GenericMap, rule *api.K8sInfraRule) { } func objectIsApp(addr string, rule *api.K8sInfraRule) bool { - obj, err := informers.GetInfo(addr) - if err != nil { - logrus.WithError(err).Tracef("can't find kubernetes info for IP %s", addr) + obj := ds.GetByIP(addr) + if obj == nil { + logrus.Tracef("can't find kubernetes info for IP %s", addr) return false } if len(obj.Namespace) == 0 { diff --git a/pkg/pipeline/transform/kubernetes/enrich_test.go b/pkg/pipeline/transform/kubernetes/enrich_test.go index e1339609e..9a5839706 100644 --- a/pkg/pipeline/transform/kubernetes/enrich_test.go +++ b/pkg/pipeline/transform/kubernetes/enrich_test.go @@ -5,20 +5,21 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/datasource" inf "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/informers" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/model" "github.com/stretchr/testify/assert" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" ) -var info = map[string]*inf.Info{ +var info = map[string]*model.ResourceMetaData{ "1.2.3.4": nil, "10.0.0.1": { ObjectMeta: v1.ObjectMeta{ Name: "pod-1", Namespace: "ns-1", }, - Type: "Pod", + Kind: "Pod", HostName: "host-1", HostIP: "100.0.0.1", }, @@ -27,7 +28,7 @@ var info = map[string]*inf.Info{ Name: "pod-2", Namespace: "ns-2", }, - Type: "Pod", + Kind: "Pod", HostName: "host-2", HostIP: "100.0.0.2", }, @@ -36,11 +37,11 @@ var info = map[string]*inf.Info{ Name: "service-1", Namespace: "ns-1", }, - Type: "Service", + Kind: "Service", }, } -var nodes = map[string]*inf.Info{ +var nodes = map[string]*model.ResourceMetaData{ "host-1": { ObjectMeta: v1.ObjectMeta{ Name: "host-1", @@ -48,7 +49,7 @@ var nodes = map[string]*inf.Info{ nodeZoneLabelName: "us-east-1a", }, }, - Type: "Node", + Kind: "Node", }, "host-2": { ObjectMeta: v1.ObjectMeta{ @@ -57,7 +58,7 @@ var nodes = map[string]*inf.Info{ nodeZoneLabelName: "us-east-1b", }, }, - Type: "Node", + Kind: "Node", }, } @@ -81,7 +82,9 @@ var rules = api.NetworkTransformRules{ } func TestEnrich(t *testing.T) { - informers = inf.SetupStubs(info, nodes) + ds = &datasource.Datasource{ + Informers: inf.SetupStubs(info, nodes), + } // Pod to unknown entry := config.GenericMap{ @@ -182,7 +185,9 @@ var otelRules = api.NetworkTransformRules{ } func TestEnrich_Otel(t *testing.T) { - informers = inf.SetupStubs(info, nodes) + ds = &datasource.Datasource{ + Informers: inf.SetupStubs(info, nodes), + } // Pod to unknown entry := config.GenericMap{ @@ -200,7 +205,6 @@ func TestEnrich_Otel(t *testing.T) { "source.k8s.name": "pod-1", "source.k8s.namespace.name": "ns-1", "source.k8s.pod.name": "pod-1", - "source.k8s.pod.uid": types.UID(""), "source.k8s.owner.name": "", "source.k8s.owner.type": "", "source.k8s.type": "Pod", @@ -222,7 +226,6 @@ func TestEnrich_Otel(t *testing.T) { "destination.k8s.name": "pod-2", "destination.k8s.namespace.name": "ns-2", "destination.k8s.pod.name": "pod-2", - "destination.k8s.pod.uid": types.UID(""), "destination.k8s.owner.name": "", "destination.k8s.owner.type": "", "destination.k8s.type": "Pod", @@ -233,7 +236,6 @@ func TestEnrich_Otel(t *testing.T) { "source.k8s.name": "pod-1", "source.k8s.namespace.name": "ns-1", "source.k8s.pod.name": "pod-1", - "source.k8s.pod.uid": types.UID(""), "source.k8s.owner.name": "", "source.k8s.owner.type": "", "source.k8s.type": "Pod", @@ -253,7 +255,6 @@ func TestEnrich_Otel(t *testing.T) { "destination.k8s.name": "service-1", "destination.k8s.namespace.name": "ns-1", "destination.k8s.service.name": "service-1", - "destination.k8s.service.uid": types.UID(""), "destination.k8s.owner.name": "", "destination.k8s.owner.type": "", "destination.k8s.type": "Service", @@ -263,7 +264,6 @@ func TestEnrich_Otel(t *testing.T) { "source.k8s.name": "pod-2", "source.k8s.namespace.name": "ns-2", "source.k8s.pod.name": "pod-2", - "source.k8s.pod.uid": types.UID(""), "source.k8s.owner.name": "", "source.k8s.owner.type": "", "source.k8s.type": "Pod", @@ -272,7 +272,9 @@ func TestEnrich_Otel(t *testing.T) { } func TestEnrich_EmptyNamespace(t *testing.T) { - informers = inf.SetupStubs(info, nodes) + ds = &datasource.Datasource{ + Informers: inf.SetupStubs(info, nodes), + } // We need to check that, whether it returns NotFound or just an empty namespace, // there is no map entry for that namespace (an empty-valued map entry is not valid) @@ -289,14 +291,14 @@ func TestEnrich_EmptyNamespace(t *testing.T) { assert.NotContains(t, entry, "DstK8s_Namespace") } -var infoLayers = map[string]*inf.Info{ +var infoLayers = map[string]*model.ResourceMetaData{ "1.2.3.4": nil, "10.0.0.1": { ObjectMeta: v1.ObjectMeta{ Name: "prometheus", Namespace: "openshift-monitoring", }, - Type: "Pod", + Kind: "Pod", HostName: "host-1", HostIP: "100.0.0.1", }, @@ -305,7 +307,7 @@ var infoLayers = map[string]*inf.Info{ Name: "pod-2", Namespace: "ns-2", }, - Type: "Pod", + Kind: "Pod", HostName: "host-2", HostIP: "100.0.0.2", }, @@ -314,7 +316,7 @@ var infoLayers = map[string]*inf.Info{ Name: "flowlogs-pipeline-1", Namespace: "netobserv", }, - Type: "Pod", + Kind: "Pod", HostName: "host-2", HostIP: "100.0.0.2", }, @@ -323,25 +325,27 @@ var infoLayers = map[string]*inf.Info{ Name: "kubernetes", Namespace: "default", }, - Type: "Service", + Kind: "Service", }, "20.0.0.2": { ObjectMeta: v1.ObjectMeta{ Name: "my-service", Namespace: "my-ns", }, - Type: "Service", + Kind: "Service", }, "30.0.0.1": { ObjectMeta: v1.ObjectMeta{ Name: "node-x", }, - Type: "Node", + Kind: "Node", }, } func TestEnrichLayer(t *testing.T) { - informers = inf.SetupStubs(infoLayers, nodes) + ds = &datasource.Datasource{ + Informers: inf.SetupStubs(infoLayers, nodes), + } rule := api.NetworkTransformRule{ KubernetesInfra: &api.K8sInfraRule{ diff --git a/pkg/pipeline/transform/kubernetes/informers/informers-mock.go b/pkg/pipeline/transform/kubernetes/informers/informers-mock.go index 1d41ad03d..8959fdff3 100644 --- a/pkg/pipeline/transform/kubernetes/informers/informers-mock.go +++ b/pkg/pipeline/transform/kubernetes/informers/informers-mock.go @@ -3,6 +3,8 @@ package informers import ( "errors" + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/model" "github.com/stretchr/testify/mock" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/cache" @@ -15,18 +17,19 @@ type Mock struct { func NewInformersMock() *Mock { inf := new(Mock) - inf.On("InitFromConfig", mock.Anything).Return(nil) + inf.On("InitFromConfig", mock.Anything, mock.Anything).Return(nil) return inf } -func (o *Mock) InitFromConfig(kubeConfigPath string) error { - args := o.Called(kubeConfigPath) +func (o *Mock) InitFromConfig(kubeConfigPath string, kafkaConfig *api.EncodeKafka) error { + args := o.Called(kubeConfigPath, kafkaConfig) return args.Error(0) } type IndexerMock struct { mock.Mock cache.Indexer + parentChecker func(*model.ResourceMetaData) } type InformerMock struct { @@ -55,45 +58,50 @@ func (m *InformerMock) GetIndexer() cache.Indexer { return args.Get(0).(cache.Indexer) } -func (m *IndexerMock) MockPod(ip, name, namespace, nodeIP string, owner *Owner) { - var ownerRef []metav1.OwnerReference - if owner != nil { - ownerRef = []metav1.OwnerReference{{ - Kind: owner.Type, - Name: owner.Name, - }} - } - m.On("ByIndex", IndexIP, ip).Return([]interface{}{&Info{ - Type: "Pod", +func (m *IndexerMock) MockPod(ip, name, namespace, nodeIP, ownerName, ownerKind string) { + res := model.ResourceMetaData{ ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: namespace, - OwnerReferences: ownerRef, + Name: name, + Namespace: namespace, }, - HostIP: nodeIP, - }}, nil) + Kind: "Pod", + OwnerName: ownerName, + OwnerKind: ownerKind, + HostIP: nodeIP, + } + m.parentChecker(&res) + m.On("ByIndex", IndexIP, ip).Return([]interface{}{&res}, nil) } func (m *IndexerMock) MockNode(ip, name string) { - m.On("ByIndex", IndexIP, ip).Return([]interface{}{&Info{ - Type: "Node", - ObjectMeta: metav1.ObjectMeta{Name: name}, + m.On("ByIndex", IndexIP, ip).Return([]interface{}{&model.ResourceMetaData{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Kind: "Node", + OwnerKind: "Node", + OwnerName: name, }}, nil) } func (m *IndexerMock) MockService(ip, name, namespace string) { - m.On("ByIndex", IndexIP, ip).Return([]interface{}{&Info{ - Type: "Service", - ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace}, + m.On("ByIndex", IndexIP, ip).Return([]interface{}{&model.ResourceMetaData{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Kind: "Service", + OwnerKind: "Service", + OwnerName: name, }}, nil) } -func (m *IndexerMock) MockReplicaSet(name, namespace string, owner Owner) { +func (m *IndexerMock) MockReplicaSet(name, namespace, ownerName, ownerKind string) { m.On("GetByKey", namespace+"/"+name).Return(&metav1.ObjectMeta{ Name: name, OwnerReferences: []metav1.OwnerReference{{ - Kind: owner.Type, - Name: owner.Name, + Kind: ownerKind, + Name: ownerName, }}, }, true, nil) } @@ -104,7 +112,7 @@ func (m *IndexerMock) FallbackNotFound() { func SetupIndexerMocks(kd *Informers) (pods, nodes, svc, rs *IndexerMock) { // pods informer - pods = &IndexerMock{} + pods = &IndexerMock{parentChecker: kd.checkParent} pim := InformerMock{} pim.On("GetIndexer").Return(pods) kd.pods = &pim @@ -128,30 +136,27 @@ func SetupIndexerMocks(kd *Informers) (pods, nodes, svc, rs *IndexerMock) { type FakeInformers struct { InformersInterface - info map[string]*Info - nodes map[string]*Info + info map[string]*model.ResourceMetaData + nodes map[string]*model.ResourceMetaData } -func SetupStubs(info map[string]*Info, nodes map[string]*Info) *FakeInformers { +func SetupStubs(info map[string]*model.ResourceMetaData, nodes map[string]*model.ResourceMetaData) *FakeInformers { return &FakeInformers{ info: info, nodes: nodes, } } -func (f *FakeInformers) InitFromConfig(_ string) error { +func (f *FakeInformers) InitFromConfig(_ string, _ *api.EncodeKafka) error { return nil } -func (f *FakeInformers) GetInfo(n string) (*Info, error) { +func (f *FakeInformers) GetByIP(n string) *model.ResourceMetaData { i := f.info[n] - if i != nil { - return i, nil - } - return nil, errors.New("notFound") + return i } -func (f *FakeInformers) GetNodeInfo(n string) (*Info, error) { +func (f *FakeInformers) GetNodeByName(n string) (*model.ResourceMetaData, error) { i := f.nodes[n] if i != nil { return i, nil diff --git a/pkg/pipeline/transform/kubernetes/informers/informers.go b/pkg/pipeline/transform/kubernetes/informers/informers.go index 4d3fd80c9..1c98f99b8 100644 --- a/pkg/pipeline/transform/kubernetes/informers/informers.go +++ b/pkg/pipeline/transform/kubernetes/informers/informers.go @@ -22,10 +22,13 @@ import ( "net" "time" + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/kafka" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/cni" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/model" "github.com/netobserv/flowlogs-pipeline/pkg/utils" - "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -40,18 +43,15 @@ const ( kubeConfigEnvVariable = "KUBECONFIG" syncTime = 10 * time.Minute IndexIP = "byIP" - TypeNode = "Node" - TypePod = "Pod" - TypeService = "Service" ) var log = logrus.WithField("component", "transform.Network.Kubernetes") //nolint:revive type InformersInterface interface { - GetInfo(string) (*Info, error) - GetNodeInfo(string) (*Info, error) - InitFromConfig(string) error + GetByIP(string) *model.ResourceMetaData + GetNodeByName(string) (*model.ResourceMetaData, error) + InitFromConfig(string, *api.EncodeKafka) error } type Informers struct { @@ -66,46 +66,24 @@ type Informers struct { mdStopChan chan struct{} } -type Owner struct { - Type string - Name string -} - -// Info contains precollected metadata for Pods, Nodes and Services. -// Not all the fields are populated for all the above types. To save -// memory, we just keep in memory the necessary data for each Type. -// For more information about which fields are set for each type, please -// refer to the instantiation function of the respective informers. -type Info struct { - // Informers need that internal object is an ObjectMeta instance - metav1.ObjectMeta - Type string - Owner Owner - HostName string - HostIP string - ips []string -} - var commonIndexers = map[string]cache.IndexFunc{ IndexIP: func(obj interface{}) ([]string, error) { - return obj.(*Info).ips, nil + return obj.(*model.ResourceMetaData).IPs, nil }, } -func (k *Informers) GetInfo(ip string) (*Info, error) { +func (k *Informers) GetByIP(ip string) *model.ResourceMetaData { if info, ok := k.fetchInformers(ip); ok { // Owner data might be discovered after the owned, so we fetch it // at the last moment - if info.Owner.Name == "" { - info.Owner = k.getOwner(info) - } - return info, nil + k.checkParent(info) + return info } - return nil, fmt.Errorf("informers can't find IP %s", ip) + return nil } -func (k *Informers) fetchInformers(ip string) (*Info, bool) { +func (k *Informers) fetchInformers(ip string) (*model.ResourceMetaData, bool) { if info, ok := infoForIP(k.pods.GetIndexer(), ip); ok { // it might happen that the Host is discovered after the Pod if info.HostName == "" { @@ -122,7 +100,7 @@ func (k *Informers) fetchInformers(ip string) (*Info, bool) { return nil, false } -func infoForIP(idx cache.Indexer, ip string) (*Info, bool) { +func infoForIP(idx cache.Indexer, ip string) (*model.ResourceMetaData, bool) { objs, err := idx.ByIndex(IndexIP, ip) if err != nil { log.WithError(err).WithField("ip", ip).Debug("error accessing index. Ignoring") @@ -131,48 +109,33 @@ func infoForIP(idx cache.Indexer, ip string) (*Info, bool) { if len(objs) == 0 { return nil, false } - return objs[0].(*Info), true + return objs[0].(*model.ResourceMetaData), true } -func (k *Informers) GetNodeInfo(name string) (*Info, error) { +func (k *Informers) GetNodeByName(name string) (*model.ResourceMetaData, error) { item, ok, err := k.nodes.GetIndexer().GetByKey(name) if err != nil { return nil, err } else if ok { - return item.(*Info), nil + return item.(*model.ResourceMetaData), nil } return nil, nil } -func (k *Informers) getOwner(info *Info) Owner { - if len(info.OwnerReferences) != 0 { - ownerReference := info.OwnerReferences[0] - if ownerReference.Kind != "ReplicaSet" { - return Owner{ - Name: ownerReference.Name, - Type: ownerReference.Kind, - } - } - - item, ok, err := k.replicaSets.GetIndexer().GetByKey(info.Namespace + "/" + ownerReference.Name) +func (k *Informers) checkParent(info *model.ResourceMetaData) { + if info.OwnerKind == "ReplicaSet" { + item, ok, err := k.replicaSets.GetIndexer().GetByKey(info.Namespace + "/" + info.OwnerName) if err != nil { - log.WithError(err).WithField("key", info.Namespace+"/"+ownerReference.Name). + log.WithError(err).WithField("key", info.Namespace+"/"+info.OwnerName). Debug("can't get ReplicaSet info from informer. Ignoring") } else if ok { rsInfo := item.(*metav1.ObjectMeta) if len(rsInfo.OwnerReferences) > 0 { - return Owner{ - Name: rsInfo.OwnerReferences[0].Name, - Type: rsInfo.OwnerReferences[0].Kind, - } + info.OwnerKind = rsInfo.OwnerReferences[0].Kind + info.OwnerName = rsInfo.OwnerReferences[0].Name } } } - // If no owner references found, return itself as owner - return Owner{ - Name: info.Name, - Type: info.Type, - } } func (k *Informers) getHostName(hostIP string) string { @@ -207,14 +170,16 @@ func (k *Informers) initNodeInformer(informerFactory inf.SharedInformerFactory) // CNI-dependent logic (must work regardless of whether the CNI is installed) ips = cni.AddOvnIPs(ips, node) - return &Info{ + return &model.ResourceMetaData{ ObjectMeta: metav1.ObjectMeta{ Name: node.Name, Namespace: "", Labels: node.Labels, }, - ips: ips, - Type: TypeNode, + Kind: model.KindNode, + OwnerName: node.Name, + OwnerKind: model.KindNode, + IPs: ips, // We duplicate HostIP and HostName information to simplify later filtering e.g. by // Host IP, where we want to get all the Pod flows by src/dst host, but also the actual // host-to-host flows by the same field. @@ -224,9 +189,6 @@ func (k *Informers) initNodeInformer(informerFactory inf.SharedInformerFactory) }); err != nil { return fmt.Errorf("can't set nodes transform: %w", err) } - if err := nodes.AddIndexers(commonIndexers); err != nil { - return fmt.Errorf("can't add %s indexer to Nodes informer: %w", IndexIP, err) - } k.nodes = nodes return nil } @@ -247,24 +209,29 @@ func (k *Informers) initPodInformer(informerFactory inf.SharedInformerFactory) e ips = append(ips, ip.IP) } } - return &Info{ + + obj := model.ResourceMetaData{ ObjectMeta: metav1.ObjectMeta{ - Name: pod.Name, - Namespace: pod.Namespace, - Labels: pod.Labels, - OwnerReferences: pod.OwnerReferences, + Name: pod.Name, + Namespace: pod.Namespace, + Labels: pod.Labels, }, - Type: TypePod, - HostIP: pod.Status.HostIP, - HostName: pod.Spec.NodeName, - ips: ips, - }, nil + Kind: model.KindPod, + OwnerName: pod.Name, + OwnerKind: model.KindPod, + HostIP: pod.Status.HostIP, + HostName: pod.Spec.NodeName, + IPs: ips, + } + if len(pod.OwnerReferences) > 0 { + obj.OwnerKind = pod.OwnerReferences[0].Kind + obj.OwnerName = pod.OwnerReferences[0].Name + } + k.checkParent(&obj) + return &obj, nil }); err != nil { return fmt.Errorf("can't set pods transform: %w", err) } - if err := pods.AddIndexers(commonIndexers); err != nil { - return fmt.Errorf("can't add %s indexer to Pods informer: %w", IndexIP, err) - } k.pods = pods return nil @@ -286,21 +253,20 @@ func (k *Informers) initServiceInformer(informerFactory inf.SharedInformerFactor ips = append(ips, ip) } } - return &Info{ + return &model.ResourceMetaData{ ObjectMeta: metav1.ObjectMeta{ Name: svc.Name, Namespace: svc.Namespace, Labels: svc.Labels, }, - Type: TypeService, - ips: ips, + Kind: model.KindService, + OwnerName: svc.Name, + OwnerKind: model.KindService, + IPs: ips, }, nil }); err != nil { return fmt.Errorf("can't set services transform: %w", err) } - if err := services.AddIndexers(commonIndexers); err != nil { - return fmt.Errorf("can't add %s indexer to Pods informer: %w", IndexIP, err) - } k.services = services return nil @@ -331,7 +297,7 @@ func (k *Informers) initReplicaSetInformer(informerFactory metadatainformer.Shar return nil } -func (k *Informers) InitFromConfig(kubeConfigPath string) error { +func (k *Informers) InitFromConfig(kubeConfigPath string, kafkaConfig *api.EncodeKafka) error { // Initialization variables k.stopChan = make(chan struct{}) k.mdStopChan = make(chan struct{}) @@ -351,7 +317,7 @@ func (k *Informers) InitFromConfig(kubeConfigPath string) error { return err } - err = k.initInformers(kubeClient, metaKubeClient) + err = k.initInformers(kubeClient, metaKubeClient, kafkaConfig) if err != nil { return err } @@ -359,7 +325,7 @@ func (k *Informers) InitFromConfig(kubeConfigPath string) error { return nil } -func (k *Informers) initInformers(client kubernetes.Interface, metaClient metadata.Interface) error { +func (k *Informers) initInformers(client kubernetes.Interface, metaClient metadata.Interface, kafkaConfig *api.EncodeKafka) error { informerFactory := inf.NewSharedInformerFactory(client, syncTime) metadataInformerFactory := metadatainformer.NewSharedInformerFactory(metaClient, syncTime) err := k.initNodeInformer(informerFactory) @@ -379,15 +345,45 @@ func (k *Informers) initInformers(client kubernetes.Interface, metaClient metada return err } - log.Debugf("starting kubernetes informers, waiting for synchronization") - informerFactory.Start(k.stopChan) - informerFactory.WaitForCacheSync(k.stopChan) - log.Debugf("kubernetes informers started") + if kafkaConfig != nil { + log.Debugf("adding event handlers for Kafka") + kafkaWriter, err := kafka.NewWriter(kafkaConfig) + if err != nil { + return err + } + // Informers will publish updates to Kafka + for _, inf := range []cache.SharedIndexInformer{k.nodes, k.pods, k.services} { + // Note that the update handler is called whenever the resource was updated, even if the update doesn't affect + // the transformed data (model.ResourceMetaData). We may want to further optimize this later. On the flip side, this helps + // to keep everything in sync in case of missed events. + _, err := inf.AddEventHandler(getKafkaEventHandlers(kafkaWriter)) + if err != nil { + return err + } + } + } else { + // Informers expose an indexer + log.Debugf("adding indexers") + if err := k.nodes.AddIndexers(commonIndexers); err != nil { + return fmt.Errorf("can't add %s indexer to Nodes informer: %w", IndexIP, err) + } + if err := k.pods.AddIndexers(commonIndexers); err != nil { + return fmt.Errorf("can't add %s indexer to Pods informer: %w", IndexIP, err) + } + if err := k.services.AddIndexers(commonIndexers); err != nil { + return fmt.Errorf("can't add %s indexer to Services informer: %w", IndexIP, err) + } + } log.Debugf("starting kubernetes metadata informers, waiting for synchronization") metadataInformerFactory.Start(k.mdStopChan) metadataInformerFactory.WaitForCacheSync(k.mdStopChan) log.Debugf("kubernetes metadata informers started") + + log.Debugf("starting kubernetes informers, waiting for synchronization") + informerFactory.Start(k.stopChan) + informerFactory.WaitForCacheSync(k.stopChan) + log.Debugf("kubernetes informers started") return nil } diff --git a/pkg/pipeline/transform/kubernetes/informers/informers_test.go b/pkg/pipeline/transform/kubernetes/informers/informers_test.go index 005897554..10cd5bf0d 100644 --- a/pkg/pipeline/transform/kubernetes/informers/informers_test.go +++ b/pkg/pipeline/transform/kubernetes/informers/informers_test.go @@ -20,84 +20,84 @@ package informers import ( "testing" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/model" "github.com/stretchr/testify/require" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func TestGetInfo(t *testing.T) { kubeData := Informers{} pidx, hidx, sidx, ridx := SetupIndexerMocks(&kubeData) - pidx.MockPod("1.2.3.4", "pod1", "podNamespace", "10.0.0.1", nil) - pidx.MockPod("1.2.3.5", "pod2", "podNamespace", "10.0.0.1", &Owner{Name: "rs1", Type: "ReplicaSet"}) - pidx.FallbackNotFound() - ridx.MockReplicaSet("rs1", "podNamespace", Owner{Name: "dep1", Type: "Deployment"}) + ridx.MockReplicaSet("rs1", "podNamespace", "dep1", "Deployment") ridx.FallbackNotFound() + pidx.MockPod("1.2.3.4", "pod1", "podNamespace", "10.0.0.1", "pod1", "Pod") + pidx.MockPod("1.2.3.5", "pod2", "podNamespace", "10.0.0.1", "rs1", "ReplicaSet") + pidx.FallbackNotFound() sidx.MockService("1.2.3.100", "svc1", "svcNamespace") sidx.FallbackNotFound() hidx.MockNode("10.0.0.1", "node1") hidx.FallbackNotFound() // Test get orphan pod - info, err := kubeData.GetInfo("1.2.3.4") - require.NoError(t, err) + info := kubeData.GetByIP("1.2.3.4") + require.NotNil(t, info) - require.Equal(t, *info, Info{ - Type: "Pod", - ObjectMeta: metav1.ObjectMeta{ + require.Equal(t, model.ResourceMetaData{ + ObjectMeta: v1.ObjectMeta{ Name: "pod1", Namespace: "podNamespace", }, - HostName: "node1", - HostIP: "10.0.0.1", - Owner: Owner{Name: "pod1", Type: "Pod"}, - }) + Kind: "Pod", + HostName: "node1", + HostIP: "10.0.0.1", + OwnerName: "pod1", + OwnerKind: "Pod", + }, *info) // Test get pod owned - info, err = kubeData.GetInfo("1.2.3.5") - require.NoError(t, err) + info = kubeData.GetByIP("1.2.3.5") + require.NotNil(t, info) - require.Equal(t, *info, Info{ - Type: "Pod", - ObjectMeta: metav1.ObjectMeta{ + require.Equal(t, model.ResourceMetaData{ + ObjectMeta: v1.ObjectMeta{ Name: "pod2", Namespace: "podNamespace", - OwnerReferences: []metav1.OwnerReference{{ - Kind: "ReplicaSet", - Name: "rs1", - }}, }, - HostName: "node1", - HostIP: "10.0.0.1", - Owner: Owner{Name: "dep1", Type: "Deployment"}, - }) + Kind: "Pod", + HostName: "node1", + HostIP: "10.0.0.1", + OwnerName: "dep1", + OwnerKind: "Deployment", + }, *info) // Test get node - info, err = kubeData.GetInfo("10.0.0.1") - require.NoError(t, err) + info = kubeData.GetByIP("10.0.0.1") + require.NotNil(t, info) - require.Equal(t, *info, Info{ - Type: "Node", - ObjectMeta: metav1.ObjectMeta{ + require.Equal(t, model.ResourceMetaData{ + ObjectMeta: v1.ObjectMeta{ Name: "node1", }, - Owner: Owner{Name: "node1", Type: "Node"}, - }) + Kind: "Node", + OwnerName: "node1", + OwnerKind: "Node", + }, *info) // Test get service - info, err = kubeData.GetInfo("1.2.3.100") - require.NoError(t, err) + info = kubeData.GetByIP("1.2.3.100") + require.NotNil(t, info) - require.Equal(t, *info, Info{ - Type: "Service", - ObjectMeta: metav1.ObjectMeta{ + require.Equal(t, model.ResourceMetaData{ + ObjectMeta: v1.ObjectMeta{ Name: "svc1", Namespace: "svcNamespace", }, - Owner: Owner{Name: "svc1", Type: "Service"}, - }) + Kind: "Service", + OwnerName: "svc1", + OwnerKind: "Service", + }, *info) // Test no match - info, err = kubeData.GetInfo("1.2.3.200") - require.NotNil(t, err) + info = kubeData.GetByIP("1.2.3.200") require.Nil(t, info) } diff --git a/pkg/pipeline/transform/kubernetes/informers/kafka_cache_writer.go b/pkg/pipeline/transform/kubernetes/informers/kafka_cache_writer.go new file mode 100644 index 000000000..6b5f38ba3 --- /dev/null +++ b/pkg/pipeline/transform/kubernetes/informers/kafka_cache_writer.go @@ -0,0 +1,52 @@ +package informers + +import ( + "context" + + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/model" + kafkago "github.com/segmentio/kafka-go" + "k8s.io/client-go/tools/cache" +) + +func getKafkaEventHandlers(kafka *kafkago.Writer) cache.ResourceEventHandler { + return &cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + if res, ok := obj.(*model.ResourceMetaData); ok { + publish(&model.KafkaCacheMessage{ + Operation: model.OperationAdd, + Resource: res, + }, kafka) + } + }, + UpdateFunc: func(_, new any) { + if res, ok := new.(*model.ResourceMetaData); ok { + publish(&model.KafkaCacheMessage{ + Operation: model.OperationUpdate, + Resource: res, + }, kafka) + } + }, + DeleteFunc: func(obj any) { + if res, ok := obj.(*model.ResourceMetaData); ok { + publish(&model.KafkaCacheMessage{ + Operation: model.OperationDelete, + Resource: res, + }, kafka) + } + }, + } +} + +func publish(content *model.KafkaCacheMessage, kafka *kafkago.Writer) { + log.Debugf("Publishing to Kafka: %v", content.Resource) + b, err := content.ToBytes() + if err != nil { + log.Errorf("kafka publish, encoding error: %v", err) + return + } + msg := kafkago.Message{Value: b} + err = kafka.WriteMessages(context.Background(), msg) + if err != nil { + log.Errorf("kafka publish, write error: %v", err) + } +} diff --git a/pkg/pipeline/transform/kubernetes/model/model.go b/pkg/pipeline/transform/kubernetes/model/model.go new file mode 100644 index 000000000..304e858ec --- /dev/null +++ b/pkg/pipeline/transform/kubernetes/model/model.go @@ -0,0 +1,62 @@ +package model + +import ( + "bytes" + "encoding/gob" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + KindNode = "Node" + KindPod = "Pod" + KindService = "Service" +) + +// ResourceMetaData contains precollected metadata for Pods, Nodes and Services. +// Not all the fields are populated for all the above types. To save +// memory, we just keep in memory the necessary data for each Type. +// For more information about which fields are set for each type, please +// refer to the instantiation function of the respective informers. +type ResourceMetaData struct { + // Informers need that internal object is an ObjectMeta instance + metav1.ObjectMeta + Kind string + OwnerName string + OwnerKind string + HostName string + HostIP string + IPs []string +} + +type Operation string + +const ( + OperationAdd Operation = "add" + OperationDelete Operation = "delete" + OperationUpdate Operation = "update" +) + +type KafkaCacheMessage struct { + Operation Operation + Resource *ResourceMetaData +} + +func (i *KafkaCacheMessage) ToBytes() ([]byte, error) { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + if err := enc.Encode(i); err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +func MessageFromBytes(b []byte) (*KafkaCacheMessage, error) { + var msg KafkaCacheMessage + buf := bytes.NewReader(b) + dec := gob.NewDecoder(buf) + if err := dec.Decode(&msg); err != nil { + return nil, err + } + return &msg, nil +} diff --git a/pkg/pipeline/transform/kubernetes/model/model_test.go b/pkg/pipeline/transform/kubernetes/model/model_test.go new file mode 100644 index 000000000..c5599921f --- /dev/null +++ b/pkg/pipeline/transform/kubernetes/model/model_test.go @@ -0,0 +1,32 @@ +package model + +import ( + "testing" + + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestEncodeDecode(t *testing.T) { + msg := KafkaCacheMessage{ + Operation: OperationAdd, + Resource: &ResourceMetaData{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Labels: map[string]string{ + "label-1": "value-1", + }, + }, + Kind: KindNode, + IPs: []string{"1.2.3.4", "4.5.6.7"}, + }, + } + + b, err := msg.ToBytes() + require.NoError(t, err) + + decoded, err := MessageFromBytes(b) + require.NoError(t, err) + + require.Equal(t, msg, *decoded) +} diff --git a/pkg/pipeline/transform/transform_network.go b/pkg/pipeline/transform/transform_network.go index dbff0a938..f6f0434d8 100644 --- a/pkg/pipeline/transform/transform_network.go +++ b/pkg/pipeline/transform/transform_network.go @@ -202,9 +202,16 @@ func NewTransformNetwork(params config.StageParam) (Transformer, error) { } if needToInitKubeData { - err := kubernetes.InitFromConfig(jsonNetworkTransform.KubeConfigPath) - if err != nil { - return nil, err + if jsonNetworkTransform.KafkaCacheConfig != nil { + // Get kube data from Kafka rather than informers + if err := kubernetes.InitKafkaCacheDatasource(jsonNetworkTransform.KafkaCacheConfig); err != nil { + return nil, err + } + } else { + // Init informers + if err := kubernetes.InitInformerDatasource(jsonNetworkTransform.KubeConfigPath, nil); err != nil { + return nil, err + } } } diff --git a/pkg/pipeline/utils/exit.go b/pkg/pipeline/utils/exit.go index 7bbfb7cc2..667026a9d 100644 --- a/pkg/pipeline/utils/exit.go +++ b/pkg/pipeline/utils/exit.go @@ -42,8 +42,7 @@ func CloseExitChannel() { close(exitChannel) } -func SetupElegantExit() { - logrus.Debugf("entering SetupElegantExit") +func SetupElegantExit() (stopCh <-chan struct{}) { // handle elegant exit; create support for channels of go routines that want to exit cleanly exitChannel = make(chan struct{}) exitSigChan := make(chan os.Signal, 1) @@ -54,7 +53,6 @@ func SetupElegantExit() { sig := <-exitSigChan logrus.Debugf("received exit signal = %v", sig) close(exitChannel) - logrus.Debugf("exiting SetupElegantExit go function") }() - logrus.Debugf("exiting SetupElegantExit") + return exitChannel }