Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for user-specified file with connections to add #372

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cmd/nettop/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ func getVerbosity(args *inArgs) analyzer.Verbosity {
// (or NetworkPolicies to allow only this connectivity)
func detectTopology(args *inArgs) error {
logger := analyzer.NewDefaultLoggerWithVerbosity(getVerbosity(args))
synth := analyzer.NewPoliciesSynthesizer(analyzer.WithLogger(logger), analyzer.WithDNSPort(*args.DNSPort))
opts := []analyzer.PoliciesSynthesizerOption{analyzer.WithLogger(logger), analyzer.WithDNSPort(*args.DNSPort)}
if *args.connsFile != "" {
opts = append(opts, analyzer.WithConnectionsFile(*args.connsFile))
}
synth := analyzer.NewPoliciesSynthesizer(opts...)

var content interface{}
if args.SynthNetpols != nil && *args.SynthNetpols {
Expand Down
2 changes: 1 addition & 1 deletion cmd/nettop/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ var (
true,
[]string{"-v"},
false,
[]string{"sockshop", "expected_netpol_output.json"},
[]string{"sockshop", "expected_netpols", "expected_netpol_output.json"},
},
{
"NetpolsK8sWordpress",
Expand Down
2 changes: 2 additions & 0 deletions cmd/nettop/parse_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type inArgs struct {
OutputFile *string
OutputFormat *string
DNSPort *int
connsFile *string
SynthNetpols *bool
Quiet *bool
Verbose *bool
Expand All @@ -47,6 +48,7 @@ func parseInArgs(cmdlineArgs []string) (*inArgs, error) {
args.OutputFormat = flagset.String("format", jsonFormat, "output format; must be either \"json\" or \"yaml\"")
args.SynthNetpols = flagset.Bool("netpols", false, "whether to synthesize NetworkPolicies to allow only the discovered connections")
args.DNSPort = flagset.Int("dnsport", analyzer.DefaultDNSPort, "DNS port to be used in egress rules of synthesized NetworkPolicies")
args.connsFile = flagset.String("conns", "", "a file specifying connections to enable")
args.Quiet = flagset.Bool("q", false, "runs quietly, reports only severe errors and results")
args.Verbose = flagset.Bool("v", false, "runs with more informative messages printed to log")
err := flagset.Parse(cmdlineArgs)
Expand Down
225 changes: 215 additions & 10 deletions pkg/analyzer/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,33 @@ SPDX-License-Identifier: Apache-2.0
package analyzer

import (
"bufio"
"fmt"
"os"
"strings"

"k8s.io/apimachinery/pkg/util/validation"
)

type connectionExtractor struct {
resources []*Resource
links []*Service
logger Logger
}

// This function is at the core of the topology analysis
// For each resource, it finds other resources that may use it and compiles a list of connections holding these dependencies
func discoverConnections(resources []*Resource, links []*Service, logger Logger) []*Connections {
func (ce *connectionExtractor) discoverConnections() []*Connections {
connections := []*Connections{}
for _, destRes := range resources {
deploymentServices := findServices(destRes, links)
logger.Debugf("services matched to %v: %v", destRes.Resource.Name, deploymentServices)
for _, destRes := range ce.resources {
deploymentServices := ce.findServices(destRes)
ce.logger.Debugf("services matched to %v: %v", destRes.Resource.Name, deploymentServices)
for _, svc := range deploymentServices {
srcRes := findSource(resources, svc)
srcRes := ce.findSource(svc)
if len(srcRes) > 0 {
for _, r := range srcRes {
if !r.equals(destRes) {
logger.Debugf("source: %s target: %s link: %s", r.Resource.Name, destRes.Resource.Name, svc.Resource.Name)
ce.logger.Debugf("source: %s target: %s link: %s", r.Resource.Name, destRes.Resource.Name, svc.Resource.Name)
connections = append(connections, &Connections{Source: r, Target: destRes, Link: svc})
}
}
Expand Down Expand Up @@ -51,9 +62,9 @@ func areSelectorsContained(selectors1 map[string]string, selectors2 []string) bo
}

// findServices returns a list of services that may be in front of a given workload resource
func findServices(resource *Resource, links []*Service) []*Service {
func (ce *connectionExtractor) findServices(resource *Resource) []*Service {
var matchedSvc []*Service
for _, link := range links {
for _, link := range ce.links {
if link.Resource.Namespace != resource.Resource.Namespace {
continue
}
Expand All @@ -68,9 +79,9 @@ func findServices(resource *Resource, links []*Service) []*Service {
}

// findSource returns a list of resources that are likely trying to connect to the given service
func findSource(resources []*Resource, service *Service) []*Resource {
func (ce *connectionExtractor) findSource(service *Service) []*Resource {
tRes := []*Resource{}
for _, resource := range resources {
for _, resource := range ce.resources {
serviceAddresses := getPossibleServiceAddresses(service, resource)
foundSrc := *resource // We copy the resource so we can specify the ports used by the source found
matched := false
Expand Down Expand Up @@ -122,3 +133,197 @@ func envValueMatchesService(envVal string, service *Service, serviceAddresses []
}
return false, SvcNetworkAttr{}
}

const (
srcDstDelim = "->"
endpointsPortDelim = "|"
commentToken = "#"
wildcardToken = "_"
strongWildcardToken = "*"
endpointParts = 3
)

type workloadAndService struct {
resource *Resource
service *Service
}

func (ce *connectionExtractor) connectionsFromFile(filename string) ([]*Connections, error) {
file, err := os.Open(filename)
if err != nil {
return nil, err
}
defer file.Close()

conns := []*Connections{}

scanner := bufio.NewScanner(file)
lineNum := 0
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
lineNum += 1
if line == "" || strings.HasPrefix(line, commentToken) {
continue
}
lineConns, err := ce.parseConnectionLine(line, lineNum)
if err != nil {
return nil, err
}
conns = append(conns, lineConns...)
}

if err := scanner.Err(); err != nil {
return nil, err
}

return conns, nil
}

func (ce *connectionExtractor) parseConnectionLine(line string, lineNum int) ([]*Connections, error) {
// Take only the part before # starts a comment
parts := strings.Split(line, commentToken)
if len(parts) == 0 {
return nil, syntaxError("unexpected comment", lineNum)
}

line = parts[0]

parts = strings.Split(line, srcDstDelim)
if len(parts) != 2 {
return nil, syntaxError("connection line must have exactly one -> separator", lineNum)
}

src := strings.TrimSpace(parts[0])
srcWorkloads, err := ce.parseEndpoints(src, lineNum)
if err != nil {
return nil, err
}

parts = strings.Split(parts[1], endpointsPortDelim)
if len(parts) == 0 {
return nil, syntaxError("missing destination", lineNum)
}
if len(parts) > 2 {
return nil, syntaxError("connection line must have at most one | separator", lineNum)
}
dst := strings.TrimSpace(parts[0])
dstWorkloads, err := ce.parseEndpoints(dst, lineNum)
if err != nil {
return nil, err
}

conns := []*Connections{}
for _, srcWl := range srcWorkloads {
for _, dstWl := range dstWorkloads {
svc := mergeSrcAndDstSvcs(srcWl.service, dstWl.service)
conns = append(conns, &Connections{
Source: srcWl.resource,
Target: dstWl.resource,
Link: svc,
})
}
}
return conns, nil
}

func mergeSrcAndDstSvcs(srcSvc, dstSvc *Service) *Service {
retSvc := *dstSvc
retSvc.Resource.ExposeExternally = srcSvc.Resource.ExposeExternally || dstSvc.Resource.ExposeExternally
retSvc.Resource.ExposeToCluster = srcSvc.Resource.ExposeToCluster || dstSvc.Resource.ExposeToCluster
retSvc.Resource.exposeToNS = dstSvc.Resource.exposeToNS
retSvc.Resource.exposeToNS = append(retSvc.Resource.exposeToNS, srcSvc.Resource.exposeToNS...)
return &retSvc
}

func (ce *connectionExtractor) parseEndpoints(endpoint string, lineNum int) ([]workloadAndService, error) {
parts := strings.Split(endpoint, "/")
if len(parts) != endpointParts {
return nil, syntaxError("source and destination must be of the form namespace/kind/name", lineNum)
}
ns, kind, name := parts[0], parts[1], parts[2]
kind = strings.ToUpper(kind[:1]) + kind[1:] // Capitalize kind's first letter

if ns == strongWildcardToken || kind == strongWildcardToken || name == strongWildcardToken {
return ce.parseEndpointWithStrongWildcard(ns, kind, name)
}

var res []workloadAndService
switch kind {
case service:
res = ce.getMatchingServices(ns, name)
case wildcardToken:
res = append(ce.getMatchingServices(ns, name), ce.getMatchingWorkloads(ns, kind, name)...)
default:
res = ce.getMatchingWorkloads(ns, kind, name)
}
if len(res) == 0 {
return nil, fmt.Errorf("no matching endpoints for %s in the provided manifests", endpoint)
}
return res, nil
}

func (ce *connectionExtractor) parseEndpointWithStrongWildcard(ns, kind, name string) ([]workloadAndService, error) {
if kind != strongWildcardToken || name != strongWildcardToken {
return nil, fmt.Errorf("bad endpoint pattern %s/%s/%s. Patterns with '*' should either equal '*/*/*' "+
"or have the form '<namespace>/*/*'", ns, kind, name)
}

svc := Service{}
if ns != strongWildcardToken {
if len(validation.IsDNS1123Subdomain(ns)) != 0 {
return nil, fmt.Errorf("%s is not a proper namespace name", ns)
}
svc.Resource.exposeToNS = append(svc.Resource.exposeToNS, ns)
} else {
svc.Resource.ExposeToCluster = true
}
wlSvc := workloadAndService{resource: nil, service: &svc}

return []workloadAndService{wlSvc}, nil
}

func (ce *connectionExtractor) getMatchingServices(ns, name string) []workloadAndService {
services := []workloadAndService{}
for _, svc := range ce.links {
if strMatch(svc.Resource.Namespace, ns) && strMatch(svc.Resource.Name, name) {
services = append(services, ce.workloadsOfSvc(svc)...)
}
}
return services
}

func (ce *connectionExtractor) workloadsOfSvc(svc *Service) []workloadAndService {
svcWorkloads := []workloadAndService{}
for _, workload := range ce.resources {
if workload.Resource.Namespace == svc.Resource.Namespace &&
areSelectorsContained(workload.Resource.Labels, svc.Resource.Selectors) {
svcWorkloads = append(svcWorkloads, workloadAndService{workload, svc})
}
}
return svcWorkloads
}

func (ce *connectionExtractor) getMatchingWorkloads(ns, kind, name string) []workloadAndService {
workloads := []workloadAndService{}
for _, workload := range ce.resources {
if strMatch(workload.Resource.Namespace, ns) && strMatch(workload.Resource.Kind, kind) &&
strMatch(workload.Resource.Name, name) {
services := ce.findServices(workload)
if len(services) == 0 {
ce.logger.Infof("workload %s is not exposed by any service", workload.Resource.Name)
}
for _, svc := range services {
workloads = append(workloads, workloadAndService{workload, svc})
}
}
}
return workloads
}

func strMatch(str, pattern string) bool {
return pattern == wildcardToken || str == pattern
}

func syntaxError(errorStr string, lineNum int) error {
return fmt.Errorf("syntax error in line %d: %s", lineNum, errorStr)
}
28 changes: 23 additions & 5 deletions pkg/analyzer/policies_synthesizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ type WalkFunction func(root string, fn fs.WalkDirFunc) error
// It is possible to get either a slice with all the discovered connections or a slice with K8s NetworkPolicies
// that allow only the discovered connections and nothing more.
type PoliciesSynthesizer struct {
logger Logger
stopOnError bool
walkFn WalkFunction
dnsPort int
logger Logger
stopOnError bool
walkFn WalkFunction
dnsPort int
connectionsFile string

errors []FileProcessingError
}
Expand Down Expand Up @@ -74,6 +75,12 @@ func WithDNSPort(dnsPort int) PoliciesSynthesizerOption {
}
}

func WithConnectionsFile(filename string) PoliciesSynthesizerOption {
return func(p *PoliciesSynthesizer) {
p.connectionsFile = filename
}
}

// NewPoliciesSynthesizer creates a new instance of PoliciesSynthesizer, and applies the provided functional options.
func NewPoliciesSynthesizer(options ...PoliciesSynthesizerOption) *PoliciesSynthesizer {
// object with default behavior options
Expand Down Expand Up @@ -227,7 +234,18 @@ func (ps *PoliciesSynthesizer) extractConnections(resAcc *resourceAccumulator) (
resAcc.exposeServices()

// Discover all connections between resources
connections := discoverConnections(resAcc.workloads, resAcc.services, ps.logger)
ce := connectionExtractor{resAcc.workloads, resAcc.services, ps.logger}
connections := ce.discoverConnections()

// If user specified a file with extra connections, add them too
if ps.connectionsFile != "" {
fileConns, err := ce.connectionsFromFile(ps.connectionsFile)
if err != nil {
fpErr := failedReadingFile(ps.connectionsFile, err)
return nil, nil, appendAndLogNewError(fileErrors, fpErr, ps.logger)
}
connections = append(connections, fileConns...)
}
return resAcc.workloads, connections, fileErrors
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/analyzer/policies_synthesizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,15 @@ func TestPoliciesSynthesizerAPIDnsPort(t *testing.T) {
}
}

func TestPoliciesSynthesizerConnectionsFile(t *testing.T) {
dirPath := filepath.Join(getTestsDir(), "sockshop", "manifests")
connFilePath := filepath.Join(getTestsDir(), "sockshop", "connections.txt")
synthesizer := NewPoliciesSynthesizer(WithConnectionsFile(connFilePath))
netpols, err := synthesizer.PoliciesFromFolderPaths([]string{dirPath})
require.Nil(t, err)
require.Len(t, netpols, 15)
}

func TestPoliciesSynthesizerAPIFatalError(t *testing.T) {
dirPath1 := filepath.Join(getTestsDir(), "k8s_wordpress_example")
dirPath2 := filepath.Join(getTestsDir(), "badPath")
Expand Down
Loading
Loading