Skip to content

Commit

Permalink
NETOBSERV-386 IP categorization (#359)
Browse files Browse the repository at this point in the history
* NETOBSERV-386 IP categorization

Allows to configure IP ranges and assign them any name
IPs falling in those ranges are flagged with that name

* Use and update TimedCache for IP categories

- Cache categorized IPs to avoid too much IP parsing & matching
- Update TimedCache to use time API rather than int64
- Use lambda-style cleanup rather than an interface
  • Loading branch information
jotak authored Jan 9, 2023
1 parent fa0662e commit 6a8b986
Show file tree
Hide file tree
Showing 11 changed files with 170 additions and 45 deletions.
20 changes: 14 additions & 6 deletions pkg/api/transform_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
package api

type TransformNetwork struct {
Rules NetworkTransformRules `yaml:"rules" json:"rules" doc:"list of transform rules, each includes:"`
KubeConfigPath string `yaml:"kubeConfigPath,omitempty" json:"kubeConfigPath,omitempty" doc:"path to kubeconfig file (optional)"`
ServicesFile string `yaml:"servicesFile,omitempty" json:"servicesFile,omitempty" doc:"path to services file (optional, default: /etc/services)"`
ProtocolsFile string `yaml:"protocolsFile,omitempty" json:"protocolsFile,omitempty" doc:"path to protocols file (optional, default: /etc/protocols)"`
DirectionInfo DirectionInfo `yaml:"directionInfo,omitempty" json:"directionInfo,omitempty" doc:"information to reinterpret flow direction (optional, to use with reinterpret_direction rule)"`
Rules NetworkTransformRules `yaml:"rules" json:"rules" doc:"list of transform rules, each includes:"`
KubeConfigPath string `yaml:"kubeConfigPath,omitempty" json:"kubeConfigPath,omitempty" doc:"path to kubeconfig file (optional)"`
ServicesFile string `yaml:"servicesFile,omitempty" json:"servicesFile,omitempty" doc:"path to services file (optional, default: /etc/services)"`
ProtocolsFile string `yaml:"protocolsFile,omitempty" json:"protocolsFile,omitempty" doc:"path to protocols file (optional, default: /etc/protocols)"`
IPCategories []NetworkTransformIPCategory `yaml:"ipCategories,omitempty" json:"ipCategories,omitempty" doc:"configure IP categories"`
DirectionInfo NetworkTransformDirectionInfo `yaml:"directionInfo,omitempty" json:"directionInfo,omitempty" doc:"information to reinterpret flow direction (optional, to use with reinterpret_direction rule)"`
}

func (tn *TransformNetwork) GetServiceFiles() (string, string) {
Expand All @@ -45,6 +46,7 @@ const (
OpAddService = "add_service"
OpAddKubernetes = "add_kubernetes"
OpReinterpretDirection = "reinterpret_direction"
OpAddIPCategory = "add_ip_category"
)

type TransformNetworkOperationEnum struct {
Expand All @@ -55,6 +57,7 @@ type TransformNetworkOperationEnum struct {
AddService string `yaml:"add_service" json:"add_service" doc:"add output network service field from input port and parameters protocol field"`
AddKubernetes string `yaml:"add_kubernetes" json:"add_kubernetes" doc:"add output kubernetes fields from input"`
ReinterpretDirection string `yaml:"reinterpret_direction" json:"reinterpret_direction" doc:"reinterpret flow direction at a higher level than the interface"`
AddIPCategory string `yaml:"add_ip_category" json:"add_ip_category" doc:"categorize IPs based on known subnets configuration"`
}

func TransformNetworkOperationName(operation string) string {
Expand All @@ -69,7 +72,7 @@ type NetworkTransformRule struct {
Assignee string `yaml:"assignee,omitempty" json:"assignee,omitempty" doc:"value needs to assign to output field"`
}

type DirectionInfo struct {
type NetworkTransformDirectionInfo struct {
ReporterIPField string `yaml:"reporterIPField,omitempty" json:"reporterIPField,omitempty" doc:"field providing the reporter (agent) host IP"`
SrcHostField string `yaml:"srcHostField,omitempty" json:"srcHostField,omitempty" doc:"source host field"`
DstHostField string `yaml:"dstHostField,omitempty" json:"dstHostField,omitempty" doc:"destination host field"`
Expand All @@ -78,3 +81,8 @@ type DirectionInfo struct {
}

type NetworkTransformRules []NetworkTransformRule

type NetworkTransformIPCategory struct {
CIDRs []string `yaml:"cidrs,omitempty" json:"cidrs,omitempty" doc:"list of CIDRs to match a category"`
Name string `yaml:"name,omitempty" json:"name,omitempty" doc:"name of the category"`
}
12 changes: 6 additions & 6 deletions pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
log "github.com/sirupsen/logrus"
)

const defaultExpiryTime = 120
const defaultExpiryTime = 2 * time.Minute

type gaugeInfo struct {
gauge *prometheus.GaugeVec
Expand All @@ -57,7 +57,7 @@ type EncodeProm struct {
counters []counterInfo
histos []histoInfo
aggHistos []histoInfo
expiryTime int64
expiryTime time.Duration
mCache *utils.TimedCache
exitChan <-chan struct{}
server *http.Server
Expand Down Expand Up @@ -232,14 +232,14 @@ func (e *EncodeProm) Cleanup(cleanupFunc interface{}) {
}

func (e *EncodeProm) cleanupExpiredEntriesLoop() {
ticker := time.NewTicker(time.Duration(e.expiryTime) * time.Second)
ticker := time.NewTicker(e.expiryTime)
for {
select {
case <-e.exitChan:
log.Debugf("exiting cleanupExpiredEntriesLoop because of signal")
return
case <-ticker.C:
e.mCache.CleanupExpiredEntries(e.expiryTime, e)
e.mCache.CleanupExpiredEntries(e.expiryTime, e.Cleanup)
}
}
}
Expand Down Expand Up @@ -274,11 +274,11 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
cfg = *params.Encode.Prom
}

expiryTime := int64(cfg.ExpiryTime)
expiryTime := time.Duration(cfg.ExpiryTime) * time.Second
if expiryTime == 0 {
expiryTime = defaultExpiryTime
}
log.Debugf("expiryTime = %d", expiryTime)
log.Debugf("expiryTime = %v", expiryTime)

counters := []counterInfo{}
gauges := []gaugeInfo{}
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/encode/encode_prom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func Test_NewEncodeProm(t *testing.T) {
require.Equal(t, 1, len(encodeProm.gauges))
require.Equal(t, 1, len(encodeProm.histos))
require.Equal(t, 1, len(encodeProm.aggHistos))
require.Equal(t, int64(1), encodeProm.expiryTime)
require.Equal(t, time.Second, encodeProm.expiryTime)
require.Equal(t, (*api.PromTLSConf)(nil), encodeProm.tlsConfig)

require.Equal(t, encodeProm.gauges[0].info.Name, "Bytes")
Expand All @@ -125,7 +125,7 @@ func Test_NewEncodeProm(t *testing.T) {

// wait a couple seconds so that the entry will expire
time.Sleep(2 * time.Second)
encodeProm.mCache.CleanupExpiredEntries(encodeProm.expiryTime, encodeProm)
encodeProm.mCache.CleanupExpiredEntries(encodeProm.expiryTime, encodeProm.Cleanup)
entriesMapLen = encodeProm.mCache.GetCacheLen()
require.Equal(t, 0, entriesMapLen)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/pipeline/extract/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
Expand All @@ -47,7 +48,7 @@ type Aggregate struct {
Definition api.AggregateDefinition
cache *utils.TimedCache
mutex *sync.Mutex
expiryTime int64
expiryTime time.Duration
}

type GroupState struct {
Expand Down
3 changes: 2 additions & 1 deletion pkg/pipeline/extract/aggregate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package aggregate
import (
"sync"
"testing"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
Expand All @@ -38,7 +39,7 @@ func GetMockAggregate() Aggregate {
},
cache: utils.NewTimedCache(),
mutex: &sync.Mutex{},
expiryTime: 30,
expiryTime: 30 * time.Second,
}
return aggregate
}
Expand Down
12 changes: 5 additions & 7 deletions pkg/pipeline/extract/aggregate/aggregates.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ import (
log "github.com/sirupsen/logrus"
)

var defaultExpiryTime = 60 * 10 // 10 minutes
var defaultExpiryTime = 10 * time.Minute

type Aggregates struct {
Aggregates []Aggregate
expiryTime int64
expiryTime time.Duration
}

type Definitions []api.AggregateDefinition
Expand Down Expand Up @@ -72,7 +72,7 @@ func (aggregates *Aggregates) AddAggregate(aggregateDefinition api.AggregateDefi

func (aggregates *Aggregates) cleanupExpiredEntriesLoop() {

ticker := time.NewTicker(time.Duration(aggregates.expiryTime) * time.Second)
ticker := time.NewTicker(aggregates.expiryTime)
go func() {
for {
select {
Expand All @@ -86,18 +86,16 @@ func (aggregates *Aggregates) cleanupExpiredEntriesLoop() {
}

func (aggregates *Aggregates) cleanupExpiredEntries() {

for _, aggregate := range aggregates.Aggregates {
aggregate.mutex.Lock()
aggregate.cache.CleanupExpiredEntries(aggregate.expiryTime, aggregate)
aggregate.cache.CleanupExpiredEntries(aggregate.expiryTime, aggregate.Cleanup)
aggregate.mutex.Unlock()
}

}

func NewAggregatesFromConfig(definitions []api.AggregateDefinition) (Aggregates, error) {
aggregates := Aggregates{
expiryTime: int64(defaultExpiryTime),
expiryTime: defaultExpiryTime,
}

for _, aggregateDefinition := range definitions {
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/extract/aggregate/aggregates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func Test_NewAggregatesFromConfig(t *testing.T) {

func Test_CleanupExpiredEntriesLoop(t *testing.T) {

defaultExpiryTime = 4 // expiration after 4 seconds
defaultExpiryTime = 4 * time.Second // expiration after 4 seconds
aggregates := initAggregates(t)
expectedAggregate := GetMockAggregate()
require.Equal(t, expectedAggregate.Definition, aggregates.Aggregates[0].Definition)
Expand Down
58 changes: 55 additions & 3 deletions pkg/pipeline/transform/transform_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,30 @@ import (
"os"
"regexp"
"strconv"
"time"

"github.com/Knetic/govaluate"
"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/location"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/netdb"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
"github.com/sirupsen/logrus"
)

var log = logrus.WithField("component", "transform.Network")

type Network struct {
api.TransformNetwork
svcNames *netdb.ServiceNames
svcNames *netdb.ServiceNames
categories []subnetCategory
ipCatCache *utils.TimedCache
}

type subnetCategory struct {
cidrs []*net.IPNet
name string
}

func (n *Network) Transform(inputEntry config.GenericMap) (config.GenericMap, bool) {
Expand Down Expand Up @@ -143,6 +152,15 @@ func (n *Network) Transform(inputEntry config.GenericMap) (config.GenericMap, bo
}
case api.OpReinterpretDirection:
reinterpretDirection(outputEntry, &n.DirectionInfo)
case api.OpAddIPCategory:
if strIP, ok := outputEntry[rule.Input].(string); ok {
cat, ok := n.ipCatCache.GetCacheEntry(strIP)
if !ok {
cat = n.categorizeIP(net.ParseIP(strIP))
n.ipCatCache.UpdateCacheEntry(strIP, cat)
}
outputEntry[rule.Output] = cat
}

default:
log.Panicf("unknown type %s for transform.Network rule: %v", rule.Type, rule)
Expand All @@ -152,6 +170,19 @@ func (n *Network) Transform(inputEntry config.GenericMap) (config.GenericMap, bo
return outputEntry, true
}

func (n *Network) categorizeIP(ip net.IP) string {
if ip != nil {
for _, subnetCat := range n.categories {
for _, cidr := range subnetCat.cidrs {
if cidr.Contains(ip) {
return subnetCat.name
}
}
}
}
return ""
}

// NewTransformNetwork create a new transform
func NewTransformNetwork(params config.StageParam) (Transformer, error) {
var needToInitLocationDB = false
Expand All @@ -171,9 +202,13 @@ func NewTransformNetwork(params config.StageParam) (Transformer, error) {
case api.OpAddService:
needToInitNetworkServices = true
case api.OpReinterpretDirection:
if err := validatereinterpretDirectionConfig(&jsonNetworkTransform.DirectionInfo); err != nil {
if err := validateReinterpretDirectionConfig(&jsonNetworkTransform.DirectionInfo); err != nil {
return nil, err
}
case api.OpAddIPCategory:
if len(jsonNetworkTransform.IPCategories) == 0 {
return nil, fmt.Errorf("a rule '%s' was found, but there are no IP categories configured", api.OpAddIPCategory)
}
}
}

Expand Down Expand Up @@ -211,11 +246,28 @@ func NewTransformNetwork(params config.StageParam) (Transformer, error) {
}
}

var subnetCats []subnetCategory
for _, category := range jsonNetworkTransform.IPCategories {
var cidrs []*net.IPNet
for _, cidr := range category.CIDRs {
_, parsed, err := net.ParseCIDR(cidr)
if err != nil {
return nil, fmt.Errorf("category %s: fail to parse CIDR, %w", category.Name, err)
}
cidrs = append(cidrs, parsed)
}
if len(cidrs) > 0 {
subnetCats = append(subnetCats, subnetCategory{name: category.Name, cidrs: cidrs})
}
}

return &Network{
TransformNetwork: api.TransformNetwork{
Rules: jsonNetworkTransform.Rules,
DirectionInfo: jsonNetworkTransform.DirectionInfo,
},
svcNames: servicesDB,
svcNames: servicesDB,
categories: subnetCats,
ipCatCache: utils.NewQuietExpiringTimedCache(2 * time.Minute),
}, nil
}
4 changes: 2 additions & 2 deletions pkg/pipeline/transform/transform_network_direction.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const (
egress = 1
)

func validatereinterpretDirectionConfig(info *api.DirectionInfo) error {
func validateReinterpretDirectionConfig(info *api.NetworkTransformDirectionInfo) error {
if info.FlowDirectionField == "" {
return fmt.Errorf("invalid config for transform.Network rule %s: missing FlowDirectionField", api.OpReinterpretDirection)
}
Expand All @@ -28,7 +28,7 @@ func validatereinterpretDirectionConfig(info *api.DirectionInfo) error {
return nil
}

func reinterpretDirection(output config.GenericMap, info *api.DirectionInfo) {
func reinterpretDirection(output config.GenericMap, info *api.NetworkTransformDirectionInfo) {
if fd, ok := output[info.FlowDirectionField]; ok && len(info.IfDirectionField) > 0 {
output[info.IfDirectionField] = fd
}
Expand Down
Loading

0 comments on commit 6a8b986

Please sign in to comment.