Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add blockfilter that allows sampling at a different rate #2

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ test:
chmod 755 ./pkg/disabler/testdata/return_one_after_X_runs.bash

go test -v ./pkg/inetdiager/
go test -v ./pkg/blockfilter/
go test -v ./pkg/xtcpnl/
go test -v ./pkg/disabler/
go test -v ./pkg/xtcpstater/
Expand All @@ -39,6 +40,7 @@ bench:
#go test -bench=. -run Trim

go test -v ./pkg/inetdiager/ -bench=.
go test -v ./pkg/blockfilter/ -bench=.
go test -v ./pkg/xtcpnl/ -bench=.
go test -v ./pkg/disabler/ -bench=.
go test -v ./pkg/xtcpstater/ -bench=.
Expand Down
42 changes: 41 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ There are three (x3) main message sampling/throttling points within `xtcp`:
- Intra PoP traffic
- Origin servers



The following diagram shows the sampling controls:

<img src="./docs/diagrams/xtcp_sampling.png" alt="xtcp_sampling diagram" width="75%" height="75%"/>
Expand All @@ -212,6 +214,44 @@ e.g. To select all sockets
xtcp --frequency 10ms -inetdiagerReportModulus 1 -samplingModulus 1
```

## filterBlocks and filterJson

To enable more controlled reporting, a filter can be specified. Along with a separately specificed
filter report modulus, this allows for traffic matching a particular filter to be reported at
a different rate, for example reporting fewer sockets for internal vs external communication. The
filter itself is specified providing a json file of IP ranges to be included in the filter of the
following format:

<group name>:
"V4" : [
{
"StartIp": <block starting address>,
"EndIP": <block ending address>
},
...
}
"V6" : [
{
"StartIp": <block starting address>,
"EndIP": <block ending address>
},
...
}


Note that multiple blocks of each address family can be specified. When enabling the filter, the
group name is specified by CLI flags. The full set of filtering flags are the following:

- eanbleFilter - enables the use of filtering.
- filterJson - Specifies the location of the filter json of the above format.
- filterGroup - Specifies the group name that will match the filter.
- inetdiagerFilterReportModulus - Specifies the filter modulus to be used for sockets which match the group.
- includeLoopback - Specifies whether of not loopback sockets should be included.





Outstanding security controls:
- NOT chrooted

Expand Down Expand Up @@ -247,4 +287,4 @@ Matthew Wodrich | For helping with early work on the "ss"" parsing, bef
Reed Morrison | Protobuf help and general golang structure advice
Corey Kasten | Protobuf help and general golang structure advice
Michael Ballard | Golang and bash fun
Marcel Flores | Data insights, sampling improvements, applicability of xtcp data
Marcel Flores | Data insights, sampling improvements, applicability of xtcp data
75 changes: 64 additions & 11 deletions bundle/scripts/xtcp_wrapper.bash
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
#===============================================================================
#
# This wrapper is providing the ability to:
# 1. Inspect xconfig xtcp_disable to NOT start xtcp, instead just exiting cleanly, so systemd doesn't consider xtcp failed
# 2. Starts xtcp with CLI arguments based on xconfig xtcp_command
# 1. Inspect env var xtcp_disable to NOT start xtcp, instead just exiting cleanly, so systemd doesn't consider xtcp failed
# 2. Starts xtcp with CLI arguments based on env var xtcp_command

#------------------------------------------
# Function to send to standard output and syslog
Expand All @@ -27,7 +27,7 @@ echo_and_syslog () {

#------------------------------------------
# Quickly check if xtcp is running
XTCP_PID=$(/usr/bin/pgrep --full /home/vagrant/xtcp-opensource/bin/xtcp);
XTCP_PID="$(/usr/bin/pgrep --full /home/vagrant/xtcp-opensource/bin/xtcp)";
PGREP_STATUS=$?;
echo_and_syslog "$0 line:$LINENO PGREP_STATUS: $PGREP_STATUS, XTCP_PID: $XTCP_PID";

Expand All @@ -54,8 +54,8 @@ echo_and_syslog "$0 line:$LINENO PGREP_STATUS: $PGREP_STATUS, XTCP_PID: $XTCP_PI
# PGREP_STATUS==

#------------------------------------------
# Read xconfig to see if xtcp should be disabled
XTCP_DISABLED=$(/bin/echo $XTCP_DISABLED);
# Read env to see if xtcp should be disabled
XTCP_DISABLED="$(/bin/echo $XTCP_DISABLED)";
echo_and_syslog "$0 line:$LINENO xtcp disabled: $XTCP_DISABLED";

#------------------------------------------
Expand Down Expand Up @@ -88,15 +88,15 @@ else
fi

#-----------------------------------------------------------------------------------------------------------------
# Read in xconfigs with some sanity checking
# Read in env vars with some sanity checking

#------------------------------------------
# Read and sanity check xtcp_frequency
#
# Please note we could obviously do a lot better job of checking frequency
# But this is just a few checks to make sure it's vaguely safe to use
#
XTCP_FREQUENCY=$(/bin/echo $XTCP_FREQUENCY);
XTCP_FREQUENCY="$(/bin/echo $XTCP_FREQUENCY)";
echo_and_syslog "$0 line:$LINENO XTCP_FREQUENCY: $XTCP_FREQUENCY";

if [[ $XTCP_FREQUENCY == "default" ]]; then
Expand Down Expand Up @@ -131,7 +131,7 @@ if [[ ! "$XTCP_FREQUENCY_EVERYTHING_BUT_LAST_CHAR" =~ ^[0-9]+$ ]]; then
echo_and_syslog "$0 line:$LINENO XTCP_FREQUENCY must be numeric" "local0.error";
exit 1;
fi
XTCP_FREQUENCY_EVERYTHING_BUT_LAST_CHAR_NUM=$((XTCP_FREQUENCY_EVERYTHING_BUT_LAST_CHAR + 0))
XTCP_FREQUENCY_EVERYTHING_BUT_LAST_CHAR_NUM="$((XTCP_FREQUENCY_EVERYTHING_BUT_LAST_CHAR + 0))"
#--------------
# Numbers must be < 86400
if [[ $XTCP_FREQUENCY_EVERYTHING_BUT_LAST_CHAR_NUM -gt 86400 ]]; then
Expand All @@ -156,7 +156,7 @@ fi
#------------------------------------------
# Read and sanity check xtcp_sampling_modulus
#
XTCP_SAMPLING_MODULUS=$(/bin/echo $XTCP_SAMPLING_MODULUS);
XTCP_SAMPLING_MODULUS="$(/bin/echo $XTCP_SAMPLING_MODULUS)";
echo_and_syslog "$0 line:$LINENO XTCP_SAMPLING_MODULUS: $XTCP_SAMPLING_MODULUS";

if [[ $XTCP_SAMPLING_MODULUS == "default" ]]; then
Expand Down Expand Up @@ -214,6 +214,44 @@ if [ $XTCP_REPORT_MODULUS -lt 1 ]; then
exit 1;
fi

#------------------------------------------
# Read and sanity check xtcp_filter_report_modulus
#
XTCP_FILTER_REPORT_MODULUS="$(/bin/echo $XTCP_REPORT_MODULUS)";
echo_and_syslog "$0 line:$LINENO XTCP_FILTER_REPORT_MODULUS: $XTCP_FILTER_REPORT_MODULUS";

if [[ $XTCP_FILTER_REPORT_MODULUS == "default" ]]; then
XTCP_FILTER_REPORT_MODULUS=2000;
echo_and_syslog "$0 line:$LINENO Using default XTCP_FILTER_REPORT_MODULUS: $XTCP_FILTER_REPORT_MODULUS";
fi

#--------------
# Check modulus is only numeric
if [ $XTCP_FILTER_REPORT_MODULUS -ne $XTCP_FILTER_REPORT_MODULUS ]; then
echo_and_syslog "$0 line:$LINENO XTCP_FILTER_REPORT_MODULUS must be numeric:$XTCP_FILTER_REPORT_MODULUS " "local0.error";
exit 1;
fi
#--------------
# The filter modulus may be quite high, so we skip the max value check.
# Must be greater than zero >0
if [ $XTCP_FILTER_REPORT_MODULUS -lt 1 ]; then
echo_and_syslog "$0 line:$LINENO XTCP_FILTER_REPORT_MODULUS must >= 1:$XTCP_FILTER_REPORT_MODULUS" "local0.error";
exit 1;
fi

#------------------------------------------
# Load the list of pop-local IPs from pops.json
XTCP_FILTER_JSON="$(/bin/echo $XTCP_FILTER_JSON)";

#------------------------------------------
# Read in the pop name
XTCP_FILTER_GROUP="$(/bin/echo $XTCP_FILTER_GROUP)";

#------------------------------------------
# Read env var to see if fitlering is enabled
XTCP_ENABLE_FILTER="$(/bin/echo $XTCP_ENABLE_FILTER)";
echo_and_syslog "$0 line:$LINENO xtcp filter enabled: $XTCP_ENABLE_FILTER";

#NSQ
XTCP_NSQ=$(/bin/echo $XTCP_NSQ);
echo_and_syslog "$0 line:$LINENO xtcp nsq: $XTCP_NSQ";
Expand All @@ -236,9 +274,24 @@ EXEC_COMMAND_ARRAY[4]="$XTCP_SAMPLING_MODULUS";
EXEC_COMMAND_ARRAY[5]="-inetdiagerReportModulus";
EXEC_COMMAND_ARRAY[6]="$XTCP_REPORT_MODULUS";

# Counter to keep track of optional indices
ind=7

if [[ $XTCP_ENABLE_FILTER != "" ]]; then
EXEC_COMMAND_ARRAY[7]="-enableFilter";
EXEC_COMMAND_ARRAY[8]="-inetdiagerFilterReportModulus";
EXEC_COMMAND_ARRAY[9]="$XTCP_FILTER_REPORT_MODULUS";
EXEC_COMMAND_ARRAY[10]="-filterJson";
EXEC_COMMAND_ARRAY[11]="$XTCP_FILTER_JSON";
EXEC_COMMAND_ARRAY[12]="-filterGroup";
EXEC_COMMAND_ARRAY[13]="$XTCP_FILTER_GROUP";
ind=14
fi


if [[ $XTCP_NSQ != "" ]]; then
EXEC_COMMAND_ARRAY[7]="-nsq";
EXEC_COMMAND_ARRAY[8]="$XTCP_NSQ";
EXEC_COMMAND_ARRAY[$ind]="-nsq";
EXEC_COMMAND_ARRAY[$((ind+1))]="$XTCP_NSQ";
fi

# Print out what we have.
Expand Down
19 changes: 19 additions & 0 deletions cmd/xtcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ func main() {
samplingModulus := flag.Int("samplingModulus", 2, "samplingModulus. Netlinker will sample every Xth inetdiag messages to send to inetdiager. Default 2") //TODO make default 1
// CLI standard out reporting modulus. e.g. report every x inetd messages
inetdiagerReportModulus := flag.Int("inetdiagerReportModulus", 2000, "inetdiagerReportModulus. Report every X inetd messages to Kafka. Default 2000") //TODO make default 1000
inetdiagerFilterReportModulus := flag.Int("inetdiagerFilterReportModulus", 2000, "inetdiagerFilterReportModulus. Report every X inetd messages that matches the filter to Kafka. Default 2000")

inetdiagerStatsRatio := flag.Float64("inetdiagerStatsRatio", 0.9, "inetdiagerStatsRatio controls the how often the inetdiagers send summary stats, which is as a percentage of the pollingFrequencySeconds. Default = 0.9 (90% of pollingFrequencySeconds)")

// UDP send destination
Expand Down Expand Up @@ -129,6 +131,13 @@ func main() {
xTCPStaterFrequency := flag.Duration("xTCPStaterFrequencySeconds", 60*time.Second, "XTCP stater reporting frequency. Default 60 seconds")
xTCPStaterSystemctlPath := flag.String("xTCPStaterSystemctlPath", "/bin/systemctl", "Full system path to systemctl. Default \"/bin/systemctl\"")
xTCPStaterPsPath := flag.String("xTCPStaterPsPath", "/bin/ps", "Full system path to ps. Default \"/bin/ps\"")
// Controls to include or disclude loopbacks socks
includeLoopback := flag.Bool("includeLoopback", false, "Include loopback in collection. Default: false")

// Controls for the pop local block filters
enableFilter := flag.Bool("enableFilter", false, "Subsample sockets that match the filter blocks. Default: false")
filterJson := flag.String("filterJson", "", "Json definition of the filter groups.")
filterGroup := flag.String("filterGroup", "", "Name of filter group used in top level of filterJson.")

version := flag.Bool("version", false, "show version")
defaults := flag.Bool("defaults", false, "show default configuration")
Expand Down Expand Up @@ -165,6 +174,7 @@ func main() {
fmt.Println("*netlinkerChSize:", *netlinkerChSize)
fmt.Println("*samplingModulus:", *samplingModulus)
fmt.Println("*inetdiagerReportModulus:", *inetdiagerReportModulus)
fmt.Println("*inetdiagerFilterReportModulus:", *inetdiagerFilterReportModulus)
fmt.Println("*inetdiagerStatsRatio:", *inetdiagerStatsRatio)
fmt.Println("*udpSendDest:", *udpSendDest)
fmt.Println("*promListen:", *promListen)
Expand All @@ -184,6 +194,10 @@ func main() {
fmt.Println("*xTCPStaterFrequency:", *xTCPStaterFrequency)
fmt.Println("*xTCPStaterSystemctlPath:", *xTCPStaterSystemctlPath)
fmt.Println("*xTCPStaterPsPath:", *xTCPStaterPsPath)
fmt.Println("*includeLoopback:", *includeLoopback)
fmt.Println("*enableFilter:", *enableFilter)
fmt.Println("*filterJson:", *filterJson)
fmt.Println("*filterGroup:", *filterGroup)
fmt.Println("*nsq:", *nsq)
}
os.Exit(0)
Expand Down Expand Up @@ -223,6 +237,7 @@ func main() {
cliFlags.NetlinkerChSize = netlinkerChSize
cliFlags.SamplingModulus = samplingModulus
cliFlags.InetdiagerReportModulus = inetdiagerReportModulus
cliFlags.InetdiagerFilterReportModulus = inetdiagerFilterReportModulus
cliFlags.InetdiagerStatsRatio = inetdiagerStatsRatio
cliFlags.GoMaxProcs = goMaxProcs
cliFlags.UDPSendDest = udpSendDest
Expand All @@ -242,6 +257,10 @@ func main() {
cliFlags.XTCPStaterFrequency = xTCPStaterFrequency
cliFlags.XTCPStaterSystemctlPath = xTCPStaterSystemctlPath
cliFlags.XTCPStaterPsPath = xTCPStaterPsPath
cliFlags.IncludeLoopback = includeLoopback
cliFlags.EnableFilter = enableFilter
cliFlags.FilterJson = filterJson
cliFlags.FilterGroup = filterGroup
cliFlags.NSQ = nsq

// Start background polling job to cleanly exit if the return code of executing 'disablerCommand' is "1"
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ go 1.16

replace github.com/Edgio/xtcp/pkg/misc => ./pkg/misc

replace github.com/Edgio/xtcp/pkg/blockfilter => ./pkg/blockfilter

replace github.com/Edgio/xtcp/pkg/cliflags => ./pkg/cliflags

replace github.com/Edgio/xtcp/pkg/xtcppb => ./pkg/xtcppb
Expand Down
2 changes: 2 additions & 0 deletions go.mod.replace
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ replace github.com/Edgio/xtcp/pkg/netlinker => ./pkg/netlinker

replace github.com/Edgio/xtcp/pkg/inetdiager => ./pkg/inetdiager

replace github.com/Edgio/xtcp/pkg/blockfilter => ./pkg/blockfilter

replace github.com/Edgio/xtcp/pkg/pollerstater => ./pkg/pollerstater

replace github.com/Edgio/xtcp/pkg/inetdiagerstater => ./pkg/inetdiagerstater
Expand Down
Loading