diff --git a/Makefile b/Makefile index 758a35c..8e0fc31 100644 --- a/Makefile +++ b/Makefile @@ -38,7 +38,7 @@ swoll: bindata ## Build the swoll binary ## Build the BPF probe and generate embedded asset file bindata: $(GOBINDATA) bpf - $(GOBINDATA) -nometadata -nocompress -pkg assets -tags !nobindata -o internal/pkg/assets/bindata.go ./internal/bpf/probe*.o + $(GOBINDATA) -nometadata -nocompress -pkg assets -tags !nobindata -o pkg/kernel/assets/compiled.go ./internal/bpf/probe*.o all: bpf bindata generate cmd/ internal/bpf swoll ## Build the BPF probe and swoll binary diff --git a/README.md b/README.md index 2b87bc1..26f930c 100644 --- a/README.md +++ b/README.md @@ -237,7 +237,7 @@ func main() { decoded.Ingest(msg) // fetch the arguments associated with the call - args := decoded.Argv.(call.Function).Arguments() + args := decoded.Argv.Arguments() fmt.Printf("comm:%-15s pid:%-8d %s(%s)\n", decoded.Comm, decoded.Pid, decoded.Syscall, args) } @@ -303,7 +303,7 @@ func main() { // attach to the running trace and print out stuff kHub.AttachTrace(trace, func(id string, ev *event.TraceEvent) { - args := ev.Argv.(call.Function).Arguments() + args := ev.Argv.Arguments() fmt.Printf("container=%s pod=%s namespace=%s comm:%-15s pid:%-8d %s(%s)\n", ev.Container.Name, ev.Container.Pod, ev.Container.Namespace, diff --git a/cmd/client.go b/cmd/client.go index 1a2574b..3a3cfd4 100644 --- a/cmd/client.go +++ b/cmd/client.go @@ -14,7 +14,6 @@ import ( "github.com/criticalstack/swoll/api/v1alpha1" "github.com/criticalstack/swoll/pkg/client" - "github.com/criticalstack/swoll/pkg/event/call" color "github.com/fatih/color" uuid "github.com/google/uuid" log "github.com/sirupsen/logrus" @@ -293,7 +292,7 @@ var cmdClientCreate = &cobra.Command{ case ev := <-outChan: switch out { case "cli": - fn := ev.Data.Argv.(call.Function) + fn := ev.Data.Argv args := fn.Arguments() green := color.New(color.FgGreen).SprintFunc() diff --git a/cmd/controller.go b/cmd/controller.go index 9923731..b9d3fd5 100644 --- a/cmd/controller.go +++ b/cmd/controller.go @@ -21,7 +21,7 @@ import ( ) const ( - defaultImageName = "cinderegg:5000/swoll:latest" + defaultImageName = "criticalstack/swoll:latest" ) var ( diff --git a/cmd/loader.go b/cmd/loader.go new file mode 100644 index 0000000..35afbd7 --- /dev/null +++ b/cmd/loader.go @@ -0,0 +1,47 @@ +package cmd + +import ( + "io/ioutil" + "os" + + "github.com/criticalstack/swoll/pkg/kernel/assets" + "github.com/spf13/cobra" +) + +// loadBPFargs will attempt to find the BPF object file via the commandline, +// If the argument is empty (default), we check the local environment, and if +// that fails, we attempt to load the go-bindata generated asset. +func loadBPFargs(cmd *cobra.Command, args []string) ([]byte, error) { + var ( + bpf []byte + err error + ) + + // first check to see if the bpf object was defined at the commandline + bpfFile, err = cmd.Flags().GetString("bpf") + if err != nil { + return nil, err + } + + if bpfFile == "" { + // not found on the command-line, now try environment + bpfFile = os.Getenv("SWOLL_BPFOBJECT") + } + + if bpfFile != "" { + // attempt to read the bpf object file if defined + bpf, err = ioutil.ReadFile(bpfFile) + if err != nil && !os.IsNotExist(err) { + // only error if the error is *NOT* of type "file not found" + return nil, err + } + } + + if len(bpf) == 0 { + // we've tried all sorts of ways to load this file, by default + // it attempts to use the go-bindata generated asset resource. + bpf = assets.LoadBPF() + } + + return bpf, err +} diff --git a/cmd/offsetter.go b/cmd/offsetter.go index a8b08e6..cbde7c1 100644 --- a/cmd/offsetter.go +++ b/cmd/offsetter.go @@ -1,336 +1,72 @@ package cmd import ( - "bufio" - "bytes" - "fmt" - "io" - "os" - "os/exec" - "regexp" "strconv" "strings" "github.com/criticalstack/swoll/pkg/kernel" - "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) -type kernelSyms map[string]string - -// parseKernelSyms reads in a `/proc/kallsyms` formatted symtab and merges the -// info into a map. -func parseKernelSyms(r io.Reader) kernelSyms { - ret := make(map[string]string) - scanner := bufio.NewScanner(r) - - for scanner.Scan() { - tokens := strings.Fields(scanner.Text()) - switch tokens[1] { - case "t", "T": - break - default: - continue - } - - // symname = kernel address - ret[tokens[2]] = tokens[0] - } - - return ret -} - -func parseKallsyms() kernelSyms { - f, err := os.Open("/proc/kallsyms") - if err != nil { - log.Infof("Failed to open /proc/kallsyms: %s", err.Error()) - return nil - } - defer f.Close() - - return parseKernelSyms(f) -} - -func objdumpAddress(addr, corefile string) ([]byte, error) { - r, err := strconv.ParseUint(addr, 16, 64) - if err != nil { - return nil, err - } - - if _, err := os.Stat(corefile); os.IsNotExist(err) { - return nil, errors.Wrapf(err, "could not open %s, set offsets manually", corefile) - } - - if _, err := exec.LookPath("objdump"); err != nil { - return nil, errors.Wrapf(err, "could not find `objdump`: please install binutils") - } - - return exec.Command("objdump", corefile, - "--disassemble", - "--disassembler-options", "intel", - "--start-address", fmt.Sprintf("0x%08x", r), - "--stop-address", fmt.Sprintf("0x%08x", r+(8*32))).Output() -} - -const ( - stateRLock = iota - stateRMov -) - -var movre *regexp.Regexp = regexp.MustCompile(`mov\s+\w+,QWORD PTR \[\w+\+(0x\w+?)\]`) -var leare *regexp.Regexp = regexp.MustCompile(`lea\s+\w+,\[\w+[+-](0x\w+?)\]`) - -// The purpose of this little nugget is to find the offset of the currently -// running kernel's task_struct->nsproxy variable. We use this offset to -// find the current tasks's pid and mnt namespaces. This (currently) only works -// on x86_64 as we are looking for x86 specific operations. -// -// It works by scanning `/proc/kallsyms` for specific functions we are -// interested in, in our case: -// - `_raw_spin_lock` (more on this later) -// - `ipcns_get` -// - `utsns_get` -// - `mntns_get` -// - `cgroupns_get` -// -// With the exception of `_raw_spin_lock`, these kernel functions are small -// helper functions which can be found in all modern linux installs. We target -// these because they: -// a) are very small in size and pose little risk of changing -// b) are very similar in operation -// c) lock the task_struct at a very early stage -// d) assign a local variable the value of `task_struct->nsproxy` -// -// Take the following function as an example: -// static struct ns_common *mntns_get(struct task_struct *task) -// { -// struct ns_common *ns = NULL; -// struct nsproxy *nsproxy; -// task_lock(task); // push %dsi -// // call _raw_spin_lock -// nsproxy = task->nsproxy; // mov reg,dsi+0x... == offset of task->nsproxy -// if (nsproxy != NULL) { // test reg,reg -// -// The resulting assembly will have the following characteristics: -// push %dsi // or wherever arg[0] resides. -// call _raw_spin_lock // task_lock(task) -// mov %r8,QWORD PTR [%dsi+XXXX] // nsproxy = task->nsproxy (XXXX == offset) -// test %r8,%r8 // if (nsproxy != NULL) -// -// We fetch the addresses of these symbols from /proc/kallsyms, then read -// /proc/kcore and disassemble the first few instructions and attempt to find -// the above pattern. -// -// This is a hack, but it's a working hack. -func nsproxyOffsetSearch(objdump io.Reader) string { - lockaddr := parseKallsyms()["_raw_spin_lock"] - scanner := bufio.NewScanner(objdump) - state := stateRLock - - for scanner.Scan() { - if !strings.HasPrefix(scanner.Text(), "ffffffff") { - // skip lines that aren't specifically code segments. - continue - } - - switch state { - case stateRLock: - // read until we see a call to _raw_spin_lock - if strings.Contains(scanner.Text(), lockaddr) { - // found a call to _raw_spin_lock, set the next thing - // to search for. - state = stateRMov - } - case stateRMov: - // read instructions until we find a local mov from dsi+offset - if found := movre.FindStringSubmatch(scanner.Text()); len(found) > 0 { - // this is a potential match. - return found[1] - } - - } - } - - return "" -} - -// pidnscommonOffsetSearch is simple in comparison to nsproxyOffsetSearch. Here -// we are trying to get the offset to the `struct ns_common` member `ns` from -// `struct pid_namespace`. There are several functions in the kernel that do -// something like the following: `return ns ? &(struct ns_common *)ns->ns : -// NULL;`, or `if (ns != NULL) return &ns->ns; else return NULL;` -// -// Returning a pointer to a constant or stack will result in a "load effective -// address" (`lea`), and these functions are small enough where we can just -// count the number of `lea`'s to get a good idea of where this member sits. -func pidnscommonOffsetSearch(objdump io.Reader) []string { - scanner := bufio.NewScanner(objdump) - ret := make([]string, 0) - - for scanner.Scan() { - if found := leare.FindStringSubmatch(scanner.Text()); len(found) > 0 { - ret = append(ret, found[1]) - } - } - - return ret -} - -func maxCandidates(candidates map[string]int) (string, int) { - var mn int - var ms string - - for k, v := range candidates { - if v == mn { - log.Infof("warning: same size candidates (%v=%v == %v=%v)\n", k, v, ms, mn) - } - - if v > mn { - ms = k - mn = v - } - } - - return ms, mn -} - -func pidnsCommonLikelyOffset(symfile, corefile string, functions []string) string { - symbols := parseKallsyms() - candidates := make(map[string]int) - - for _, fn := range functions { - addr, ok := symbols[fn] - if !ok { - log.Infof("warning: couldnt find address for sym %s", fn) - } - - code, err := objdumpAddress(addr, corefile) - if err != nil { - log.Warnf("warning: %s", err.Error()) - continue - } - - if offs := pidnscommonOffsetSearch(bytes.NewReader(code)); len(offs) > 0 { - for _, off := range offs { - candidates[off]++ - } - } - } - - addr, count := maxCandidates(candidates) - log.Infof("info: pidns->ns_common likelyOffset addr=%v, count=%v\n", addr, count) - return addr - -} - -func nsproxyLikelyOffset(symfile, corefile string, functions []string) string { - symbols := parseKallsyms() - candidates := make(map[string]int) - - for _, fn := range functions { - addr, ok := symbols[fn] - if !ok { - log.Warnf("warning: couldn't find address for sym %s", fn) - continue - } - - code, err := objdumpAddress(addr, corefile) - if err != nil { - log.Warnf("%s", err.Error()) - continue - } - - if offs := nsproxyOffsetSearch(bytes.NewReader(code)); offs != "" { - candidates[offs]++ - } - } - - addr, count := maxCandidates(candidates) - log.Infof("info: likelyOffset addr=%v, count=%v\n", addr, count) - - return addr -} - func SetOffsetsFromArgs(probe *kernel.Probe, cmd *cobra.Command, args []string) error { nodetect, err := cmd.Flags().GetBool("no-detect-offsets") if err != nil { return err } - var nsproxyOffset string - var pnsCommOffset string - if !nodetect { - nsproxyOffset = nsproxyLikelyOffset("/proc/kallsyms", "/proc/kcore", - []string{ - "ipcns_get", - "mntns_get", - "cgroupns_get", - "netns_get", - "get_proc_task_net", - "switch_task_namespaces", - "mounts_open_common", - }) - - pnsCommOffset = pidnsCommonLikelyOffset("/proc/kallsyms", "/proc/kcore", - []string{ - "pidns_for_children_get", - "pidns_put", - "pidns_get", - }) - - } else { - offstr, err := cmd.Flags().GetString("nsproxy-offset") - if err != nil { + if err := probe.DetectAndSetOffsets(); err != nil { return err } - - nsproxyOffset = offstr - - offstr, err = cmd.Flags().GetString("pidns-offset") + } else { + offset, err := cmd.Flags().GetString("nsproxy-offset") if err != nil { return err } - pnsCommOffset = offstr - } - - if nsproxyOffset != "" { - nsproxyOffset = strings.TrimPrefix(nsproxyOffset, "0x") - offset, err := strconv.ParseInt(nsproxyOffset, 16, 64) - if err != nil { - return err - } + if offset != "" { + offset = strings.TrimPrefix(offset, "0x") + offset, err := strconv.ParseInt(offset, 16, 64) + if err != nil { + return err + } - setter, err := kernel.NewOffsetter(probe.Module()) - if err != nil { - log.Fatal(err) - } + setter, err := kernel.NewOffsetter(probe.Module()) + if err != nil { + log.Fatal(err) + } - log.Infof("Setting task_struct->nsproxy offset to: %x\n", offset) + log.Infof("Setting task_struct->nsproxy offset to: %x\n", offset) - if err := setter.Set("nsproxy", kernel.OffsetValue(offset)); err != nil { - return err + if err := setter.Set("nsproxy", kernel.OffsetValue(offset)); err != nil { + return err + } } - } - if pnsCommOffset != "" { - pnsCommOffset = strings.TrimPrefix(pnsCommOffset, "0x") - offset, err := strconv.ParseInt(pnsCommOffset, 16, 64) + offset, err = cmd.Flags().GetString("pidns-offset") if err != nil { return err } - setter, err := kernel.NewOffsetter(probe.Module()) - if err != nil { - log.Fatal(err) - } + if offset != "" { + offset = strings.TrimPrefix(offset, "0x") + offset, err := strconv.ParseInt(offset, 16, 64) + if err != nil { + return err + } - log.Infof("Setting pid_namespace->ns offset to: %x\n", offset) + setter, err := kernel.NewOffsetter(probe.Module()) + if err != nil { + log.Fatal(err) + } + + log.Infof("Setting pid_namespace->ns offset to: %x\n", offset) + + if err := setter.Set("pid_ns_common", kernel.OffsetValue(offset)); err != nil { + return err + } - if err := setter.Set("pid_ns_common", kernel.OffsetValue(offset)); err != nil { - return err } } diff --git a/cmd/server.go b/cmd/server.go index ce305b8..26857a6 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -2,6 +2,7 @@ package cmd import ( + "bytes" "context" "encoding/json" "fmt" @@ -13,7 +14,6 @@ import ( "time" "github.com/criticalstack/swoll/api/v1alpha1" - "github.com/criticalstack/swoll/internal/pkg/hub" "github.com/criticalstack/swoll/pkg/event" "github.com/criticalstack/swoll/pkg/kernel/metrics" "github.com/criticalstack/swoll/pkg/syscalls" @@ -38,7 +38,7 @@ type errResponse struct { type traceJob struct { ID string `json:"id"` Trace *v1alpha1.Trace `json:"traceSpec"` - job *hub.Job + job *topology.Job } type liveJobs struct { @@ -201,7 +201,7 @@ func jobReader(c *websocket.Conn) { // jobWriter attaches the current websocket to the running job and deals with // the PING messages from the server. It also routes the ouput of a job back to // the websocket client -func jobWriter(job *traceJob, h *hub.Hub, c *websocket.Conn) { +func jobWriter(job *traceJob, h *topology.Hub, c *websocket.Conn) { tickr := time.NewTicker(pingPeriod) unsub := h.AttachTrace(job.Trace, @@ -236,7 +236,7 @@ func jobWriter(job *traceJob, h *hub.Hub, c *websocket.Conn) { // traceWriter acts like jobWriter, but for specific traces (as in not a job, // but a subset of a job). See `traceWatchHandler` -func traceWriter(paths []string, h *hub.Hub, c *websocket.Conn) { +func traceWriter(paths []string, h *topology.Hub, c *websocket.Conn) { tickr := time.NewTicker(pingPeriod) defer func() { tickr.Stop() @@ -270,7 +270,7 @@ func traceWriter(paths []string, h *hub.Hub, c *websocket.Conn) { } // traceWatchHandler processes subset queries and outputs it to the websocket -func traceWatchHandler(ctx context.Context, hub *hub.Hub) func(http.ResponseWriter, *http.Request) { +func traceWatchHandler(ctx context.Context, hub *topology.Hub) func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, r *http.Request) { args := mux.Vars(r) conn, err := upgrader.Upgrade(w, r, nil) @@ -288,7 +288,7 @@ func traceWatchHandler(ctx context.Context, hub *hub.Hub) func(http.ResponseWrit } // getJobsHandler gets information about a collection of jobs from this endpoint -func getJobsHandler(ctx context.Context, hub *hub.Hub) func(http.ResponseWriter, *http.Request) { +func getJobsHandler(ctx context.Context, hub *topology.Hub) func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, r *http.Request) { running.RLock() completed.RLock() @@ -305,7 +305,7 @@ func getJobsHandler(ctx context.Context, hub *hub.Hub) func(http.ResponseWriter, // getJobHandler returns job information about a given job. If `id` is // specified, it will use the value as the jobID, otherwise the query arguments // are used. -func getJobHandler(ctx context.Context, hub *hub.Hub) func(http.ResponseWriter, *http.Request) { +func getJobHandler(ctx context.Context, hub *topology.Hub) func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, r *http.Request) { jobid := mux.Vars(r)["id"] job, err := running.get(jobid) @@ -326,7 +326,7 @@ func getJobHandler(ctx context.Context, hub *hub.Hub) func(http.ResponseWriter, } // createJobHandler constructs a v1alpha1.Trace type and creates the resource -func createJobHandler(ctx context.Context, hb *hub.Hub) func(http.ResponseWriter, *http.Request) { +func createJobHandler(ctx context.Context, hb *topology.Hub) func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, r *http.Request) { var jobid string @@ -369,7 +369,7 @@ func createJobHandler(ctx context.Context, hb *hub.Hub) func(http.ResponseWriter }, } - job := traceJob{jobid, trace, hub.NewJob(trace)} + job := traceJob{jobid, trace, topology.NewJob(trace)} // add this job to the running list of jobs if err := running.add(&job); err != nil { @@ -384,7 +384,7 @@ func createJobHandler(ctx context.Context, hb *hub.Hub) func(http.ResponseWriter } } -func deleteJob(job *traceJob, hub *hub.Hub) error { +func deleteJob(job *traceJob, hub *topology.Hub) error { // Delete the job from our probe-hub if err := hub.DeleteTrace(job.Trace); err != nil { return err @@ -405,7 +405,7 @@ func deleteJob(job *traceJob, hub *hub.Hub) error { } // deleteJobHandler is exeucted when a user attempts to delete a resource. -func deleteJobHandler(ctx context.Context, hub *hub.Hub) func(http.ResponseWriter, *http.Request) { +func deleteJobHandler(ctx context.Context, hub *topology.Hub) func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, r *http.Request) { jobid := mux.Vars(r)["id"] job, err := running.get(jobid) @@ -793,11 +793,7 @@ func runServer(cmd *cobra.Command, args []string) { log.Fatal(err) } - hb, err := hub.NewHub(&hub.Config{ - AltRoot: altroot, - BPFObject: bpf, - CRIEndpoint: crisock, - K8SEndpoint: kconfig}, topo) + hb, err := topology.NewHub(bytes.NewReader(bpf), topo) if err != nil { log.Fatal(err) } diff --git a/cmd/trace.go b/cmd/trace.go index f02e377..c75e027 100644 --- a/cmd/trace.go +++ b/cmd/trace.go @@ -5,15 +5,11 @@ import ( "context" "encoding/json" "fmt" - "io/ioutil" "os" "strings" "github.com/criticalstack/swoll/api/v1alpha1" - "github.com/criticalstack/swoll/internal/pkg/assets" - "github.com/criticalstack/swoll/internal/pkg/hub" "github.com/criticalstack/swoll/pkg/event" - "github.com/criticalstack/swoll/pkg/event/call" "github.com/criticalstack/swoll/pkg/event/reader" "github.com/criticalstack/swoll/pkg/kernel" "github.com/criticalstack/swoll/pkg/kernel/filter" @@ -26,47 +22,6 @@ import ( "k8s.io/apimachinery/pkg/labels" ) -// loadBPFargs will attempt to find the BPF object file via the commandline, -// If the argument is empty (default), we check the local environment, and if -// that fails, we attempt to load the go-bindata generated asset. -func loadBPFargs(cmd *cobra.Command, args []string) ([]byte, error) { - var ( - bpf []byte - err error - ) - - // first check to see if the bpf object was defined at the commandline - bpfFile, err = cmd.Flags().GetString("bpf") - if err != nil { - return nil, err - } - - if bpfFile == "" { - // not found on the command-line, now try environment - bpfFile = os.Getenv("SWOLL_BPFOBJECT") - } - - if bpfFile != "" { - // attempt to read the bpf object file if defined - bpf, err = ioutil.ReadFile(bpfFile) - if err != nil && !os.IsNotExist(err) { - // only error if the error is *NOT* of type "file not found" - return nil, err - } - } - - if len(bpf) == 0 { - // we've tried all sorts of ways to load this file, by default - // it attempts to use the go-bindata generated asset resource. - bpf, err = assets.Asset("internal/bpf/probe.o") - if err != nil { - return nil, err - } - } - - return bpf, err -} - var cmdTrace = &cobra.Command{ Use: "trace", Short: "Kubernetes-Aware strace(1)", @@ -155,7 +110,7 @@ var cmdTrace = &cobra.Command{ bgblack := color.New(color.BgBlack).SprintFunc() white := color.New(color.FgWhite).SprintFunc() - fn := ev.Argv.(call.Function) + fn := ev.Argv args := fn.Arguments() var errno string @@ -203,12 +158,7 @@ var cmdTrace = &cobra.Command{ log.Fatal(err) } - hub, err := hub.NewHub(&hub.Config{ - AltRoot: altroot, - BPFObject: bpf, - CRIEndpoint: crisock, - K8SEndpoint: kconfig, - K8SNamespace: namespace}, topo) + hub, err := topology.NewHub(bytes.NewReader(bpf), topo) if err != nil { log.Fatal(err) } diff --git a/examples/basic-trace/main.go b/examples/basic-trace/main.go new file mode 100644 index 0000000..0c5174e --- /dev/null +++ b/examples/basic-trace/main.go @@ -0,0 +1,45 @@ +package main + +import ( + "context" + "fmt" + "log" + + "github.com/criticalstack/swoll/pkg/event" + "github.com/criticalstack/swoll/pkg/kernel" + "github.com/criticalstack/swoll/pkg/kernel/assets" +) + +func dumpTextEvent(ev *event.TraceEvent) { + fn := ev.Argv + + fmt.Printf("[%s/%v] (%s) %s(", ev.Comm, ev.Pid, ev.Error, fn.CallName()) + for _, arg := range fn.Arguments() { + fmt.Printf("(%s)%s=%v ", arg.Type, arg.Name, arg.Value) + } + fmt.Println(")") +} + +func main() { + probe, err := kernel.NewProbe(assets.LoadBPFReader(), nil) + if err != nil { + log.Fatalf("Unable to load static BPF asset: %v", err) + } + + if err := probe.InitProbe(kernel.WithOffsetDetection(), kernel.WithDefaultFilter()); err != nil { + log.Fatalf("Unable to initialize probe: %v", err) + } + + event := new(event.TraceEvent) + + probe.Run(context.Background(), func(msg []byte, lost uint64) error { + parsed, err := event.Ingest(msg) + if err != nil { + return nil + } + + dumpTextEvent(parsed) + + return nil + }) +} diff --git a/examples/kubernetes-basic/Dockerfile b/examples/kubernetes-basic/Dockerfile new file mode 100644 index 0000000..d9be0b4 --- /dev/null +++ b/examples/kubernetes-basic/Dockerfile @@ -0,0 +1,5 @@ +FROM alpine:latest +WORKDIR /root/ +RUN apk add --no-cache binutils +COPY kube-trace ./ +CMD ["./kube-trace"] diff --git a/examples/kubernetes-basic/Makefile b/examples/kubernetes-basic/Makefile new file mode 100644 index 0000000..b338699 --- /dev/null +++ b/examples/kubernetes-basic/Makefile @@ -0,0 +1,25 @@ +REGISTRY=docker.io +USERNAME=errzey +APP_NAME=swoll-kube-test + +binary: + go build -ldflags="-extldflags=-static" -o kube-trace + +build: binary + docker build -t ${USERNAME}/${APP_NAME} . + +push: + docker push ${USERNAME}/${APP_NAME} + @echo "run: make deploy" + +all: build binary + @echo "run: make push" + +deploy: + kubectl apply -f deploy.yaml + +uninstall: + kubectl delete -f deploy.yaml + +clean: + rm ./kube-trace diff --git a/examples/kubernetes-basic/README.md b/examples/kubernetes-basic/README.md new file mode 100644 index 0000000..6bb97df --- /dev/null +++ b/examples/kubernetes-basic/README.md @@ -0,0 +1,21 @@ +# What + +A simple application which uses the Swoll API to trace system calls on a +kubernetes cluster. If a kernel event was associated with a host inside +kuberentes, that information will be displayed. + +If an event did not come from kubernetes, (e.g., local operations) it will be +marked with "-.-.-". + +# building + +1. Modify `Makefile` and change the docker repo info. +2. Modify `deploy.yaml`'s `image` to the repository you created +3. Run: + +``` +make all +make push +make deploy +``` + diff --git a/examples/kubernetes-basic/deploy.yaml b/examples/kubernetes-basic/deploy.yaml new file mode 100644 index 0000000..d9804bf --- /dev/null +++ b/examples/kubernetes-basic/deploy.yaml @@ -0,0 +1,62 @@ +--- +apiVersion: v1 +kind: Namespace +metadata: + name: swoll-test +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: swoll-kube-test + namespace: swoll-test +spec: + replicas: 1 + selector: + matchLabels: + app: swoll-kube-test + template: + metadata: + labels: + app: swoll-kube-test + spec: + hostPID: true + volumes: + - name: sys + hostPath: + path: /sys + - name: containerd + hostPath: + path: /run/containerd/containerd.sock + containers: + - name: swoll-kube-test + image: errzey/swoll-kube-test:latest + imagePullPolicy: Always + securityContext: + privileged: true + volumeMounts: + - mountPath: /run/containerd/containerd.sock + name: containerd + - mountPath: /sys + name: sys +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: swoll-kube-test-reader +rules: +- apiGroups: [""] + resources: ["pods"] + verbs: ["get", "watch", "list"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: swoll-kube-test-rolebinding +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: swoll-kube-test-reader +subjects: +- kind: ServiceAccount + name: default + namespace: swoll-test diff --git a/examples/kubernetes-basic/main.go b/examples/kubernetes-basic/main.go new file mode 100644 index 0000000..2fbaf9b --- /dev/null +++ b/examples/kubernetes-basic/main.go @@ -0,0 +1,68 @@ +package main + +import ( + "context" + "fmt" + "log" + + "github.com/criticalstack/swoll/pkg/event" + "github.com/criticalstack/swoll/pkg/kernel" + "github.com/criticalstack/swoll/pkg/kernel/assets" + "github.com/criticalstack/swoll/pkg/kernel/filter" + "github.com/criticalstack/swoll/pkg/topology" + "github.com/criticalstack/swoll/pkg/types" +) + +func dumpTextEvent(ev *event.TraceEvent) { + fmt.Printf("%s: [%s/%v] (%s) %s(", ev.Container.FQDN(), ev.Comm, ev.Pid, ev.Error, ev.Argv.CallName()) + for _, arg := range ev.Argv.Arguments() { + fmt.Printf("(%s)%s=%v ", arg.Type, arg.Name, arg.Value) + } + fmt.Println(")") +} + +func main() { + probe, err := kernel.NewProbe(assets.LoadBPFReader(), nil) + if err != nil { + log.Fatalf("Unable to load static BPF asset: %v", err) + } + + if err := probe.InitProbe(kernel.WithOffsetDetection()); err != nil { + log.Fatalf("Unable to initialize probe: %v", err) + } + + f, err := filter.NewFilter(probe.Module()) + if err != nil { + log.Fatalf("Unable to create filter: %v", err) + } + + f.FilterSelf() + f.AddSyscall("execve", -1) + f.AddSyscall("openat", -1) + f.AddSyscall("accept4", -1) + f.AddSyscall("connect", -1) + + observer, err := topology.NewKubernetes(topology.WithKubernetesCRI("/run/containerd/containerd.sock")) + if err != nil { + log.Fatalf("Unable to create topology context: %v", err) + } + + ctx := context.Background() + topo := topology.NewTopology(observer) + event := event.NewTraceEvent().WithTopology(topo) + + go topo.Run(ctx, func(tp topology.EventType, c *types.Container) { + fmt.Printf("eventType=%v, container=%v\n", tp, c.FQDN()) + }) + + probe.Run(ctx, func(msg []byte, lost uint64) error { + parsed, err := event.Ingest(msg) + if err != nil { + return nil + } + + dumpTextEvent(parsed) + + return nil + }) +} diff --git a/examples/kubernetes-hub/Dockerfile b/examples/kubernetes-hub/Dockerfile new file mode 100644 index 0000000..336bee1 --- /dev/null +++ b/examples/kubernetes-hub/Dockerfile @@ -0,0 +1,5 @@ +FROM alpine:latest +WORKDIR /root/ +RUN apk add --no-cache binutils +COPY kubernetes-hub ./ +CMD ["./kubernetes-hub"] diff --git a/examples/kubernetes-hub/Makefile b/examples/kubernetes-hub/Makefile new file mode 100644 index 0000000..6e88a75 --- /dev/null +++ b/examples/kubernetes-hub/Makefile @@ -0,0 +1,25 @@ +REGISTRY=docker.io +USERNAME=errzey +APP_NAME=swoll-hub-test + +binary: + go build -ldflags="-extldflags=-static" -o kubernetes-hub + +build: binary + docker build -t ${USERNAME}/${APP_NAME} . + +push: + docker push ${USERNAME}/${APP_NAME} + @echo "run: make deploy" + +all: build binary + @echo "run: make push" + +deploy: + kubectl apply -f deploy.yaml + +uninstall: + kubectl delete -f deploy.yaml + +clean: + rm ./kubernetes-hub diff --git a/examples/kubernetes-hub/README.md b/examples/kubernetes-hub/README.md new file mode 100644 index 0000000..1af502c --- /dev/null +++ b/examples/kubernetes-hub/README.md @@ -0,0 +1,249 @@ +While there are several ways to utilize the Swoll API to create and manage traces, the Topology method is preferred, especially on container orchestration systems such as Kubernetes. + +To adequately understand what this package does, it is best to start with a little primer on how Swoll interacts with the running BPF to create, filter, and emit events from the kernel back to userland. + +The Swoll BPF has a simple userspace-configurable filtering mechanism that allows us to permit or deny a syscall from being monitored. Optionally, each call we monitor can be associated with a kernel namespace. For example, a user can request to see only the syscall "open" in the Linux PID Namespace `31337`. The kernel will silently drop any events that do not match this particular rule. + +```go +package main + +import ( + "github.com/criticalstack/swoll/pkg/kernel" + "github.com/criticalstack/swoll/pkg/kernel/assets" + "github.com/criticalstack/swoll/pkg/kernel/filter" +) + +func main() { + probe, _ := kernel.NewProbe(assets.LoadBPFReader(), nil) + filter, _ := filter.NewFilter(probe.Module()) + + filter.AddSyscall("open", 31337) +} +```` + +Manually maintaining this filter can become tedious when we start speaking of traces in non-specifics and abstracts. For example, this TraceSpec YAML configuration does not indicate **what** hosts it will monitor, just that it will begin watching any host that matches these meta-attributes. + +```yaml +# Monitor the calls "openat", and "execve" from any host in +# the namespace "default" with the label +# app.kubernetes.io/name set to the value "nginx" +apiVersion: tools.swoll.criticalstack.com/v1alpha1 +kind: Trace +metadata: + name: monitor-nginx + namespace: default +spec: + syscalls: + - openat + - execve + labelSelector: + matchLabels: + app.kubernetes.io/name: "nginx" +``` + +And it is with this concept of being abstract where the Topology package starts to shine; it is the glue that binds the symbolic to the real and the logic that governs the flow of information through the system. + +Two primary components make up a Topology: an `Observer` and a `Hub`; the Observer has the simple task of monitoring container states and reporting any changes to the Hub. The Hub then uses these update notifications to make changes to the kernel filtering in real-time. + +The Hub also acts as a runtime optimizer, de-duplicating rules, pruning state, maintaining metrics, and routing events to the proper endpoints with a straightforward goal: "One BPF To Mon Them All" without sacrificing performance for flexibility or vice-versa. + +In the following example, we will define and run two "trace-jobs" using the Topology package, resulting in some events matching both outputs for the express purpose of explaining the BPF rule optimizer. + +----- + +### Load our BPF + +```go +func main() { + ctx := context.Background() + bpf := assets.LoadBPFReader() +``` + +*And with `main`, we do begin...* + +The Swoll API ships with a pre-compiled BPF object for `x86_64` which should work on any Linux kernel greater than v4.1. To use this, we load the code from the assets package. The return of `LoadBPFReader()` is a `bytes.Reader` object. + +### Create our Observer + +```go + observer, err := topology.NewKubernetes( + topology.WithKubernetesConfig(""), + topology.WithKubernetesCRI("/run/containerd/containerd.sock"), + topology.WithKubernetesNamespace(""), + topology.WithKubernetesProcRoot("/"), + topology.WithKubernetesLabelSelector("noSwoll!=true"), + topology.WithKubernetesFieldSelector("status.phase=Running"), + ) + if err != nil { + log.Fatalf("Could not create the kubernetes observer: %v", err) + } +``` + +Here we have created the first part of a `Topology`, the `Observer`. In thise case, the `Observer` will interact with Kubernetes and the CRI (Container Runtime Interface) to map numerical kernel-namespaces to a specific (k8s)Namespace/POD/container. There are a few `With` configuration directives set here, so let's go over each one individually: + +```go +topology.WithKubernetesConfig("") +``` + +If this code was being run outside of the cluster (e.g., not an `in-cluster` deployment), the value of this option would be the fully-qualified path to your current kubernetes configuration file. + +```go +topology.WithKubernetesCRI("/run/containerd/containerd.sock") +``` + +This is the path to the CRI socket file as seen by the host running this code. This socket must be readable via a shared mount. + +```go +topology.WithKubernetesNamespace("") +``` + +You can tell our observer to limit the container search to a single kubernetes namespace. For the sake of brevity, we leave this empty. The result of which is monitoring container states in *all* namespaces. + +```go +topology.WithKubernetesProcRoot("/") +``` + +The Observer uses ProcFS to derive what numerical kernel-namespaces a process belongs to. If that is mounted in a different directory outside of `/`, you would set it here. + +```go +topology.WithKubernetesLabelSelector("noSwoll!=true") +``` + +This acts like a pre-filter for container events, essentially stating that any Pod/Container that has the label `noSwoll=true` should never be seen. + +```go +topology.WithKubernetesFieldSelector("status.phase=Running") +``` + +Inform the Observer that we are only interested in containers that Kube has deemed as "running". + +### Prime & Pump the Hub + +```go + hub, err := topology.NewHub(bpf, observer) + if err != nil { + log.Fatalf("Could not create the hub: %v", err) + } +``` + +Creates a brand new Hub context, using the Kubernetes Observer object we just created. + +```go + if err := hub.Probe().DetectAndSetOffsets(); err != nil { + log.Fatalf("Could not detect offsets for running kernel: %v", err) + } +``` + +This part is pretty important. The BPF needs to access various members of the +kernel's `struct task_struct` structure which can differ on every system. This +helper function `DetectAndSetOffsets` will poke at the memory of the running +kernel in order to determine and set these offsets. + +### Run the Hub + +```go + go hub.MustRun(ctx) +``` + +This runs the Hub's event loop as a background task, silently maintaining the filters running general book-keeping operations. + +### Define the Traces + +```go + trace1 := &v1alpha1.Trace{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "swoll-hub-test", + }, + Spec: v1alpha1.TraceSpec{ + LabelSelector: metav1.LabelSelector{ + MatchLabels: convertLabels("app=nginx"), + }, + FieldSelector: metav1.LabelSelector{ + MatchLabels: convertLabels("status.phase=Running"), + }, + Syscalls: []string{"execve", "openat", "connect", "accept4"}, + }, + Status: v1alpha1.TraceStatus{ + JobID: "trace1-monitor-nginx", + }, + } +``` + +This trace will monitor the syscalls `execve`, `openat`, `connect`, and `accept4` on any container living in the `swoll-hub-test` Kubernetes namespace with the label `app=nginx`. + +```go + trace2 := &v1alpha1.Trace{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "", + }, + Spec: v1alpha1.TraceSpec{ + FieldSelector: metav1.LabelSelector{ + MatchLabels: convertLabels("status.phase=Running"), + }, + Syscalls: []string{"execve"}, + }, + Status: v1alpha1.TraceStatus{ + JobID: "trace2-monitor-execve", + }, + } +``` + +While this trace will monitor the syscall `execve` on any container in any Kubernetes namespace. + +### Run & Read + +```go + go hub.RunTrace(ctx, trace1) + go hub.RunTrace(ctx, trace2) +``` + +Submit these two traces to be run on the Hub as a background task. + +```go + dumpEvent := func(traceName string, ev *event.TraceEvent) { + fmt.Printf("job-id:%s - %s: [%s/%v] (%s) %s(", traceName, + ev.Container.FQDN(), ev.Comm, ev.Pid, ev.Error, + ev.Argv.CallName(), + ) + for _, arg := range ev.Argv.Arguments() { + fmt.Printf("(%s)%s=%v ", arg.Type, arg.Name, arg.Value) + } + fmt.Println(")") + } + + hub.AttachTrace(trace1, dumpEvent) + hub.AttachTrace(trace2, dumpEvent) + <-ctx.Done() +``` + +This attaches to the running traces, and for each matched event, execute the callback `dumpEvent` until the program terminates. + +### Deploy the probe into Kubernetes + +#### Build the Project: + +1. Modify the `Makefile` and change the `REGISTRY`, and `USERNAME` for storing your container image. +2. Modify `deploy.yaml`'s `image` configuration directive. +3. Build it. +``` +$ make all +$ make push +$ make deploy +``` + +### Understanding the Output +If all goes according to plan, you should start seeing traffic on the deployment. + +``` +job-id:trace1-monitor-nginx - indexwriter.nginx-reader-writer.swoll-hub-test: [sh/2007494] (OK) execve((const char *)filename=/bin/date, (char * const)argv[]="") +job-id:trace2-monitor-execve - indexwriter.nginx-reader-writer.swoll-hub-test: [sh/2007494] (OK) execve((const char *)filename=/bin/date, (char * const)argv[]="") +job-id:trace1-monitor-nginx - indexwriter.nginx-reader-writer.swoll-hub-test: [date/2007494] (OK) openat((int)dirfd=AT_FDCWD (const char *)pathname=/etc/ld.so.cache, (int)flags=O_CLOEXEC) +job-id:trace1-monitor-nginx - indexwriter.nginx-reader-writer.swoll-hub-test: [date/2007494] (OK) openat((int)dirfd=AT_FDCWD (const char *)pathname=/lib/x86_64-linux-gnu/libc.so.6, (int)flags=O_CLOEXEC) +job-id:trace1-monitor-nginx - indexwriter.nginx-reader-writer.swoll-hub-test: [date/2007494] (OK) openat((int)dirfd=AT_FDCWD (const char *)pathname=/etc/localtime, (int)flags=O_CLOEXEC) +job-id:trace1-monitor-nginx - indexwriter.nginx-reader-writer.swoll-hub-test: [sh/2007495] (OK) execve((const char *)filename=/bin/sleep, (char * const)argv[]=5 KUBERNETES_SERVICE_PORT=443) +job-id:trace2-monitor-execve - indexwriter.nginx-reader-writer.swoll-hub-test: [sh/2007495] (OK) execve((const char *)filename=/bin/sleep, (char * const)argv[]=5 KUBERNETES_SERVICE_PORT=443) +``` + +Some things to note: `execve` calls are duplicated across both rules `trace1-monitor-nginx` and `trace2-monitor-execve`. This is a feature! Rules are never duplicated at the kernel side, and if an event matches two rules, only one kernel-filter is created, and the events are routed internally to different outputs. + +If you were to delete the job `trace2-monitor-execve`, only the output queue is removed, the filters for `execve` stay in place until `trace1-monitor-nginx` is deleted. diff --git a/examples/kubernetes-hub/deploy.yaml b/examples/kubernetes-hub/deploy.yaml new file mode 100644 index 0000000..7276c61 --- /dev/null +++ b/examples/kubernetes-hub/deploy.yaml @@ -0,0 +1,97 @@ +--- +# this is the temporary test namespace we use for this example +apiVersion: v1 +kind: Namespace +metadata: + name: swoll-hub-test +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: swoll-hub + # make sure this deployment sits in our test namespace + namespace: swoll-hub-test +spec: + replicas: 1 + selector: + matchLabels: + app: swoll-hub + template: + metadata: + labels: + app: swoll-hub + # tell swoll not to monitor ourselves + noSwoll: 'true' + spec: + hostPID: true + volumes: + - name: sys + hostPath: + path: /sys + - name: containerd + hostPath: + path: /run/containerd/containerd.sock + containers: + - name: swoll-hub-test + # change this to your own personal repo if you modify the code + image: errzey/swoll-hub-test:latest + imagePullPolicy: Always + securityContext: + privileged: true + volumeMounts: + - mountPath: /run/containerd/containerd.sock + name: containerd + - mountPath: /sys + name: sys +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: swoll-hub-test-reader +rules: +- apiGroups: [""] + resources: ["pods"] + verbs: ["get", "watch", "list"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: swoll-hub-test-rolebinding +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: swoll-hub-test-reader +subjects: +- kind: ServiceAccount + name: default + namespace: swoll-hub-test +--- +# Spin up some test containers which this example will monitor +apiVersion: v1 +kind: Pod +metadata: + name: nginx-reader-writer + namespace: swoll-hub-test + labels: + app: nginx +spec: + volumes: + - name: html + emptyDir: {} + containers: + - name: webserver + image: nginx + volumeMounts: + - name: html + mountPath: /usr/share/nginx/html + - name: indexwriter + image: debian + volumeMounts: + - name: html + mountPath: /html + command: ["/bin/sh", "-c"] + args: + - while true; do + date >> /html/index․html; + sleep 5; + done diff --git a/examples/kubernetes-hub/main.go b/examples/kubernetes-hub/main.go new file mode 100644 index 0000000..be97647 --- /dev/null +++ b/examples/kubernetes-hub/main.go @@ -0,0 +1,224 @@ +package main + +import ( + "context" + "fmt" + "os" + + "github.com/criticalstack/swoll/api/v1alpha1" + "github.com/criticalstack/swoll/pkg/event" + "github.com/criticalstack/swoll/pkg/kernel/assets" + "github.com/criticalstack/swoll/pkg/topology" + log "github.com/sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" +) + +func dumpTextEvent(name string, ev *event.TraceEvent) { + fmt.Printf("job-id:%s - %s: [%s/%v] (%s) %s(", name, ev.Container.FQDN(), ev.Comm, ev.Pid, ev.Error, ev.Argv.CallName()) + for _, arg := range ev.Argv.Arguments() { + fmt.Printf("(%s)%s=%v ", arg.Type, arg.Name, arg.Value) + } + fmt.Println(")") +} + +func main() { + log.SetOutput(os.Stderr) + log.SetLevel(log.DebugLevel) + + // The first step, as always, is to load the BPF object. + // + // If you do not wish to ship the compiled BPF around with your code, swoll + // contains a pre-compiled version of the BPF which can be loaded via + // the assets API + bpf := assets.LoadBPFReader() + ctx := context.Background() + + // Since the point of this example is to show how to "properly" use the + // swoll API to monitor Kubernetes, we must start with the concept of a + // the `topology`. + // + // The `topology` is a combination of an `Observer` and a `Hub`. The + // `Observer` is an API which reads start and stop events from a container + // runtime, and a `Hub` maintains the kernel-level filters for one or more trace + // definitions using information obtained from the `Observer`. + // + // Using this combination of APIs has some unique properties which assist in + // creating a performant analysis tool: + // - All the hard work of maintaining the filters being run in the kernel is + // done for you. + // - A single instance of the probe can run multiple traces at one time, + // only targetting the very specific pieces of information (e.g., filters + // for specific container/syscalls) which was requested. + // + // This reduces the overhead of BPF code running in the kernel, + // as we only need a single tracepoint attached and running our filters. + // + // - Rule de-duplication. If you specify more than one rule, and the rule + // from the second has containers and system-calls that also matched the + // first, the kernel still only uses a single filter. The data is + // dynamically copied to the output of the second rule from the first. + // + // It should be noted that the actual kernel filters are never removed + // until ALL rules which reference that filter are removed. + // + // Secondary note: if the user has marked a rule as "sampled", and there + // is another rule which matches pods/containers from this rule, the + // lowest sample-rate is used in the kernel, and higher rates are + // sub-sampled in userland. + // - A more kubernetes-like experience: everything can be done in Yaml if + // needed. Just like Kubernetes! + // + // First, we create the `Observer`, in our case, the Kubernetes observer. Under the + // hood, this will use the k8s-api-server in combination with the CRI to + // maintain the topology of all containers running in PODs. + kubeObserver, err := topology.NewKubernetes( + // Here you can tell the observer to use a specific Kubernetes + // configuration file. If either this option is not passed, or the + // argument to this option is empty, we assume that this is running + // "in-cluster" or inside an already-running kubernetes cluster. I left + // it in here as an example, but normally would be omitted when running + // inside kube. + topology.WithKubernetesConfig(""), + // Specify the CRI socket to read container events from. This must be + // the CRI socket as seen on the host it is running on. Make sure that + // this file is available to this program. + topology.WithKubernetesCRI("/run/containerd/containerd.sock"), + // By default, the topology reader will see all containers in all + // namespaces, set this to whatever namespace if you wish to limit the + // search to a single namespace. It's usually a good idea to leave this + // empty for the Observer. + topology.WithKubernetesNamespace(""), + // The observer does some checks for information contained in the ProcFS + // directory (e.g., /proc), but if you have mounted this into a + // different directory, you can specify that here. I'm leaving it here + // as an example. This will lookup information from "/proc". + topology.WithKubernetesProcRoot("/"), + // You can optionally specify a set of labels to filter on using + // `key=value` strings. Here we apply a filter which matches on any + // pods/containers that do NOT have the label `noSwoll` set to + // `true`. In other words, if a POD/container is created with the label + // `noSwoll=true`, it will not be seen by this observer. + topology.WithKubernetesLabelSelector("noSwoll!=true"), + // Much like the label-selector, one can also add a Field Selection to + // match on runtime information like the running status of the pod or + // container. This is runtime specific, and in this case, we match only + // hosts that Kubernetes has deemed as "Running". + topology.WithKubernetesFieldSelector("status.phase=Running"), + ) + if err != nil { + log.Fatalf("Could not create the kubernetes observer: %v", err) + } + + // Next we create our `Hub` which inherits our kubernetes observer. This + // will initialize all the underlying BPF and apply initial kernel filters. + // This API is the component which manipulates and maintains all of the + // moving parts of the topology. It acts as a kernel event multiplexer. + hub, err := topology.NewHub(bpf, kubeObserver) + if err != nil { + log.Fatalf("Could not create the topology hub: %v", err) + } + + // Since we are using the pre-compiled BPF object, we must inform the kernel + // BPF where to look inside our task_struct for various members that are not + // known at runtime. Here we use a builtin helper function which determines, + // and sets those offsets for us. + if err := hub.Probe().DetectAndSetOffsets(); err != nil { + log.Fatalf("Could not detect offsets for running kernel: %v", err) + } + + // Start our Hub as a background task. This maintains a running list of all + // containers and submitted jobs running on the system along with kernel filters, + // garbage-collection, and other various house-keeping operations. + go hub.MustRun(ctx) + + // Now in order to run one or more traces on the Hub, we must construct a properly + // formatted TraceSpec for each trace we wish to install. + // + // This is a helper function to convert a string to a kubernetes LabelSet + convertLabels := func(lstr string) labels.Set { + ret, err := labels.ConvertSelectorToLabelsMap(lstr) + if err != nil { + log.Fatalf("Could not convert labels %v to labels map: %v", err) + } + return ret + } + + // This first trace specification will trace the system-calls "openat", + // "connect", "execve", and "accept4" for any *running* containers in the `swoll` + // namespace that have the Kubernetes label `app=nginx` set. + trace1 := &v1alpha1.Trace{ + ObjectMeta: metav1.ObjectMeta{ + // The Kubernetes namespace we want to monitor inside of. For this + // example it is assumed that the deploy.yaml file in this directory + // has been applied. + Namespace: "swoll-hub-test", + }, + Spec: v1alpha1.TraceSpec{ + LabelSelector: metav1.LabelSelector{ + // Monitor any hosts that have the `app=nginx` label + // This should match some containers defined in the + // `deploy.yaml` file found in this directory. + MatchLabels: convertLabels("app=nginx"), + }, + FieldSelector: metav1.LabelSelector{ + // Only monitor hosts that are currently up and running. + MatchLabels: convertLabels("status.phase=Running"), + }, + Syscalls: []string{"execve", "openat", "connect", "accept4"}, + }, + Status: v1alpha1.TraceStatus{ + // Set the name of this trace, can be anything; if left empty, a + // name will be generated. + JobID: "trace1-monitor-nginx", + }, + } + + // This second trace specification will monitor `execve` calls for any + // running container in any kubernetes namespace. + trace2 := &v1alpha1.Trace{ + ObjectMeta: metav1.ObjectMeta{ + // An empty namespace means to monitor ALL namespaces + Namespace: "", + }, + Spec: v1alpha1.TraceSpec{ + FieldSelector: metav1.LabelSelector{ + // Only monitor hosts that are currently up and running. + MatchLabels: convertLabels("status.phase=Running"), + }, + Syscalls: []string{"execve"}, + }, + Status: v1alpha1.TraceStatus{ + // Set the name of this trace, can be anything; if left empty, a + // name will be generated. + JobID: "trace2-monitor-execve", + }, + } + + // Next we submit and run these two trace specifications in our Hub as a + // background task + go hub.RunTrace(ctx, trace1) + go hub.RunTrace(ctx, trace2) + + // And now for the final step: attaching to the two running traces and + // printing out the output of each. The second argument of these calls it + // the callback function to execute for every event that matched. + hub.AttachTrace(trace1, dumpTextEvent) + hub.AttachTrace(trace2, dumpTextEvent) + + // If you have used the deploy.yaml found within this directory, you will + // see two `job-id`'s firing: + // + // `trace1-monitor-nginx` + // `trace2-monitor-execve` + // + // Since `trace1` monitors execve for only a subset of hosts (`app=nginx`), and `trace2` + // monitors ALL execve calls across all hosts, both rules will fire for + // TraceEvent's sourced from hosts with the label `app=nginx`. + // + // This is an example of how the Hub + // does de-duplication. + + // Run until we are told to stop. + <-ctx.Done() +} diff --git a/go.mod b/go.mod index 1982e98..00695da 100644 --- a/go.mod +++ b/go.mod @@ -3,10 +3,6 @@ module github.com/criticalstack/swoll go 1.13 require ( - github.com/Microsoft/go-winio v0.4.14 // indirect - github.com/docker/distribution v2.7.1+incompatible // indirect - github.com/docker/docker v0.7.3-0.20190327010347-be7ac8be2ae0 - github.com/docker/go-connections v0.4.0 // indirect github.com/fatih/color v1.9.0 github.com/go-bindata/go-bindata v3.1.2+incompatible github.com/go-echarts/go-echarts v1.0.0 @@ -18,9 +14,6 @@ require ( github.com/gorilla/mux v1.7.4 github.com/gorilla/websocket v1.4.2 github.com/iovisor/gobpf v0.0.0-20191219090757-e72091e3c5e6 - github.com/morikuni/aec v1.0.0 // indirect - github.com/opencontainers/go-digest v1.0.0 // indirect - github.com/opencontainers/image-spec v1.0.1 // indirect github.com/pkg/errors v0.9.1 github.com/prometheus/alertmanager v0.21.0 github.com/prometheus/client_golang v1.6.0 diff --git a/go.sum b/go.sum index c441c14..5cc98d0 100644 --- a/go.sum +++ b/go.sum @@ -31,8 +31,6 @@ github.com/Djarvur/go-err113 v0.0.0-20200511133814-5174e21577d5/go.mod h1:4UJr5H github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww= github.com/Masterminds/semver v1.5.0/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y= -github.com/Microsoft/go-winio v0.4.14 h1:+hMXMk01us9KgxGb7ftKQt2Xpf5hH/yky+TDA+qxleU= -github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA= github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/OpenPeeDeeP/depguard v1.0.1 h1:VlW4R6jmBIv3/u1JNlawEvJMM4J+dPORPaZasQee8Us= @@ -120,12 +118,8 @@ github.com/denis-tingajkin/go-header v0.3.1 h1:ymEpSiFjeItCy1FOP+x0M2KdCELdEAHUs github.com/denis-tingajkin/go-header v0.3.1/go.mod h1:sq/2IxMhaZX+RRcgHfCRx/m0M5na0fBt4/CRe7Lrji0= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= -github.com/docker/distribution v2.7.1+incompatible h1:a5mlkVzth6W5A4fOsS3D2EO5BUmsJpcB+cRlLU7cSug= -github.com/docker/distribution v2.7.1+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= github.com/docker/docker v0.7.3-0.20190327010347-be7ac8be2ae0 h1:w3NnFcKR5241cfmQU5ZZAsf0xcpId6mWOupTvJlUX2U= github.com/docker/docker v0.7.3-0.20190327010347-be7ac8be2ae0/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= -github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ= -github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec= github.com/docker/go-units v0.3.3/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw= github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= @@ -579,8 +573,6 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/moricho/tparallel v0.2.1 h1:95FytivzT6rYzdJLdtfn6m1bfFJylOJK41+lgv/EHf4= github.com/moricho/tparallel v0.2.1/go.mod h1:fXEIZxG2vdfl0ZF8b42f5a78EhjjD5mX8qUplsoSU4k= -github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= -github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= github.com/mozilla/tls-observatory v0.0.0-20200317151703-4fa42e1c2dee/go.mod h1:SrKMQvPiws7F7iqYp8/TX+IhxCYhzr6N/1yb8cwHsGk= github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= @@ -626,10 +618,6 @@ github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7J github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk= -github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= -github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= -github.com/opencontainers/image-spec v1.0.1 h1:JMemWkRwHx4Zj+fVxWoMCFm/8sYGGrUVojFA6h/TRcI= -github.com/opencontainers/image-spec v1.0.1/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0= github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis= github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= @@ -822,6 +810,7 @@ github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijb github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/uudashr/gocognit v1.0.1 h1:MoG2fZ0b/Eo7NXoIwCVFLG5JED3qgQz5/NEE+rOsjPs= github.com/uudashr/gocognit v1.0.1/go.mod h1:j44Ayx2KW4+oB6SWMv8KsmHzZrOInQav7D3cQMJ5JUM= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasthttp v1.16.0/go.mod h1:YOKImeEosDdBPnxc0gy7INqi3m1zK6A+xl6TwOBhHCA= github.com/valyala/quicktemplate v1.6.3/go.mod h1:fwPzK2fHuYEODzJ9pkw0ipCPNHZ2tD5KW4lOuSdPKzY= diff --git a/hack/tools/pretty_logs.go b/hack/tools/pretty_logs.go index 173fb0b..5882672 100644 --- a/hack/tools/pretty_logs.go +++ b/hack/tools/pretty_logs.go @@ -10,7 +10,6 @@ import ( "os" "github.com/criticalstack/swoll/pkg/client" - "github.com/criticalstack/swoll/pkg/event/call" "github.com/fatih/color" ) @@ -31,7 +30,7 @@ func main() { continue } - fn := event.Data.Argv.(call.Function) + fn := event.Data.Argv args := fn.Arguments() var errno string diff --git a/internal/pkg/hub/config.go b/internal/pkg/hub/config.go deleted file mode 100644 index 745319f..0000000 --- a/internal/pkg/hub/config.go +++ /dev/null @@ -1,30 +0,0 @@ -package hub - -const ( - // The prefix for where job-specific events are sent - swJobStream = "job" - // The prefix for where non-job-specific events (pathed) are sent - swNsStream = "ns" -) - -type Config struct { - // if we have an alternate root setting, and the endpoints start with - // "$root", use the AltRoot as the CWD for any further lookups, whether - // that be for /proc, or for configurations. - // - // mainly here for development reasons, if you're able to see your k8s - // node via /proc//root, you can set the AltRoot to this, and - // namespace lookups will look at /proc//root/proc/... - // cri socket will look at /proc//root/path/to/cri.sock - // etc... - AltRoot string - // The kube CRI socket needed for resolving kernel namespaces to containers - CRIEndpoint string - // If running out-of-cluster, this is the local k8s configuration - K8SEndpoint string - // The namespace in which to monitor for pods, if empty we watch all - // namespaces. - K8SNamespace string - // The raw BPF probe object loaded via assets (go-bindata) or via file - BPFObject []byte -} diff --git a/internal/pkg/hub/example_test.go b/internal/pkg/hub/example_test.go deleted file mode 100644 index 7dc6394..0000000 --- a/internal/pkg/hub/example_test.go +++ /dev/null @@ -1,106 +0,0 @@ -package hub_test - -import ( - "context" - "fmt" - "log" - "os" - "time" - - "github.com/criticalstack/swoll/api/v1alpha1" - "github.com/criticalstack/swoll/internal/pkg/hub" - "github.com/criticalstack/swoll/pkg/event" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -func ExampleTrace() { - t1 := &v1alpha1.Trace{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "swoll", - }, - Spec: v1alpha1.TraceSpec{ - LabelSelector: metav1.LabelSelector{ - MatchLabels: map[string]string{"app": "nginx-with-writer"}, - }, - Syscalls: []string{"execve"}, - }, - Status: v1alpha1.TraceStatus{ - JobID: "test-tracer-a", - }, - } - - t2 := &v1alpha1.Trace{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "swoll", - }, - Spec: v1alpha1.TraceSpec{ - LabelSelector: metav1.LabelSelector{ - MatchLabels: map[string]string{"app": "nginx-with-writer"}, - }, - Syscalls: []string{"openat", "execve"}, - }, - Status: v1alpha1.TraceStatus{ - JobID: "test-tracer-b", - }, - } - - t3 := &v1alpha1.Trace{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "swoll", - }, - Spec: v1alpha1.TraceSpec{ - LabelSelector: metav1.LabelSelector{ - MatchLabels: map[string]string{"component": "grafana"}, - }, - Syscalls: []string{"stat", "access", "accept", "accept4", "listen", "socket"}, - }, - Status: v1alpha1.TraceStatus{ - JobID: "test-tracer-c", - }, - } - - printEvent := func(name string, ev *event.TraceEvent) { - fmt.Printf("<%s> \033[1m%s:\033[0m [\033[4m%s\033[0m] %s err=%s ses=%v\n", name, - ev.Container.FQDN(), ev.Comm, ev.Argv, ev.Error.ColorString(), ev.Sid) - } - - hub, err := hub.NewHub(&hub.Config{ - AltRoot: os.Getenv("ALT_ROOT"), - BPFObject: []byte(os.Getenv("BPF_OBJECT")), - CRIEndpoint: os.Getenv("CRI_ENDPOINT"), - K8SEndpoint: os.Getenv("K8S_ENDPOINT"), - }, nil) - if err != nil { - log.Fatal(err) - } - - ctx := context.Background() - - //nolint:errcheck - go hub.Run(ctx) - //nolint:errcheck - go hub.RunTrace(ctx, t1) - //nolint:errcheck - go hub.RunTrace(ctx, t2) - //nolint:errcheck - go hub.RunTrace(ctx, t3) - - hub.AttachTrace(t1, printEvent) - hub.AttachTrace(t2, printEvent) - hub.AttachTrace(t3, printEvent) - - i := 0 - for { - time.Sleep(time.Second * 1) - if i++; i > 3 { - break - } - } - - //nolint:errcheck - hub.DeleteTrace(t1) - //nolint:errcheck - hub.DeleteTrace(t2) - //nolint:errcheck - hub.DeleteTrace(t3) -} diff --git a/pkg/event/call/syscall.go b/pkg/event/call/syscall.go index b525b07..bcdf9bc 100644 --- a/pkg/event/call/syscall.go +++ b/pkg/event/call/syscall.go @@ -10,7 +10,7 @@ const ( SYS_SETNS = 308 ) -func DecodeSyscall(nr int, arguments []*byte, arglen int) (interface{}, error) { +func DecodeSyscall(nr int, arguments []*byte, arglen int) (Function, error) { switch nr { case syscall.SYS_ACCEPT: ret := new(Accept) @@ -194,140 +194,3 @@ func DecodeSyscall(nr int, arguments []*byte, arglen int) (interface{}, error) { } } - -/* -func DecodeSyscall(nr int, arguments []*byte, arglen int) (*FunctionHandle, error) { - var ret *FunctionHandle - switch nr { - case syscall.SYS_ACCEPT: - ret = &FunctionHandle{new(Accept)} - case syscall.SYS_ACCEPT4: - ret = &FunctionHandle{new(Accept4)} - case syscall.SYS_ALARM: - ret = &FunctionHandle{new(Alarm)} - case syscall.SYS_ACCT: - ret = &FunctionHandle{new(Acct)} - case syscall.SYS_BRK: - ret = &FunctionHandle{new(Brk)} - //case syscall.SYS_IOCTL: - // XXX[lz]: not ready - // ret = &FunctionHandle{new(Ioctl)} - case syscall.SYS_CONNECT: - ret = &FunctionHandle{new(Connect)} - case syscall.SYS_CLONE: - ret = &FunctionHandle{new(Clone)} - case syscall.SYS_CLOSE: - ret = &FunctionHandle{new(Close)} - case syscall.SYS_CREAT: - ret = &FunctionHandle{new(Creat)} - case syscall.SYS_EXIT: - ret = &FunctionHandle{new(Exit)} - case syscall.SYS_FACCESSAT: - ret = &FunctionHandle{new(Faccessat)} - case syscall.SYS_FSTAT: - ret = &FunctionHandle{new(Fstat)} - case syscall.SYS_FTRUNCATE: - ret = &FunctionHandle{new(Ftruncate)} - case syscall.SYS_FUTEX: - ret = &FunctionHandle{new(Futex)} - case syscall.SYS_GETCWD: - ret = &FunctionHandle{new(Getcwd)} - case syscall.SYS_GETPEERNAME: - ret = &FunctionHandle{new(Getpeername)} - case syscall.SYS_GETSOCKNAME: - ret = &FunctionHandle{new(Getsockname)} - case syscall.SYS_GETSOCKOPT: - ret = &FunctionHandle{new(Getsockopt)} - case syscall.SYS_INIT_MODULE: - ret = &FunctionHandle{new(InitModule)} - case syscall.SYS_INOTIFY_ADD_WATCH: - ret = &FunctionHandle{new(INotifyAddWatch)} - case syscall.SYS_KILL: - ret = &FunctionHandle{new(Kill)} - case syscall.SYS_LINK: - ret = &FunctionHandle{new(Link)} - case syscall.SYS_LISTEN: - ret = &FunctionHandle{new(Listen)} - case syscall.SYS_MINCORE: - ret = &FunctionHandle{new(Mincore)} - case syscall.SYS_MKDIR: - ret = &FunctionHandle{new(Mkdir)} - case syscall.SYS_MOUNT: - ret = &FunctionHandle{new(Mount)} - case syscall.SYS_MPROTECT: - ret = &FunctionHandle{new(Mprotect)} - case syscall.SYS_NANOSLEEP: - ret = &FunctionHandle{new(Nanosleep)} - case syscall.SYS_PIVOT_ROOT: - ret = &FunctionHandle{new(PivotRoot)} - case syscall.SYS_PRLIMIT64: - ret = &FunctionHandle{new(Prlimit64)} - case syscall.SYS_PTRACE: - ret = &FunctionHandle{new(Ptrace)} - case syscall.SYS_READ: - ret = &FunctionHandle{new(Read)} - case syscall.SYS_READLINK: - ret = &FunctionHandle{new(Readlink)} - case syscall.SYS_READLINKAT: - ret = &FunctionHandle{new(Readlinkat)} - case syscall.SYS_RECVFROM: - ret = &FunctionHandle{new(Recvfrom)} - case syscall.SYS_RENAME: - ret = &FunctionHandle{new(Rename)} - case syscall.SYS_RMDIR: - ret = &FunctionHandle{new(Rmdir)} - case SYS_SECCOMP: - ret = &FunctionHandle{new(Seccomp)} - case syscall.SYS_SENDTO: - ret = &FunctionHandle{new(Sendto)} - case SYS_SETNS: - ret = &FunctionHandle{new(Setns)} - case syscall.SYS_SETSOCKOPT: - ret = &FunctionHandle{new(Setsockopt)} - case syscall.SYS_BIND: - ret = &FunctionHandle{new(Bind)} - case syscall.SYS_EXECVE: - ret = &FunctionHandle{new(Execve)} - case syscall.SYS_OPEN: - ret = &FunctionHandle{new(Open)} - case syscall.SYS_OPENAT: - ret = &FunctionHandle{new(Openat)} - case syscall.SYS_CHDIR: - ret = &FunctionHandle{new(Chdir)} - case syscall.SYS_CHROOT: - ret = &FunctionHandle{new(Chroot)} - case syscall.SYS_ACCESS: - ret = &FunctionHandle{new(Access)} - case syscall.SYS_WRITE: - ret = &FunctionHandle{new(Write)} - case syscall.SYS_UNLINK: - ret = &FunctionHandle{new(Unlink)} - case syscall.SYS_UMOUNT2: - ret = &FunctionHandle{new(Umount2)} - case syscall.SYS_TIMERFD_SETTIME: - ret = &FunctionHandle{new(TimerFDSettime)} - case syscall.SYS_TIMERFD_CREATE: - ret = &FunctionHandle{new(TimerFDCreate)} - case syscall.SYS_SYSLOG: - ret = &FunctionHandle{new(Syslog)} - case syscall.SYS_SYMLINK: - ret = &FunctionHandle{new(Symlink)} - case syscall.SYS_STATFS: - ret = &FunctionHandle{new(Statfs)} - case syscall.SYS_STAT: - ret = &FunctionHandle{new(Stat)} - case syscall.SYS_SOCKET: - ret = &FunctionHandle{new(Socket)} - case syscall.SYS_SETUID: - ret = &FunctionHandle{new(Setuid)} - default: - return nil, fmt.Errorf("unhandled syscall: %v", nr) - } - - if err := ret.DecodeArguments(arguments, arglen); err != nil { - return nil, err - } - - return ret, nil -} -*/ diff --git a/pkg/event/reader/reader.go b/pkg/event/reader/reader.go index 700f3e4..332075a 100644 --- a/pkg/event/reader/reader.go +++ b/pkg/event/reader/reader.go @@ -4,8 +4,6 @@ import ( "context" "github.com/criticalstack/swoll/pkg/kernel" - "github.com/criticalstack/swoll/pkg/topology" - "github.com/go-redis/redis" ) type EventReader interface { @@ -24,10 +22,6 @@ func NewEventReader(src interface{}) EventReader { switch src := src.(type) { case *kernel.Probe: return NewKernelReader(src) - case *redis.PubSub: - return NewRedisReader(src) - case *topology.Topology: - return NewTopologyReader(src) } return &EmptyReader{} diff --git a/pkg/event/reader/redis.go b/pkg/event/reader/redis.go deleted file mode 100644 index 691db01..0000000 --- a/pkg/event/reader/redis.go +++ /dev/null @@ -1,40 +0,0 @@ -package reader - -import ( - "context" - - "github.com/criticalstack/swoll/pkg/event" - "github.com/go-redis/redis" -) - -// RedisReader reads events from a redis client PubSub, -// And outputs as a event.Trace record into its backlog. -type RedisReader struct { - pubsub *redis.PubSub - backlog chan interface{} -} - -func NewRedisReader(pubsub *redis.PubSub) EventReader { - return &RedisReader{ - pubsub: pubsub, - backlog: make(chan interface{}), - } -} - -func (r *RedisReader) Run(ctx context.Context) error { - //ch := client.ps.Channel() - - for { - select { - case msg := <-r.pubsub.Channel(): - r.backlog <- event.RedisEvent(msg) - case <-ctx.Done(): - // TODO[mark] log cancelation error here. - return ctx.Err() - } - } -} - -func (r RedisReader) Read() chan interface{} { - return r.backlog -} diff --git a/pkg/event/reader/topology.go b/pkg/event/reader/topology.go deleted file mode 100644 index c103e7a..0000000 --- a/pkg/event/reader/topology.go +++ /dev/null @@ -1,36 +0,0 @@ -package reader - -import ( - "context" - - "github.com/criticalstack/swoll/pkg/event" - "github.com/criticalstack/swoll/pkg/topology" - "github.com/criticalstack/swoll/pkg/types" -) - -type TopologyReader struct { - topo *topology.Topology - backlog chan interface{} -} - -func NewTopologyReader(t *topology.Topology) EventReader { - return &TopologyReader{t, make(chan interface{})} -} - -func (t *TopologyReader) Read() chan interface{} { - return t.backlog -} - -func (t *TopologyReader) handler(tp topology.EventType, c *types.Container) { - switch tp { - case topology.EventTypeStart: - t.backlog <- event.ContainerAddEvent{Container: c} - case topology.EventTypeStop: - t.backlog <- event.ContainerDelEvent{Container: c} - } -} - -func (t *TopologyReader) Run(ctx context.Context) error { - t.topo.Run(ctx, t.handler) - return nil -} diff --git a/pkg/event/trace.go b/pkg/event/trace.go index a7fe160..e2199f8 100644 --- a/pkg/event/trace.go +++ b/pkg/event/trace.go @@ -1,15 +1,14 @@ package event import ( - "context" "encoding/json" "fmt" "syscall" "github.com/criticalstack/swoll/pkg/event/call" "github.com/criticalstack/swoll/pkg/syscalls" - "github.com/criticalstack/swoll/pkg/topology" "github.com/criticalstack/swoll/pkg/types" + log "github.com/sirupsen/logrus" ) // TraceEvent is a more concrete version of the `RawEvent` structure, it @@ -32,15 +31,22 @@ type TraceEvent struct { MntNamespace int `json:"mount_ns"` Start int64 `json:"start"` Finish int64 `json:"finish"` - Argv interface{} `json:"args"` + Argv call.Function `json:"args"` // when the topology context is not nil, it is used // to resove container information. - topo *topology.Topology + //topo *topology.Topology + lookupContainer ContainerLookupCb // Right now the only use for this is to copy the raw arguments // into the JSON version of this structure. raw *RawEvent } +type ContainerLookupCb func(namespace int) (*types.Container, error) + +func NewTraceEvent() *TraceEvent { + return new(TraceEvent) +} + // ColorString is just a helper to display a stupid terminal-colored // representation of a single event. func (ev *TraceEvent) ColorString() string { @@ -65,7 +71,7 @@ func (ev *TraceEvent) UnmarshalJSON(data []byte) error { return err } - var args interface{} + var args call.Function switch ev.Syscall.Nr { case syscall.SYS_ACCEPT: @@ -197,12 +203,21 @@ func (ev *TraceEvent) UnmarshalJSON(data []byte) error { return nil } +/* // WithTopology sets the internal topology context to `topo` for "resolving" // kernel-namespaces to containers. func (ev *TraceEvent) WithTopology(topo *topology.Topology) *TraceEvent { ev.topo = topo return ev } +*/ + +// WithContainerLookup sets the callback to execute to resolve kernel namespaces +// to the container it is associated with. +func (ev *TraceEvent) WithContainerLookup(cb ContainerLookupCb) *TraceEvent { + ev.lookupContainer = cb + return ev +} // Ingest reads an abstract input and outputs it as a fully-parsed TraceEvent. // If a topology context has been set, it will also attempt to resolve the @@ -228,13 +243,14 @@ func (ev *TraceEvent) Ingest(data interface{}) (*TraceEvent, error) { var container *types.Container - if ev.topo != nil { - container, _ = ev.topo.LookupContainer(context.TODO(), rawEvent.PidNamespace()) + if ev.lookupContainer != nil { + container, _ = ev.lookupContainer(rawEvent.PidNamespace()) } callData, err := call.DecodeSyscall(int(rawEvent.Syscall), rawEvent.Args(), rawEvent.ArgLen()) if err != nil { - return nil, err + log.Warnf("Failed to decode syscall %v: %v", rawEvent.Syscall, err) + callData = data.(call.Function) } ev.PidNamespace = int(rawEvent.PidNS) diff --git a/internal/pkg/assets/bindata.go b/pkg/kernel/assets/compiled.go similarity index 99% rename from internal/pkg/assets/bindata.go rename to pkg/kernel/assets/compiled.go index 0c8c2ab..8e60ee8 100644 --- a/internal/pkg/assets/bindata.go +++ b/pkg/kernel/assets/compiled.go @@ -1,4 +1,4 @@ -// Code generated by go-bindata. DO NOT EDIT. +// Code generated for package assets by go-bindata DO NOT EDIT. (@generated) // sources: // internal/bpf/probe.o // +build !nobindata @@ -25,21 +25,32 @@ type bindataFileInfo struct { modTime time.Time } +// Name return file name func (fi bindataFileInfo) Name() string { return fi.name } + +// Size return file size func (fi bindataFileInfo) Size() int64 { return fi.size } + +// Mode return file mode func (fi bindataFileInfo) Mode() os.FileMode { return fi.mode } + +// Mode return file modify time func (fi bindataFileInfo) ModTime() time.Time { return fi.modTime } + +// IsDir return file whether a directory func (fi bindataFileInfo) IsDir() bool { - return false + return fi.mode&os.ModeDir != 0 } + +// Sys return file is sys mode func (fi bindataFileInfo) Sys() interface{} { return nil } @@ -155,6 +166,7 @@ type bintree struct { Func func() (*asset, error) Children map[string]*bintree } + var _bintree = &bintree{nil, map[string]*bintree{ "internal": &bintree{nil, map[string]*bintree{ "bpf": &bintree{nil, map[string]*bintree{ @@ -209,4 +221,3 @@ func _filePath(dir, name string) string { cannonicalName := strings.Replace(name, "\\", "/", -1) return filepath.Join(append([]string{dir}, strings.Split(cannonicalName, "/")...)...) } - diff --git a/pkg/kernel/assets/loader.go b/pkg/kernel/assets/loader.go new file mode 100644 index 0000000..a552391 --- /dev/null +++ b/pkg/kernel/assets/loader.go @@ -0,0 +1,20 @@ +// +build !nobindata + +package assets + +import ( + "bytes" +) + +const ( + defaultBPFObject = "internal/bpf/probe.o" +) + +func LoadBPFReader() *bytes.Reader { + return bytes.NewReader(LoadBPF()) +} + +func LoadBPF() []byte { + bpf, _ := Asset(defaultBPFObject) + return bpf +} diff --git a/pkg/kernel/filter/defaults.go b/pkg/kernel/filter/defaults.go index c065d9d..4516b93 100644 --- a/pkg/kernel/filter/defaults.go +++ b/pkg/kernel/filter/defaults.go @@ -15,7 +15,7 @@ func (f *Filter) ApplyDefaults() error { return err } - if err := f.ApplySyscallDefaults(0); err != nil { + if err := f.ApplySyscallDefaults(-1); err != nil { return err } @@ -37,7 +37,7 @@ var defaultSyscalls = []string{ "sys_mkdirat", "sys_statfs", "sys_access", - "sys_prlimit", + "sys_prlimit64", "sys_mount", "sys_unlink", "sys_unlinkat", @@ -72,13 +72,7 @@ var defaultSyscalls = []string{ "sys_inotify_add_watch", "sys_ftruncate", "sys_sethostname", - "sys_ioctl", "sys_clone", - "sys_nanosleep", - "sys_close", - "sys_sendto", - "sys_recvfrom", - "sys_rt_sigaction", "sys_mprotect", // "sys_read" } diff --git a/pkg/kernel/metrics/handler.go b/pkg/kernel/metrics/handler.go index e0ad49e..cf7e8fb 100644 --- a/pkg/kernel/metrics/handler.go +++ b/pkg/kernel/metrics/handler.go @@ -1,6 +1,7 @@ package metrics import ( + "sync" "unsafe" "github.com/iovisor/gobpf/elf" @@ -9,6 +10,7 @@ import ( type Handler struct { module *elf.Module table *elf.Map + sync.Mutex } func NewHandler(mod *elf.Module) *Handler { @@ -24,6 +26,9 @@ func (h *Handler) PruneNamespace(ns int) (int, error) { n := &key{} toDelete := make([]*key, 0) + h.Lock() + defer h.Unlock() + for { more, _ := h.module.LookupNextElement(h.table, unsafe.Pointer(k), @@ -54,6 +59,9 @@ func (h *Handler) QueryAll() Metrics { kval := &val{} next := &key{} + h.Lock() + defer h.Unlock() + for { more, _ := h.module.LookupNextElement(h.table, unsafe.Pointer(kkey), diff --git a/pkg/kernel/offset_helper.go b/pkg/kernel/offset_helper.go new file mode 100644 index 0000000..5c420b9 --- /dev/null +++ b/pkg/kernel/offset_helper.go @@ -0,0 +1,324 @@ +package kernel + +import ( + "bufio" + "bytes" + "fmt" + "io" + "os" + "os/exec" + "regexp" + "strconv" + "strings" + + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" +) + +var ( + nsProxyProbeSymbols = []string{ + "ipcns_get", + "mntns_get", + "cgroupns_get", + "netns_get", + "get_proc_task_net", + "switch_task_namespaces", + "mounts_open_common", + } + pidNsProbeSymbols = []string{ + "pidns_for_children_get", + "pidns_put", + "pidns_get", + } +) + +const ( + ksymsFile = "/proc/kallsyms" + kcoreFile = "/proc/kcore" + + OffsetFlagNsProxy = (1 << 0) + OffsetFlagPidNs = (1 << 1) + OffsetFlagAll = (OffsetFlagNsProxy | OffsetFlagPidNs) +) + +type kernelSyms map[string]string + +// parseKernelSyms reads in a `/proc/kallsyms` formatted symtab and merges the +// info into a map. +func parseKernelSyms(r io.Reader) kernelSyms { + ret := make(map[string]string) + scanner := bufio.NewScanner(r) + + for scanner.Scan() { + tokens := strings.Fields(scanner.Text()) + switch tokens[1] { + case "t", "T": + break + default: + continue + } + + // symname = kernel address + ret[tokens[2]] = tokens[0] + } + + return ret +} + +func parseKallsyms(symfile string) (kernelSyms, error) { + if symfile == "" { + symfile = ksymsFile + } + + f, err := os.Open(ksymsFile) + if err != nil { + return nil, err + } + defer f.Close() + + return parseKernelSyms(f), nil +} + +func objdumpAddress(addr, corefile string) ([]byte, error) { + r, err := strconv.ParseUint(addr, 16, 64) + if err != nil { + return nil, err + } + + if _, err := os.Stat(corefile); os.IsNotExist(err) { + return nil, errors.Wrapf(err, "could not open %s, set offsets manually", corefile) + } + + if _, err := exec.LookPath("objdump"); err != nil { + return nil, errors.Wrapf(err, "could not find `objdump`: please install binutils") + } + + return exec.Command("objdump", corefile, + "--disassemble", + "--disassembler-options", "intel", + "--start-address", fmt.Sprintf("0x%08x", r), + "--stop-address", fmt.Sprintf("0x%08x", r+(8*32))).Output() +} + +const ( + stateRLock = iota + stateRMov +) + +var movre *regexp.Regexp = regexp.MustCompile(`mov\s+\w+,QWORD PTR \[\w+\+(0x\w+?)\]`) +var leare *regexp.Regexp = regexp.MustCompile(`lea\s+\w+,\[\w+[+-](0x\w+?)\]`) + +// The purpose of this little nugget is to find the offset of the currently +// running kernel's task_struct->nsproxy variable. We use this offset to +// find the current tasks's pid and mnt namespaces. This (currently) only works +// on x86_64 as we are looking for x86 specific operations. +// +// It works by scanning `/proc/kallsyms` for specific functions we are +// interested in, in our case: +// - `_raw_spin_lock` (more on this later) +// - `ipcns_get` +// - `utsns_get` +// - `mntns_get` +// - `cgroupns_get` +// +// With the exception of `_raw_spin_lock`, these kernel functions are small +// helper functions which can be found in all modern linux installs. We target +// these because they: +// a) are very small in size and pose little risk of changing +// b) are very similar in operation +// c) lock the task_struct at a very early stage +// d) assign a local variable the value of `task_struct->nsproxy` +// +// Take the following function as an example: +// static struct ns_common *mntns_get(struct task_struct *task) +// { +// struct ns_common *ns = NULL; +// struct nsproxy *nsproxy; +// task_lock(task); // push %dsi +// // call _raw_spin_lock +// nsproxy = task->nsproxy; // mov reg,dsi+0x... == offset of task->nsproxy +// if (nsproxy != NULL) { // test reg,reg +// +// The resulting assembly will have the following characteristics: +// push %dsi // or wherever arg[0] resides. +// call _raw_spin_lock // task_lock(task) +// mov %r8,QWORD PTR [%dsi+XXXX] // nsproxy = task->nsproxy (XXXX == offset) +// test %r8,%r8 // if (nsproxy != NULL) +// +// We fetch the addresses of these symbols from /proc/kallsyms, then read +// /proc/kcore and disassemble the first few instructions and attempt to find +// the above pattern. +// +// This is a hack, but it's a working hack. +func nsproxyOffsetSearch(objdump io.Reader, syms kernelSyms) string { + lockaddr := syms["_raw_spin_lock"] + scanner := bufio.NewScanner(objdump) + state := stateRLock + + for scanner.Scan() { + if !strings.HasPrefix(scanner.Text(), "ffffffff") { + // skip lines that aren't specifically code segments. + continue + } + + switch state { + case stateRLock: + // read until we see a call to _raw_spin_lock + if strings.Contains(scanner.Text(), lockaddr) { + // found a call to _raw_spin_lock, set the next thing + // to search for. + state = stateRMov + } + case stateRMov: + // read instructions until we find a local mov from dsi+offset + if found := movre.FindStringSubmatch(scanner.Text()); len(found) > 0 { + // this is a potential match. + return found[1] + } + + } + } + + return "" +} + +// pidnscommonOffsetSearch is simple in comparison to nsproxyOffsetSearch. Here +// we are trying to get the offset to the `struct ns_common` member `ns` from +// `struct pid_namespace`. There are several functions in the kernel that do +// something like the following: `return ns ? &(struct ns_common *)ns->ns : +// NULL;`, or `if (ns != NULL) return &ns->ns; else return NULL;` +// +// Returning a pointer to a constant or stack will result in a "load effective +// address" (`lea`), and these functions are small enough where we can just +// count the number of `lea`'s to get a good idea of where this member sits. +func pidnscommonOffsetSearch(objdump io.Reader) []string { + scanner := bufio.NewScanner(objdump) + ret := make([]string, 0) + + for scanner.Scan() { + if found := leare.FindStringSubmatch(scanner.Text()); len(found) > 0 { + ret = append(ret, found[1]) + } + } + + return ret +} + +func maxCandidates(candidates map[string]int) (string, int) { + var mn int + var ms string + + for k, v := range candidates { + if v == mn { + log.Warnf("warning: same size candidates (%v=%v == %v=%v)\n", k, v, ms, mn) + } + + if v > mn { + ms = k + mn = v + } + } + + return ms, mn +} + +func pidnsCommonLikelyOffset(symbols kernelSyms, corefile string, functions []string) (string, error) { + candidates := make(map[string]int) + + for _, fn := range functions { + addr, ok := symbols[fn] + if !ok { + return "", fmt.Errorf("symbol '%s' not found in kernel-symbols", fn) + } + + code, err := objdumpAddress(addr, corefile) + if err != nil { + return "", errors.Wrapf(err, "unable to dump address of symbol '%s': %v", fn, err) + } + + if offs := pidnscommonOffsetSearch(bytes.NewReader(code)); len(offs) > 0 { + for _, off := range offs { + candidates[off]++ + } + } + } + + addr, count := maxCandidates(candidates) + log.Debugf("pidns->ns_common likelyOffset addr=%v, count=%v\n", addr, count) + return addr, nil + +} + +func nsproxyLikelyOffset(symbols kernelSyms, corefile string, functions []string) (string, error) { + candidates := make(map[string]int) + + for _, fn := range functions { + addr, ok := symbols[fn] + if !ok { + return "", fmt.Errorf("symbol '%s' not found in kernel-symbols", fn) + } + + code, err := objdumpAddress(addr, corefile) + if err != nil { + return "", err + } + + if offs := nsproxyOffsetSearch(bytes.NewReader(code), symbols); offs != "" { + candidates[offs]++ + } + } + + addr, count := maxCandidates(candidates) + log.Debugf("nsproxy offset likely at offset=%v (%d hits)\n", addr, count) + + return addr, nil +} + +// DetectAndSetOffsets is a wrapper around the kernel Offseter. For now it +// requires `objdump` to be installed, and will attempt to find offsets within +// the `struct task_struct` structure that are required to run the probe with. +func (p *Probe) DetectAndSetOffsets() error { + symbols, err := parseKallsyms(ksymsFile) + if err != nil { + return errors.Wrapf(err, "Unable to parse symbol-file=%s: %v", ksymsFile, err) + } + + nsproxy, err := nsproxyLikelyOffset(symbols, kcoreFile, nsProxyProbeSymbols) + if err != nil || nsproxy == "" { + return errors.Wrapf(err, "unable to detect nsproxy offset: %v", err) + } + + pidns, err := pidnsCommonLikelyOffset(symbols, kcoreFile, pidNsProbeSymbols) + if err != nil || pidns == "" { + return errors.Wrapf(err, "unable to detect pidns_common offset: %v", err) + } + + // trim and parse up the returned raw offsets + nsproxy = strings.TrimPrefix(nsproxy, "0x") + pidns = strings.TrimPrefix(pidns, "0x") + + nsproxyOffset, err := strconv.ParseInt(nsproxy, 16, 64) + if err != nil { + return errors.Wrapf(err, "Unable to parse nsproxy raw offset-string '%s': %v", nsproxy, err) + } + + pidnsOffset, err := strconv.ParseInt(pidns, 16, 64) + if err != nil { + return errors.Wrapf(err, "Unable to parse pidns raw offset-string '%s': %v", pidns, err) + } + + offsetter, err := NewOffsetter(p.Module()) + if err != nil { + return errors.Wrapf(err, "Unable to create kernel-offset configuration: %v", err) + } + + if err := offsetter.Set("nsproxy", OffsetValue(nsproxyOffset)); err != nil { + return errors.Wrapf(err, "Unable to set offset for nsproxy: %v", err) + } + + if err := offsetter.Set("pid_ns_common", OffsetValue(pidnsOffset)); err != nil { + return errors.Wrapf(err, "Unable to set offset for pid_ns_common: %v", err) + } + + return nil +} diff --git a/pkg/kernel/probe.go b/pkg/kernel/probe.go index 20b493e..d10a6d4 100644 --- a/pkg/kernel/probe.go +++ b/pkg/kernel/probe.go @@ -3,10 +3,11 @@ package kernel import ( "bytes" "context" - "errors" "fmt" + "github.com/criticalstack/swoll/pkg/kernel/filter" "github.com/iovisor/gobpf/elf" + "github.com/pkg/errors" ) // Config is uhh, configuration stuff. @@ -75,9 +76,11 @@ func NewProbe(bpf *bytes.Reader, cfg *Config) (*Probe, error) { // or if the lost channel had been signaled. type DataCallback func(msg []byte, lost uint64) error +type ProbeInitOption func(*Probe) error + // InitProbe loads all the underlying bpf maps, allocates the perfevent buffer, // sets the tracepoints, all of which are operations which require CAP_ADMIN -func (p *Probe) InitProbe() error { +func (p *Probe) InitProbe(opts ...ProbeInitOption) error { if p == nil || p.module == nil { return fmt.Errorf("nil probe") } @@ -93,11 +96,38 @@ func (p *Probe) InitProbe() error { return err } + for _, opt := range opts { + if err := opt(p); err != nil { + return err + } + } + p.initialized = true return nil } +func WithOffsetDetection() ProbeInitOption { + return func(p *Probe) error { + return p.DetectAndSetOffsets() + } +} + +func WithDefaultFilter() ProbeInitOption { + return func(p *Probe) error { + f, err := filter.NewFilter(p.Module()) + if err != nil { + return errors.Wrapf(err, "unable to create new filter context") + } + + if err := f.ApplyDefaults(); err != nil { + return errors.Wrapf(err, "unable to create filter for this proccess") + } + + return nil + } +} + // InitTracePoints will set our tracepoints as on. func (p *Probe) InitTracepoints() error { // initialize our tracepoints that we have setup to be configured. @@ -136,11 +166,7 @@ func (p *Probe) Run(ctx context.Context, cb DataCallback) error { } if !p.initialized { - // don't rely on end-user to call init (as it could be sourced from - // eventreader) - if err := p.InitProbe(); err != nil { - return err - } + return errors.New("probe not initialized") } if !p.tpInitialized { diff --git a/pkg/topology/consts.go b/pkg/topology/consts.go new file mode 100644 index 0000000..5e6b129 --- /dev/null +++ b/pkg/topology/consts.go @@ -0,0 +1,8 @@ +package topology + +const ( + // The prefix for where job-specific events are sent + swJobStream = "job" + // The prefix for where non-job-specific events (pathed) are sent + swNsStream = "ns" +) diff --git a/pkg/topology/doc.go b/pkg/topology/doc.go new file mode 100644 index 0000000..c0c1550 --- /dev/null +++ b/pkg/topology/doc.go @@ -0,0 +1,43 @@ +// Package topology is the preferred method for creating and supervising system +// traces when using the Swoll API on modern container management and +// orchestration systems such as Kubernetes. +// +// To better understand what this package does, it is best to start with +// learning a little bit about how Swoll creates, captures, filters, and +// emits data from the kernel back into our code. +// +// The Swoll BPF has a very simple userspace-configurable filtering mechanism +// which allows us to either white-list or black-list what syscalls we want to +// monitor. Optionally, each call we want to monitor can also be associated with +// a specific kernel namespace. So, for example, a user can request to only see +// events which made the sytem call "open" in the kernel PID-Namespace `31337`. +// Any events that do not match this specific rule will be silently dropped by +// the kernel. +// +// Furthermore, each filter can optionally maintain a basic sample-rate +// configuration, giving the developer the option to gain insight into high-load +// system-calls such as `sys_read` without impacting performance too much. +// +// Since each container within a `Pod` gets its own unique (or derived if +// shared) namespace, swoll exploits the above ns+syscall filter feature by +// maintaining the relations between Kubernetes and the container-runtime by +// dynamically updating and tuning the filters in real-time. +// +// In short (using Kubernetes as an example), when we request Swoll to monitor +// syscall events for the Pod "foobar", we connect to the kube-api-server, watch +// for Pod events that match "foobar", and when matched, utilizes the Container +// Runtime Interface to find process details for that Pod. Once we have obtained +// the init PID from the CRI, we can render the PID namespace we need to use to +// set the filter in the kernel. +// +// In theory this sounds simple, but in practice things are not as easy. Swoll +// strives to run as lean-and-mean as possible, and in doing so, the goal +// of which is "One BPF Context To Mon Them All", and still without sacrificing +// performance for flexibility or vice-versa. +// +// And the Topology API is exactly that. It "observes" events from Kubernetes +// and CRI (see: topology.Observer), runs one or more v1alpha1.Trace specifications as a +// topology.Job, which in-turn dynamically updates, de-duplicates, +// and prunes the kernel filter inside a single BPF context, better known as the +// topology.Hub. +package topology diff --git a/pkg/topology/docker.go b/pkg/topology/docker.go deleted file mode 100644 index 6d8c90e..0000000 --- a/pkg/topology/docker.go +++ /dev/null @@ -1,11 +0,0 @@ -package topology - -import ( - _ "github.com/docker/docker/client" -) - -type DockerOption func(*Docker) error - -type Docker struct { - //client *dockercli.Client -} diff --git a/internal/pkg/hub/hash.go b/pkg/topology/hash.go similarity index 95% rename from internal/pkg/hub/hash.go rename to pkg/topology/hash.go index 2947f94..fed7a08 100644 --- a/internal/pkg/hub/hash.go +++ b/pkg/topology/hash.go @@ -1,4 +1,4 @@ -package hub +package topology import "hash/crc64" diff --git a/internal/pkg/hub/hub.go b/pkg/topology/hub.go similarity index 63% rename from internal/pkg/hub/hub.go rename to pkg/topology/hub.go index efe3389..3db3867 100644 --- a/internal/pkg/hub/hub.go +++ b/pkg/topology/hub.go @@ -1,4 +1,4 @@ -package hub +package topology import ( "bytes" @@ -11,12 +11,11 @@ import ( "github.com/criticalstack/swoll/api/v1alpha1" "github.com/criticalstack/swoll/internal/pkg/pubsub" "github.com/criticalstack/swoll/pkg/event" - "github.com/criticalstack/swoll/pkg/event/reader" "github.com/criticalstack/swoll/pkg/kernel" "github.com/criticalstack/swoll/pkg/kernel/filter" "github.com/criticalstack/swoll/pkg/kernel/metrics" "github.com/criticalstack/swoll/pkg/syscalls" - "github.com/criticalstack/swoll/pkg/topology" + "github.com/criticalstack/swoll/pkg/types" log "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -31,10 +30,10 @@ const ( stubSyscallFilter = "sys_set_tid_address" // a stub syscall we use to "prime" the kernel syscall-filter ) -// Hub maintains the global kernel probe and all the underlying -// filters and event message routing. +// Hub maintains all of the underlying kernel-filters, job request and output +// routing, metric-rules, de-duplication, garbage-collection using information +// it has obtained via the underlying Observer. type Hub struct { - config *Config // map pid-namespace+syscall_nr to a list of JobContext's nsmap map[int]map[int]*JobList // map job-ids to lists of JobContext's @@ -47,32 +46,79 @@ type Hub struct { filter *filter.Filter // the topology context for this hub which is used for resolving kernel // namespaces to pods/containers - topo *topology.Topology + topo *Topology statsInterval time.Duration sync.Mutex } -// RunJob runs a job on the Hub +// NewHub creates and initializes a Hub context and the underlying BPF, primes +// the kernel filter, and sets up the in-kernel metrics. +// hub := topology.NewHub(assets.LoadBPFReader(), topology.NewKubernetes()) +func NewHub(bpf *bytes.Reader, observer Observer) (*Hub, error) { + if bpf.Len() == 0 { + return nil, errors.New("BPF object missing") + } + + probe, err := kernel.NewProbe(bpf, nil) + if err != nil { + return nil, err + } + + if err := probe.InitProbe(); err != nil { + return nil, err + } + + filter, err := filter.NewFilter(probe.Module()) + if err != nil { + return nil, err + } + + // blacklist our running pid so we don't see events from our self. + if err := filter.FilterSelf(); err != nil { + return nil, err + } + + // we need to have at least one syscall in our filter (which will never + // actually match anything) when we start with a clean slate. + if err := filter.AddSyscall(stubSyscallFilter, 0); err != nil { + return nil, err + } + + // add a stub/dummy metrics filter so we don't dump everything. + if err := filter.AddMetrics(stubMetricsFilterNS); err != nil { + return nil, err + } + + return &Hub{ + nsmap: make(map[int]map[int]*JobList), + idmap: make(map[string]*JobList), + ps: pubsub.New(), + probe: probe, + filter: filter, + topo: NewTopology(observer), + }, nil +} + +// RunTrace will create and schedule a trace-job to be run inside the Hub. +func (h *Hub) RunTrace(ctx context.Context, t *v1alpha1.Trace) error { + return h.RunJob(ctx, NewJob(t)) +} + +// RunJob will schedule an already-allocated trace-job to be run inside the Hub. func (h *Hub) RunJob(ctx context.Context, job *Job) error { return job.Run(ctx, h) } -// MustRunJob calls hub.RunJob but exits on any errors +// MustRunJob is a fail-wrapper around RunJob func (h *Hub) MustRunJob(ctx context.Context, job *Job) { if err := h.RunJob(ctx, job); err != nil { log.Fatal(err) } } -// RunTrace runs a TraceJob on the hub -func (h *Hub) RunTrace(ctx context.Context, t *v1alpha1.Trace) error { - return h.RunJob(ctx, NewJob(t)) -} - // DeleteTrace will stop all the running jobs that are associated with this -// Trace specification. using the job-id, we iterate over each context and -// if there are no other jobs trying to use the syscall and pod associated -// with this, the kernel filters are removed. +// specification. All kernel-filters that were added to create this job are +// removed if no other jobs share the same rules func (h *Hub) DeleteTrace(t *v1alpha1.Trace) error { h.Lock() defer h.Unlock() @@ -139,82 +185,47 @@ func (h *Hub) DeleteTrace(t *v1alpha1.Trace) error { return nil } -// WriteEvent writes a single TraceEvent to all subscribers +// WriteEvent writes a single TraceEvent to all subscribers using the +// path-subscriptions func (h *Hub) WriteEvent(ev *event.TraceEvent) { h.ps.Publish(ev, pubsub.LinearTreeTraverser( hashPath(swNsStream, ev.Container.Namespace, ev.Container.Pod, ev.Container.Name, ev.Syscall.Name))) } -// Run starts up and runs the global kernel probe and maintains various -// state. For each event that is recv'd via the bpf, we decode it (`Ingest`), -// find all jobs associated with this message, and publish the event to -// the streams tied to the jobs. +// Run runs the main Hub event-loop. It maintains the filters and metric +// rules that run in the BPF, resolves and routes system-call events to all the +// job output queues, accepts Trace specs to run, and keeps the bpf running +// light. func (h *Hub) Run(ctx context.Context) error { - // initialize our kernel reader used to read messages from the kernel. - proberdr := reader.NewEventReader(h.probe) - //nolint:errcheck - go proberdr.Run(ctx) - - // initialize our topology reader which is used for resolving - // kernel-namespaces back to the container/pod it was sourced from. - topordr := reader.NewEventReader(h.topo) - // nolint:errcheck - go topordr.Run(ctx) - - mhandler := metrics.NewHandler(h.probe.Module()) - sinterval := h.statsInterval - if sinterval == 0 { - // default the stats interval to 10 seconds. - sinterval = 10 * time.Second - } - - stattick := time.NewTicker(sinterval) - - msg := new(event.TraceEvent).WithTopology(h.topo) - - for { - select { - case <-stattick.C: - log.Debugf("allocated-metric-nodes: %v", len(mhandler.QueryAll())) - case ev := <-topordr.Read(): - // we keep an active podmon reader available for kernel-namespace to - // container resolution - switch ev := ev.(type) { - case event.ContainerAddEvent: - log.Tracef("adding container to metrics watcher: %s", ev.Container.FQDN()) - - if err := h.filter.AddMetrics(ev.PidNamespace); err != nil { - log.Warnf("failed to add kernel-metric filter for %s: %v", ev.Container.FQDN(), err) - } - - case event.ContainerDelEvent: - log.Tracef("removing/pruning container from metrics watcher: %s", ev.Container.FQDN()) - - if err := h.filter.RemoveMetrics(ev.PidNamespace); err != nil { - log.Warnf("failed to remove kernel-metric filter for %s: %v", ev.Container.FQDN(), err) - } - - pruned, err := mhandler.PruneNamespace(ev.PidNamespace) - if err != nil { - log.Warnf("failed to prune metrics for %s: %v", ev.Container.FQDN(), err) - } - - log.Tracef("pruned %d metrics for %s", pruned, ev.Container.FQDN()) - } - - case ev := <-proberdr.Read(): - // read a single event from the kernel, allcoate empty TraceEvent, - // initialize the underlying with the topology resolver + // create a new buffer to store our parsed kernel messages into, also set + // the kernel-namespace -> container resolver callback to read from our + // topology API. + // + // Note: we create this as global to the scope of this function, everything + // is run as a callback so there should not be a need for a mutex. + msg := event.NewTraceEvent().WithContainerLookup( + func(pidNamespace int) (*types.Container, error) { + // when this callback is executed, the return container value will + // be set within our current `msg` context + return h.topo.LookupContainer(ctx, pidNamespace) + }) + + // Run our kernel probe handler in a goroutine + go func() { + err := h.probe.Run(ctx, func(ev []byte, lost uint64) error { + // read a single raw event from the kernel, attempt to decode the raw + // data into a TraceEvent via Ingest. if _, err := msg.Ingest(ev); err != nil { - continue + log.Debugf("Failed to decode raw event: %v", err) + return nil } // We were unable to obtain any container information about this // message, this could be for several reasons, but we just ignore // for now. if msg.Container == nil { - continue + return nil } h.Lock() @@ -235,11 +246,69 @@ func (h *Hub) Run(ctx context.Context) error { } h.Unlock() + return nil + }) + if err != nil { + log.Error(err) + } + + }() + + // Using events from the observer (container start/stop), we keep a running + // list of metrics to gather. This also handles kernel-filter garbage + // collection. + mhandler := metrics.NewHandler(h.probe.Module()) + + go h.topo.Run(ctx, func(etype EventType, c *types.Container) { + switch etype { + case EventTypeStart: + // a new container was started, simply assure that we are collecting + // metrics. + log.Tracef("adding container to metrics watcher: %s", c.FQDN()) + + if err := h.filter.AddMetrics(c.PidNamespace); err != nil { + log.Warnf("failed to add kernel-metric filter for %s: %v", c.FQDN(), err) + } + case EventTypeStop: + // a container has been marked as either stopped, or some other + // indicator stating it is no longer available. + log.Tracef("removing/pruning container from metrics watcher: %s", c.FQDN()) + + if err := h.filter.RemoveMetrics(c.PidNamespace); err != nil { + log.Warnf("failed to remove kernel-metric filter for %s: %v", c.FQDN(), err) + } + + pruned, err := mhandler.PruneNamespace(c.PidNamespace) + if err != nil { + log.Warnf("failed to prune metrics for %s: %v", c.FQDN(), err) + } + + log.Tracef("pruned %d metrics for %s", pruned, c.FQDN()) } + }) + + sinterval := h.statsInterval + if sinterval == 0 { + // default the stats interval to 10 seconds. + sinterval = 10 * time.Second } + stattick := time.NewTicker(sinterval) + + for { + select { + case <-stattick.C: + log.Debugf("allocated-metric-nodes: %v", len(mhandler.QueryAll())) + case <-ctx.Done(): + return ctx.Err() + } + } + + return nil + } +// MustRun is a fail-wrapper around Run func (h *Hub) MustRun(ctx context.Context) { if err := h.Run(ctx); err != nil { log.Fatal(err) @@ -268,12 +337,16 @@ func (h *Hub) findJobListByID(id string) *JobList { } // PushJob insert a namespace+nr specific job as a value of a list in two -// buckets; the first being the `nsmap`, a mapping of pidNamespace + syscall_NR -// to lists of jobs, and an `idmap`, a mapping of jobID's to jobs. +// buckets; +// "nsmap": a mapping of pid-namespace+syscall -> list of jobs, +// "idmap": a mapping of a job-ID to individual job contexts. // -// We keep these lists like this so that if two jobs that have overlapping rules -// (e.g., rule-A=syscall_A,syscall_B, rule-B=syscall_A,syscall_C) we don't -// accidentally delete a running check for `syscall_A` if `rule-B` is removed. +// This is done to solve potential job duplication issues with overlapping +// rules. For example if we have two rules: +// rule-A = syscall_A, syscall_B +// rule-B = syscall_A, syscall_C +// And if "rule-B" is deleted, we don't want the kernel filter "syscall_A" +// removed due to the fact it is still needed for "rule-A". func (h *Hub) PushJob(job *Job, ns, nr int) { h.Lock() defer h.Unlock() @@ -322,55 +395,8 @@ func (h *Hub) findLowestSampleJob(ns, nr int) *JobContext { return min } -// NewHub creates and initializes a Hub context for reading and writing data to -// the kernel probe and routing them to the clients that care. -func NewHub(config *Config, observer topology.Observer) (*Hub, error) { - if len(config.BPFObject) == 0 { - return nil, errors.New("BPF object missing") - } - - probe, err := kernel.NewProbe(bytes.NewReader(config.BPFObject), nil) - if err != nil { - return nil, err - } - - if err := probe.InitProbe(); err != nil { - return nil, err - } - - filter, err := filter.NewFilter(probe.Module()) - if err != nil { - return nil, err - } - - if err := filter.FilterSelf(); err != nil { - return nil, err - } - - // we need to have at least one syscall in our filter (which will never - // actually match anything) when we start with a clean slate. - if err := filter.AddSyscall(stubSyscallFilter, 0); err != nil { - return nil, err - } - - // add a stub/dummy metrics filter so we don't dump everything. - if err := filter.AddMetrics(stubMetricsFilterNS); err != nil { - return nil, err - } - - return &Hub{ - config: config, - nsmap: make(map[int]map[int]*JobList), - idmap: make(map[string]*JobList), - ps: pubsub.New(), - probe: probe, - filter: filter, - topo: topology.NewTopology(observer), - }, nil -} - // Topology returns this Hub's current underlying topology context -func (h *Hub) Topology() *topology.Topology { +func (h *Hub) Topology() *Topology { if h != nil { return h.topo } @@ -387,17 +413,15 @@ func (h *Hub) Probe() *kernel.Probe { return nil } -// AttachTrace will subscribe the caller to a stream which has the output of a -// specific job. -func (h *Hub) AttachTrace(t *v1alpha1.Trace, cb func(n string, ev *event.TraceEvent)) pubsub.Unsubscriber { - return h.ps.Subscribe( - func(data interface{}) { - cb(t.Status.JobID, data.(*event.TraceEvent)) - }, pubsub.WithPath(hashPath(swJobStream, t.Status.JobID))) -} - -// AttachPath will subscribe the caller to a stream which is a subset of data -// sent to a specific job. +// AttachPath taps the caller into a subset of the data being sent to a running Job. +// Whenever an event is sent to a job, the Hub will also broadcast a copy of +// this event to a prefix-hash like so: +// hash("kube-namespace/", "kube-pod/", "kube-container/", "syscall-name/") +// +// Monitor ns/pod/container/syscall +// hub.AttachPath("", []string{"", "", "", "syscall"}, cb) +// Monitor all syscalls and containers in pod: +// hub.AttachPath("", []string{"", ""}, cb) func (h *Hub) AttachPath(name string, paths []string, cb func(string, *event.TraceEvent)) pubsub.Unsubscriber { tpaths := []string{swNsStream} tpaths = append(tpaths, paths...) @@ -407,3 +431,11 @@ func (h *Hub) AttachPath(name string, paths []string, cb func(string, *event.Tra cb(name, data.(*event.TraceEvent)) }, pubsub.WithPath(hashPath(tpaths...))) } + +// AttachTrace taps the caller into the events for a running Trace +func (h *Hub) AttachTrace(t *v1alpha1.Trace, cb func(n string, ev *event.TraceEvent)) pubsub.Unsubscriber { + return h.ps.Subscribe( + func(data interface{}) { + cb(t.Status.JobID, data.(*event.TraceEvent)) + }, pubsub.WithPath(hashPath(swJobStream, t.Status.JobID))) +} diff --git a/pkg/topology/hub_test.go b/pkg/topology/hub_test.go new file mode 100644 index 0000000..4f3be52 --- /dev/null +++ b/pkg/topology/hub_test.go @@ -0,0 +1,98 @@ +package topology_test + +import ( + "bytes" + "context" + + "github.com/criticalstack/swoll/api/v1alpha1" + "github.com/criticalstack/swoll/pkg/event" + "github.com/criticalstack/swoll/pkg/topology" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// Running the Hub +func ExampleHub_Run() { + obs, err := topology.NewKubernetes() + if err != nil { + panic(err) + } + + var bpf *bytes.Reader + + hub, err := topology.NewHub(bpf, obs) + if err != nil { + panic(err) + } + + ctx := context.Background() + go hub.Run(ctx) + <-ctx.Done() +} + +// A short example showing how to use the RunTrace call +func ExampleHub_RunTrace() { + var ( + bpf *bytes.Reader + observer topology.Observer + ) + + hub, err := topology.NewHub(bpf, observer) + if err != nil { + panic(err) + } + + ctx := context.Background() + + // Run the Hub. + go hub.MustRun(ctx) + + // Monitor execve and openat in the kubernetes-namespace 'kube-system' and + // name the job "foo-bar". + go hub.RunTrace(ctx, &v1alpha1.Trace{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "kube-system", + }, + Spec: v1alpha1.TraceSpec{ + Syscalls: []string{"execve", "openat"}, + }, + Status: v1alpha1.TraceStatus{ + JobID: "foo-bar", + }, + }) + + // trace is now running inside the Hub, you must attach to it to recv events + <-ctx.Done() + +} + +// Simple example to show how to use the AttachTrace method, this assumes the +// topology.Hub is already running with an Observer. +func ExampleHub_AttachTrace() { + var hub *topology.Hub + + trace := &v1alpha1.Trace{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "kube-system", + }, + Spec: v1alpha1.TraceSpec{ + Syscalls: []string{"execve", "openat"}, + }, + Status: v1alpha1.TraceStatus{ + JobID: "foo-bar", + }, + } + + go hub.RunTrace(context.TODO(), trace) + hub.AttachTrace(trace, func(name string, ev *event.TraceEvent) {}) +} + +// In this example we use AttachPath to "subscribe" to a subset of events being +// sent to a running Job output. +func ExampleHub_AttachPath() { + var hub *topology.Hub + // Assumes there is a job that has matches namespace=kube-system, + // pod=foo-pod, and a container named "boo" + unsub := hub.AttachPath("example", []string{"kube-system", "foo-pod", "boo"}, + func(name string, ev *event.TraceEvent) {}) + defer unsub() +} diff --git a/internal/pkg/hub/job.go b/pkg/topology/job.go similarity index 53% rename from internal/pkg/hub/job.go rename to pkg/topology/job.go index c503f93..ec6c4d8 100644 --- a/internal/pkg/hub/job.go +++ b/pkg/topology/job.go @@ -1,4 +1,4 @@ -package hub +package topology import ( "container/list" @@ -8,38 +8,37 @@ import ( "github.com/criticalstack/swoll/api/v1alpha1" "github.com/criticalstack/swoll/internal/pkg/pubsub" "github.com/criticalstack/swoll/pkg/event" - "github.com/criticalstack/swoll/pkg/event/reader" "github.com/criticalstack/swoll/pkg/syscalls" - "github.com/criticalstack/swoll/pkg/topology" + "github.com/criticalstack/swoll/pkg/types" + "github.com/pkg/errors" log "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" ) -// Job maintains the general rules for which a trace runs under. +// Job stores the trace specification and a running list of hosts which have +// matched this job. type Job struct { *v1alpha1.Trace sampled int monitoredHosts map[string]bool } -// JobContext is a structure that is used to store all the kernel filtering -// information per pid-namespace. +// JobContext contains information about the filters that were created in order +// to run a Job. Since multiple jobs can have shared resources (like +// kernel-filters), all possible rules are created and set. // -// When we reference a "POD" in this code, we use the PID namespace of the -// underlying container to key off of. Since multiple jobs can have some of -// the same syscalls and hosts being monitored, we create a list of all possible -// rules, but only filter in the kernel what is needed. +// For example, say we have two jobs: "job-A", and "job-B". // -// For example: say we have two "jobs". "Job-A", and "Job-B". -// Job-A monitors "app=nginx" with the syscalls "open"/"close" -// Job-B monitors "type=webserver" with the syscalls "open" -// app=nginx matches 'host-A' and 'host-B' -// type=webserver matches 'host-A' and 'host-Z' -// If we were to blindly delete Job-B, (meaning removing the filters for -// 'Host-A' and syscall "open"), we would also delete the filter that is -// being used for 'Job-A'. So we just make sure we don't delete from the -// actual kernel filter until our lists are empty. +// job-A monitors pods that match the label: app=nginx for the syscalls: "open", and "close" +// job-B monitors pods that match the label: type=webserver for just the syscall "open" +// +// If a pod was created with both the labels above (app=nginx,type=webserver), +// and we were to blindly delete "job-B", any filters that were added that +// matched both rules would be removed. +// +// Thus every filter is accounted for, treated much like a reference counter, +// only removing from the kernel-filter when no rules require it. type JobContext struct { *Job // the element that was used for insertion into the `nsmap` in `Hub` @@ -85,23 +84,19 @@ func (j *Job) Run(ctx context.Context, h *Hub) error { now := metav1.NewTime(time.Now()) j.Status.StartTime = &now + // grab the trace specification from this job definition. spec := j.TraceSpec() - kubetop, err := topology.NewKubernetes( - topology.WithKubernetesCRI(h.config.CRIEndpoint), - topology.WithKubernetesConfig(h.config.K8SEndpoint), - topology.WithKubernetesNamespace(j.Namespace), - topology.WithKubernetesProcRoot(h.config.AltRoot), - topology.WithKubernetesLabelSelector(labels.Set(spec.LabelSelector.MatchLabels).String()), - topology.WithKubernetesFieldSelector(labels.Set(spec.FieldSelector.MatchLabels).String())) + + // derive an Observer using the parent observer (most likely the Kubernetes observer), but + // set the namespace and various selectors to match this job. + observer, err := h.topo.observer.Copy( + WithKubernetesNamespace(j.Namespace), + WithKubernetesLabelSelector(labels.Set(spec.LabelSelector.MatchLabels).String()), + WithKubernetesFieldSelector(labels.Set(spec.FieldSelector.MatchLabels).String())) if err != nil { - log.Fatal(err) + return errors.Wrapf(err, "could not make a copy of the observer") } - topo := topology.NewTopology(kubetop) - rdr := reader.NewEventReader(topo) - //nolint:errcheck - go rdr.Run(ctx) - // these are the list of syscalls which will be used as rules for each // matched container from the topology api. calls := make(syscalls.SyscallList, 0) @@ -109,103 +104,96 @@ func (j *Job) Run(ctx context.Context, h *Hub) error { calls = append(calls, syscalls.Lookup(sc)) } - // It should be noted that we are looking at topology events, meaning these - // are messages informing us about containers entering and leaving the - // cluster. So when you see kernel filters being added and removed, even if - // associated with another job, these are containers that have left or - // joined the cluster so they need to be removed from the filter. - for { - select { - case ev := <-rdr.Read(): - switch ev := ev.(type) { - case event.ContainerAddEvent: - // new container found inside cluster that matched our labels - name := ev.Name - - if len(spec.HostSelector) > 0 { - // if we have a host-selector array in our spec, attempt to - // match this container's name with the entries in this - // variable. Only if they match will the filter be added. - matched := false - for _, h := range spec.HostSelector { - if h == name { - matched = true - break - } - } - - if !matched { - // just break out of this case if we didn't match - // anything + // Create and run the topology using the new Observer for this specific job. + go NewTopology(observer).Run(ctx, func(etype EventType, c *types.Container) { + switch etype { + case EventTypeStop: + // container that matched our labels is being removed from the + // cluster. Knowing that this container no longer exists, we + // can remove the associated global kernel filters. + pns := c.PidNamespace + j.RemoveContainer(c.Pod, c.Name) + + for _, sc := range calls { + log.Tracef("[%s/%d] removing syscall '%s' to kernel-filter\n", j.JobID(), pns, sc.Name) + + if err := h.filter.RemoveSyscall(sc.Nr, pns); err != nil { + log.Warnf("[%s/%d] failed to remove syscall '%s' from kernel-filter\n", j.JobID(), pns, sc.Name) + + // XXX[lz]: just continue on for now - but we should + // really think about what to do in cases like this as + // it might be dire. + } + } + case EventTypeStart: + // new container found inside cluster that matched our labels + name := c.Name + + if len(spec.HostSelector) > 0 { + // if we have a host-selector array in our spec, attempt to + // match this container's name with the entries in this + // variable. Only if they match will the filter be added. + matched := false + for _, h := range spec.HostSelector { + if h == name { + matched = true break } + } + if !matched { + // just break out of this case if we didn't match + // anything + break } - pns := ev.PidNamespace - j.AddContainer(ev.Pod, name) - - // got a new container that matched, for each syscall, push a - // job up that associates pidns+syscallNR with this job - for _, sc := range calls { - log.Tracef("[%s/%d] Adding syscall '%s' to kernel-filter\n", j.JobID(), pns, sc.Name) - - // This will create a sub-filter off of the pid-namespace - // which matches this subset of syscalls.. - h.PushJob(j, pns, sc.Nr) - - sampleRate := spec.SampleRate - - if sampleRate > 0 { - // find the job with the lowest current sampleRate, and - // if it is lower than this job's sampleRate, replace it - // with this one. The other job's will emulate via - // sub-sampling the true samplerate. - if lowestJob := h.findLowestSampleJob(sc.Nr, pns); lowestJob != nil && lowestJob.Spec.SampleRate < sampleRate { - log.Tracef("[%s/%d] swapping sample-rate for currently running rule to %d\n", j.JobID(), pns, sampleRate) - if err := h.filter.RemoveSyscall(sc.Nr, pns); err != nil { - log.Warnf("Couldn't remove syscall %v\n", err) - } - } + } - if err := h.filter.AddSampledSyscall(sc.Nr, pns, uint64(sampleRate)); err != nil { - log.Warnf("[%s/%d] Error adding syscall kernel-filter for '%s'\n", j.JobID(), pns, sc.Name) - return err + pns := c.PidNamespace + j.AddContainer(c.Pod, name) + + // got a new container that matched, for each syscall, push a + // job up that associates pidns+syscallNR with this job + for _, sc := range calls { + log.Tracef("[%s/%d] Adding syscall '%s' to kernel-filter\n", j.JobID(), pns, sc.Name) + + // This will create a sub-filter off of the pid-namespace + // which matches this subset of syscalls.. + h.PushJob(j, pns, sc.Nr) + + sampleRate := spec.SampleRate + + if sampleRate > 0 { + // find the job with the lowest current sampleRate, and + // if it is lower than this job's sampleRate, replace it + // with this one. The other job's will emulate via + // sub-sampling the true samplerate. + if lowestJob := h.findLowestSampleJob(sc.Nr, pns); lowestJob != nil && lowestJob.Spec.SampleRate < sampleRate { + log.Tracef("[%s/%d] swapping sample-rate for currently running rule to %d\n", j.JobID(), pns, sampleRate) + if err := h.filter.RemoveSyscall(sc.Nr, pns); err != nil { + log.Warnf("Couldn't remove syscall %v\n", err) } } - // Tell the kernel that we wish to monitor this syscall for - // this given pid-namespace. - // Note: if the filter already exists, this acts as a NOP. - if err := h.filter.AddSyscall(sc.Nr, pns); err != nil { + if err := h.filter.AddSampledSyscall(sc.Nr, pns, uint64(sampleRate)); err != nil { log.Warnf("[%s/%d] Error adding syscall kernel-filter for '%s'\n", j.JobID(), pns, sc.Name) - return err + return } } - case event.ContainerDelEvent: - // container that matched our labels is being removed from the - // cluster. Knowing that this container no longer exists, we - // can remove the associated global kernel filters. - pns := ev.PidNamespace - j.RemoveContainer(ev.Pod, ev.Name) - - for _, sc := range calls { - log.Tracef("[%s/%d] removing syscall '%s' to kernel-filter\n", j.JobID(), pns, sc.Name) - - if err := h.filter.RemoveSyscall(sc.Nr, pns); err != nil { - log.Warnf("[%s/%d] failed to remove syscall '%s' from kernel-filter\n", j.JobID(), pns, sc.Name) - - // XXX[lz]: just continue on for now - but we should - // really think about what to do in cases like this as - // it might be dire. - } + // Tell the kernel that we wish to monitor this syscall for + // this given pid-namespace. + // Note: if the filter already exists, this acts as a NOP. + if err := h.filter.AddSyscall(sc.Nr, pns); err != nil { + log.Warnf("[%s/%d] Error adding syscall kernel-filter for '%s'\n", j.JobID(), pns, sc.Name) + return } } - case <-ctx.Done(): - return nil } - } + }) + + <-ctx.Done() + return nil } // WriteEvent writes event `ev` to all listeners of this `Job` diff --git a/pkg/topology/kubernetes.go b/pkg/topology/kubernetes.go index ab644cd..384419d 100644 --- a/pkg/topology/kubernetes.go +++ b/pkg/topology/kubernetes.go @@ -1,11 +1,3 @@ -// In order to properly satisfy the topology interface, the kubernetes wrapper -// will monitor POD events (either starting, updating, or stopping) and match -// them with information from the underlying CRI (Container Runtime Interface) -// which is managed by the kubelet. -// -// We utilize the CRI endpoints to fetch the current PID, and PID namespace -// associated with every container in a POD. When any POD event is seen, this -// code will automatically scan the CRI for containers that match these PODS. package topology import ( @@ -29,10 +21,9 @@ import ( "k8s.io/client-go/kubernetes" ) -type KubernetesOption func(*Kubernetes) error - -// Kubernetes contains all the working parts to facilitate a Observer -// for kubernetes operation. +// Kubernetes satisfies the Observer interface for the topology. +// Using a combination of the kubelet api-server and the container-runtime +// interface, this will emit container start and stop messages to the caller type Kubernetes struct { criSocket string // fully-qualified path to a CRI socket endpoint kubeConfig string // if running out-of-cluster, the kubeconfig file @@ -45,7 +36,10 @@ type Kubernetes struct { kubeWatcher *kcache.ListWatch // the listwatch client for monitoring pods } -// WithKubernetesNamespace sets the namespace configuration option +type KubernetesOption func(*Kubernetes) error + +// WithKubernetesNamespace will limit the observation to a specific kubernetes +// namespace func WithKubernetesNamespace(namespace string) KubernetesOption { return func(k *Kubernetes) error { k.namespace = namespace @@ -53,7 +47,9 @@ func WithKubernetesNamespace(namespace string) KubernetesOption { } } -// WithKubernetesProcRoot sets the root-directory to the "/proc" directory +// WithKubernetesProcRoot will look for the ProcFS mount inside the path. Useful +// if the containers you are monitoring are mounted to a different path. +// Defaults to "/" func WithKubernetesProcRoot(path string) KubernetesOption { return func(k *Kubernetes) error { k.procRoot = path @@ -61,7 +57,8 @@ func WithKubernetesProcRoot(path string) KubernetesOption { } } -// WithKubernetesCRI sets the cri file configuration option +// WithKubernetesCRI is the fully-qualified path to the container-runtime +// interface UNIX socket. This file must exist on the host that runs this code. func WithKubernetesCRI(criSocket string) KubernetesOption { return func(k *Kubernetes) error { sinfo, err := os.Stat(criSocket) @@ -78,7 +75,8 @@ func WithKubernetesCRI(criSocket string) KubernetesOption { } } -// WithKubernetesConfig sets the path to the kube configuration file +// WithKubernetesConfig will use the kubernetes configuration file. By default, +// this will attempt to use the in-cluster Kubernetes configuration settings. func WithKubernetesConfig(kubeConfig string) KubernetesOption { return func(k *Kubernetes) error { k.kubeConfig = kubeConfig @@ -86,7 +84,7 @@ func WithKubernetesConfig(kubeConfig string) KubernetesOption { } } -// WithKubernetesLabelSelector sets the labelselector match configuration option +// WithKubernetesLabelSelector will only match hosts that match this label. func WithKubernetesLabelSelector(l string) KubernetesOption { return func(k *Kubernetes) error { k.labelSelector = l @@ -94,7 +92,8 @@ func WithKubernetesLabelSelector(l string) KubernetesOption { } } -// WithKubernetesFieldSelector sets the fieldselector match configuration option +// WithKubernetesFieldSelector will only match hosts what matched this +// field-selector labelset. func WithKubernetesFieldSelector(f string) KubernetesOption { return func(k *Kubernetes) error { k.fieldSelector = f @@ -102,7 +101,7 @@ func WithKubernetesFieldSelector(f string) KubernetesOption { } } -// NewKubernetes creates a Observer object for watching kubernetes changes +// NewKubernetes creates an Observer for Kubernetes func NewKubernetes(opts ...KubernetesOption) (*Kubernetes, error) { ret := &Kubernetes{namespace: kapi.NamespaceAll} @@ -115,7 +114,30 @@ func NewKubernetes(opts ...KubernetesOption) (*Kubernetes, error) { return ret, nil } +// Copy will copy all underlying data minus the client communication sockets. +func (k *Kubernetes) Copy(opts ...interface{}) (Observer, error) { + kcopy := &Kubernetes{ + criSocket: k.criSocket, + kubeConfig: k.kubeConfig, + namespace: k.namespace, + labelSelector: k.labelSelector, + fieldSelector: k.fieldSelector, + procRoot: k.procRoot, + } + + for _, opt := range opts { + optfn := opt.(KubernetesOption) + if err := optfn(kcopy); err != nil { + return nil, err + } + } + + return kcopy, nil +} + func (k *Kubernetes) connectCRI(ctx context.Context) error { + log.Tracef("Connecting to CRI %s...", k.criSocket) + conn, err := grpc.Dial(k.criSocket, grpc.WithInsecure(), grpc.WithContextDialer( func(ctx context.Context, addr string) (net.Conn, error) { return net.Dial("unix", k.criSocket) @@ -170,8 +192,8 @@ func (k *Kubernetes) connectKube(ctx context.Context) error { } -// Connect will do all the things to create client connects to both the -// kubernetes api, and the CRI grpc endpoint. +// Connect establishes the connections between the kube-apiserver and the +// container-runtime-interface. func (k *Kubernetes) Connect(ctx context.Context) error { if err := k.connectCRI(ctx); err != nil { return errors.Wrapf(err, "failed to connect to CRI endpoint '%s'", k.criSocket) @@ -241,6 +263,9 @@ func (k *Kubernetes) criContainers(ctx context.Context, match ...*matchPod) ([]* } containers := res.GetContainers() + + log.Tracef("CRI found %d containers...", len(containers)) + ret := make([]*types.Container, 0) for _, container := range containers { @@ -301,7 +326,7 @@ func (k *Kubernetes) criContainers(ctx context.Context, match ...*matchPod) ([]* return ret, nil } -// Containers returns an array of running containers inside kubernetes. +// Containers returns a list of all currently running containers func (k *Kubernetes) Containers(ctx context.Context) ([]*types.Container, error) { return k.criContainers(ctx) } @@ -331,9 +356,8 @@ func (k *Kubernetes) containersForPod(ctx context.Context, pod *kapi.Pod) []*typ return criContainers } -// Run connects to kube and watches for POD changes. When changes are seen, -// attempt to match the changes with the underlying CRI containers (to find the -// running PID of the container, and the underlying PID namespace). +// Run watches and maintains a cache of all running containers for kubernetes, +// sending events as an Observer to the topology. func (k *Kubernetes) Run(ctx context.Context, out chan<- *ObservationEvent) { if k.kubeWatcher == nil { if err := k.connectKube(ctx); err != nil { diff --git a/pkg/topology/kubernetes_test.go b/pkg/topology/kubernetes_test.go new file mode 100644 index 0000000..bd26f15 --- /dev/null +++ b/pkg/topology/kubernetes_test.go @@ -0,0 +1,21 @@ +package topology_test + +import ( + "fmt" + + "github.com/criticalstack/swoll/pkg/topology" +) + +func ExampleNewKubernetes() { + observer, err := topology.NewKubernetes( + topology.WithKubernetesConfig("/root/.kube/config"), + topology.WithKubernetesNamespace("kube-system"), + topology.WithKubernetesCRI("/run/containerd/containerd.sock"), + topology.WithKubernetesLabelSelector("app=nginx"), + topology.WithKubernetesFieldSelector("status.phase=Running")) + if err != nil { + panic(err) + } + + fmt.Println(observer) +} diff --git a/pkg/topology/topology.go b/pkg/topology/topology.go index 8291e73..c1d1783 100644 --- a/pkg/topology/topology.go +++ b/pkg/topology/topology.go @@ -9,21 +9,34 @@ import ( log "github.com/sirupsen/logrus" ) -type EventType int type OnEventCallback func(t EventType, container *types.Container) +// These are the two states in which an observer event can be in. +type EventType int + const ( - EventTypeStart EventType = iota - EventTypeStop + EventTypeStart EventType = iota // container started + EventTypeStop // container stopped ) -var ( - ErrNilEvent = errors.New("nil event") - ErrNilContainer = errors.New("nil container") - ErrUnknownType = errors.New("unknown event-type") - ErrBadNamespace = errors.New("invalid kernel pid-namespace") - ErrContainerNotFound = errors.New("container not found") -) +// ErrNilEvent is the error returned to indicate the observer sent an empty +// message +var ErrNilEvent = errors.New("nil event") + +// ErrNilContainer is the error returned to indicate the observer sent an empty +// container message +var ErrNilContainer = errors.New("nil container") + +// ErrUnknownType is the error returned to indicate a malformed observer event +var ErrUnknownType = errors.New("unknown event-type") + +// ErrBadNamespace is the error returned to indicate the observer was unable to +// resolve the PID-Namespace of the container +var ErrBadNamespace = errors.New("invalid kernel pid-namespace") + +// ErrContainerNotFound is the error returned to indicate the container was +// unable to be resolved +var ErrContainerNotFound = errors.New("container not found") type ObservationEvent struct { Type EventType @@ -34,6 +47,7 @@ type Observer interface { Connect(ctx context.Context) error Containers(ctx context.Context) ([]*types.Container, error) Run(ctx context.Context, out chan<- *ObservationEvent) + Copy(opts ...interface{}) (Observer, error) Close() error }