Skip to content

Commit

Permalink
feat(host): container log (#20513)
Browse files Browse the repository at this point in the history
  • Loading branch information
zexi authored Jun 13, 2024
1 parent 0b85e31 commit 41e3ff1
Show file tree
Hide file tree
Showing 16 changed files with 969 additions and 8 deletions.
29 changes: 29 additions & 0 deletions cmd/climc/shell/compute/containers.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package compute

import (
"bufio"
"fmt"
"io"
"io/ioutil"
"os"
"os/exec"
Expand Down Expand Up @@ -116,4 +118,31 @@ func init() {
man := modules.Containers
return man.Exec(s, opts.ID, opts.ToAPIInput())
})

R(new(options.ContainerLogOptions), "container-log", "Get container log", func(s *mcclient.ClientSession, opts *options.ContainerLogOptions) error {
man := modules.Containers
input, err := opts.ToAPIInput()
if err != nil {
return err
}
reader, err := man.Log(s, opts.ID, input)
if err != nil {
return errors.Wrap(err, "get container log")
}
defer reader.Close()

r := bufio.NewReader(reader)
for {
bytes, err := r.ReadBytes('\n')
if _, err := os.Stdout.Write(bytes); err != nil {
return errors.Wrap(err, "write container log to stdout")
}
if err != nil {
if err != io.EOF {
return errors.Wrap(err, "read container log")
}
return nil
}
}
})
}
70 changes: 70 additions & 0 deletions pkg/apis/compute/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@

package compute

import (
"time"

"yunion.io/x/onecloud/pkg/httperrors"
)

