diff --git a/pkg/api/transform_network.go b/pkg/api/transform_network.go index 8ab96d0f1..ea3f71074 100644 --- a/pkg/api/transform_network.go +++ b/pkg/api/transform_network.go @@ -18,11 +18,12 @@ package api type TransformNetwork struct { - Rules NetworkTransformRules `yaml:"rules" json:"rules" doc:"list of transform rules, each includes:"` - KubeConfigPath string `yaml:"kubeConfigPath,omitempty" json:"kubeConfigPath,omitempty" doc:"path to kubeconfig file (optional)"` - ServicesFile string `yaml:"servicesFile,omitempty" json:"servicesFile,omitempty" doc:"path to services file (optional, default: /etc/services)"` - ProtocolsFile string `yaml:"protocolsFile,omitempty" json:"protocolsFile,omitempty" doc:"path to protocols file (optional, default: /etc/protocols)"` - DirectionInfo DirectionInfo `yaml:"directionInfo,omitempty" json:"directionInfo,omitempty" doc:"information to reinterpret flow direction (optional, to use with reinterpret_direction rule)"` + Rules NetworkTransformRules `yaml:"rules" json:"rules" doc:"list of transform rules, each includes:"` + KubeConfigPath string `yaml:"kubeConfigPath,omitempty" json:"kubeConfigPath,omitempty" doc:"path to kubeconfig file (optional)"` + ServicesFile string `yaml:"servicesFile,omitempty" json:"servicesFile,omitempty" doc:"path to services file (optional, default: /etc/services)"` + ProtocolsFile string `yaml:"protocolsFile,omitempty" json:"protocolsFile,omitempty" doc:"path to protocols file (optional, default: /etc/protocols)"` + IPCategories []NetworkTransformIPCategory `yaml:"ipCategories,omitempty" json:"ipCategories,omitempty" doc:"configure IP categories"` + DirectionInfo NetworkTransformDirectionInfo `yaml:"directionInfo,omitempty" json:"directionInfo,omitempty" doc:"information to reinterpret flow direction (optional, to use with reinterpret_direction rule)"` } func (tn *TransformNetwork) GetServiceFiles() (string, string) { @@ -45,6 +46,7 @@ const ( OpAddService = "add_service" OpAddKubernetes = "add_kubernetes" OpReinterpretDirection = "reinterpret_direction" + OpAddIPCategory = "add_ip_category" ) type TransformNetworkOperationEnum struct { @@ -55,6 +57,7 @@ type TransformNetworkOperationEnum struct { AddService string `yaml:"add_service" json:"add_service" doc:"add output network service field from input port and parameters protocol field"` AddKubernetes string `yaml:"add_kubernetes" json:"add_kubernetes" doc:"add output kubernetes fields from input"` ReinterpretDirection string `yaml:"reinterpret_direction" json:"reinterpret_direction" doc:"reinterpret flow direction at a higher level than the interface"` + AddIPCategory string `yaml:"add_ip_category" json:"add_ip_category" doc:"categorize IPs based on known subnets configuration"` } func TransformNetworkOperationName(operation string) string { @@ -69,7 +72,7 @@ type NetworkTransformRule struct { Assignee string `yaml:"assignee,omitempty" json:"assignee,omitempty" doc:"value needs to assign to output field"` } -type DirectionInfo struct { +type NetworkTransformDirectionInfo struct { ReporterIPField string `yaml:"reporterIPField,omitempty" json:"reporterIPField,omitempty" doc:"field providing the reporter (agent) host IP"` SrcHostField string `yaml:"srcHostField,omitempty" json:"srcHostField,omitempty" doc:"source host field"` DstHostField string `yaml:"dstHostField,omitempty" json:"dstHostField,omitempty" doc:"destination host field"` @@ -78,3 +81,8 @@ type DirectionInfo struct { } type NetworkTransformRules []NetworkTransformRule + +type NetworkTransformIPCategory struct { + CIDRs []string `yaml:"cidrs,omitempty" json:"cidrs,omitempty" doc:"list of CIDRs to match a category"` + Name string `yaml:"name,omitempty" json:"name,omitempty" doc:"name of the category"` +} diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index aea1e8a29..971f87555 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -35,7 +35,7 @@ import ( log "github.com/sirupsen/logrus" ) -const defaultExpiryTime = 120 +const defaultExpiryTime = 2 * time.Minute type gaugeInfo struct { gauge *prometheus.GaugeVec @@ -57,7 +57,7 @@ type EncodeProm struct { counters []counterInfo histos []histoInfo aggHistos []histoInfo - expiryTime int64 + expiryTime time.Duration mCache *utils.TimedCache exitChan <-chan struct{} server *http.Server @@ -232,14 +232,14 @@ func (e *EncodeProm) Cleanup(cleanupFunc interface{}) { } func (e *EncodeProm) cleanupExpiredEntriesLoop() { - ticker := time.NewTicker(time.Duration(e.expiryTime) * time.Second) + ticker := time.NewTicker(e.expiryTime) for { select { case <-e.exitChan: log.Debugf("exiting cleanupExpiredEntriesLoop because of signal") return case <-ticker.C: - e.mCache.CleanupExpiredEntries(e.expiryTime, e) + e.mCache.CleanupExpiredEntries(e.expiryTime, e.Cleanup) } } } @@ -274,11 +274,11 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En cfg = *params.Encode.Prom } - expiryTime := int64(cfg.ExpiryTime) + expiryTime := time.Duration(cfg.ExpiryTime) * time.Second if expiryTime == 0 { expiryTime = defaultExpiryTime } - log.Debugf("expiryTime = %d", expiryTime) + log.Debugf("expiryTime = %v", expiryTime) counters := []counterInfo{} gauges := []gaugeInfo{} diff --git a/pkg/pipeline/encode/encode_prom_test.go b/pkg/pipeline/encode/encode_prom_test.go index e6816c1aa..f8d903a51 100644 --- a/pkg/pipeline/encode/encode_prom_test.go +++ b/pkg/pipeline/encode/encode_prom_test.go @@ -106,7 +106,7 @@ func Test_NewEncodeProm(t *testing.T) { require.Equal(t, 1, len(encodeProm.gauges)) require.Equal(t, 1, len(encodeProm.histos)) require.Equal(t, 1, len(encodeProm.aggHistos)) - require.Equal(t, int64(1), encodeProm.expiryTime) + require.Equal(t, time.Second, encodeProm.expiryTime) require.Equal(t, (*api.PromTLSConf)(nil), encodeProm.tlsConfig) require.Equal(t, encodeProm.gauges[0].info.Name, "Bytes") @@ -125,7 +125,7 @@ func Test_NewEncodeProm(t *testing.T) { // wait a couple seconds so that the entry will expire time.Sleep(2 * time.Second) - encodeProm.mCache.CleanupExpiredEntries(encodeProm.expiryTime, encodeProm) + encodeProm.mCache.CleanupExpiredEntries(encodeProm.expiryTime, encodeProm.Cleanup) entriesMapLen = encodeProm.mCache.GetCacheLen() require.Equal(t, 0, entriesMapLen) } diff --git a/pkg/pipeline/extract/aggregate/aggregate.go b/pkg/pipeline/extract/aggregate/aggregate.go index 0428694af..9103c2f04 100644 --- a/pkg/pipeline/extract/aggregate/aggregate.go +++ b/pkg/pipeline/extract/aggregate/aggregate.go @@ -24,6 +24,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" @@ -47,7 +48,7 @@ type Aggregate struct { Definition api.AggregateDefinition cache *utils.TimedCache mutex *sync.Mutex - expiryTime int64 + expiryTime time.Duration } type GroupState struct { diff --git a/pkg/pipeline/extract/aggregate/aggregate_test.go b/pkg/pipeline/extract/aggregate/aggregate_test.go index bcbf7fb91..06c4dd58b 100644 --- a/pkg/pipeline/extract/aggregate/aggregate_test.go +++ b/pkg/pipeline/extract/aggregate/aggregate_test.go @@ -20,6 +20,7 @@ package aggregate import ( "sync" "testing" + "time" "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" @@ -38,7 +39,7 @@ func GetMockAggregate() Aggregate { }, cache: utils.NewTimedCache(), mutex: &sync.Mutex{}, - expiryTime: 30, + expiryTime: 30 * time.Second, } return aggregate } diff --git a/pkg/pipeline/extract/aggregate/aggregates.go b/pkg/pipeline/extract/aggregate/aggregates.go index d7f580c62..efdfba7eb 100644 --- a/pkg/pipeline/extract/aggregate/aggregates.go +++ b/pkg/pipeline/extract/aggregate/aggregates.go @@ -27,11 +27,11 @@ import ( log "github.com/sirupsen/logrus" ) -var defaultExpiryTime = 60 * 10 // 10 minutes +var defaultExpiryTime = 10 * time.Minute type Aggregates struct { Aggregates []Aggregate - expiryTime int64 + expiryTime time.Duration } type Definitions []api.AggregateDefinition @@ -72,7 +72,7 @@ func (aggregates *Aggregates) AddAggregate(aggregateDefinition api.AggregateDefi func (aggregates *Aggregates) cleanupExpiredEntriesLoop() { - ticker := time.NewTicker(time.Duration(aggregates.expiryTime) * time.Second) + ticker := time.NewTicker(aggregates.expiryTime) go func() { for { select { @@ -86,18 +86,16 @@ func (aggregates *Aggregates) cleanupExpiredEntriesLoop() { } func (aggregates *Aggregates) cleanupExpiredEntries() { - for _, aggregate := range aggregates.Aggregates { aggregate.mutex.Lock() - aggregate.cache.CleanupExpiredEntries(aggregate.expiryTime, aggregate) + aggregate.cache.CleanupExpiredEntries(aggregate.expiryTime, aggregate.Cleanup) aggregate.mutex.Unlock() } - } func NewAggregatesFromConfig(definitions []api.AggregateDefinition) (Aggregates, error) { aggregates := Aggregates{ - expiryTime: int64(defaultExpiryTime), + expiryTime: defaultExpiryTime, } for _, aggregateDefinition := range definitions { diff --git a/pkg/pipeline/extract/aggregate/aggregates_test.go b/pkg/pipeline/extract/aggregate/aggregates_test.go index 6d1a239b6..ea32d17b8 100644 --- a/pkg/pipeline/extract/aggregate/aggregates_test.go +++ b/pkg/pipeline/extract/aggregate/aggregates_test.go @@ -61,7 +61,7 @@ func Test_NewAggregatesFromConfig(t *testing.T) { func Test_CleanupExpiredEntriesLoop(t *testing.T) { - defaultExpiryTime = 4 // expiration after 4 seconds + defaultExpiryTime = 4 * time.Second // expiration after 4 seconds aggregates := initAggregates(t) expectedAggregate := GetMockAggregate() require.Equal(t, expectedAggregate.Definition, aggregates.Aggregates[0].Definition) diff --git a/pkg/pipeline/transform/transform_network.go b/pkg/pipeline/transform/transform_network.go index 71c21866b..d0d66c905 100644 --- a/pkg/pipeline/transform/transform_network.go +++ b/pkg/pipeline/transform/transform_network.go @@ -23,6 +23,7 @@ import ( "os" "regexp" "strconv" + "time" "github.com/Knetic/govaluate" "github.com/netobserv/flowlogs-pipeline/pkg/api" @@ -30,6 +31,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/location" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/netdb" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" "github.com/sirupsen/logrus" ) @@ -37,7 +39,14 @@ var log = logrus.WithField("component", "transform.Network") type Network struct { api.TransformNetwork - svcNames *netdb.ServiceNames + svcNames *netdb.ServiceNames + categories []subnetCategory + ipCatCache *utils.TimedCache +} + +type subnetCategory struct { + cidrs []*net.IPNet + name string } func (n *Network) Transform(inputEntry config.GenericMap) (config.GenericMap, bool) { @@ -143,6 +152,15 @@ func (n *Network) Transform(inputEntry config.GenericMap) (config.GenericMap, bo } case api.OpReinterpretDirection: reinterpretDirection(outputEntry, &n.DirectionInfo) + case api.OpAddIPCategory: + if strIP, ok := outputEntry[rule.Input].(string); ok { + cat, ok := n.ipCatCache.GetCacheEntry(strIP) + if !ok { + cat = n.categorizeIP(net.ParseIP(strIP)) + n.ipCatCache.UpdateCacheEntry(strIP, cat) + } + outputEntry[rule.Output] = cat + } default: log.Panicf("unknown type %s for transform.Network rule: %v", rule.Type, rule) @@ -152,6 +170,19 @@ func (n *Network) Transform(inputEntry config.GenericMap) (config.GenericMap, bo return outputEntry, true } +func (n *Network) categorizeIP(ip net.IP) string { + if ip != nil { + for _, subnetCat := range n.categories { + for _, cidr := range subnetCat.cidrs { + if cidr.Contains(ip) { + return subnetCat.name + } + } + } + } + return "" +} + // NewTransformNetwork create a new transform func NewTransformNetwork(params config.StageParam) (Transformer, error) { var needToInitLocationDB = false @@ -171,9 +202,13 @@ func NewTransformNetwork(params config.StageParam) (Transformer, error) { case api.OpAddService: needToInitNetworkServices = true case api.OpReinterpretDirection: - if err := validatereinterpretDirectionConfig(&jsonNetworkTransform.DirectionInfo); err != nil { + if err := validateReinterpretDirectionConfig(&jsonNetworkTransform.DirectionInfo); err != nil { return nil, err } + case api.OpAddIPCategory: + if len(jsonNetworkTransform.IPCategories) == 0 { + return nil, fmt.Errorf("a rule '%s' was found, but there are no IP categories configured", api.OpAddIPCategory) + } } } @@ -211,11 +246,28 @@ func NewTransformNetwork(params config.StageParam) (Transformer, error) { } } + var subnetCats []subnetCategory + for _, category := range jsonNetworkTransform.IPCategories { + var cidrs []*net.IPNet + for _, cidr := range category.CIDRs { + _, parsed, err := net.ParseCIDR(cidr) + if err != nil { + return nil, fmt.Errorf("category %s: fail to parse CIDR, %w", category.Name, err) + } + cidrs = append(cidrs, parsed) + } + if len(cidrs) > 0 { + subnetCats = append(subnetCats, subnetCategory{name: category.Name, cidrs: cidrs}) + } + } + return &Network{ TransformNetwork: api.TransformNetwork{ Rules: jsonNetworkTransform.Rules, DirectionInfo: jsonNetworkTransform.DirectionInfo, }, - svcNames: servicesDB, + svcNames: servicesDB, + categories: subnetCats, + ipCatCache: utils.NewQuietExpiringTimedCache(2 * time.Minute), }, nil } diff --git a/pkg/pipeline/transform/transform_network_direction.go b/pkg/pipeline/transform/transform_network_direction.go index 17054228f..d18c8b08d 100644 --- a/pkg/pipeline/transform/transform_network_direction.go +++ b/pkg/pipeline/transform/transform_network_direction.go @@ -12,7 +12,7 @@ const ( egress = 1 ) -func validatereinterpretDirectionConfig(info *api.DirectionInfo) error { +func validateReinterpretDirectionConfig(info *api.NetworkTransformDirectionInfo) error { if info.FlowDirectionField == "" { return fmt.Errorf("invalid config for transform.Network rule %s: missing FlowDirectionField", api.OpReinterpretDirection) } @@ -28,7 +28,7 @@ func validatereinterpretDirectionConfig(info *api.DirectionInfo) error { return nil } -func reinterpretDirection(output config.GenericMap, info *api.DirectionInfo) { +func reinterpretDirection(output config.GenericMap, info *api.NetworkTransformDirectionInfo) { if fd, ok := output[info.FlowDirectionField]; ok && len(info.IfDirectionField) > 0 { output[info.IfDirectionField] = fd } diff --git a/pkg/pipeline/transform/transform_network_test.go b/pkg/pipeline/transform/transform_network_test.go index 7a717f705..d7b184ae7 100644 --- a/pkg/pipeline/transform/transform_network_test.go +++ b/pkg/pipeline/transform/transform_network_test.go @@ -368,13 +368,60 @@ func (*fakeKubeData) GetInfo(n string) (*kubernetes.Info, error) { } func Test_Categorize(t *testing.T) { + entry := config.GenericMap{ + "addr1": "10.1.2.3", + "addr2": "100.1.2.3", + "addr3": "100.2.3.4", + "addr4": "101.1.0.0", + } + cfg := config.StageParam{ + Transform: &config.Transform{ + Network: &api.TransformNetwork{ + Rules: []api.NetworkTransformRule{ + {Type: api.OpAddIPCategory, Input: "addr1", Output: "cat1"}, + {Type: api.OpAddIPCategory, Input: "addr2", Output: "cat2"}, + {Type: api.OpAddIPCategory, Input: "addr3", Output: "cat3"}, + {Type: api.OpAddIPCategory, Input: "addr4", Output: "cat4"}, + }, + IPCategories: []api.NetworkTransformIPCategory{{ + Name: "Pods overlay", + CIDRs: []string{"10.0.0.0/8"}, + }, { + Name: "MySite.com", + CIDRs: []string{"101.1.0.0/32", "100.1.0.0/16"}, + }, { + Name: "MyOtherSite.com", + CIDRs: []string{"100.2.3.10/32"}, + }}, + }, + }, + } + + tr, err := NewTransformNetwork(cfg) + require.NoError(t, err) + + output, ok := tr.Transform(entry) + require.True(t, ok) + require.Equal(t, config.GenericMap{ + "addr1": "10.1.2.3", + "cat1": "Pods overlay", + "addr2": "100.1.2.3", + "cat2": "MySite.com", + "addr3": "100.2.3.4", + "cat3": "", + "addr4": "101.1.0.0", + "cat4": "MySite.com", + }, output) +} + +func Test_ReinterpretDirection(t *testing.T) { cfg := config.StageParam{ Transform: &config.Transform{ Network: &api.TransformNetwork{ Rules: []api.NetworkTransformRule{{ Type: "reinterpret_direction", }}, - DirectionInfo: api.DirectionInfo{ + DirectionInfo: api.NetworkTransformDirectionInfo{ ReporterIPField: "ReporterIP", SrcHostField: "SrcHostIP", DstHostField: "DstHostIP", @@ -511,7 +558,7 @@ func Test_ValidateReinterpretDirection(t *testing.T) { Rules: []api.NetworkTransformRule{{ Type: "reinterpret_direction", }}, - DirectionInfo: api.DirectionInfo{ + DirectionInfo: api.NetworkTransformDirectionInfo{ SrcHostField: "SrcHostIP", DstHostField: "DstHostIP", FlowDirectionField: "FlowDirection", @@ -529,7 +576,7 @@ func Test_ValidateReinterpretDirection(t *testing.T) { Rules: []api.NetworkTransformRule{{ Type: "reinterpret_direction", }}, - DirectionInfo: api.DirectionInfo{ + DirectionInfo: api.NetworkTransformDirectionInfo{ ReporterIPField: "ReporterIP", DstHostField: "DstHostIP", FlowDirectionField: "FlowDirection", @@ -547,7 +594,7 @@ func Test_ValidateReinterpretDirection(t *testing.T) { Rules: []api.NetworkTransformRule{{ Type: "reinterpret_direction", }}, - DirectionInfo: api.DirectionInfo{ + DirectionInfo: api.NetworkTransformDirectionInfo{ ReporterIPField: "ReporterIP", SrcHostField: "SrcHostIP", FlowDirectionField: "FlowDirection", @@ -565,7 +612,7 @@ func Test_ValidateReinterpretDirection(t *testing.T) { Rules: []api.NetworkTransformRule{{ Type: "reinterpret_direction", }}, - DirectionInfo: api.DirectionInfo{ + DirectionInfo: api.NetworkTransformDirectionInfo{ ReporterIPField: "ReporterIP", SrcHostField: "SrcHostIP", DstHostField: "DstHostIP", @@ -583,7 +630,7 @@ func Test_ValidateReinterpretDirection(t *testing.T) { Rules: []api.NetworkTransformRule{{ Type: "reinterpret_direction", }}, - DirectionInfo: api.DirectionInfo{ + DirectionInfo: api.NetworkTransformDirectionInfo{ ReporterIPField: "ReporterIP", SrcHostField: "SrcHostIP", DstHostField: "DstHostIP", diff --git a/pkg/pipeline/utils/timed_cache.go b/pkg/pipeline/utils/timed_cache.go index 034acac93..472314a08 100644 --- a/pkg/pipeline/utils/timed_cache.go +++ b/pkg/pipeline/utils/timed_cache.go @@ -30,13 +30,11 @@ var log = logrus.WithField("component", "utils.TimedCache") // Functions to manage an LRU cache with an expiry // When an item expires, allow a callback to allow the specific implementation to perform its particular cleanup -type CacheCallback interface { - Cleanup(entry interface{}) -} +type CacheCallback func(entry interface{}) type cacheEntry struct { key string - lastUpdatedTime int64 + lastUpdatedTime time.Time e *list.Element SourceEntry interface{} } @@ -63,7 +61,7 @@ func (tc *TimedCache) GetCacheEntry(key string) (interface{}, bool) { var uclog = log.WithField("method", "UpdateCacheEntry") func (tc *TimedCache) UpdateCacheEntry(key string, entry interface{}) *cacheEntry { - nowInSecs := time.Now().Unix() + nowInSecs := time.Now() tc.mu.Lock() defer tc.mu.Unlock() cEntry, ok := tc.cacheMap[key] @@ -105,7 +103,7 @@ func (tc *TimedCache) Iterate(f func(key string, value interface{})) { } // CleanupExpiredEntries removes items from cache that were last touched more than expiryTime seconds ago -func (tc *TimedCache) CleanupExpiredEntries(expiryTime int64, callback CacheCallback) { +func (tc *TimedCache) CleanupExpiredEntries(expiry time.Duration, callback CacheCallback) { tc.mu.Lock() defer tc.mu.Unlock() @@ -115,8 +113,7 @@ func (tc *TimedCache) CleanupExpiredEntries(expiryTime int64, callback CacheCall }) clog.Debugf("cleaning up expried entries") - nowInSecs := time.Now().Unix() - expireTime := nowInSecs - expiryTime + expireTime := time.Now().Add(-expiry) deleted := 0 // go through the list until we reach recently used entries for { @@ -125,13 +122,13 @@ func (tc *TimedCache) CleanupExpiredEntries(expiryTime int64, callback CacheCall return } pCacheInfo := listEntry.Value.(*cacheEntry) - if pCacheInfo.lastUpdatedTime > expireTime { + if pCacheInfo.lastUpdatedTime.After(expireTime) { // no more expired items clog.Debugf("deleted %d expired entries", deleted) return } deleted++ - callback.Cleanup(pCacheInfo.SourceEntry) + callback(pCacheInfo.SourceEntry) delete(tc.cacheMap, pCacheInfo.key) tc.cacheList.Remove(listEntry) } @@ -144,3 +141,24 @@ func NewTimedCache() *TimedCache { } return l } + +func NewQuietExpiringTimedCache(expiry time.Duration) *TimedCache { + l := &TimedCache{ + cacheList: list.New(), + cacheMap: make(TimedCacheMap), + } + + ticker := time.NewTicker(expiry) + go func() { + for { + select { + case <-ExitChannel(): + return + case <-ticker.C: + l.CleanupExpiredEntries(expiry, func(entry interface{}) {}) + } + } + }() + + return l +}