Skip to content

Commit

Permalink
Fix read log
Browse files Browse the repository at this point in the history
Signed-off-by: Wenqi Qiu <[email protected]>
  • Loading branch information
wenqiq committed Feb 26, 2024
1 parent 69b3a4b commit c42df9d
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 29 deletions.
2 changes: 1 addition & 1 deletion test/performance/framework/service/scale_up.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func ScaleUp(ctx context.Context, provider providers.ProviderInterface, controlP
svcs = append(svcs, ServiceInfo{Name: newSvc.Name, IP: newSvc.Spec.ClusterIP, NameSpace: newSvc.Namespace})

// ip := newSvc.Spec.ClusterIP

klog.InfoS("go FetchTimestampFromLog", "actualCheckNum", actualCheckNum, "maxCheckNum", maxCheckNum, "cap(ch)", cap(ch), "fromPod", fromPod.Name)

Check failure on line 184 in test/performance/framework/service/scale_up.go

View workflow job for this annotation

GitHub Actions / Golangci-lint (ubuntu-latest)

SA5011: possible nil pointer dereference (staticcheck)

Check failure on line 184 in test/performance/framework/service/scale_up.go

View workflow job for this annotation

GitHub Actions / Golangci-lint (macos-latest)

SA5011: possible nil pointer dereference (staticcheck)
if fromPod != nil && actualCheckNum < maxCheckNum && actualCheckNum < cap(ch) {

Check failure on line 185 in test/performance/framework/service/scale_up.go

View workflow job for this annotation

GitHub Actions / Golangci-lint (ubuntu-latest)

SA5011(related information): this check suggests that the pointer can be nil (staticcheck)

Check failure on line 185 in test/performance/framework/service/scale_up.go

View workflow job for this annotation

GitHub Actions / Golangci-lint (macos-latest)

SA5011(related information): this check suggests that the pointer can be nil (staticcheck)
// k := int(utils.GenRandInt()) % len(clientPods)
// clientPod := clientPods[k]
Expand Down
77 changes: 49 additions & 28 deletions test/performance/utils/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"k8s.io/klog/v2"
"net/url"
"regexp"
Expand Down Expand Up @@ -117,39 +118,59 @@ func extractSeconds(logEntry string) (int, error) {
}

func FetchTimestampFromLog(ctx context.Context, kc kubernetes.Interface, namespace, podName, containerName string, ch chan time.Duration) error {
podLogOptions := &corev1.PodLogOptions{
Container: containerName,
}

return wait.PollImmediateUntil(2*time.Second, func() (done bool, err error) {
podLogs, err := kc.CoreV1().Pods(namespace).GetLogs(podName, podLogOptions).Stream(ctx)
return wait.Poll(defaultInterval, defaultTimeout, func() (done bool, err error) {
req := kc.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{
Container: containerName,
})
podLogs, err := req.Stream(ctx)
if err != nil {
return false, fmt.Errorf("error reading pod logs: %+v", err.Error())
klog.ErrorS(err, "error when opening stream to retrieve logs for Pod", "namespace", namespace, "podName", podName)
return false, nil
}
defer podLogs.Close()
buf := make([]byte, 1024)
for {
n, err := podLogs.Read(buf)
if n > 0 {
logEntry := string(buf[:n])
if strings.Contains(logEntry, "Status changed from") {
seconds, err := extractSeconds(logEntry)
if err != nil {
return false, err
}
select {
case ch <- time.Duration(seconds):
klog.InfoS("Successfully write in channel")
default:
klog.InfoS("Skipped writing to the channel. No receiver.")
}
return true, nil
}
}

var b bytes.Buffer
if _, err := io.Copy(&b, podLogs); err != nil {
return false, fmt.Errorf("error when copying logs for Pod '%s/%s': %w", namespace, podName, err)
}
klog.InfoS("GetLogs from probe container", "logs", b.String())
if strings.Contains(b.String(), "Status changed from") {
seconds, err := extractSeconds(b.String())
if err != nil {
break
return false, err
}
select {
case ch <- time.Duration(seconds):
klog.InfoS("Successfully write in channel")
default:
klog.InfoS("Skipped writing to the channel. No receiver.")
}
return true, nil
}

// podLogs, err := kc.CoreV1().Pods(namespace).GetLogs(podName, podLogOptions).Stream(ctx)
// if err != nil {
// return false, fmt.Errorf("error reading pod logs: %+v", err.Error())
// }
// defer podLogs.Close()
// buf := make([]byte, 1024)
// n, err := podLogs.Read(buf)
// if n > 0 {
// logEntry := string(buf[:n])
// if strings.Contains(logEntry, "Status changed from") {
// seconds, err := extractSeconds(logEntry)
// if err != nil {
// return false, err
// }
// select {
// case ch <- time.Duration(seconds):
// klog.InfoS("Successfully write in channel")
// default:
// klog.InfoS("Skipped writing to the channel. No receiver.")
// }
// return true, nil
// }
// }
return false, nil
}, wait.NeverStop)
})
}

0 comments on commit c42df9d

Please sign in to comment.