Skip to content

Commit

Permalink
add log api
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancandevloper committed Aug 31, 2023
1 parent c5dd035 commit c04d5a9
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 122 deletions.
190 changes: 68 additions & 122 deletions pkg/apiserver/cubeapi/resourcemanage/handle/podlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,20 @@ limitations under the License.
package resourcemanage

import (
"bufio"
"context"
"github.com/kubecube-io/kubecube/pkg/utils/constants"
"io"
"io/ioutil"
"strconv"

"github.com/gin-gonic/gin"
v1 "k8s.io/api/core/v1"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/kubectl/pkg/scheme"

"github.com/kubecube-io/kubecube/pkg/clog"
"github.com/kubecube-io/kubecube/pkg/multicluster/client"
"github.com/kubecube-io/kubecube/pkg/utils/errcode"
"github.com/kubecube-io/kubecube/pkg/utils/filter"
"github.com/kubecube-io/kubecube/pkg/utils/log"
"github.com/kubecube-io/kubecube/pkg/utils/response"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
)

// NOTE: This file is copied from k8s.io/kubernetes/dashboard/src/app/backend/resource/container/logs.go.
Expand All @@ -60,137 +57,86 @@ func (podLog *PodLog) HandleLogs(c *gin.Context) {
k8sClient := podLog.client.ClientSet()

namespace := c.Param("namespace")
podID := c.Param("resourceName")
containerID := c.Query("containerName")

refTimestamp := c.Query("referenceTimestamp")
if refTimestamp == "" {
refTimestamp = log.NewestTimestamp
pod := c.Param("resourceName")
container := c.Query("container")
tailLines := c.Query("tailLines")
timestamps := c.Query("timestamps")
limitBytes := c.Query("limitBytes")
sinceSeconds := c.Query("sinceSeconds")
follow := c.Query("follow")

lines := int64(1000)
var err error
if len(tailLines) > 0 {
lines, err = strconv.ParseInt(tailLines, 10, 64)
}

refLineNum, err := strconv.Atoi(c.Query("referenceLineNum"))
if err != nil {
refLineNum = 0
response.FailReturn(c, errcode.ParamsInvalid(err))
return
}
usePreviousLogs := c.Query("previous") == "true"
offsetFrom, err1 := strconv.Atoi(c.Query("offsetFrom"))
offsetTo, err2 := strconv.Atoi(c.Query("offsetTo"))
logFilePosition := c.Query("logFilePosition")
isTimestamps, _ := strconv.ParseBool(timestamps)
isFollow, _ := strconv.ParseBool(follow)

logSelector := log.DefaultSelection
if err1 == nil && err2 == nil {
logSelector = &log.Selection{
ReferencePoint: log.LogLineId{
LogTimestamp: log.LogTimestamp(refTimestamp),
LineNum: refLineNum,
},
OffsetFrom: offsetFrom,
OffsetTo: offsetTo,
LogFilePosition: logFilePosition,
}
limit := int64(5242880) // 5MBi
if len(limitBytes) > 0 {
limit, err = strconv.ParseInt(limitBytes, 10, 64)
}

result, err := GetLogDetails(k8sClient, namespace, podID, containerID, logSelector, usePreviousLogs)
if err != nil {
clog.Error("get log details fail: %v", err)
response.FailReturn(c, errcode.BadRequest(err))
response.FailReturn(c, errcode.ParamsInvalid(err))
return
}
response.SuccessReturn(c, result)
}

// GetLogDetails returns logs for particular pod and container. When container is null, logs for the first one
// are returned. Previous indicates to read archived logs created by log rotation or container crash
func GetLogDetails(client kubernetes.Interface, namespace, podID string, container string,
logSelector *log.Selection, usePreviousLogs bool) (*log.LogDetails, error) {
pod, err := client.CoreV1().Pods(namespace).Get(context.TODO(), podID, metaV1.GetOptions{})
if err != nil {
return nil, err
seconds := int64(1800) // 30min
if len(sinceSeconds) > 0 {
seconds, err = strconv.ParseInt(sinceSeconds, 10, 64)
}

if len(container) == 0 {
container = pod.Spec.Containers[0].Name
}

logOptions := mapToLogOptions(container, logSelector, usePreviousLogs)
rawLogs, err := readRawLogs(client, namespace, podID, logOptions)
if err != nil {
return nil, err
response.FailReturn(c, errcode.ParamsInvalid(err))
return
}
details := ConstructLogDetails(podID, rawLogs, container, logSelector)
return details, nil
}

// Maps the log selection to the corresponding api object
// Read limits are set to avoid out of memory issues
func mapToLogOptions(container string, logSelector *log.Selection, previous bool) *v1.PodLogOptions {
logOptions := &v1.PodLogOptions{
Container: container,
Follow: false,
Previous: previous,
Timestamps: true,
}

if logSelector.LogFilePosition == log.Beginning {
logOptions.LimitBytes = &log.ByteReadLimit
} else {
logOptions.TailLines = &log.LineReadLimit
Container: container,
Follow: isFollow,
Timestamps: isTimestamps,
TailLines: &lines,
LimitBytes: &limit,
SinceSeconds: &seconds,
}

return logOptions
}

// Construct a request for getting the logs for a pod and retrieves the logs.
func readRawLogs(client kubernetes.Interface, namespace, podID string, logOptions *v1.PodLogOptions) (
string, error) {
readCloser, err := openStream(client, namespace, podID, logOptions)
logStream, err := openStream(c, k8sClient, namespace, pod, logOptions)
if err != nil {
return err.Error(), nil
}

defer readCloser.Close()

result, err := ioutil.ReadAll(readCloser)
if err != nil {
return "", err
}

return string(result), nil
}

func openStream(client kubernetes.Interface, namespace, podID string, logOptions *v1.PodLogOptions) (io.ReadCloser, error) {
return client.CoreV1().RESTClient().Get().
Namespace(namespace).
Name(podID).
Resource("pods").
SubResource("log").
VersionedParams(logOptions, scheme.ParameterCodec).Stream(context.TODO())
}

// ConstructLogDetails creates a new log details structure for given parameters.
func ConstructLogDetails(podID string, rawLogs string, container string, logSelector *log.Selection) *log.LogDetails {
parsedLines := log.ToLogLines(rawLogs)
logLines, fromDate, toDate, logSelection, lastPage := parsedLines.SelectLogs(logSelector)

readLimitReached := isReadLimitReached(int64(len(rawLogs)), int64(len(parsedLines)), logSelector.LogFilePosition)
truncated := readLimitReached && lastPage

info := log.LogInfo{
PodName: podID,
ContainerName: container,
FromDate: fromDate,
ToDate: toDate,
Truncated: truncated,
clog.Error("get log details fail: %v", err)
response.FailReturn(c, errcode.BadRequest(err))
return
}
return &log.LogDetails{
Info: info,
Selection: logSelection,
LogLines: logLines,
defer func(logStream io.ReadCloser) {
_ = logStream.Close()
}(logStream)
writer := c.Writer
header := writer.Header()
header.Set(constants.HttpHeaderTransferEncoding, constants.HttpHeaderChunked)
header.Set(constants.HttpHeaderContentType, constants.HttpHeaderTextHtml)
r := bufio.NewReader(logStream)
for {
bytes, err := r.ReadBytes('\n')
if err != nil {
if err == io.EOF {

Check failure on line 121 in pkg/apiserver/cubeapi/resourcemanage/handle/podlog.go

View workflow job for this annotation

GitHub Actions / lint

comparing with == will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
response.SuccessReturn(c, nil)
return
}
clog.Error("read log fail: %v", err)
response.FailReturn(c, errcode.BadRequest(err))
return
}
_, err = writer.Write(bytes)
if err != nil {
clog.Error("write log fail: %v", err)
response.FailReturn(c, errcode.BadRequest(err))
return
}
writer.Flush()
}
}

// Checks if the amount of log file returned from the apiserver is equal to the read limits
func isReadLimitReached(bytesLoaded int64, linesLoaded int64, logFilePosition string) bool {
return (logFilePosition == log.Beginning && bytesLoaded >= log.ByteReadLimit) ||
(logFilePosition == log.End && linesLoaded >= log.LineReadLimit)
// Construct a request for getting the logs for a pod and retrieves the logs.
func openStream(ctx context.Context, client kubernetes.Interface, namespace, podID string, logOptions *v1.PodLogOptions) (io.ReadCloser, error) {
return client.CoreV1().Pods(namespace).GetLogs(podID, logOptions).Stream(ctx)
}
3 changes: 3 additions & 0 deletions pkg/utils/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ const (
HttpHeaderContentType = "Content-type"
HttpHeaderContentDisposition = "Content-Disposition"
HttpHeaderContentTypeOctet = "application/octet-stream"
HttpHeaderTransferEncoding = "Transfer-Encoding"
HttpHeaderChunked = "chunked"
HttpHeaderTextHtml = "text/html"

ImpersonateUserKey = "Impersonate-User"
ImpersonateGroupKey = "Impersonate-Group"
Expand Down

0 comments on commit c04d5a9

Please sign in to comment.