Skip to content

Commit

Permalink
Merge pull request #52 from vladimirvivien/kubeget-logs-fix-take-2
Browse files Browse the repository at this point in the history
Re-applying KUBEGET changes
  • Loading branch information
vladimirvivien authored Mar 4, 2020
2 parents 66c109b + fe86bb0 commit c558caa
Show file tree
Hide file tree
Showing 9 changed files with 537 additions and 141 deletions.
1 change: 1 addition & 0 deletions .github/workflows/compile-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ jobs:
mkdir -p ~/.ssh
chmod 765 ~/.ssh
cp testing/keys/* ~/.ssh/
GO111MODULE=on go get sigs.k8s.io/[email protected]
GO111MODULE=on go test -timeout 600s -v ./...
6 changes: 5 additions & 1 deletion docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,12 @@ KUBEGET what:"objects"

Here is an example of `KUBEGET` that explicitly uses most of its parameters (assuming `KUBECONFIG` is declared properly):
```
KUBEGET objects groups:"core" kinds:"pods" namespaces:"kube-system default" containers:"kindnet-cni etcd"
KUBEGET objects groups:"core" kinds:"pods" namespaces:"kube-system default" containers:"nginx etcd"
```
The previous `KUBEGET` command will retrieve all pods from namespaces `kube-system` or `default` that have container names `nginx` or `etcd`.

Crash-Diagnostics stores all retrieved objects under root directory `kubeget` as JSON files. Inside that directory, the saved files are organized by namespaces (for namespaced resources) or
saved at the root directory.

### OUTPUT
This directive configures the location and file name of the generated archive file as shown in the following example:
Expand Down
18 changes: 5 additions & 13 deletions exec/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"path/filepath"

"github.com/vmware-tanzu/crash-diagnostics/archiver"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

"github.com/sirupsen/logrus"
"github.com/vmware-tanzu/crash-diagnostics/script"
Expand Down Expand Up @@ -74,22 +73,15 @@ func (e *Executor) Execute() error {
switch cmd := action.(type) {
case *script.KubeGetCommand:
logrus.Infof("KUBEGET: getting API objects (this may take a while)")
objects, err := exeKubeGet(k8sClient, cmd)
results, err := exeKubeGet(k8sClient, cmd)
if err != nil {
logrus.Errorf("KUBEGET: %s", err)
continue
}
// print objects
for _, obj := range objects {
objList, ok := obj.(*unstructured.UnstructuredList)
if !ok {
logrus.Errorf("KUBEGET: unexpected object type for %T", obj)
continue
}
if err := writeObjectList(k8sClient, cmd.What(), objList, workdir.Path()); err != nil {
logrus.Errorf("KUBEGET: %s", err)
continue
}
// process search result
if err := writeSearchResults(k8sClient, cmd.What(), results, workdir.Path()); err != nil {
logrus.Errorf("KUBEGET: %s", err)
continue
}

default:
Expand Down
11 changes: 3 additions & 8 deletions exec/from_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/vmware-tanzu/crash-diagnostics/k8s"
"github.com/vmware-tanzu/crash-diagnostics/script"
coreV1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
)

Expand Down Expand Up @@ -86,7 +85,7 @@ func exeFrom(k8s *k8s.Client, src *script.Script) (*script.FromCommand, []*scrip
}

func getNodes(k8sc *k8s.Client, names, labels string) ([]*coreV1.Node, error) {
objs, err := k8sc.Search(
nodeResults, err := k8sc.Search(
"core", // group
"nodes", // kind
"", // namespaces
Expand All @@ -101,12 +100,8 @@ func getNodes(k8sc *k8s.Client, names, labels string) ([]*coreV1.Node, error) {

// collate
var nodes []*coreV1.Node
for _, obj := range objs {
unstructList, ok := obj.(*unstructured.UnstructuredList)
if !ok {
return nil, fmt.Errorf("unexpected type for NodeList: %T", obj)
}
for _, item := range unstructList.Items {
for _, result := range nodeResults {
for _, item := range result.List.Items {
node := new(coreV1.Node)
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(item.Object, &node); err != nil {
return nil, err
Expand Down
185 changes: 132 additions & 53 deletions exec/kubeget_exe.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,112 +15,191 @@ import (
"k8s.io/client-go/kubernetes/scheme"
)

func exeKubeGet(k8sc *k8s.Client, cmd *script.KubeGetCommand) ([]runtime.Object, error) {
func exeKubeGet(k8sc *k8s.Client, cmd *script.KubeGetCommand) ([]k8s.SearchResult, error) {
if k8sc == nil {
return nil, fmt.Errorf("K8s client not initialized")
}
var objects []runtime.Object
var searchResults []k8s.SearchResult

switch cmd.What() {
case "objects":
logrus.Debug("KUBEGET what:objects")
objs, err := k8sc.Search(cmd.Groups(), cmd.Kinds(), cmd.Namespaces(), cmd.Versions(), cmd.Names(), cmd.Labels(), cmd.Containers())
results, err := k8sc.Search(cmd.Groups(), cmd.Kinds(), cmd.Namespaces(), cmd.Versions(), cmd.Names(), cmd.Labels(), cmd.Containers())
if err != nil {
return nil, err
}
objects = append(objects, objs...)
searchResults = append(searchResults, results...)
case "logs":
logrus.Debug("KUBEGET what:logs")
objs, err := k8sc.Search("core", "pods", cmd.Namespaces(), "", cmd.Names(), cmd.Labels(), cmd.Containers())
results, err := k8sc.Search("core", "pods", cmd.Namespaces(), "", cmd.Names(), cmd.Labels(), cmd.Containers())
if err != nil {
return nil, err
}
objects = append(objects, objs...)
searchResults = append(searchResults, results...)
case "all", "*":
logrus.Debug("KUBEGET what:all")
objs, err := k8sc.Search(cmd.Groups(), cmd.Kinds(), cmd.Namespaces(), cmd.Versions(), cmd.Names(), cmd.Labels(), cmd.Containers())
results, err := k8sc.Search(cmd.Groups(), cmd.Kinds(), cmd.Namespaces(), cmd.Versions(), cmd.Names(), cmd.Labels(), cmd.Containers())
if err != nil {
return nil, err
}
objects = append(objects, objs...)

searchResults = append(searchResults, results...)
default:
return nil, fmt.Errorf("don't know how to get: %s", cmd.What())
}

return objects, nil
return searchResults, nil
}

func writeObjectList(k8sc *k8s.Client, what string, objList *unstructured.UnstructuredList, workdir string) error {
if objList == nil {
return fmt.Errorf("cannot write nil object list")
func writeSearchResults(k8sc *k8s.Client, what string, searchResults []k8s.SearchResult, workdir string) error {
if searchResults == nil || len(searchResults) == 0 {
return fmt.Errorf("cannot write empty (or nil) search result")
}

writer := os.Stdout
if workdir != "stdout" {
kind := objList.GetKind()
path := filepath.Join(workdir, fmt.Sprintf("kubeget-%s.json", kind))
file, err := os.Create(path)
if err != nil {
return err
}
defer file.Close()
writer = file
logrus.Debugf("KUBEGET: writing objects to %s", path)
}

if err := k8sc.JsonPrinter.PrintObj(objList, writer); err != nil {
// earch result represents a list of searched item
// write each list in a namespaced location in working dir
rootDir := filepath.Join(workdir, "kubeget")
if err := os.MkdirAll(rootDir, 0744); err != nil && !os.IsExist(err) {
return err
}
for _, result := range searchResults {
resultDir := rootDir
if result.Namespaced {
resultDir = filepath.Join(rootDir, result.Namespace)
}
if err := os.MkdirAll(resultDir, 0744); err != nil && !os.IsExist(err) {
return fmt.Errorf("failed to create search result dir: %s", err)
}

// if obj is PodList, write logs for items in list
if what == "logs" || what == "all" {
if objList.GetKind() != "PodList" {
return nil
if err := saveResultToFile(k8sc, result, resultDir); err != nil {
return fmt.Errorf("failed to save object: %s", err)
}
for _, podItem := range objList.Items {
if err := writePodLogs(k8sc, podItem, workdir); err != nil {
logrus.Errorf("Failed to write logs for pod %s: %s", podItem.GetName(), err)

// print logs
if (what == "logs" || what == "all") && result.ListKind == "PodList" {
if len(result.List.Items) == 0 {
continue
}
for _, podItem := range result.List.Items {
logDir := filepath.Join(resultDir, podItem.GetName())
if err := os.MkdirAll(logDir, 0744); err != nil && !os.IsExist(err) {
return fmt.Errorf("failed to create pod log dir: %s", err)
}

if err := writePodLogs(k8sc, podItem, logDir); err != nil {
logrus.Errorf("failed to save logs: pod %s: %s", podItem.GetName(), err)
continue
}
}
}

}

return nil
}

func writePodLogs(k8sc *k8s.Client, podItem unstructured.Unstructured, workdir string) error {
ns := podItem.GetNamespace()
name := podItem.GetName()
func saveResultToFile(k8sc *k8s.Client, result k8s.SearchResult, resultDir string) error {
path := filepath.Join(resultDir, fmt.Sprintf("%s.json", result.ResourceName))
file, err := os.Create(path)
if err != nil {
return err
}
defer file.Close()

logrus.Debugf("KUBEGET: saving %s search results to: %s", result.ResourceName, path)

writer := os.Stdout
if workdir != "stdout" {
path := filepath.Join(workdir, fmt.Sprintf("kubeget-podlog-%s-%s.txt", ns, name))
logFile, err := os.Create(path)
if err != nil {
if err := k8sc.JsonPrinter.PrintObj(result.List, file); err != nil {
if wErr := writeError(err, file); wErr != nil {
return fmt.Errorf("failed to write previous err [%s] to file: %s", err, wErr)
}
return err
}
return nil
}

func writePodLogs(k8sc *k8s.Client, podItem unstructured.Unstructured, logDir string) error {
logrus.Debugf("KUBEGET: writing logs for pod %s", podItem.GetName())
containers, err := getPodContainers(podItem)
if err != nil {
return fmt.Errorf("failed to retrieve pod containers: %s", err)
}
if len(containers) == 0 {
return nil
}

for _, container := range containers {
if err := writeContainerLogs(k8sc, podItem.GetNamespace(), podItem.GetName(), container, logDir); err != nil {
return err
}
defer logFile.Close()
writer = logFile
logrus.Debugf("KUBEGET: writing pod logs to %s", path)
}

req := k8sc.CoreRest.Get().Namespace(ns).Name(name).Resource("pods").SubResource("log").VersionedParams(&corev1.PodLogOptions{}, scheme.ParameterCodec)
return nil
}

func getPodContainers(podItem unstructured.Unstructured) ([]corev1.Container, error) {
var containers []corev1.Container

pod := new(corev1.Pod)
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(podItem.Object, &pod); err != nil {
return nil, fmt.Errorf("error converting container objects: %s", err)
}

for _, c := range pod.Spec.InitContainers {
containers = append(containers, c)
}

for _, c := range pod.Spec.Containers {
containers = append(containers, c)
}
containers = append(containers, getPodEphemeralContainers(pod)...)
return containers, nil
}

func getPodEphemeralContainers(pod *corev1.Pod) []corev1.Container {
var containers []corev1.Container
for _, ec := range pod.Spec.EphemeralContainers {
containers = append(containers, corev1.Container(ec.EphemeralContainerCommon))
}
return containers
}

func writeContainerLogs(k8sc *k8s.Client, namespace string, podName string, container corev1.Container, logDir string) error {
containerLogDir := filepath.Join(logDir, container.Name)
if err := os.MkdirAll(containerLogDir, 0744); err != nil && !os.IsExist(err) {
return fmt.Errorf("error creating container log dir: %s", err)
}

path := filepath.Join(containerLogDir, fmt.Sprintf("%s.log", container.Name))
logrus.Debugf("Writing pod container log %s", path)

file, err := os.Create(path)
if err != nil {
return err
}
defer file.Close()

opts := &corev1.PodLogOptions{Container: container.Name}
req := k8sc.CoreRest.Get().Namespace(namespace).Name(podName).Resource("pods").SubResource("log").VersionedParams(opts, scheme.ParameterCodec)
reader, err := req.Stream()
if err != nil {
streamErr := fmt.Errorf("failed to create container log stream:\n%s", err)
if wErr := writeError(streamErr, file); wErr != nil {
return fmt.Errorf("failed to write previous err [%s] to file: %s", err, wErr)
}
return err
}
defer reader.Close()

writeLogHeader(podItem, writer)
if _, err := io.Copy(writer, reader); err != nil {
if _, err := io.Copy(file, reader); err != nil {
cpErr := fmt.Errorf("failed to copy container log:\n%s", err)
if wErr := writeError(cpErr, file); wErr != nil {
return fmt.Errorf("failed to write previous err [%s] to file: %s", err, wErr)
}
return err
}

return nil
}

func writeLogHeader(podItem unstructured.Unstructured, w io.Writer) {
fmt.Fprintln(w, "----------------------------------------------------------------")
fmt.Fprintf(w, "Log pod %s/%s\n", podItem.GetNamespace(), podItem.GetName())
fmt.Fprintln(w, "----------------------------------------------------------------")
func writeError(errStr error, w io.Writer) error {
_, err := fmt.Fprintln(w, errStr.Error())
return err
}
Loading

0 comments on commit c558caa

Please sign in to comment.