diff --git a/infrabox/generator/infrabox.json b/infrabox/generator/infrabox.json index 98599d2e..7c3c14d4 100644 --- a/infrabox/generator/infrabox.json +++ b/infrabox/generator/infrabox.json @@ -14,10 +14,5 @@ "name": "deploy", "infrabox_file": "../../.infrabox/output/deployments.json", "depends_on": ["test"] - }, { - "type": "workflow", - "name": "e2e", - "infrabox_file": "../../.infrabox/output/e2e.json", - "depends_on": ["deploy"] }] } diff --git a/src/services/gcp/infrabox-service-gcp/templates/deployment.yaml b/src/services/gcp/infrabox-service-gcp/templates/deployment.yaml index e32e6e2b..4494b777 100644 --- a/src/services/gcp/infrabox-service-gcp/templates/deployment.yaml +++ b/src/services/gcp/infrabox-service-gcp/templates/deployment.yaml @@ -52,6 +52,8 @@ spec: value: {{ .Values.allow_ips | quote }} - name: GC_ENABLED value: {{ .Values.gc_enabled | quote }} + - name: INFRABOX_PARALLEL_LOG_PULL + value: {{ .Values.infrabox_parallel_log_pull | quote }} {{ if .Values.gc_enabled }} - name: GC_CLUSTER_MAX_AGE value: {{ .Values.gc_cluster_max_age | quote }} diff --git a/src/services/gcp/infrabox-service-gcp/values.yaml b/src/services/gcp/infrabox-service-gcp/values.yaml index ed5bd5a2..34d87df5 100644 --- a/src/services/gcp/infrabox-service-gcp/values.yaml +++ b/src/services/gcp/infrabox-service-gcp/values.yaml @@ -23,3 +23,5 @@ gc_interval: 3600 log_level: info allow_ips: + +infrabox_parallel_log_pull: 2 diff --git a/src/services/gcp/pkg/stub/handler.go b/src/services/gcp/pkg/stub/handler.go index 8d48264b..b59eee93 100644 --- a/src/services/gcp/pkg/stub/handler.go +++ b/src/services/gcp/pkg/stub/handler.go @@ -2,6 +2,7 @@ package stub import ( "bytes" + "context" "crypto/tls" "crypto/x509" b64 "encoding/base64" @@ -15,8 +16,11 @@ import ( "path" "strconv" "strings" + "sync" "time" + "golang.org/x/sync/semaphore" + uuid "github.com/satori/go.uuid" "github.com/sap/infrabox/src/services/gcp/pkg/apis/gcp/v1alpha1" @@ -1111,6 +1115,14 @@ func retrieveLogs(cr *v1alpha1.GKECluster, cluster *RemoteCluster, log *logrus.E log.Infof("Collecting data from GKE cluster %s", cluster.Name) defer close(done) + parallelLogPulls := 1 + if n, err := strconv.Atoi(os.Getenv("INFRABOX_PARALLEL_LOG_PULL")); err == nil { + if n > 0 && n < 10 { + parallelLogPulls = n + } + log.Infof("Setting parallel log pulls for log collection: %d", parallelLogPulls) + } + annotations := cr.GetAnnotations() _, ok := annotations["infrabox.net/root-url"] if !ok { @@ -1144,23 +1156,40 @@ func retrieveLogs(cr *v1alpha1.GKECluster, cluster *RemoteCluster, log *logrus.E return } + // Do log collection in parallel, up to parallelLogPulls concurrent goroutines. + wg := sync.WaitGroup{} + sem := semaphore.NewWeighted(int64(parallelLogPulls)) for _, pod := range pods { + pod := pod // necessary before Go1.22 I think that changed this behavior. for _, container := range pod.Containers { - log.Debug("Collecting logs for pod: ", pod.PodID) - data, err := doCollectorRequest(cluster, log, "/api/pods/"+pod.PodID+"/log/"+container) + container := container + err := sem.Acquire(context.Background(), 1) if err != nil { - log.Warningf("Failed to get collected pod logs: %v", err) - continue + log.Errorf("Failed to get collected pod list, cannot acquire semaphore: %v", err) + return } - filename := "pod_" + pod.Namespace + "_" + pod.Pod + "_" + container + ".txt" - filename = path.Join(logPath, filename) - if err := ioutil.WriteFile(filename, *data, os.ModePerm); err != nil { - log.Debugf("Failed to write pod logs: %v", err) - continue - } + wg.Add(1) + go func() { + defer sem.Release(1) + defer wg.Done() + + log.Debug("Collecting logs for pod: ", pod.PodID) + data, err := doCollectorRequest(cluster, log, "/api/pods/"+pod.PodID+"/log/"+container) + if err != nil { + log.Warningf("Failed to get collected pod logs: %v", err) + return + } + filename := "pod_" + pod.Namespace + "_" + pod.Pod + "_" + container + ".txt" + filename = path.Join(logPath, filename) + if err := ioutil.WriteFile(filename, *data, os.ModePerm); err != nil { + log.Debugf("Failed to write pod logs: %v", err) + return + } + }() } } + wg.Wait() archivePath := path.Join(logPath, "pods_log.zip") err = archiver.Archive([]string{logPath}, archivePath)