const (
POD_STATUS_CREATING_CONTAINER = "creating_container"
POD_STATUS_CREATE_CONTAINER_FAILED = "create_container_failed"
Expand Down Expand Up @@ -91,3 +97,67 @@ type PodMetadataPortMapping struct {
type GuestSetPortMappingsInput struct {
PortMappings []*PodPortMapping `json:"port_mappings"`
}

type PodLogOptions struct {
// The container for which to stream logs. Defaults to only container if there is one container in the pod.
// +optional
Container string `json:"container,omitempty"`
// Follow the log stream of the pod. Defaults to false.
// +optional
Follow bool `json:"follow,omitempty"`
// Return previous terminated container logs. Defaults to false.
// +optional
Previous bool `json:"previous,omitempty"`
// A relative time in seconds before the current time from which to show logs. If this value
// precedes the time a pod was started, only logs since the pod start will be returned.
// If this value is in the future, no logs will be returned.
// Only one of sinceSeconds or sinceTime may be specified.
// +optional
SinceSeconds *int64 `json:"sinceSeconds,omitempty"`
// An RFC3339 timestamp from which to show logs. If this value
// precedes the time a pod was started, only logs since the pod start will be returned.
// If this value is in the future, no logs will be returned.
// Only one of sinceSeconds or sinceTime may be specified.
// +optional
SinceTime *time.Time `json:"sinceTime,omitempty"`
// If true, add an RFC3339 or RFC3339Nano timestamp at the beginning of every line
// of log output. Defaults to false.
// +optional
Timestamps bool `json:"timestamps,omitempty"`
// If set, the number of lines from the end of the logs to show. If not specified,
// logs are shown from the creation of the container or sinceSeconds or sinceTime
// +optional
TailLines *int64 `json:"tailLines,omitempty"`
// If set, the number of bytes to read from the server before terminating the
// log output. This may not display a complete final line of logging, and may return
// slightly more or slightly less than the specified limit.
// +optional
LimitBytes *int64 `json:"limitBytes,omitempty"`

// insecureSkipTLSVerifyBackend indicates that the apiserver should not confirm the validity of the
// serving certificate of the backend it is connecting to. This will make the HTTPS connection between the apiserver
// and the backend insecure. This means the apiserver cannot verify the log data it is receiving came from the real
// kubelet. If the kubelet is configured to verify the apiserver's TLS credentials, it does not mean the
// connection to the real kubelet is vulnerable to a man in the middle attack (e.g. an attacker could not intercept
// the actual log data coming from the real kubelet).
// +optional
InsecureSkipTLSVerifyBackend bool `json:"insecureSkipTLSVerifyBackend,omitempty"`
}

func ValidatePodLogOptions(opts *PodLogOptions) error {
if opts.TailLines != nil && *opts.TailLines < 0 {
return httperrors.NewInputParameterError("negative tail lines")
}
if opts.LimitBytes != nil && *opts.LimitBytes < 1 {
return httperrors.NewInputParameterError("limit_bytes must be greater than zero")
}
if opts.SinceSeconds != nil && opts.SinceTime != nil {
return httperrors.NewInputParameterError("at most one of since_time or since_seconds must be specified")
}
if opts.SinceSeconds != nil {
if *opts.SinceSeconds < 1 {
return httperrors.NewInputParameterError("since_seconds must be greater than zero")
}
}
return nil
}
6 changes: 6 additions & 0 deletions pkg/appsrv/appsrv.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,12 @@ type loggingResponseWriter struct {
data []byte
}

func (lrw *loggingResponseWriter) Flush() {
if fw, ok := lrw.ResponseWriter.(http.Flusher); ok {
fw.Flush()
}
}

func (lrw *loggingResponseWriter) Write(data []byte) (int, error) {
lrw.data = data
return lrw.ResponseWriter.Write(data)
Expand Down
4 changes: 2 additions & 2 deletions pkg/appsrv/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,15 @@ func (w *responseWriterChannel) wait(ctx context.Context, workerChan chan *SWork
err = httperrors.NewTimeoutError("request process timeout")
stop = true
case bytes, more := <-w.bodyChan:
// log.Print("Recive body ", len(bytes), " more ", more)
// log.Infof("Recive body: %s, more: %v", len(bytes), more)
if more {
c, e := w.backend.Write(bytes)
w.bodyResp <- responseWriterResponse{count: c, err: e}
} else {
stop = true
}
case status, more := <-w.statusChan:
// log.Print("Recive status ", status, " more ", more)
// log.Infof("Recive status %d, more: %v", status, more)
if more {
w.backend.WriteHeader(status)
w.statusResp <- true
Expand Down
24 changes: 24 additions & 0 deletions pkg/hostman/guestman/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package guestman
import (
"context"
"fmt"
"io"
"io/ioutil"
"net/url"
"os"
Expand Down Expand Up @@ -54,6 +55,7 @@ import (
"yunion.io/x/onecloud/pkg/util/netutils2/getport"
"yunion.io/x/onecloud/pkg/util/pod"
"yunion.io/x/onecloud/pkg/util/pod/image"
"yunion.io/x/onecloud/pkg/util/pod/logs"
"yunion.io/x/onecloud/pkg/util/procutils"
)

Expand All @@ -69,6 +71,8 @@ type PodInstance interface {
SaveVolumeMountToImage(ctx context.Context, userCred mcclient.TokenCredential, input *hostapi.ContainerSaveVolumeMountToImageInput, ctrId string) (jsonutils.JSONObject, error)
ExecContainer(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *computeapi.ContainerExecInput) (*url.URL, error)
ContainerExecSync(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *computeapi.ContainerExecSyncInput) (jsonutils.JSONObject, error)

ReadLogs(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *computeapi.PodLogOptions, stdout, stderr io.Writer) error
}

type sContainer struct {
Expand Down Expand Up @@ -1646,3 +1650,23 @@ func (s *sPodGuestInstance) ContainerExecSync(ctx context.Context, userCred mccl
ExitCode: resp.ExitCode,
}), nil
}

func (s *sPodGuestInstance) ReadLogs(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *computeapi.PodLogOptions, stdout, stderr io.Writer) error {
// Do a zero-byte write to stdout before handing off to the container runtime.
// This ensures at least one Write call is made to the writer when copying starts,
// even if we then block waiting for log output from the container.
if _, err := stdout.Write([]byte{}); err != nil {
return err
}
ctrCriId, err := s.getContainerCRIId(ctrId)
if err != nil {
return errors.Wrapf(err, "get container cri id %s", ctrId)
}
resp, err := s.getCRI().ContainerStatus(ctx, ctrCriId)
if err != nil {
return errors.Wrapf(err, "get container status %s", ctrCriId)
}
logPath := resp.GetStatus().GetLogPath()
opts := logs.NewLogOptions(input, time.Now())
return logs.ReadLogs(ctx, logPath, ctrCriId, opts, s.getCRI().GetRuntimeClient(), stdout, stderr)
}
61 changes: 55 additions & 6 deletions pkg/hostman/guestman/podhandlers/podhandlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/util/proxy"

"yunion.io/x/jsonutils"
"yunion.io/x/log"
"yunion.io/x/pkg/errors"

"yunion.io/x/onecloud/pkg/apis/compute"
Expand All @@ -34,6 +35,7 @@ import (
"yunion.io/x/onecloud/pkg/httperrors"
"yunion.io/x/onecloud/pkg/mcclient"
"yunion.io/x/onecloud/pkg/mcclient/auth"
"yunion.io/x/onecloud/pkg/util/flushwriter"
)

const (
Expand Down Expand Up @@ -115,9 +117,12 @@ func AddPodHandlers(prefix string, app *appsrv.Application) {
containerActionHandler(f))
}

execWorker := appsrv.NewWorkerManager("exec-worker", 16, appsrv.DEFAULT_BACKLOG, false)
app.AddHandler3(newExecContainerHandler("POST", fmt.Sprintf("%s/pods/%s/containers/%s/exec-sync", prefix, POD_ID, CONTAINER_ID), execWorker, containerSyncActionHandler(containerExecSync)))
app.AddHandler3(newExecContainerHandler("POST", fmt.Sprintf("%s/pods/%s/containers/%s/exec", prefix, POD_ID, CONTAINER_ID), execWorker, execContainer()))
execWorker := appsrv.NewWorkerManager("container-exec-worker", 16, appsrv.DEFAULT_BACKLOG, false)
app.AddHandler3(newContainerWorkerHandler("POST", fmt.Sprintf("%s/pods/%s/containers/%s/exec-sync", prefix, POD_ID, CONTAINER_ID), execWorker, containerSyncActionHandler(containerExecSync)))
app.AddHandler3(newContainerWorkerHandler("POST", fmt.Sprintf("%s/pods/%s/containers/%s/exec", prefix, POD_ID, CONTAINER_ID), execWorker, execContainer()))

logWorker := appsrv.NewWorkerManager("container-log-worker", 64, appsrv.DEFAULT_BACKLOG, false)
app.AddHandler3(newContainerWorkerHandler("GET", fmt.Sprintf("%s/pods/%s/containers/%s/log", prefix, POD_ID, CONTAINER_ID), logWorker, logContainer()))
}

func pullImage(ctx context.Context, userCred mcclient.TokenCredential, pod guestman.PodInstance, ctrId string, body jsonutils.JSONObject) (jsonutils.JSONObject, error) {
Expand Down Expand Up @@ -164,7 +169,7 @@ func saveVolumeMountToImage(ctx context.Context, userCred mcclient.TokenCredenti
return pod.SaveVolumeMountToImage(ctx, userCred, input, ctrId)
}

func newExecContainerHandler(method, urlPath string, worker *appsrv.SWorkerManager, hander appsrv.FilterHandler) *appsrv.SHandlerInfo {
func newContainerWorkerHandler(method, urlPath string, worker *appsrv.SWorkerManager, hander appsrv.FilterHandler) *appsrv.SHandlerInfo {
hi := &appsrv.SHandlerInfo{}
hi.SetMethod(method)
hi.SetPath(urlPath)
Expand All @@ -174,9 +179,11 @@ func newExecContainerHandler(method, urlPath string, worker *appsrv.SWorkerManag
return hi
}

func execContainer() appsrv.FilterHandler {
type containerWorkerActionHander func(ctx context.Context, userCred mcclient.TokenCredential, pod guestman.PodInstance, ctrId string, query, body jsonutils.JSONObject, r *http.Request, w http.ResponseWriter)

func containerWorkerAction(handler containerWorkerActionHander) appsrv.FilterHandler {
return auth.Authenticate(func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
params, query, _ := appsrv.FetchEnv(ctx, w, r)
params, query, body := appsrv.FetchEnv(ctx, w, r)
podId := params[POD_ID]
ctrId := params[CONTAINER_ID]
userCred := auth.FetchUserCredential(ctx, nil)
Expand All @@ -190,6 +197,48 @@ func execContainer() appsrv.FilterHandler {
hostutils.Response(ctx, w, httperrors.NewBadRequestError("runtime instance is %#v", podObj))
return
}
handler(ctx, userCred, pod, ctrId, query, body, r, w)
})
}

func logContainer() appsrv.FilterHandler {
return containerWorkerAction(func(ctx context.Context, userCred mcclient.TokenCredential, pod guestman.PodInstance, ctrId string, query, body jsonutils.JSONObject, r *http.Request, w http.ResponseWriter) {
input := new(compute.PodLogOptions)
if err := query.Unmarshal(input); err != nil {
hostutils.Response(ctx, w, errors.Wrap(err, "unmarshal to PodLogOptions"))
return
}
if err := compute.ValidatePodLogOptions(input); err != nil {
hostutils.Response(ctx, w, err)
return
}
if _, ok := w.(http.Flusher); !ok {
hostutils.Response(ctx, w, errors.Errorf("unable to convert to http.Flusher"))
return
}
w.Header().Set("Transfer-Encoding", "chunked")
fw := flushwriter.Wrap(w)
ctx, cancel := context.WithCancel(ctx)
go func() {
for {
// check whether client request is closed
select {
case <-r.Context().Done():
log.Infof("client request is closed, end session")
cancel()
return
}
}
}()
if err := pod.ReadLogs(ctx, userCred, ctrId, input, fw, fw); err != nil {
hostutils.Response(ctx, w, errors.Wrap(err, "Read logs"))
return
}
})
}

func execContainer() appsrv.FilterHandler {
return containerWorkerAction(func(ctx context.Context, userCred mcclient.TokenCredential, pod guestman.PodInstance, ctrId string, query, body jsonutils.JSONObject, r *http.Request, w http.ResponseWriter) {
input := new(compute.ContainerExecInput)
if err := query.Unmarshal(input); err != nil {
hostutils.Response(ctx, w, errors.Wrap(err, "unmarshal to ContainerExecInput"))
Expand Down
26 changes: 26 additions & 0 deletions pkg/mcclient/modules/compute/mod_containers.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,23 @@
package compute

import (
"context"
"fmt"
"io"
"net/url"
"os"
"time"

"yunion.io/x/jsonutils"
"yunion.io/x/pkg/errors"
"yunion.io/x/pkg/util/httputils"

api "yunion.io/x/onecloud/pkg/apis/compute"
"yunion.io/x/onecloud/pkg/mcclient"
"yunion.io/x/onecloud/pkg/mcclient/modulebase"
"yunion.io/x/onecloud/pkg/mcclient/modules"
"yunion.io/x/onecloud/pkg/util/pod/remotecommand"
"yunion.io/x/onecloud/pkg/util/pod/stream"
"yunion.io/x/onecloud/pkg/util/pod/term"
)

Expand Down Expand Up @@ -92,6 +96,28 @@ func (man ContainerManager) Exec(s *mcclient.ClientSession, id string, opt *api.
return t.Safe(fn)
}

func (man ContainerManager) Log(s *mcclient.ClientSession, id string, opt *api.PodLogOptions) (io.ReadCloser, error) {
info, err := man.GetSpecific(s, id, "exec-info", nil)
if err != nil {
return nil, errors.Wrap(err, "get exec info")
}
infoOut := new(api.ContainerExecInfoOutput)
if err := info.Unmarshal(infoOut); err != nil {
return nil, errors.Wrap(err, "unmarshal exec info")
}

qs := jsonutils.Marshal(opt).QueryString()
urlLoc := fmt.Sprintf("%s/pods/%s/containers/%s/log?%s", infoOut.HostUri, infoOut.PodId, infoOut.ContainerId, qs)

headers := mcclient.GetTokenHeaders(s.GetToken())
req := stream.NewRequest(httputils.GetTimeoutClient(1*time.Hour), nil, headers)
reader, err := req.Stream(context.Background(), "GET", urlLoc)
if err != nil {
return nil, errors.Wrap(err, "stream request")
}
return reader, nil
}

var (
Containers ContainerManager
)
Expand Down
Loading

0 comments on commit 41e3ff1

Please sign in to comment.