Skip to content

Commit

Permalink
Add all and worker node type to kubectl ray log
Browse files Browse the repository at this point in the history
  • Loading branch information
chiayi committed Oct 14, 2024
1 parent bf21d2d commit 8c526e3
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 108 deletions.
85 changes: 50 additions & 35 deletions kubectl-plugin/pkg/cmd/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ import (
const filePathInPod = "/tmp/ray/session_latest/logs/"

type ClusterLogOptions struct {
configFlags *genericclioptions.ConfigFlags
ioStreams *genericclioptions.IOStreams
Executor RemoteExecutor
outputDir string
nodeType string
args []string
configFlags *genericclioptions.ConfigFlags
ioStreams *genericclioptions.IOStreams
Executor RemoteExecutor
outputDir string
nodeType string
ResourceName string
}

var (
Expand All @@ -52,6 +52,12 @@ var (
# Download logs from a RayCluster, but only for the head node
kubectl ray log my-raycluster --node-type head
# Download logs from a RayCluster, but only for the worker nodes
kubectl ray log my-raycluster --node-type worker
# Download all (worker node and head node) the logs from a RayCluster
kubectl ray log my-raycluster --node-type all
`)
)

Expand All @@ -77,7 +83,7 @@ func NewClusterLogCommand(streams genericclioptions.IOStreams) *cobra.Command {
SilenceUsage: true,
ValidArgsFunction: completion.RayClusterCompletionFunc(cmdFactory),
RunE: func(cmd *cobra.Command, args []string) error {
if err := options.Complete(args); err != nil {
if err := options.Complete(cmd, args); err != nil {
return err
}
if err := options.Validate(); err != nil {
Expand All @@ -92,11 +98,15 @@ func NewClusterLogCommand(streams genericclioptions.IOStreams) *cobra.Command {
return cmd
}

func (options *ClusterLogOptions) Complete(args []string) error {
options.args = args
func (options *ClusterLogOptions) Complete(cmd *cobra.Command, args []string) error {
if len(args) != 1 {
return cmdutil.UsageErrorf(cmd, "%s", cmd.Use)
}

options.ResourceName = args[0]

if options.nodeType == "" {
options.nodeType = "head"
options.nodeType = "all"
}

return nil
Expand All @@ -112,12 +122,9 @@ func (options *ClusterLogOptions) Validate() error {
return fmt.Errorf("no context is currently set, use %q to select a new one", "kubectl config use-context <context>")
}

// Command must have ray cluster name
if len(options.args) != 1 {
return fmt.Errorf("must have at only one argument")
} else if options.outputDir == "" {
if options.outputDir == "" {
fmt.Fprintln(options.ioStreams.Out, "No output directory specified, creating dir under current directory using cluster name.")
options.outputDir = options.args[0]
options.outputDir = options.ResourceName
err := os.MkdirAll(options.outputDir, 0o755)
if err != nil {
return fmt.Errorf("could not create directory with cluster name %s: %w", options.outputDir, err)
Expand All @@ -126,11 +133,11 @@ func (options *ClusterLogOptions) Validate() error {

switch options.nodeType {
case "all":
return fmt.Errorf("node type `all` is currently not supported")
fmt.Printf("Command set to retrieve all RayCluster logs")
case "head":
break
fmt.Printf("Command set to retrieve only head node logs")
case "worker":
return fmt.Errorf("node type `worker` is currently not supported")
fmt.Printf("Command set to retrieve only worker node logs")
default:
return fmt.Errorf("unknown node type `%s`", options.nodeType)
}
Expand All @@ -154,43 +161,51 @@ func (options *ClusterLogOptions) Run(ctx context.Context, factory cmdutil.Facto
}

var listopts v1.ListOptions
if options.nodeType == "head" {
if options.nodeType == "all" {
listopts = v1.ListOptions{
LabelSelector: fmt.Sprintf("ray.io/cluster=%s", options.ResourceName),
}
} else if options.nodeType == "head" {
listopts = v1.ListOptions{
LabelSelector: fmt.Sprintf("ray.io/node-type=head, ray.io/cluster=%s", options.ResourceName),
}
} else if options.nodeType == "worker" {
listopts = v1.ListOptions{
LabelSelector: fmt.Sprintf("ray.io/group=headgroup, ray.io/cluster=%s", options.args[0]),
LabelSelector: fmt.Sprintf("ray.io/node-type=worker, ray.io/cluster=%s", options.ResourceName),
}
}

// Get list of nodes that are considered ray heads
rayHeads, err := kubeClientSet.CoreV1().Pods(*options.configFlags.Namespace).List(ctx, listopts)
// Get list of nodes that are considered the specified node type
rayNodes, err := kubeClientSet.CoreV1().Pods(*options.configFlags.Namespace).List(ctx, listopts)
if err != nil {
return fmt.Errorf("failed to retrieve head node for cluster %s: %w", options.args[0], err)
return fmt.Errorf("failed to retrieve head node for cluster %s: %w", options.ResourceName, err)
}

// Get a list of logs of the ray heads.
// Get a list of logs of the ray nodes.
var logList []*bytes.Buffer
for _, rayHead := range rayHeads.Items {
request := kubeClientSet.CoreV1().Pods(rayHead.Namespace).GetLogs(rayHead.Name, &corev1.PodLogOptions{})
for _, rayNode := range rayNodes.Items {
request := kubeClientSet.CoreV1().Pods(rayNode.Namespace).GetLogs(rayNode.Name, &corev1.PodLogOptions{})

podLogs, err := request.Stream(ctx)
if err != nil {
return fmt.Errorf("Error retrieving log for kuberay-head %s: %w", rayHead.Name, err)
return fmt.Errorf("Error retrieving log for kuberay node %s: %w", rayNode.Name, err)
}
defer podLogs.Close()

// Get current logs:
buf := new(bytes.Buffer)
_, err = io.Copy(buf, podLogs)
if err != nil {
return fmt.Errorf("Failed to get read current logs for kuberay-head %s: %w", rayHead.Name, err)
return fmt.Errorf("Failed to get read current logs for kuberay Node %s: %w", rayNode.Name, err)
}

logList = append(logList, buf)
}

// Pod file name format is name of the ray head
// Pod file name format is name of the ray node
for ind, logList := range logList {
curFilePath := filepath.Join(options.outputDir, rayHeads.Items[ind].Name, "stdout.log")
dirPath := filepath.Join(options.outputDir, rayHeads.Items[ind].Name)
curFilePath := filepath.Join(options.outputDir, rayNodes.Items[ind].Name, "stdout.log")
dirPath := filepath.Join(options.outputDir, rayNodes.Items[ind].Name)
err := os.MkdirAll(dirPath, 0o755)
if err != nil {
return fmt.Errorf("failed to create directory within path %s: %w", dirPath, err)
Expand All @@ -203,14 +218,14 @@ func (options *ClusterLogOptions) Run(ctx context.Context, factory cmdutil.Facto

_, err = logList.WriteTo(file)
if err != nil {
return fmt.Errorf("failed to write to file for kuberay-head: %s: %w", rayHeads.Items[ind].Name, err)
return fmt.Errorf("failed to write to file for kuberay-head: %s: %w", rayNodes.Items[ind].Name, err)
}

req := kubeClientSet.CoreV1().RESTClient().
Get().
Namespace(rayHeads.Items[ind].Namespace).
Namespace(rayNodes.Items[ind].Namespace).
Resource("pods").
Name(rayHeads.Items[ind].Name).
Name(rayNodes.Items[ind].Name).
SubResource("exec").
VersionedParams(&corev1.PodExecOptions{
Command: []string{"tar", "--warning=no-file-changed", "-cf", "-", "-C", filePathInPod, "."},
Expand All @@ -230,7 +245,7 @@ func (options *ClusterLogOptions) Run(ctx context.Context, factory cmdutil.Facto
return fmt.Errorf("failed to create executor with error: %w", err)
}

err = options.downloadRayLogFiles(ctx, exec, rayHeads.Items[ind])
err = options.downloadRayLogFiles(ctx, exec, rayNodes.Items[ind])
if err != nil {
return fmt.Errorf("failed to download ray head log files with error: %w", err)
}
Expand Down
113 changes: 40 additions & 73 deletions kubectl-plugin/pkg/cmd/log/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"testing"
"time"

"github.com/spf13/cobra"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -116,11 +117,13 @@ func TestRayClusterLogComplete(t *testing.T) {
fakeClusterLogOptions := NewClusterLogOptions(testStreams)
fakeArgs := []string{"Expectedoutput"}

err := fakeClusterLogOptions.Complete(fakeArgs)
cmd := &cobra.Command{Use: "log"}

assert.Equal(t, fakeClusterLogOptions.nodeType, "head")
err := fakeClusterLogOptions.Complete(cmd, fakeArgs)

assert.Equal(t, fakeClusterLogOptions.nodeType, "all")
assert.Nil(t, err)
assert.Equal(t, fakeClusterLogOptions.args, fakeArgs)
assert.Equal(t, fakeClusterLogOptions.ResourceName, fakeArgs[0])
}

func TestRayClusterLogValidate(t *testing.T) {
Expand Down Expand Up @@ -180,107 +183,71 @@ func TestRayClusterLogValidate(t *testing.T) {
{
name: "Test validation when no context is set",
opts: &ClusterLogOptions{
configFlags: genericclioptions.NewConfigFlags(false),
outputDir: fakeDir,
args: []string{"fake-cluster"},
nodeType: "head",
ioStreams: &testStreams,
configFlags: genericclioptions.NewConfigFlags(false),
outputDir: fakeDir,
ResourceName: "fake-cluster",
nodeType: "head",
ioStreams: &testStreams,
},
expectError: "no context is currently set, use \"kubectl config use-context <context>\" to select a new one",
},
{
name: "Test validation when more than 1 arg",
opts: &ClusterLogOptions{
// Use fake config to bypass the config flag checks
configFlags: fakeConfigFlags,
outputDir: fakeDir,
args: []string{"fake-cluster", "another-fake"},
nodeType: "head",
ioStreams: &testStreams,
},
expectError: "must have at only one argument",
},
{
name: "Test validation when node type is `all`",
opts: &ClusterLogOptions{
// Use fake config to bypass the config flag checks
configFlags: fakeConfigFlags,
outputDir: fakeDir,
args: []string{"fake-cluster"},
nodeType: "all",
ioStreams: &testStreams,
},
expectError: "node type `all` is currently not supported",
},
{
name: "Test validation when node type is `worker`",
opts: &ClusterLogOptions{
// Use fake config to bypass the config flag checks
configFlags: fakeConfigFlags,
outputDir: fakeDir,
args: []string{"fake-cluster"},
nodeType: "worker",
ioStreams: &testStreams,
},
expectError: "node type `worker` is currently not supported",
},
{
name: "Test validation when node type is `random-string`",
opts: &ClusterLogOptions{
// Use fake config to bypass the config flag checks
configFlags: fakeConfigFlags,
outputDir: fakeDir,
args: []string{"fake-cluster"},
nodeType: "random-string",
ioStreams: &testStreams,
configFlags: fakeConfigFlags,
outputDir: fakeDir,
ResourceName: "fake-cluster",
nodeType: "random-string",
ioStreams: &testStreams,
},
expectError: "unknown node type `random-string`",
},
{
name: "Successful validation call",
opts: &ClusterLogOptions{
// Use fake config to bypass the config flag checks
configFlags: fakeConfigFlags,
outputDir: fakeDir,
args: []string{"random_arg"},
nodeType: "head",
ioStreams: &testStreams,
configFlags: fakeConfigFlags,
outputDir: fakeDir,
ResourceName: "fake-cluster",
nodeType: "head",
ioStreams: &testStreams,
},
expectError: "",
},
{
name: "Validate output directory when no out-dir i set.",
name: "Validate output directory when no out-dir is set.",
opts: &ClusterLogOptions{
// Use fake config to bypass the config flag checks
configFlags: fakeConfigFlags,
outputDir: "",
args: []string{"cluster-name"},
nodeType: "head",
ioStreams: &testStreams,
configFlags: fakeConfigFlags,
outputDir: "",
ResourceName: "fake-cluster",
nodeType: "head",
ioStreams: &testStreams,
},
expectError: "",
},
{
name: "Failed validation call with output directory not exist",
opts: &ClusterLogOptions{
// Use fake config to bypass the config flag checks
configFlags: fakeConfigFlags,
outputDir: "randomPath-here",
args: []string{"random_arg"},
nodeType: "head",
ioStreams: &testStreams,
configFlags: fakeConfigFlags,
outputDir: "randomPath-here",
ResourceName: "fake-cluster",
nodeType: "head",
ioStreams: &testStreams,
},
expectError: "Directory does not exist. Failed with: stat randomPath-here: no such file or directory",
},
{
name: "Failed validation call with output directory is file",
opts: &ClusterLogOptions{
// Use fake config to bypass the config flag checks
configFlags: fakeConfigFlags,
outputDir: fakeFile,
args: []string{"random_arg"},
nodeType: "head",
ioStreams: &testStreams,
configFlags: fakeConfigFlags,
outputDir: fakeFile,
ResourceName: "fake-cluster",
nodeType: "head",
ioStreams: &testStreams,
},
expectError: "Path is Not a directory. Please input a directory and try again",
},
Expand All @@ -293,7 +260,7 @@ func TestRayClusterLogValidate(t *testing.T) {
assert.Equal(t, tc.expectError, err.Error())
} else {
if tc.opts.outputDir == "" {
assert.Equal(t, tc.opts.args[0], tc.opts.outputDir)
assert.Equal(t, tc.opts.ResourceName, tc.opts.outputDir)
}
assert.True(t, err == nil)
}
Expand All @@ -314,7 +281,7 @@ func TestRayClusterLogRun(t *testing.T) {
fakeClusterLogOptions := NewClusterLogOptions(testStreams)
// Uses the mocked executor
fakeClusterLogOptions.Executor = &FakeRemoteExecutor{}
fakeClusterLogOptions.args = []string{"test-cluster"}
fakeClusterLogOptions.ResourceName = "test-cluster"
fakeClusterLogOptions.outputDir = fakeDir

// Create list of fake ray heads
Expand Down Expand Up @@ -434,7 +401,7 @@ func TestDownloadRayLogFiles(t *testing.T) {
testStreams, _, _, _ := genericiooptions.NewTestIOStreams()

fakeClusterLogOptions := NewClusterLogOptions(testStreams)
fakeClusterLogOptions.args = []string{"test-cluster"}
fakeClusterLogOptions.ResourceName = "test-cluster"
fakeClusterLogOptions.outputDir = fakeDir

// create fake tar files to test
Expand Down

0 comments on commit 8c526e3

Please sign in to comment.