Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions infrabox/generator/infrabox.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,5 @@
"name": "deploy",
"infrabox_file": "../../.infrabox/output/deployments.json",
"depends_on": ["test"]
}, {
"type": "workflow",
"name": "e2e",
"infrabox_file": "../../.infrabox/output/e2e.json",
"depends_on": ["deploy"]
}]
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ spec:
value: {{ .Values.allow_ips | quote }}
- name: GC_ENABLED
value: {{ .Values.gc_enabled | quote }}
- name: INFRABOX_PARALLEL_LOG_PULL
value: {{ .Values.infrabox_parallel_log_pull | quote }}
{{ if .Values.gc_enabled }}
- name: GC_CLUSTER_MAX_AGE
value: {{ .Values.gc_cluster_max_age | quote }}
Expand Down
2 changes: 2 additions & 0 deletions src/services/gcp/infrabox-service-gcp/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@ gc_interval: 3600
log_level: info

allow_ips:

infrabox_parallel_log_pull: 2
49 changes: 39 additions & 10 deletions src/services/gcp/pkg/stub/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package stub

import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
b64 "encoding/base64"
Expand All @@ -15,8 +16,11 @@ import (
"path"
"strconv"
"strings"
"sync"
"time"

"golang.org/x/sync/semaphore"

uuid "github.com/satori/go.uuid"

"github.com/sap/infrabox/src/services/gcp/pkg/apis/gcp/v1alpha1"
Expand Down Expand Up @@ -1111,6 +1115,14 @@ func retrieveLogs(cr *v1alpha1.GKECluster, cluster *RemoteCluster, log *logrus.E
log.Infof("Collecting data from GKE cluster %s", cluster.Name)
defer close(done)

parallelLogPulls := 1
if n, err := strconv.Atoi(os.Getenv("INFRABOX_PARALLEL_LOG_PULL")); err == nil {
if n > 0 && n < 10 {
parallelLogPulls = n
}
log.Infof("Setting parallel log pulls for log collection: %d", parallelLogPulls)
}

annotations := cr.GetAnnotations()
_, ok := annotations["infrabox.net/root-url"]
if !ok {
Expand Down Expand Up @@ -1144,23 +1156,40 @@ func retrieveLogs(cr *v1alpha1.GKECluster, cluster *RemoteCluster, log *logrus.E
return
}

// Do log collection in parallel, up to parallelLogPulls concurrent goroutines.
wg := sync.WaitGroup{}
sem := semaphore.NewWeighted(int64(parallelLogPulls))
for _, pod := range pods {
pod := pod // necessary before Go1.22 I think that changed this behavior.
for _, container := range pod.Containers {
log.Debug("Collecting logs for pod: ", pod.PodID)
data, err := doCollectorRequest(cluster, log, "/api/pods/"+pod.PodID+"/log/"+container)
container := container
err := sem.Acquire(context.Background(), 1)
if err != nil {
log.Warningf("Failed to get collected pod logs: %v", err)
continue
log.Errorf("Failed to get collected pod list, cannot acquire semaphore: %v", err)
return
}

filename := "pod_" + pod.Namespace + "_" + pod.Pod + "_" + container + ".txt"
filename = path.Join(logPath, filename)
if err := ioutil.WriteFile(filename, *data, os.ModePerm); err != nil {
log.Debugf("Failed to write pod logs: %v", err)
continue
}
wg.Add(1)
go func() {
defer sem.Release(1)
defer wg.Done()

log.Debug("Collecting logs for pod: ", pod.PodID)
data, err := doCollectorRequest(cluster, log, "/api/pods/"+pod.PodID+"/log/"+container)
if err != nil {
log.Warningf("Failed to get collected pod logs: %v", err)
return
}
filename := "pod_" + pod.Namespace + "_" + pod.Pod + "_" + container + ".txt"
filename = path.Join(logPath, filename)
if err := ioutil.WriteFile(filename, *data, os.ModePerm); err != nil {
log.Debugf("Failed to write pod logs: %v", err)
return
}
}()
}
}
wg.Wait()

archivePath := path.Join(logPath, "pods_log.zip")
err = archiver.Archive([]string{logPath}, archivePath)
Expand Down