diff --git a/Makefile b/Makefile index 77f963f..dfc9bea 100644 --- a/Makefile +++ b/Makefile @@ -17,6 +17,7 @@ test: chmod 755 ./pkg/disabler/testdata/return_one_after_X_runs.bash go test -v ./pkg/inetdiager/ + go test -v ./pkg/blockfilter/ go test -v ./pkg/xtcpnl/ go test -v ./pkg/disabler/ go test -v ./pkg/xtcpstater/ @@ -39,6 +40,7 @@ bench: #go test -bench=. -run Trim go test -v ./pkg/inetdiager/ -bench=. + go test -v ./pkg/blockfilter/ -bench=. go test -v ./pkg/xtcpnl/ -bench=. go test -v ./pkg/disabler/ -bench=. go test -v ./pkg/xtcpstater/ -bench=. diff --git a/README.md b/README.md index 6c3915e..f27a397 100644 --- a/README.md +++ b/README.md @@ -202,6 +202,8 @@ There are three (x3) main message sampling/throttling points within `xtcp`: - Intra PoP traffic - Origin servers + + The following diagram shows the sampling controls: xtcp_sampling diagram @@ -212,6 +214,44 @@ e.g. To select all sockets xtcp --frequency 10ms -inetdiagerReportModulus 1 -samplingModulus 1 ``` +## filterBlocks and filterJson + +To enable more controlled reporting, a filter can be specified. Along with a separately specificed +filter report modulus, this allows for traffic matching a particular filter to be reported at +a different rate, for example reporting fewer sockets for internal vs external communication. The +filter itself is specified providing a json file of IP ranges to be included in the filter of the +following format: + +: + "V4" : [ + { + "StartIp": , + "EndIP": + }, + ... + } + "V6" : [ + { + "StartIp": , + "EndIP": + }, + ... + } + + +Note that multiple blocks of each address family can be specified. When enabling the filter, the +group name is specified by CLI flags. The full set of filtering flags are the following: + +- eanbleFilter - enables the use of filtering. +- filterJson - Specifies the location of the filter json of the above format. +- filterGroup - Specifies the group name that will match the filter. +- inetdiagerFilterReportModulus - Specifies the filter modulus to be used for sockets which match the group. +- includeLoopback - Specifies whether of not loopback sockets should be included. + + + + + Outstanding security controls: - NOT chrooted @@ -247,4 +287,4 @@ Matthew Wodrich | For helping with early work on the "ss"" parsing, bef Reed Morrison | Protobuf help and general golang structure advice Corey Kasten | Protobuf help and general golang structure advice Michael Ballard | Golang and bash fun -Marcel Flores | Data insights, sampling improvements, applicability of xtcp data \ No newline at end of file +Marcel Flores | Data insights, sampling improvements, applicability of xtcp data diff --git a/bundle/scripts/xtcp_wrapper.bash b/bundle/scripts/xtcp_wrapper.bash index 1dea3bc..66a800c 100755 --- a/bundle/scripts/xtcp_wrapper.bash +++ b/bundle/scripts/xtcp_wrapper.bash @@ -4,8 +4,8 @@ #=============================================================================== # # This wrapper is providing the ability to: -# 1. Inspect xconfig xtcp_disable to NOT start xtcp, instead just exiting cleanly, so systemd doesn't consider xtcp failed -# 2. Starts xtcp with CLI arguments based on xconfig xtcp_command +# 1. Inspect env var xtcp_disable to NOT start xtcp, instead just exiting cleanly, so systemd doesn't consider xtcp failed +# 2. Starts xtcp with CLI arguments based on env var xtcp_command #------------------------------------------ # Function to send to standard output and syslog @@ -27,7 +27,7 @@ echo_and_syslog () { #------------------------------------------ # Quickly check if xtcp is running -XTCP_PID=$(/usr/bin/pgrep --full /home/vagrant/xtcp-opensource/bin/xtcp); +XTCP_PID="$(/usr/bin/pgrep --full /home/vagrant/xtcp-opensource/bin/xtcp)"; PGREP_STATUS=$?; echo_and_syslog "$0 line:$LINENO PGREP_STATUS: $PGREP_STATUS, XTCP_PID: $XTCP_PID"; @@ -54,8 +54,8 @@ echo_and_syslog "$0 line:$LINENO PGREP_STATUS: $PGREP_STATUS, XTCP_PID: $XTCP_PI # PGREP_STATUS== #------------------------------------------ -# Read xconfig to see if xtcp should be disabled -XTCP_DISABLED=$(/bin/echo $XTCP_DISABLED); +# Read env to see if xtcp should be disabled +XTCP_DISABLED="$(/bin/echo $XTCP_DISABLED)"; echo_and_syslog "$0 line:$LINENO xtcp disabled: $XTCP_DISABLED"; #------------------------------------------ @@ -88,7 +88,7 @@ else fi #----------------------------------------------------------------------------------------------------------------- -# Read in xconfigs with some sanity checking +# Read in env vars with some sanity checking #------------------------------------------ # Read and sanity check xtcp_frequency @@ -96,7 +96,7 @@ fi # Please note we could obviously do a lot better job of checking frequency # But this is just a few checks to make sure it's vaguely safe to use # -XTCP_FREQUENCY=$(/bin/echo $XTCP_FREQUENCY); +XTCP_FREQUENCY="$(/bin/echo $XTCP_FREQUENCY)"; echo_and_syslog "$0 line:$LINENO XTCP_FREQUENCY: $XTCP_FREQUENCY"; if [[ $XTCP_FREQUENCY == "default" ]]; then @@ -131,7 +131,7 @@ if [[ ! "$XTCP_FREQUENCY_EVERYTHING_BUT_LAST_CHAR" =~ ^[0-9]+$ ]]; then echo_and_syslog "$0 line:$LINENO XTCP_FREQUENCY must be numeric" "local0.error"; exit 1; fi -XTCP_FREQUENCY_EVERYTHING_BUT_LAST_CHAR_NUM=$((XTCP_FREQUENCY_EVERYTHING_BUT_LAST_CHAR + 0)) +XTCP_FREQUENCY_EVERYTHING_BUT_LAST_CHAR_NUM="$((XTCP_FREQUENCY_EVERYTHING_BUT_LAST_CHAR + 0))" #-------------- # Numbers must be < 86400 if [[ $XTCP_FREQUENCY_EVERYTHING_BUT_LAST_CHAR_NUM -gt 86400 ]]; then @@ -156,7 +156,7 @@ fi #------------------------------------------ # Read and sanity check xtcp_sampling_modulus # -XTCP_SAMPLING_MODULUS=$(/bin/echo $XTCP_SAMPLING_MODULUS); +XTCP_SAMPLING_MODULUS="$(/bin/echo $XTCP_SAMPLING_MODULUS)"; echo_and_syslog "$0 line:$LINENO XTCP_SAMPLING_MODULUS: $XTCP_SAMPLING_MODULUS"; if [[ $XTCP_SAMPLING_MODULUS == "default" ]]; then @@ -214,6 +214,44 @@ if [ $XTCP_REPORT_MODULUS -lt 1 ]; then exit 1; fi +#------------------------------------------ +# Read and sanity check xtcp_filter_report_modulus +# +XTCP_FILTER_REPORT_MODULUS="$(/bin/echo $XTCP_REPORT_MODULUS)"; +echo_and_syslog "$0 line:$LINENO XTCP_FILTER_REPORT_MODULUS: $XTCP_FILTER_REPORT_MODULUS"; + +if [[ $XTCP_FILTER_REPORT_MODULUS == "default" ]]; then + XTCP_FILTER_REPORT_MODULUS=2000; + echo_and_syslog "$0 line:$LINENO Using default XTCP_FILTER_REPORT_MODULUS: $XTCP_FILTER_REPORT_MODULUS"; +fi + +#-------------- +# Check modulus is only numeric +if [ $XTCP_FILTER_REPORT_MODULUS -ne $XTCP_FILTER_REPORT_MODULUS ]; then + echo_and_syslog "$0 line:$LINENO XTCP_FILTER_REPORT_MODULUS must be numeric:$XTCP_FILTER_REPORT_MODULUS " "local0.error"; + exit 1; +fi +#-------------- +# The filter modulus may be quite high, so we skip the max value check. +# Must be greater than zero >0 +if [ $XTCP_FILTER_REPORT_MODULUS -lt 1 ]; then + echo_and_syslog "$0 line:$LINENO XTCP_FILTER_REPORT_MODULUS must >= 1:$XTCP_FILTER_REPORT_MODULUS" "local0.error"; + exit 1; +fi + +#------------------------------------------ +# Load the list of pop-local IPs from pops.json +XTCP_FILTER_JSON="$(/bin/echo $XTCP_FILTER_JSON)"; + +#------------------------------------------ +# Read in the pop name +XTCP_FILTER_GROUP="$(/bin/echo $XTCP_FILTER_GROUP)"; + +#------------------------------------------ +# Read env var to see if fitlering is enabled +XTCP_ENABLE_FILTER="$(/bin/echo $XTCP_ENABLE_FILTER)"; +echo_and_syslog "$0 line:$LINENO xtcp filter enabled: $XTCP_ENABLE_FILTER"; + #NSQ XTCP_NSQ=$(/bin/echo $XTCP_NSQ); echo_and_syslog "$0 line:$LINENO xtcp nsq: $XTCP_NSQ"; @@ -236,9 +274,24 @@ EXEC_COMMAND_ARRAY[4]="$XTCP_SAMPLING_MODULUS"; EXEC_COMMAND_ARRAY[5]="-inetdiagerReportModulus"; EXEC_COMMAND_ARRAY[6]="$XTCP_REPORT_MODULUS"; +# Counter to keep track of optional indices +ind=7 + +if [[ $XTCP_ENABLE_FILTER != "" ]]; then + EXEC_COMMAND_ARRAY[7]="-enableFilter"; + EXEC_COMMAND_ARRAY[8]="-inetdiagerFilterReportModulus"; + EXEC_COMMAND_ARRAY[9]="$XTCP_FILTER_REPORT_MODULUS"; + EXEC_COMMAND_ARRAY[10]="-filterJson"; + EXEC_COMMAND_ARRAY[11]="$XTCP_FILTER_JSON"; + EXEC_COMMAND_ARRAY[12]="-filterGroup"; + EXEC_COMMAND_ARRAY[13]="$XTCP_FILTER_GROUP"; + ind=14 +fi + + if [[ $XTCP_NSQ != "" ]]; then - EXEC_COMMAND_ARRAY[7]="-nsq"; - EXEC_COMMAND_ARRAY[8]="$XTCP_NSQ"; + EXEC_COMMAND_ARRAY[$ind]="-nsq"; + EXEC_COMMAND_ARRAY[$((ind+1))]="$XTCP_NSQ"; fi # Print out what we have. diff --git a/cmd/xtcp.go b/cmd/xtcp.go index 6179274..313abde 100644 --- a/cmd/xtcp.go +++ b/cmd/xtcp.go @@ -74,6 +74,8 @@ func main() { samplingModulus := flag.Int("samplingModulus", 2, "samplingModulus. Netlinker will sample every Xth inetdiag messages to send to inetdiager. Default 2") //TODO make default 1 // CLI standard out reporting modulus. e.g. report every x inetd messages inetdiagerReportModulus := flag.Int("inetdiagerReportModulus", 2000, "inetdiagerReportModulus. Report every X inetd messages to Kafka. Default 2000") //TODO make default 1000 + inetdiagerFilterReportModulus := flag.Int("inetdiagerFilterReportModulus", 2000, "inetdiagerFilterReportModulus. Report every X inetd messages that matches the filter to Kafka. Default 2000") + inetdiagerStatsRatio := flag.Float64("inetdiagerStatsRatio", 0.9, "inetdiagerStatsRatio controls the how often the inetdiagers send summary stats, which is as a percentage of the pollingFrequencySeconds. Default = 0.9 (90% of pollingFrequencySeconds)") // UDP send destination @@ -129,6 +131,13 @@ func main() { xTCPStaterFrequency := flag.Duration("xTCPStaterFrequencySeconds", 60*time.Second, "XTCP stater reporting frequency. Default 60 seconds") xTCPStaterSystemctlPath := flag.String("xTCPStaterSystemctlPath", "/bin/systemctl", "Full system path to systemctl. Default \"/bin/systemctl\"") xTCPStaterPsPath := flag.String("xTCPStaterPsPath", "/bin/ps", "Full system path to ps. Default \"/bin/ps\"") + // Controls to include or disclude loopbacks socks + includeLoopback := flag.Bool("includeLoopback", false, "Include loopback in collection. Default: false") + + // Controls for the pop local block filters + enableFilter := flag.Bool("enableFilter", false, "Subsample sockets that match the filter blocks. Default: false") + filterJson := flag.String("filterJson", "", "Json definition of the filter groups.") + filterGroup := flag.String("filterGroup", "", "Name of filter group used in top level of filterJson.") version := flag.Bool("version", false, "show version") defaults := flag.Bool("defaults", false, "show default configuration") @@ -165,6 +174,7 @@ func main() { fmt.Println("*netlinkerChSize:", *netlinkerChSize) fmt.Println("*samplingModulus:", *samplingModulus) fmt.Println("*inetdiagerReportModulus:", *inetdiagerReportModulus) + fmt.Println("*inetdiagerFilterReportModulus:", *inetdiagerFilterReportModulus) fmt.Println("*inetdiagerStatsRatio:", *inetdiagerStatsRatio) fmt.Println("*udpSendDest:", *udpSendDest) fmt.Println("*promListen:", *promListen) @@ -184,6 +194,10 @@ func main() { fmt.Println("*xTCPStaterFrequency:", *xTCPStaterFrequency) fmt.Println("*xTCPStaterSystemctlPath:", *xTCPStaterSystemctlPath) fmt.Println("*xTCPStaterPsPath:", *xTCPStaterPsPath) + fmt.Println("*includeLoopback:", *includeLoopback) + fmt.Println("*enableFilter:", *enableFilter) + fmt.Println("*filterJson:", *filterJson) + fmt.Println("*filterGroup:", *filterGroup) fmt.Println("*nsq:", *nsq) } os.Exit(0) @@ -223,6 +237,7 @@ func main() { cliFlags.NetlinkerChSize = netlinkerChSize cliFlags.SamplingModulus = samplingModulus cliFlags.InetdiagerReportModulus = inetdiagerReportModulus + cliFlags.InetdiagerFilterReportModulus = inetdiagerFilterReportModulus cliFlags.InetdiagerStatsRatio = inetdiagerStatsRatio cliFlags.GoMaxProcs = goMaxProcs cliFlags.UDPSendDest = udpSendDest @@ -242,6 +257,10 @@ func main() { cliFlags.XTCPStaterFrequency = xTCPStaterFrequency cliFlags.XTCPStaterSystemctlPath = xTCPStaterSystemctlPath cliFlags.XTCPStaterPsPath = xTCPStaterPsPath + cliFlags.IncludeLoopback = includeLoopback + cliFlags.EnableFilter = enableFilter + cliFlags.FilterJson = filterJson + cliFlags.FilterGroup = filterGroup cliFlags.NSQ = nsq // Start background polling job to cleanly exit if the return code of executing 'disablerCommand' is "1" diff --git a/go.mod b/go.mod index 1a808a5..9bf2e25 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,8 @@ go 1.16 replace github.com/Edgio/xtcp/pkg/misc => ./pkg/misc +replace github.com/Edgio/xtcp/pkg/blockfilter => ./pkg/blockfilter + replace github.com/Edgio/xtcp/pkg/cliflags => ./pkg/cliflags replace github.com/Edgio/xtcp/pkg/xtcppb => ./pkg/xtcppb diff --git a/go.mod.replace b/go.mod.replace index 1a4c262..2cf1a37 100644 --- a/go.mod.replace +++ b/go.mod.replace @@ -11,6 +11,8 @@ replace github.com/Edgio/xtcp/pkg/netlinker => ./pkg/netlinker replace github.com/Edgio/xtcp/pkg/inetdiager => ./pkg/inetdiager +replace github.com/Edgio/xtcp/pkg/blockfilter => ./pkg/blockfilter + replace github.com/Edgio/xtcp/pkg/pollerstater => ./pkg/pollerstater replace github.com/Edgio/xtcp/pkg/inetdiagerstater => ./pkg/inetdiagerstater diff --git a/pkg/blockfilter/blockfilter.go b/pkg/blockfilter/blockfilter.go new file mode 100644 index 0000000..0f52c4c --- /dev/null +++ b/pkg/blockfilter/blockfilter.go @@ -0,0 +1,163 @@ +// Package blockfilter implements a filter that subsamples matching traffic from the sampled data stream +// using data availible in a filter json + +package blockfilter + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net" + "os" +) + +const ( + debugLevel int = 10 +) + +// A container struct so we can swap out the inside list if we need to +type NetBlocks struct { + BlockList []*net.IPNet +} + +// Given an IP check if its talking to a host in the filter +func IsFilter(ip net.IP, filterBlocks *NetBlocks) bool { + // If we didn't get set of blocks, just default out + if filterBlocks == nil { + return false + } + + // Check to see if its in our set of filter blocks + for _, currBlock := range filterBlocks.BlockList { + if currBlock.Contains(ip) { + return true + } + } + return false +} + +// Given a specific group, load the appropriate IP networks for the group from +// the filter json. +func LoadGroupNetworks(filterJson string, group string) *NetBlocks { + var ipNets []*net.IPNet + + // Open our jsonFile + jsonFile, err := os.Open(filterJson) + // if we os.Open returns an error then handle it + if err != nil { + fmt.Println(err) + } + // defer the closing of our jsonFile so that we can parse it later on + defer jsonFile.Close() + + // Read the json + byteValue, _ := ioutil.ReadAll(jsonFile) + var result map[string]interface{} + json.Unmarshal([]byte(byteValue), &result) + + // We need to assert the inside level to deal with the interface + groupEntry, ok := result[group].(map[string]interface{}) + if !ok { + if debugLevel > 100 { + fmt.Println("Failed to assert json format at the group level!") + } + return nil + } + + // Let's get the v4 networks from our current group + v4Ranges := extractRanges(groupEntry["V4"]) + ipNets = append(ipNets, v4Ranges...) + // Now let's get the v6 + v6Ranges := extractRanges(groupEntry["V6"]) + ipNets = append(ipNets, v6Ranges...) + + blocks := NetBlocks{ipNets} + + //Dump them out so we can see what it read + if debugLevel > 100 { + for _, network := range blocks.BlockList { + fmt.Println(network) + } + } + + return &blocks +} + +// Given a list of ranges from the Json, extract and convert them +func extractRanges(rawRange interface{}) []*net.IPNet { + var ipNets []*net.IPNet + + // First, let's do a type assertion + rangeList, ok := rawRange.([]interface{}) + if !ok { + return nil + } + + // Now let's just spin through all the types + for _, rangeMap := range rangeList { + // Type asert the range set + netMap, ok := rangeMap.(map[string]interface{}) + if !ok { + return nil + } + // Type assert each of the strings + start, ok := netMap["StartIp"].(string) + if !ok { + return nil + } + end, ok := netMap["EndIp"].(string) + if !ok { + return nil + } + // Now we have the end points, go ahead and pass that to the build net + // which will compute the appropriuate cidr + newNet := buildIPNet(start, end) + if newNet != nil { + ipNets = append(ipNets, newNet) + } + } + return ipNets +} + +// Builld a network with the start and stop objects, filtering any junk +func buildIPNet(start string, end string) *net.IPNet { + // If its RFC 1918 just blank + ipStart := net.ParseIP(start) + ipEnd := net.ParseIP(end) + + // The filter will ignore private blocks, to avoid wasting time checking on a case that + // In the Prod case, these shouldn't happen in regular communications anyway. + if ipStart.IsPrivate() { + return nil + } + + return RangeToCIDR(ipStart, ipEnd) +} + +// Given two IP addresses, especially the start and end points of a range, find +// the smallest CIDR that contains both of them. +// This is adapted from this discussion: https://groups.google.com/g/golang-nuts/c/rJvVwk4jwjQ?pli=1 +func RangeToCIDR(ip1 net.IP, ip2 net.IP) *net.IPNet { + // Set the max length according to the IP version. + // THis trick uses the fact that to4 only works for v4 + var maxLen int + if ip1.To4() != nil { + maxLen = 32 + } else { + maxLen = 128 + } + + // Just iterate with larger and larger subnets until we get one that includes both + for l := maxLen; l >= 0; l-- { + // Build a net object so we can just use contains + mask := net.CIDRMask(l, maxLen) + na := ip1.Mask(mask) + n := net.IPNet{IP: na, Mask: mask} + // If we hit a size that has ip2, we are done return it and leave + if n.Contains(ip2) { + return &n + } + } + // I dont think this should exactly be possible, but you never know! + return nil +} diff --git a/pkg/blockfilter/blockfilter_test.go b/pkg/blockfilter/blockfilter_test.go new file mode 100644 index 0000000..9f2c56a --- /dev/null +++ b/pkg/blockfilter/blockfilter_test.go @@ -0,0 +1,96 @@ +// Tests for blockfilter +package blockfilter_test + +import ( + "net" + "testing" + + "github.com/Edgio/xtcp/pkg/blockfilter" +) + +func TestLoadGroupNetworks(t *testing.T) { + var tests = []struct { + file string + group string + expected *blockfilter.NetBlocks + }{ + {"badjson", "aaa", nil}, + {"testdata/test.json", "aab", nil}, + } + + for _, test := range tests { + output := blockfilter.LoadGroupNetworks(test.file, test.group) + if output != test.expected { + t.Error("Test Failed: input {} {}, expected {}, recieved {}", test.file, test.group, test.expected, output) + } + } +} + +func TestIsFilterEmpty(t *testing.T) { + // Test to see how it behaves if we give it a nil lookup blockl + var tests = []struct { + ip net.IP + expected bool + }{ + {net.IP{127, 0, 0, 1}, false}, // localhost + {net.IP{128, 0, 0, 1}, false}, // Random block + } + for _, test := range tests { + output := blockfilter.IsFilter(test.ip, nil) + if output != test.expected { + t.Error("Test Failed: input {}, expected {}, recieved {}", test.ip, test.expected, output) + } + } +} + +func TestIsFilter(t *testing.T) { + // Load up the sample json for comoparison + test_blocks := blockfilter.LoadGroupNetworks("testdata/test.json", "aaa") + + var tests = []struct { + ip net.IP + expected bool + }{ + {net.IP{127, 0, 0, 1}, false}, // localhost + {net.IP{128, 0, 0, 1}, false}, // Random block + {net.IP{192, 229, 217, 10}, true}, // Included v4 + {net.IP{192, 229, 218, 10}, false}, // off-by-1 v4 + {net.IP{10, 30, 31, 11}, false}, // Another v4, this will be excluded because 1918 + {net.IP{0x26, 0x06, 0x28, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}, true}, //Included v6 + {net.IP{0x26, 0x06, 0x29, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}, false}, // Off by 1 v6 + } + + for _, test := range tests { + output := blockfilter.IsFilter(test.ip, test_blocks) + if output != test.expected { + t.Error("Test Failed: input {}, expected {}, recieved {}", test.ip, test.expected, output) + } + } + +} + +func TestRangeToCIDR(t *testing.T) { + var tests = []struct { + start net.IP + end net.IP + expected string + }{ + {net.IP{192, 0, 0, 1}, net.IP{192, 0, 0, 1}, "192.0.0.1/32"}, + {net.IP{192, 0, 0, 1}, net.IP{192, 0, 0, 2}, "192.0.0.0/30"}, + {net.IP{192, 0, 0, 1}, net.IP{192, 0, 1, 0}, "192.0.0.0/23"}, + {net.IP{0x26, 0x06, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}, + net.IP{0x26, 0x06, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}, + "2606::1/128"}, + {net.IP{0x26, 0x06, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}, + net.IP{0x26, 0x06, 0x00, 0x00, 0x00, 0x00, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}, + "2606::/48"}, + } + + for _, test := range tests { + output := blockfilter.RangeToCIDR(test.start, test.end) + if output.String() != test.expected { + t.Error("Test Failed: input {} {}, expected {}, recieved {}", test.start, test.end, test.expected, output) + } + } + +} diff --git a/pkg/blockfilter/testdata/test.json b/pkg/blockfilter/testdata/test.json new file mode 100644 index 0000000..752b487 --- /dev/null +++ b/pkg/blockfilter/testdata/test.json @@ -0,0 +1,24 @@ +{ + "aaa" : { + "V6" : [ + { + "EndIp" : "2606:2800:0000:ffff:ffff:ffff:ffff:ffff", + "StartIp" : "2606:2800:0000:0000:0000:0000:0000:0000" + } + ], + "V4" : [ + { + "EndIp" : "10.30.31.255", + "StartIp" : "10.30.31.0" + }, + { + "EndIp" : "192.229.217.255", + "StartIp" : "192.229.217.0" + }, + { + "EndIp" : "10.0.2.255", + "StartIp" : "10.0.2.0" + } + ] + } +} diff --git a/pkg/cliflags/cliflags.go b/pkg/cliflags/cliflags.go index 868163b..7110203 100644 --- a/pkg/cliflags/cliflags.go +++ b/pkg/cliflags/cliflags.go @@ -9,50 +9,54 @@ import "time" // CliFlags struct to make it easier to pass all the cli flags type CliFlags struct { - No4 *bool - No6 *bool - Timeout *int64 - PollingFrequency *time.Duration - PollingSafetyBuffer *float64 - MaxLoops *int - ShutdownWorkers *bool - Netlinkers4 *int - Netlinkers6 *int - Inetdiagers4 *int - Inetdiagers6 *int - Single *bool - NlmsgSeq *int - PacketSize *int - PacketSizeMply *int - NetlinkerChSize *int - SamplingModulus *int - InetdiagerReportModulus *int - InetdiagerStatsRatio *float64 - GoMaxProcs *int - UDPSendDest *string - PromListen *string - PromPath *string - PromPollerChSize *int - PromNetlinkerChSize *int - PromInetdiagerChSize *int - StatsdDst *string - NoStatsd *bool - HappyPollerReportModulus *int - HappyIstaterReportModulus *int - NoDisabler *bool - DisablerFrequency *time.Duration - DisablerCommand *string - DisablerArgument1 *string - DisablerArgument2 *string - XTCPStaterFrequency *time.Duration - XTCPStaterSystemctlPath *string - XTCPStaterPsPath *string - NoLLDPer *bool - LLDPOutputhPath *string - NoTrie *bool - TrieCSV4 *string - TrieCSV6 *string - NoLoopback *bool - IPPath *string - NSQ *string + No4 *bool + No6 *bool + Timeout *int64 + PollingFrequency *time.Duration + PollingSafetyBuffer *float64 + MaxLoops *int + ShutdownWorkers *bool + Netlinkers4 *int + Netlinkers6 *int + Inetdiagers4 *int + Inetdiagers6 *int + Single *bool + NlmsgSeq *int + PacketSize *int + PacketSizeMply *int + NetlinkerChSize *int + SamplingModulus *int + InetdiagerReportModulus *int + InetdiagerFilterReportModulus *int + InetdiagerStatsRatio *float64 + GoMaxProcs *int + UDPSendDest *string + PromListen *string + PromPath *string + PromPollerChSize *int + PromNetlinkerChSize *int + PromInetdiagerChSize *int + StatsdDst *string + NoStatsd *bool + HappyPollerReportModulus *int + HappyIstaterReportModulus *int + NoDisabler *bool + DisablerFrequency *time.Duration + DisablerCommand *string + DisablerArgument1 *string + DisablerArgument2 *string + XTCPStaterFrequency *time.Duration + XTCPStaterSystemctlPath *string + XTCPStaterPsPath *string + NoLLDPer *bool + LLDPOutputhPath *string + NoTrie *bool + TrieCSV4 *string + TrieCSV6 *string + IncludeLoopback *bool + IPPath *string + NSQ *string + FilterJson *string + FilterGroup *string + EnableFilter *bool } diff --git a/pkg/inetdiager/inetdiager.go b/pkg/inetdiager/inetdiager.go index 45ca1b5..4b64ee1 100644 --- a/pkg/inetdiager/inetdiager.go +++ b/pkg/inetdiager/inetdiager.go @@ -13,6 +13,7 @@ import ( "syscall" "time" + "github.com/Edgio/xtcp/pkg/blockfilter" "github.com/Edgio/xtcp/pkg/cliflags" "github.com/Edgio/xtcp/pkg/inetdiag" "github.com/Edgio/xtcp/pkg/inetdiagerstater" @@ -465,49 +466,8 @@ func processNetlinkAttributes(id int, af *uint8, inetdiagMsgReader *bytes.Reader fmt.Println("inetdiager:", id, "\taf:", *af, "\tINET_DIAG_SHUTDOWN\t*shutdownState:", *shutdownState) } break - //--- NOT INET_DIAG_DCINFO - no body uses this UDP protocol - case 9: - inetdiagMsgComplete, attributesBytesRead = notDecodingThisAttributeTypeYet() - if debugLevel > 10 { - fmt.Println("inetdiager:", id, "\taf:", *af, "\tINET_DIAG_DCINFO", "\tERROR!! TODO Fix me") - } - break - //INET_DIAG_PROTOCOL - case 10: - inetdiagMsgComplete, attributesBytesRead = notDecodingThisAttributeTypeYet() - if debugLevel > 10 { - fmt.Println("inetdiager:", id, "\taf:", *af, "\tINET_DIAG_PROTOCOL", "\tERROR!! TODO Fix me") - } - break - //INET_DIAG_SKV6ONLY + //INET_DIAG_SKV6ONLY, case 11 // TODO per the comment in INET_DIAG_TCLASS above, need to handle this case for IPv6 LISTEN and CLOSE sockets - case 11: - inetdiagMsgComplete, attributesBytesRead = notDecodingThisAttributeTypeYet() - if debugLevel > 10 { - fmt.Println("inetdiager:", id, "\taf:", *af, "\tINET_DIAG_SKV6ONLY", "\tERROR!! TODO Fix me") - } - break - //INET_DIAG_LOCALS - case 12: - inetdiagMsgComplete, attributesBytesRead = notDecodingThisAttributeTypeYet() - if debugLevel > 10 { - fmt.Println("inetdiager:", id, "\taf:", *af, "\tINET_DIAG_LOCALS", "\tERROR!! TODO Fix me") - } - break - //INET_DIAG_PEERS - case 13: - inetdiagMsgComplete, attributesBytesRead = notDecodingThisAttributeTypeYet() - if debugLevel > 10 { - fmt.Println("inetdiager:", id, "\taf:", *af, "\tINET_DIAG_PEERS", "\tERROR!! TODO Fix me") - } - break - //INET_DIAG_PAD - case 14: - inetdiagMsgComplete, attributesBytesRead = notDecodingThisAttributeTypeYet() - if debugLevel > 10 { - fmt.Println("inetdiager:", id, "\taf:", *af, "\tINET_DIAG_PAD", "\tERROR!! TODO Fix me") - } - break //INET_DIAG_MARK case 15: inetdiagMsgComplete, attributesBytesRead = binaryReadWithErrorHandling(id, "INET_DIAG_MARK", inetdiagMsgReader, mark, netlinkAttributeDataLength, af) @@ -527,17 +487,11 @@ func processNetlinkAttributes(id int, af *uint8, inetdiagMsgReader *bytes.Reader if debugLevel > 100 { fmt.Println("inetdiager:", id, "\taf:", *af, "\tINET_DIAG_TOS\t*typeOfService:", *typeOfService) } - break - case 18: - inetdiagMsgComplete, attributesBytesRead = notDecodingThisAttributeTypeYet() - if debugLevel > 10 { - fmt.Println("inetdiager:", id, "\taf:", *af, "\tINET_DIAG_LOCALS", "\tERROR!! TODO Fix me") - } - break + break default: inetdiagMsgComplete, attributesBytesRead = notDecodingThisAttributeTypeYet() - if debugLevel > 10 { - fmt.Println("inetdiager:", id, "\taf:", *af, "\tnlattr.NlaType default??", nlattr.NlaType, "\tERROR!! TODO Fix me") + if debugLevel > 100 { + fmt.Println("inetdiager:", id, "\taf:", *af, "\tUnparsed nlattr.NlaType default??", nlattr.NlaType) } break } @@ -619,7 +573,7 @@ func sendToNSQ(topic string, message []byte, nsqServer string) error { // Inetdiager is the worker which recieves the Inetdiag messages from the netlinker // This functino does the heavy lifting in terms of parsing the inetdiag messages // currently we don't need the netlinkerDone channel, but we will once this function passes downstream -func Inetdiager(id int, af *uint8, in <-chan netlinker.TimeSpecandInetDiagMessage, wg *sync.WaitGroup, hostname string, cliFlags cliflags.CliFlags, inetdiagerStaterCh chan<- inetdiagerstater.InetdiagerStatsWrapper) { +func Inetdiager(id int, af *uint8, in <-chan netlinker.TimeSpecandInetDiagMessage, wg *sync.WaitGroup, hostname string, cliFlags cliflags.CliFlags, inetdiagerStaterCh chan<- inetdiagerstater.InetdiagerStatsWrapper, filterBlocks *blockfilter.NetBlocks) { //defer close(out) defer wg.Done() @@ -628,6 +582,7 @@ func Inetdiager(id int, af *uint8, in <-chan netlinker.TimeSpecandInetDiagMessag //var nlattr inetdiag.Nlattr var inetdiagMsgCount int + var inetdiagMsgFilterCount int var inetdiagMsgInSize int var inetdiagMsgBytesRemaining int var inetdiagMsgBytesRead int @@ -703,6 +658,7 @@ func Inetdiager(id int, af *uint8, in <-chan netlinker.TimeSpecandInetDiagMessag Stats: inetdiagerstater.InetdiagerStats{ InetdiagMsgInSizeTotal: inetdiagMsgInSizeTotal, InetdiagMsgCount: inetdiagMsgCount, + InetdiagMsgFilterCount: inetdiagMsgFilterCount, InetdiagMsgBytesReadTotal: inetdiagMsgBytesReadTotal, PadBufferTotal: padBufferTotal, UDPWritesTotal: udpWritesTotal, @@ -790,12 +746,21 @@ func Inetdiager(id int, af *uint8, in <-chan netlinker.TimeSpecandInetDiagMessag inetdiagMsgBytesReadTotal += bytesRead padBufferTotal += padBufferSize + // Check to see if it was coming from the loopback, if so, just skip + if !(*cliFlags.IncludeLoopback) && sourceIP.IsLoopback() { + continue + } + + // Check it against the filter, to see if we should sample it at the regular modulus or the filter modulus + filterMatch := *cliFlags.EnableFilter && blockfilter.IsFilter(destinationIP, filterBlocks) + // cli reporting frequency based on constant, as a variable to be able to pass to buildProto if debugLevel > 100 { fmt.Println("inetdiager:", id, "\taf:", *af, "\tinetdiagMsgCount:", inetdiagMsgCount, "\t*cliFlags.inetdiagerReportModulus:", *cliFlags.InetdiagerReportModulus, "\tmodulus:", inetdiagMsgCount%(*cliFlags.InetdiagerReportModulus)) } - if *cliFlags.InetdiagerReportModulus == 1 || inetdiagMsgCount%*cliFlags.InetdiagerReportModulus == 1 { + // Depending on the filter state, check against the appropriate report modulus + if (filterMatch && inetdiagMsgFilterCount%*cliFlags.InetdiagerFilterReportModulus == 0) || (!filterMatch && inetdiagMsgCount%*cliFlags.InetdiagerReportModulus == 0) { if debugLevel > 100 { fmt.Println("inetdiager:", id, "\taf:", *af, "\tinetdiagMsgCount:", inetdiagMsgCount, "\tinetdiagMsgBytesReadTotal(M):", inetdiagMsgBytesReadTotal/10^6) @@ -845,7 +810,13 @@ func Inetdiager(id int, af *uint8, in <-chan netlinker.TimeSpecandInetDiagMessag fmt.Println(XtcpRecordJSON) } } - inetdiagMsgCount++ + // Make sure we advance the right counter + if filterMatch { + inetdiagMsgFilterCount++ + } else { + inetdiagMsgCount++ + } + } //for inetdiagMsgComplete := false; !inetdiagMsgComplete && inetdiagMsgBytesRemaining > 0; { } diff --git a/pkg/inetdiagerstater/inetdiagerstater.go b/pkg/inetdiagerstater/inetdiagerstater.go index 16c8504..b3bc041 100644 --- a/pkg/inetdiagerstater/inetdiagerstater.go +++ b/pkg/inetdiagerstater/inetdiagerstater.go @@ -37,6 +37,7 @@ type InetdiagerStatsWrapper struct { type InetdiagerStats struct { InetdiagMsgInSizeTotal int InetdiagMsgCount int + InetdiagMsgFilterCount int InetdiagMsgBytesReadTotal int PadBufferTotal int UDPWritesTotal int @@ -68,6 +69,16 @@ func InetdiagerStater(in <-chan InetdiagerStatsWrapper, cliFlags cliflags.CliFla ) inetdiagerReportModulus.Set(float64(*cliFlags.InetdiagerReportModulus)) + inetdiagerFilterReportModulus := promauto.NewGauge( + prometheus.GaugeOpts{ + Namespace: "xtcp", + Subsystem: "inetdiager", + Name: "filter_report_modulus", + Help: "inetdiager FilterReportModulus will sample every X inetdiag messages which match the block filters to send to Kafka.", + }, + ) + inetdiagerFilterReportModulus.Set(float64(*cliFlags.InetdiagerFilterReportModulus)) + inetdiagerStatsRatio := promauto.NewGauge( prometheus.GaugeOpts{ Namespace: "xtcp", @@ -98,6 +109,15 @@ func InetdiagerStater(in <-chan InetdiagerStatsWrapper, cliFlags cliflags.CliFla }, []string{"af", "id"}, ) + inetdiagerMsgFilters := promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "xtcp", + Subsystem: "inetdiager", + Name: "msgsFilter", + Help: "inetdiager messages read from the netlinker channel, by address family, by worker id that matched the filter", + }, + []string{"af", "id"}, + ) inetdiagerRead := promauto.NewCounterVec( prometheus.CounterOpts{ Namespace: "xtcp", @@ -275,6 +295,7 @@ func InetdiagerStater(in <-chan InetdiagerStatsWrapper, cliFlags cliflags.CliFla // Calculate differences diffStats.InetdiagMsgInSizeTotal = inetdiagerStatsWrapper.Stats.InetdiagMsgInSizeTotal - oldStats.InetdiagMsgInSizeTotal diffStats.InetdiagMsgCount = inetdiagerStatsWrapper.Stats.InetdiagMsgCount - oldStats.InetdiagMsgCount + diffStats.InetdiagMsgFilterCount = inetdiagerStatsWrapper.Stats.InetdiagMsgFilterCount - oldStats.InetdiagMsgFilterCount diffStats.InetdiagMsgBytesReadTotal = inetdiagerStatsWrapper.Stats.InetdiagMsgBytesReadTotal - oldStats.InetdiagMsgBytesReadTotal diffStats.PadBufferTotal = inetdiagerStatsWrapper.Stats.PadBufferTotal - oldStats.PadBufferTotal diffStats.UDPWritesTotal = inetdiagerStatsWrapper.Stats.UDPWritesTotal - oldStats.UDPWritesTotal @@ -290,6 +311,7 @@ func InetdiagerStater(in <-chan InetdiagerStatsWrapper, cliFlags cliflags.CliFla inetdiagerIn.WithLabelValues(kernelEnumToString[inetdiagerStatsWrapper.Af], strconv.FormatInt(int64(inetdiagerStatsWrapper.ID), 10)).Add(float64(diffStats.InetdiagMsgInSizeTotal)) inetdiagerMsgs.WithLabelValues(kernelEnumToString[inetdiagerStatsWrapper.Af], strconv.FormatInt(int64(inetdiagerStatsWrapper.ID), 10)).Add(float64(diffStats.InetdiagMsgCount)) + inetdiagerMsgFilters.WithLabelValues(kernelEnumToString[inetdiagerStatsWrapper.Af], strconv.FormatInt(int64(inetdiagerStatsWrapper.ID), 10)).Add(float64(diffStats.InetdiagMsgFilterCount)) inetdiagerRead.WithLabelValues(kernelEnumToString[inetdiagerStatsWrapper.Af], strconv.FormatInt(int64(inetdiagerStatsWrapper.ID), 10)).Add(float64(diffStats.InetdiagMsgBytesReadTotal)) inetdiagerPad.WithLabelValues(kernelEnumToString[inetdiagerStatsWrapper.Af], strconv.FormatInt(int64(inetdiagerStatsWrapper.ID), 10)).Add(float64(diffStats.PadBufferTotal)) inetdiagerUDPs.WithLabelValues(kernelEnumToString[inetdiagerStatsWrapper.Af], strconv.FormatInt(int64(inetdiagerStatsWrapper.ID), 10)).Add(float64(diffStats.UDPWritesTotal)) @@ -297,9 +319,11 @@ func InetdiagerStater(in <-chan InetdiagerStatsWrapper, cliFlags cliflags.CliFla inetdiagerUDPErrors.WithLabelValues(kernelEnumToString[inetdiagerStatsWrapper.Af], strconv.FormatInt(int64(inetdiagerStatsWrapper.ID), 10)).Add(float64(diffStats.UDPErrorsTotal)) inetdiagerStatsBlocked.WithLabelValues(kernelEnumToString[inetdiagerStatsWrapper.Af], strconv.FormatInt(int64(inetdiagerStatsWrapper.ID), 10)).Add(float64(diffStats.StatsBlocked)) + // Sum all tyeps of messages for the total inetdiagerMsgsTotal.WithLabelValues(kernelEnumToString[inetdiagerStatsWrapper.Af]).Add(float64(diffStats.InetdiagMsgCount)) + inetdiagerMsgsTotal.WithLabelValues(kernelEnumToString[inetdiagerStatsWrapper.Af]).Add(float64(diffStats.InetdiagMsgFilterCount)) inetdiagerUDPsTotal.WithLabelValues(kernelEnumToString[inetdiagerStatsWrapper.Af]).Add(float64(diffStats.UDPWritesTotal)) - totalInetdiagerMsgs += diffStats.InetdiagMsgCount + totalInetdiagerMsgs += (diffStats.InetdiagMsgCount + diffStats.InetdiagMsgFilterCount) totalInetdiagerUDPs += diffStats.UDPWritesTotal // This modulus is a little tricky, cos it's looking up the number of inetdiagers by address family diff --git a/pkg/poller/poller.go b/pkg/poller/poller.go index 0211626..344980f 100644 --- a/pkg/poller/poller.go +++ b/pkg/poller/poller.go @@ -10,6 +10,7 @@ import ( "syscall" "time" + "github.com/Edgio/xtcp/pkg/blockfilter" "github.com/Edgio/xtcp/pkg/cliflags" "github.com/Edgio/xtcp/pkg/inetdiager" "github.com/Edgio/xtcp/pkg/inetdiagerstater" @@ -110,6 +111,14 @@ func Poller(af uint8, hostname *string, cliFlags cliflags.CliFlags, wg *sync.Wai // Prometheus variables var currentPollerStats pollerstater.PollerStats + // Let's get our filter blocks ready, so we can have the inetdiager check them in the loop + var localFilter *blockfilter.NetBlocks + if *cliFlags.EnableFilter { + localFilter = blockfilter.LoadGroupNetworks(*cliFlags.FilterJson, *cliFlags.FilterGroup) + } else { + localFilter = nil + } + // Initialize sockets and netlink request binary blobs // Build the binary blobs of the netlink inet diag dump requests, one for each address family @@ -150,7 +159,7 @@ func Poller(af uint8, hostname *string, cliFlags cliflags.CliFlags, wg *sync.Wai // startup the workers in reverse pipeline order for inetdiagerID := 0; inetdiagerID < *afToInetdiagers[af]; inetdiagerID++ { inetdiagerWG.Add(1) - go inetdiager.Inetdiager(inetdiagerID, &af, netlinkerCh, &inetdiagerWG, *hostname, cliFlags, inetdiagerStaterCh) + go inetdiager.Inetdiager(inetdiagerID, &af, netlinkerCh, &inetdiagerWG, *hostname, cliFlags, inetdiagerStaterCh, localFilter) if debugLevel > 100 { fmt.Println("poller af:", misc.KernelEnumToString[af], "\tinetdiagerID started:", inetdiagerID) }