diff --git a/cmd/climc/shell/compute/containers.go b/cmd/climc/shell/compute/containers.go index af225574538..9775f705f3c 100644 --- a/cmd/climc/shell/compute/containers.go +++ b/cmd/climc/shell/compute/containers.go @@ -15,7 +15,9 @@ package compute import ( + "bufio" "fmt" + "io" "io/ioutil" "os" "os/exec" @@ -116,4 +118,31 @@ func init() { man := modules.Containers return man.Exec(s, opts.ID, opts.ToAPIInput()) }) + + R(new(options.ContainerLogOptions), "container-log", "Get container log", func(s *mcclient.ClientSession, opts *options.ContainerLogOptions) error { + man := modules.Containers + input, err := opts.ToAPIInput() + if err != nil { + return err + } + reader, err := man.Log(s, opts.ID, input) + if err != nil { + return errors.Wrap(err, "get container log") + } + defer reader.Close() + + r := bufio.NewReader(reader) + for { + bytes, err := r.ReadBytes('\n') + if _, err := os.Stdout.Write(bytes); err != nil { + return errors.Wrap(err, "write container log to stdout") + } + if err != nil { + if err != io.EOF { + return errors.Wrap(err, "read container log") + } + return nil + } + } + }) } diff --git a/pkg/apis/compute/pod.go b/pkg/apis/compute/pod.go index 97a643b5514..338fa13a471 100644 --- a/pkg/apis/compute/pod.go +++ b/pkg/apis/compute/pod.go @@ -14,6 +14,12 @@ package compute +import ( + "time" + + "yunion.io/x/onecloud/pkg/httperrors" +) + const ( POD_STATUS_CREATING_CONTAINER = "creating_container" POD_STATUS_CREATE_CONTAINER_FAILED = "create_container_failed" @@ -91,3 +97,67 @@ type PodMetadataPortMapping struct { type GuestSetPortMappingsInput struct { PortMappings []*PodPortMapping `json:"port_mappings"` } + +type PodLogOptions struct { + // The container for which to stream logs. Defaults to only container if there is one container in the pod. + // +optional + Container string `json:"container,omitempty"` + // Follow the log stream of the pod. Defaults to false. + // +optional + Follow bool `json:"follow,omitempty"` + // Return previous terminated container logs. Defaults to false. + // +optional + Previous bool `json:"previous,omitempty"` + // A relative time in seconds before the current time from which to show logs. If this value + // precedes the time a pod was started, only logs since the pod start will be returned. + // If this value is in the future, no logs will be returned. + // Only one of sinceSeconds or sinceTime may be specified. + // +optional + SinceSeconds *int64 `json:"sinceSeconds,omitempty"` + // An RFC3339 timestamp from which to show logs. If this value + // precedes the time a pod was started, only logs since the pod start will be returned. + // If this value is in the future, no logs will be returned. + // Only one of sinceSeconds or sinceTime may be specified. + // +optional + SinceTime *time.Time `json:"sinceTime,omitempty"` + // If true, add an RFC3339 or RFC3339Nano timestamp at the beginning of every line + // of log output. Defaults to false. + // +optional + Timestamps bool `json:"timestamps,omitempty"` + // If set, the number of lines from the end of the logs to show. If not specified, + // logs are shown from the creation of the container or sinceSeconds or sinceTime + // +optional + TailLines *int64 `json:"tailLines,omitempty"` + // If set, the number of bytes to read from the server before terminating the + // log output. This may not display a complete final line of logging, and may return + // slightly more or slightly less than the specified limit. + // +optional + LimitBytes *int64 `json:"limitBytes,omitempty"` + + // insecureSkipTLSVerifyBackend indicates that the apiserver should not confirm the validity of the + // serving certificate of the backend it is connecting to. This will make the HTTPS connection between the apiserver + // and the backend insecure. This means the apiserver cannot verify the log data it is receiving came from the real + // kubelet. If the kubelet is configured to verify the apiserver's TLS credentials, it does not mean the + // connection to the real kubelet is vulnerable to a man in the middle attack (e.g. an attacker could not intercept + // the actual log data coming from the real kubelet). + // +optional + InsecureSkipTLSVerifyBackend bool `json:"insecureSkipTLSVerifyBackend,omitempty"` +} + +func ValidatePodLogOptions(opts *PodLogOptions) error { + if opts.TailLines != nil && *opts.TailLines < 0 { + return httperrors.NewInputParameterError("negative tail lines") + } + if opts.LimitBytes != nil && *opts.LimitBytes < 1 { + return httperrors.NewInputParameterError("limit_bytes must be greater than zero") + } + if opts.SinceSeconds != nil && opts.SinceTime != nil { + return httperrors.NewInputParameterError("at most one of since_time or since_seconds must be specified") + } + if opts.SinceSeconds != nil { + if *opts.SinceSeconds < 1 { + return httperrors.NewInputParameterError("since_seconds must be greater than zero") + } + } + return nil +} diff --git a/pkg/appsrv/appsrv.go b/pkg/appsrv/appsrv.go index f591bb69767..30c005fad6d 100644 --- a/pkg/appsrv/appsrv.go +++ b/pkg/appsrv/appsrv.go @@ -218,6 +218,12 @@ type loggingResponseWriter struct { data []byte } +func (lrw *loggingResponseWriter) Flush() { + if fw, ok := lrw.ResponseWriter.(http.Flusher); ok { + fw.Flush() + } +} + func (lrw *loggingResponseWriter) Write(data []byte) (int, error) { lrw.data = data return lrw.ResponseWriter.Write(data) diff --git a/pkg/appsrv/response.go b/pkg/appsrv/response.go index 9ead0f5a67e..fa44d0a9458 100644 --- a/pkg/appsrv/response.go +++ b/pkg/appsrv/response.go @@ -118,7 +118,7 @@ func (w *responseWriterChannel) wait(ctx context.Context, workerChan chan *SWork err = httperrors.NewTimeoutError("request process timeout") stop = true case bytes, more := <-w.bodyChan: - // log.Print("Recive body ", len(bytes), " more ", more) + // log.Infof("Recive body: %s, more: %v", len(bytes), more) if more { c, e := w.backend.Write(bytes) w.bodyResp <- responseWriterResponse{count: c, err: e} @@ -126,7 +126,7 @@ func (w *responseWriterChannel) wait(ctx context.Context, workerChan chan *SWork stop = true } case status, more := <-w.statusChan: - // log.Print("Recive status ", status, " more ", more) + // log.Infof("Recive status %d, more: %v", status, more) if more { w.backend.WriteHeader(status) w.statusResp <- true diff --git a/pkg/hostman/guestman/pod.go b/pkg/hostman/guestman/pod.go index 5bf520c86a3..1a9bbd4d14e 100644 --- a/pkg/hostman/guestman/pod.go +++ b/pkg/hostman/guestman/pod.go @@ -17,6 +17,7 @@ package guestman import ( "context" "fmt" + "io" "io/ioutil" "net/url" "os" @@ -54,6 +55,7 @@ import ( "yunion.io/x/onecloud/pkg/util/netutils2/getport" "yunion.io/x/onecloud/pkg/util/pod" "yunion.io/x/onecloud/pkg/util/pod/image" + "yunion.io/x/onecloud/pkg/util/pod/logs" "yunion.io/x/onecloud/pkg/util/procutils" ) @@ -69,6 +71,8 @@ type PodInstance interface { SaveVolumeMountToImage(ctx context.Context, userCred mcclient.TokenCredential, input *hostapi.ContainerSaveVolumeMountToImageInput, ctrId string) (jsonutils.JSONObject, error) ExecContainer(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *computeapi.ContainerExecInput) (*url.URL, error) ContainerExecSync(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *computeapi.ContainerExecSyncInput) (jsonutils.JSONObject, error) + + ReadLogs(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *computeapi.PodLogOptions, stdout, stderr io.Writer) error } type sContainer struct { @@ -1646,3 +1650,23 @@ func (s *sPodGuestInstance) ContainerExecSync(ctx context.Context, userCred mccl ExitCode: resp.ExitCode, }), nil } + +func (s *sPodGuestInstance) ReadLogs(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *computeapi.PodLogOptions, stdout, stderr io.Writer) error { + // Do a zero-byte write to stdout before handing off to the container runtime. + // This ensures at least one Write call is made to the writer when copying starts, + // even if we then block waiting for log output from the container. + if _, err := stdout.Write([]byte{}); err != nil { + return err + } + ctrCriId, err := s.getContainerCRIId(ctrId) + if err != nil { + return errors.Wrapf(err, "get container cri id %s", ctrId) + } + resp, err := s.getCRI().ContainerStatus(ctx, ctrCriId) + if err != nil { + return errors.Wrapf(err, "get container status %s", ctrCriId) + } + logPath := resp.GetStatus().GetLogPath() + opts := logs.NewLogOptions(input, time.Now()) + return logs.ReadLogs(ctx, logPath, ctrCriId, opts, s.getCRI().GetRuntimeClient(), stdout, stderr) +} diff --git a/pkg/hostman/guestman/podhandlers/podhandlers.go b/pkg/hostman/guestman/podhandlers/podhandlers.go index 6d1166cf28f..74d6195e25c 100644 --- a/pkg/hostman/guestman/podhandlers/podhandlers.go +++ b/pkg/hostman/guestman/podhandlers/podhandlers.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/util/proxy" "yunion.io/x/jsonutils" + "yunion.io/x/log" "yunion.io/x/pkg/errors" "yunion.io/x/onecloud/pkg/apis/compute" @@ -34,6 +35,7 @@ import ( "yunion.io/x/onecloud/pkg/httperrors" "yunion.io/x/onecloud/pkg/mcclient" "yunion.io/x/onecloud/pkg/mcclient/auth" + "yunion.io/x/onecloud/pkg/util/flushwriter" ) const ( @@ -115,9 +117,12 @@ func AddPodHandlers(prefix string, app *appsrv.Application) { containerActionHandler(f)) } - execWorker := appsrv.NewWorkerManager("exec-worker", 16, appsrv.DEFAULT_BACKLOG, false) - app.AddHandler3(newExecContainerHandler("POST", fmt.Sprintf("%s/pods/%s/containers/%s/exec-sync", prefix, POD_ID, CONTAINER_ID), execWorker, containerSyncActionHandler(containerExecSync))) - app.AddHandler3(newExecContainerHandler("POST", fmt.Sprintf("%s/pods/%s/containers/%s/exec", prefix, POD_ID, CONTAINER_ID), execWorker, execContainer())) + execWorker := appsrv.NewWorkerManager("container-exec-worker", 16, appsrv.DEFAULT_BACKLOG, false) + app.AddHandler3(newContainerWorkerHandler("POST", fmt.Sprintf("%s/pods/%s/containers/%s/exec-sync", prefix, POD_ID, CONTAINER_ID), execWorker, containerSyncActionHandler(containerExecSync))) + app.AddHandler3(newContainerWorkerHandler("POST", fmt.Sprintf("%s/pods/%s/containers/%s/exec", prefix, POD_ID, CONTAINER_ID), execWorker, execContainer())) + + logWorker := appsrv.NewWorkerManager("container-log-worker", 64, appsrv.DEFAULT_BACKLOG, false) + app.AddHandler3(newContainerWorkerHandler("GET", fmt.Sprintf("%s/pods/%s/containers/%s/log", prefix, POD_ID, CONTAINER_ID), logWorker, logContainer())) } func pullImage(ctx context.Context, userCred mcclient.TokenCredential, pod guestman.PodInstance, ctrId string, body jsonutils.JSONObject) (jsonutils.JSONObject, error) { @@ -164,7 +169,7 @@ func saveVolumeMountToImage(ctx context.Context, userCred mcclient.TokenCredenti return pod.SaveVolumeMountToImage(ctx, userCred, input, ctrId) } -func newExecContainerHandler(method, urlPath string, worker *appsrv.SWorkerManager, hander appsrv.FilterHandler) *appsrv.SHandlerInfo { +func newContainerWorkerHandler(method, urlPath string, worker *appsrv.SWorkerManager, hander appsrv.FilterHandler) *appsrv.SHandlerInfo { hi := &appsrv.SHandlerInfo{} hi.SetMethod(method) hi.SetPath(urlPath) @@ -174,9 +179,11 @@ func newExecContainerHandler(method, urlPath string, worker *appsrv.SWorkerManag return hi } -func execContainer() appsrv.FilterHandler { +type containerWorkerActionHander func(ctx context.Context, userCred mcclient.TokenCredential, pod guestman.PodInstance, ctrId string, query, body jsonutils.JSONObject, r *http.Request, w http.ResponseWriter) + +func containerWorkerAction(handler containerWorkerActionHander) appsrv.FilterHandler { return auth.Authenticate(func(ctx context.Context, w http.ResponseWriter, r *http.Request) { - params, query, _ := appsrv.FetchEnv(ctx, w, r) + params, query, body := appsrv.FetchEnv(ctx, w, r) podId := params[POD_ID] ctrId := params[CONTAINER_ID] userCred := auth.FetchUserCredential(ctx, nil) @@ -190,6 +197,48 @@ func execContainer() appsrv.FilterHandler { hostutils.Response(ctx, w, httperrors.NewBadRequestError("runtime instance is %#v", podObj)) return } + handler(ctx, userCred, pod, ctrId, query, body, r, w) + }) +} + +func logContainer() appsrv.FilterHandler { + return containerWorkerAction(func(ctx context.Context, userCred mcclient.TokenCredential, pod guestman.PodInstance, ctrId string, query, body jsonutils.JSONObject, r *http.Request, w http.ResponseWriter) { + input := new(compute.PodLogOptions) + if err := query.Unmarshal(input); err != nil { + hostutils.Response(ctx, w, errors.Wrap(err, "unmarshal to PodLogOptions")) + return + } + if err := compute.ValidatePodLogOptions(input); err != nil { + hostutils.Response(ctx, w, err) + return + } + if _, ok := w.(http.Flusher); !ok { + hostutils.Response(ctx, w, errors.Errorf("unable to convert to http.Flusher")) + return + } + w.Header().Set("Transfer-Encoding", "chunked") + fw := flushwriter.Wrap(w) + ctx, cancel := context.WithCancel(ctx) + go func() { + for { + // check whether client request is closed + select { + case <-r.Context().Done(): + log.Infof("client request is closed, end session") + cancel() + return + } + } + }() + if err := pod.ReadLogs(ctx, userCred, ctrId, input, fw, fw); err != nil { + hostutils.Response(ctx, w, errors.Wrap(err, "Read logs")) + return + } + }) +} + +func execContainer() appsrv.FilterHandler { + return containerWorkerAction(func(ctx context.Context, userCred mcclient.TokenCredential, pod guestman.PodInstance, ctrId string, query, body jsonutils.JSONObject, r *http.Request, w http.ResponseWriter) { input := new(compute.ContainerExecInput) if err := query.Unmarshal(input); err != nil { hostutils.Response(ctx, w, errors.Wrap(err, "unmarshal to ContainerExecInput")) diff --git a/pkg/mcclient/modules/compute/mod_containers.go b/pkg/mcclient/modules/compute/mod_containers.go index e90c9cdf4fe..2b558a4d07b 100644 --- a/pkg/mcclient/modules/compute/mod_containers.go +++ b/pkg/mcclient/modules/compute/mod_containers.go @@ -15,19 +15,23 @@ package compute import ( + "context" "fmt" "io" "net/url" "os" + "time" "yunion.io/x/jsonutils" "yunion.io/x/pkg/errors" + "yunion.io/x/pkg/util/httputils" api "yunion.io/x/onecloud/pkg/apis/compute" "yunion.io/x/onecloud/pkg/mcclient" "yunion.io/x/onecloud/pkg/mcclient/modulebase" "yunion.io/x/onecloud/pkg/mcclient/modules" "yunion.io/x/onecloud/pkg/util/pod/remotecommand" + "yunion.io/x/onecloud/pkg/util/pod/stream" "yunion.io/x/onecloud/pkg/util/pod/term" ) @@ -92,6 +96,28 @@ func (man ContainerManager) Exec(s *mcclient.ClientSession, id string, opt *api. return t.Safe(fn) } +func (man ContainerManager) Log(s *mcclient.ClientSession, id string, opt *api.PodLogOptions) (io.ReadCloser, error) { + info, err := man.GetSpecific(s, id, "exec-info", nil) + if err != nil { + return nil, errors.Wrap(err, "get exec info") + } + infoOut := new(api.ContainerExecInfoOutput) + if err := info.Unmarshal(infoOut); err != nil { + return nil, errors.Wrap(err, "unmarshal exec info") + } + + qs := jsonutils.Marshal(opt).QueryString() + urlLoc := fmt.Sprintf("%s/pods/%s/containers/%s/log?%s", infoOut.HostUri, infoOut.PodId, infoOut.ContainerId, qs) + + headers := mcclient.GetTokenHeaders(s.GetToken()) + req := stream.NewRequest(httputils.GetTimeoutClient(1*time.Hour), nil, headers) + reader, err := req.Stream(context.Background(), "GET", urlLoc) + if err != nil { + return nil, errors.Wrap(err, "stream request") + } + return reader, nil +} + var ( Containers ContainerManager ) diff --git a/pkg/mcclient/options/compute/containers.go b/pkg/mcclient/options/compute/containers.go index d8891358da7..b93b16882a3 100644 --- a/pkg/mcclient/options/compute/containers.go +++ b/pkg/mcclient/options/compute/containers.go @@ -18,6 +18,7 @@ import ( "os" "strconv" "strings" + "time" "yunion.io/x/jsonutils" "yunion.io/x/pkg/errors" @@ -340,3 +341,42 @@ func (o *ContainerExecSyncOptions) Params() (jsonutils.JSONObject, error) { Timeout: o.Timeout, }), nil } + +type ContainerLogOptions struct { + ServerIdOptions + Since string `help:"Only return logs newer than a relative duration like 5s, 2m, or 3h"` + Follow bool `help:"Follow log output" short-token:"f"` + Tail int64 `help:"Lines of recent log file to display"` + Timestamps bool `help:"Show timestamps on each line in the log output"` + LimitBytes int64 `help:"Maximum amount of bytes that can be used."` +} + +func (o *ContainerLogOptions) Params() (jsonutils.JSONObject, error) { + input, err := o.ToAPIInput() + if err != nil { + return nil, err + } + return jsonutils.Marshal(input), nil +} + +func (o *ContainerLogOptions) ToAPIInput() (*computeapi.PodLogOptions, error) { + opt := &computeapi.PodLogOptions{ + Follow: o.Follow, + Timestamps: o.Timestamps, + } + if o.LimitBytes > 0 { + opt.LimitBytes = &o.LimitBytes + } + if o.Tail > 0 { + opt.TailLines = &o.Tail + } + if len(o.Since) > 0 { + dur, err := time.ParseDuration(o.Since) + if err != nil { + return nil, errors.Wrapf(err, "parse duration %s", o.Since) + } + sec := int64(dur.Round(time.Second).Seconds()) + opt.SinceSeconds = &sec + } + return opt, nil +} diff --git a/pkg/util/flushwriter/doc.go b/pkg/util/flushwriter/doc.go new file mode 100644 index 00000000000..cc687c3af28 --- /dev/null +++ b/pkg/util/flushwriter/doc.go @@ -0,0 +1 @@ +package flushwriter // import "yunion.io/x/onecloud/pkg/util/flushwriter" diff --git a/pkg/util/flushwriter/writer.go b/pkg/util/flushwriter/writer.go new file mode 100644 index 00000000000..75799ef4467 --- /dev/null +++ b/pkg/util/flushwriter/writer.go @@ -0,0 +1,51 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flushwriter + +import ( + "io" + "net/http" +) + +// Wrap wraps an io.Writer into a writer that flushes after every write if +// the writer implements the Flusher interface. +func Wrap(w io.Writer) io.Writer { + fw := &flushWriter{ + writer: w, + } + if flusher, ok := w.(http.Flusher); ok { + fw.flusher = flusher + } + return fw +} + +// flushWriter provides wrapper for responseWriter with HTTP streaming capabilities +type flushWriter struct { + flusher http.Flusher + writer io.Writer +} + +func (fw *flushWriter) Write(p []byte) (n int, err error) { + n, err = fw.writer.Write(p) + if err != nil { + return + } + if fw.flusher != nil { + fw.flusher.Flush() + } + return +} diff --git a/pkg/util/pod/logs/constants.go b/pkg/util/pod/logs/constants.go new file mode 100644 index 00000000000..74c0f759fee --- /dev/null +++ b/pkg/util/pod/logs/constants.go @@ -0,0 +1,8 @@ +package logs + +const ( + // fixed width version of time.RFC3339Nano + RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00" + // variable width RFC3339 time format for lenient parsing of strings into timestamps + RFC3339NanoLenient = "2006-01-02T15:04:05.999999999Z07:00" +) diff --git a/pkg/util/pod/logs/doc.go b/pkg/util/pod/logs/doc.go new file mode 100644 index 00000000000..4341f229f86 --- /dev/null +++ b/pkg/util/pod/logs/doc.go @@ -0,0 +1 @@ +package logs // import "yunion.io/x/onecloud/pkg/util/pod/logs" diff --git a/pkg/util/pod/logs/logs.go b/pkg/util/pod/logs/logs.go new file mode 100644 index 00000000000..5d6db57cd30 --- /dev/null +++ b/pkg/util/pod/logs/logs.go @@ -0,0 +1,406 @@ +package logs + +import ( + "bufio" + "bytes" + "context" + "fmt" + "io" + "math" + "os" + "path/filepath" + "time" + + "github.com/fsnotify/fsnotify" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" + + "yunion.io/x/log" + "yunion.io/x/pkg/errors" + + "yunion.io/x/onecloud/pkg/apis/compute" +) + +const ( + // timeFormatOut is the format for writing timestamps to output. + timeFormatOut = RFC3339NanoFixed + // timeFormatIn is the format for parsing timestamps from other logs. + timeFormatIn = RFC3339NanoLenient + + // logForceCheckPeriod is the period to check for a new read + logForceCheckPeriod = 1 * time.Second +) + +var ( + // eol is the end-of-line sign in the log. + eol = []byte{'\n'} + // delimiter is the delimiter for timestamp and stream type in log line. + delimiter = []byte{' '} + // tagDelimiter is the delimiter for log tags. + tagDelimiter = []byte(runtimeapi.LogTagDelimiter) +) + +// logMessage is the CRI internal log type. +type logMessage struct { + timestamp time.Time + stream runtimeapi.LogStreamType + log []byte +} + +// reset resets the log to nil. +func (l *logMessage) reset() { + l.timestamp = time.Time{} + l.stream = "" + l.log = nil +} + +// LogOptions is the CRI interval type of all log options. +type LogOptions struct { + tail int64 + bytes int64 + since time.Time + follow bool + timestamp bool +} + +// NewLogOptions convert the PodLogOptions to CRI internal LogOptions. +func NewLogOptions(apiOpts *compute.PodLogOptions, now time.Time) *LogOptions { + opts := &LogOptions{ + tail: -1, // -1 by default which means read all logs. + bytes: -1, // -1 by default which means read all logs. + follow: apiOpts.Follow, + timestamp: apiOpts.Timestamps, + } + if apiOpts.TailLines != nil { + opts.tail = *apiOpts.TailLines + } + if apiOpts.LimitBytes != nil { + opts.bytes = *apiOpts.LimitBytes + } + if apiOpts.SinceSeconds != nil { + opts.since = now.Add(-time.Duration(*apiOpts.SinceSeconds) * time.Second) + } + if apiOpts.SinceTime != nil && apiOpts.SinceTime.After(opts.since) { + opts.since = *apiOpts.SinceTime + } + return opts +} + +// parseFunc is a function parsing one log line to the internal log type. +// Notice that the caller must make sure logMessage is not nil. +type parseFunc func([]byte, *logMessage) error + +var parseFuncs = []parseFunc{ + parseCRILog, // CRI log format parse function +} + +// parseCRILog parses logs in CRI log format. CRI Log format example: +// +// 2016-10-06T00:17:09.669794202Z stdout P log content 1 +// 2016-10-06T00:17:09.669794203Z stderr F log content 2 +func parseCRILog(log []byte, msg *logMessage) error { + var err error + // Parse timestamp + idx := bytes.Index(log, delimiter) + if idx < 0 { + return fmt.Errorf("timestamp is not found") + } + msg.timestamp, err = time.Parse(timeFormatIn, string(log[:idx])) + if err != nil { + return fmt.Errorf("unexpected timestamp format %q: %v", timeFormatIn, err) + } + + // Parse stream type + log = log[idx+1:] + idx = bytes.Index(log, delimiter) + if idx < 0 { + return fmt.Errorf("stream type is not found") + } + msg.stream = runtimeapi.LogStreamType(log[:idx]) + if msg.stream != runtimeapi.Stdout && msg.stream != runtimeapi.Stderr { + return fmt.Errorf("unexpected stream type %q", msg.stream) + } + + // Parse log tag + log = log[idx+1:] + idx = bytes.Index(log, delimiter) + if idx < 0 { + return fmt.Errorf("log tag is not found") + } + // Keep this forward compatible. + tags := bytes.Split(log[:idx], tagDelimiter) + partial := (runtimeapi.LogTag(tags[0]) == runtimeapi.LogTagPartial) + // Trim the tailing new line if this is a partial line. + if partial && len(log) > 0 && log[len(log)-1] == '\n' { + log = log[:len(log)-1] + } + + // Get log content + msg.log = log[idx+1:] + + return nil +} + +// getParseFunc returns proper parse function based on the sample log line passed in. +func getParseFunc(log []byte) (parseFunc, error) { + for _, p := range parseFuncs { + if err := p(log, &logMessage{}); err == nil { + return p, nil + } + } + return nil, fmt.Errorf("unsupported log format: %q", log) +} + +// logWriter controls the writing into the stream based on the log options. +type logWriter struct { + stdout io.Writer + stderr io.Writer + opts *LogOptions + remain int64 +} + +// errMaximumWrite is returned when all bytes have been written. +var errMaximumWrite = errors.Error("maximum write") + +// errShortWrite is returned when the message is not fully written. +var errShortWrite = errors.Error("short write") + +func newLogWriter(stdout io.Writer, stderr io.Writer, opts *LogOptions) *logWriter { + w := &logWriter{ + stdout: stdout, + stderr: stderr, + opts: opts, + remain: math.MaxInt64, // initialize it as infinity + } + if opts.bytes >= 0 { + w.remain = opts.bytes + } + return w +} + +// writeLogs writes logs into stdout, stderr. +func (w *logWriter) write(msg *logMessage) error { + if msg.timestamp.Before(w.opts.since) { + // Skip the line because it's older than since + return nil + } + line := msg.log + if w.opts.timestamp { + prefix := append([]byte(msg.timestamp.Format(timeFormatOut)), delimiter[0]) + line = append(prefix, line...) + } + // If the line is longer than the remaining bytes, cut it. + if int64(len(line)) > w.remain { + line = line[:w.remain] + } + // Get the proper stream to write to. + var stream io.Writer + switch msg.stream { + case runtimeapi.Stdout: + stream = w.stdout + case runtimeapi.Stderr: + stream = w.stderr + default: + return fmt.Errorf("unexpected stream type %q", msg.stream) + } + n, err := stream.Write(line) + w.remain -= int64(n) + if err != nil { + return err + } + // If the line has not been fully written, return errShortWrite + if n < len(line) { + return errShortWrite + } + // If there are no more bytes left, return errMaximumWrite + if w.remain <= 0 { + return errMaximumWrite + } + return nil +} + +// Readlogs read the container log and redirect into stdout and stderr. +// Note that containerID is only needed when following the log, or else +// just pass in empty string "". +func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, runtimeService runtimeapi.RuntimeServiceClient, stdout, stderr io.Writer) error { + // fsnotify has different behavior for symlinks in different platform, + // for example it follows symlink on Linux, but not on Windows, + // so we explicitly resolve symlinks before reading the logs. + // There shouldn't be security issue because the container log + // path is owned by kubelet and the container runtime. + evaluated, err := filepath.EvalSymlinks(path) + if err != nil { + return fmt.Errorf("failed to try resolving symlinks in path %q: %v", path, err) + } + path = evaluated + f, err := os.Open(path) + if err != nil { + return fmt.Errorf("failed to open log file %q: %v", path, err) + } + defer f.Close() + + // Search start point based on tail line. + start, err := FindTailLineStartIndex(f, opts.tail) + if err != nil { + return fmt.Errorf("failed to tail %d lines of log file %q: %v", opts.tail, path, err) + } + if _, err := f.Seek(start, io.SeekStart); err != nil { + return fmt.Errorf("failed to seek %d in log file %q: %v", start, path, err) + } + + // Start parsing the logs. + r := bufio.NewReader(f) + // Do not create watcher here because it is not needed if `Follow` is false. + var watcher *fsnotify.Watcher + var parse parseFunc + var stop bool + found := true + writer := newLogWriter(stdout, stderr, opts) + msg := &logMessage{} + for { + if stop { + log.Infof("Finish parsing log file %q", path) + return nil + } + l, err := r.ReadBytes(eol[0]) + if err != nil { + if err != io.EOF { // This is a real error + return fmt.Errorf("failed to read log file %q: %v", path, err) + } + if opts.follow { + // The container is not running, we got to the end of the log. + if !found { + return nil + } + // Reset seek so that if this is an incomplete line, + // it will be read again. + if _, err := f.Seek(-int64(len(l)), io.SeekCurrent); err != nil { + return fmt.Errorf("failed to reset seek in log file %q: %v", path, err) + } + if watcher == nil { + // Initialize the watcher if it has not been initialized yet. + if watcher, err = fsnotify.NewWatcher(); err != nil { + return fmt.Errorf("failed to create fsnotify watcher: %v", err) + } + defer watcher.Close() + if err := watcher.Add(f.Name()); err != nil { + return fmt.Errorf("failed to watch file %q: %v", f.Name(), err) + } + // If we just created the watcher, try again to read as we might have missed + // the event. + continue + } + var recreated bool + // Wait until the next log change. + found, recreated, err = waitLogs(ctx, containerID, watcher, runtimeService) + if err != nil { + return err + } + if recreated { + newF, err := os.Open(path) + if err != nil { + if os.IsNotExist(err) { + continue + } + return fmt.Errorf("failed to open log file %q: %v", path, err) + } + f.Close() + if err := watcher.Remove(f.Name()); err != nil && !os.IsNotExist(err) { + log.Errorf("failed to remove file watch %q: %v", f.Name(), err) + } + f = newF + if err := watcher.Add(f.Name()); err != nil { + return fmt.Errorf("failed to watch file %q: %v", f.Name(), err) + } + r = bufio.NewReader(f) + } + // If the container exited consume data until the next EOF + continue + } + // Should stop after writing the remaining content. + stop = true + if len(l) == 0 { + continue + } + log.Warningf("Incomplete line in log file %q: %q", path, l) + } + if parse == nil { + // Initialize the log parsing function. + parse, err = getParseFunc(l) + if err != nil { + return fmt.Errorf("failed to get parse function: %v", err) + } + } + // Parse the log line. + msg.reset() + if err := parse(l, msg); err != nil { + log.Errorf("Failed with err %v when parsing log for log file %q: %q", err, path, l) + continue + } + // Write the log line into the stream. + if err := writer.write(msg); err != nil { + if errors.Cause(err) == errMaximumWrite { + log.Infof("Finish parsing log file %q, hit bytes limit %d(bytes)", path, opts.bytes) + return nil + } + log.Errorf("Failed with err %v when writing log for log file %q: %+v", err, path, msg) + return err + } + } +} + +func isContainerRunning(ctx context.Context, id string, r runtimeapi.RuntimeServiceClient) (bool, error) { + req := &runtimeapi.ContainerStatusRequest{ContainerId: id} + resp, err := r.ContainerStatus(ctx, req) + if err != nil { + return false, err + } + s := resp.GetStatus() + // Only keep following container log when it is running. + if s.State != runtimeapi.ContainerState_CONTAINER_RUNNING { + log.Infof("Container %q is not running (state=%q)", id, s.State) + // Do not return error because it's normal that the container stops + // during waiting. + return false, nil + } + return true, nil +} + +// waitLogs wait for the next log write. It returns two booleans and an error. The first boolean +// indicates whether a new log is found; the second boolean if the log file was recreated; +// the error is error happens during waiting new logs. +func waitLogs(ctx context.Context, id string, w *fsnotify.Watcher, runtimeService runtimeapi.RuntimeServiceClient) (bool, bool, error) { + // no need to wait if the pod is not running + if running, err := isContainerRunning(ctx, id, runtimeService); !running { + return false, false, err + } + errRetry := 5 + for { + select { + case <-ctx.Done(): + return false, false, fmt.Errorf("context cancelled") + case e := <-w.Events: + switch e.Op { + case fsnotify.Write: + return true, false, nil + case fsnotify.Create: + fallthrough + case fsnotify.Rename: + fallthrough + case fsnotify.Remove: + fallthrough + case fsnotify.Chmod: + return true, true, nil + default: + log.Errorf("Unexpected fsnotify event: %v, retrying...", e) + } + case err := <-w.Errors: + log.Errorf("Fsnotify watch error: %v, %d error retries remaining", err, errRetry) + if errRetry == 0 { + return false, false, err + } + errRetry-- + case <-time.After(logForceCheckPeriod): + return true, false, nil + } + } +} diff --git a/pkg/util/pod/logs/tail.go b/pkg/util/pod/logs/tail.go new file mode 100644 index 00000000000..a0e7ae3dbb1 --- /dev/null +++ b/pkg/util/pod/logs/tail.go @@ -0,0 +1,94 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package logs + +import ( + "bytes" + "io" + "io/ioutil" + "os" +) + +const ( + // blockSize is the block size used in tail. + blockSize = 1024 +) + +// ReadAtMost reads at most max bytes from the end of the file identified by path or +// returns an error. It returns true if the file was longer than max. It will +// allocate up to max bytes. +func ReadAtMost(path string, max int64) ([]byte, bool, error) { + f, err := os.Open(path) + if err != nil { + return nil, false, err + } + defer f.Close() + fi, err := f.Stat() + if err != nil { + return nil, false, err + } + size := fi.Size() + if size == 0 { + return nil, false, nil + } + if size < max { + max = size + } + offset, err := f.Seek(-max, io.SeekEnd) + if err != nil { + return nil, false, err + } + data, err := ioutil.ReadAll(f) + return data, offset > 0, err +} + +// FindTailLineStartIndex returns the start of last nth line. +// * If n < 0, return the beginning of the file. +// * If n >= 0, return the beginning of last nth line. +// Notice that if the last line is incomplete (no end-of-line), it will not be counted +// as one line. +func FindTailLineStartIndex(f io.ReadSeeker, n int64) (int64, error) { + if n < 0 { + return 0, nil + } + size, err := f.Seek(0, io.SeekEnd) + if err != nil { + return 0, err + } + var left, cnt int64 + buf := make([]byte, blockSize) + for right := size; right > 0 && cnt <= n; right -= blockSize { + left = right - blockSize + if left < 0 { + left = 0 + buf = make([]byte, right) + } + if _, err := f.Seek(left, io.SeekStart); err != nil { + return 0, err + } + if _, err := f.Read(buf); err != nil { + return 0, err + } + cnt += int64(bytes.Count(buf, eol)) + } + for ; cnt > n; cnt-- { + idx := bytes.Index(buf, eol) + 1 + buf = buf[idx:] + left += int64(idx) + } + return left, nil +} diff --git a/pkg/util/pod/stream/doc.go b/pkg/util/pod/stream/doc.go new file mode 100644 index 00000000000..57b4482e290 --- /dev/null +++ b/pkg/util/pod/stream/doc.go @@ -0,0 +1 @@ +package stream // import "yunion.io/x/onecloud/pkg/util/pod/stream" diff --git a/pkg/util/pod/stream/stream.go b/pkg/util/pod/stream/stream.go new file mode 100644 index 00000000000..a9479356227 --- /dev/null +++ b/pkg/util/pod/stream/stream.go @@ -0,0 +1,155 @@ +package stream + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "net/http" + "strconv" + "time" + + "golang.org/x/net/http2" + "k8s.io/apimachinery/pkg/util/net" + "moul.io/http2curl/v2" + + "yunion.io/x/log" + "yunion.io/x/pkg/util/httputils" +) + +type Request struct { + body io.Reader + headers http.Header + client *http.Client +} + +func NewRequest(client *http.Client, body io.Reader, headers http.Header) *Request { + return &Request{ + body: body, + headers: headers, + client: client, + } +} + +func (r *Request) Stream(ctx context.Context, method string, url string) (io.ReadCloser, error) { + req, err := http.NewRequest(method, url, nil) + if err != nil { + return nil, err + } + if r.body != nil { + req.Body = ioutil.NopCloser(r.body) + } + req = req.WithContext(ctx) + req.Header = r.headers + client := r.client + if client == nil { + client = httputils.GetTimeoutClient(1 * time.Hour) + } + + curlCmd, _ := http2curl.GetCurlCommand(req) + log.Infof("curl: %s", curlCmd) + resp, err := client.Do(req) + if err != nil { + return nil, err + } + + switch { + case (resp.StatusCode >= 200) && (resp.StatusCode < 300): + return resp.Body, nil + + default: + // ensure we close the body before returning the error + defer resp.Body.Close() + + result := r.transformResponse(resp, req) + err := result.Error() + if err == nil { + err = fmt.Errorf("%d while accessing %v: %s", result.statusCode, url, string(result.body)) + } + return nil, err + } +} + +// transformResponse converts an API response into a structured API object +func (r *Request) transformResponse(resp *http.Response, req *http.Request) Result { + var body []byte + if resp.Body != nil { + data, err := ioutil.ReadAll(resp.Body) + switch err.(type) { + case nil: + body = data + case http2.StreamError: + // This is trying to catch the scenario that the server may close the connection when sending the + // response body. This can be caused by server timeout due to a slow network connection. + // TODO: Add test for this. Steps may be: + // 1. client-go (or kubectl) sends a GET request. + // 2. Apiserver sends back the headers and then part of the body + // 3. Apiserver closes connection. + // 4. client-go should catch this and return an error. + log.Infof("Stream error %#v when reading response body, may be caused by closed connection.", err) + streamErr := fmt.Errorf("stream error when reading response body, may be caused by closed connection. Please retry. Original error: %v", err) + return Result{ + err: streamErr, + } + default: + log.Errorf("Unexpected error when reading response body: %v", err) + unexpectedErr := fmt.Errorf("unexpected error when reading response body. Please retry. Original error: %v", err) + return Result{ + err: unexpectedErr, + } + } + } + + contentType := resp.Header.Get("Content-Type") + + switch { + case resp.StatusCode == http.StatusSwitchingProtocols: + // no-op, we've been upgraded + case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent: + // calculate an unstructured error from the response which the Result object may use if the caller + // did not return a structured error. + // retryAfter, _ := retryAfterSeconds(resp) + // err := r.newUnstructuredResponseError(body, isTextResponse(resp), resp.StatusCode, req.Method, retryAfter) + return Result{ + body: body, + contentType: contentType, + statusCode: resp.StatusCode, + err: nil, + } + } + + return Result{ + body: body, + contentType: contentType, + statusCode: resp.StatusCode, + } +} + +// retryAfterSeconds returns the value of the Retry-After header and true, or 0 and false if +// the header was missing or not a valid number. +func retryAfterSeconds(resp *http.Response) (int, bool) { + if h := resp.Header.Get("Retry-After"); len(h) > 0 { + if i, err := strconv.Atoi(h); err == nil { + return i, true + } + } + return 0, false +} + +// Result contains the result of calling Request.Do(). +type Result struct { + body []byte + warnings []net.WarningHeader + contentType string + err error + statusCode int +} + +// Raw returns the raw result. +func (r Result) Raw() ([]byte, error) { + return r.body, r.err +} + +func (r Result) Error() error { + return r.err +}