Skip to content

Commit

Permalink
External binary caching and code clean-up
Browse files Browse the repository at this point in the history
  • Loading branch information
stbenjam committed Nov 18, 2024
1 parent 2ae2adf commit 6a12774
Show file tree
Hide file tree
Showing 8 changed files with 891 additions and 515 deletions.
55 changes: 55 additions & 0 deletions pkg/test/externalbinary/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# External Binaries

This package includes the code used for working with external test binaries.
It's intended to house the implementation of the openshift-tests side of the
[openshift-tests extension interface](https://github.com/openshift/enhancements/pull/1676), which is only
partially implemented here for the moment.

There is a registry defined in binary.go, that lists the release image tag, and
path to each external test binary. These binaries should implement the OTE
interface defined in the enhancement, and implemented by the vendorable
[openshift-tests-extension](https://github.com/openshift-eng/openshift-tests-extension).

## Requirements

If the architecture of your local system where `openshift-tests` will run
differs from the cluster under test, you should override the release payload
with a payload of the architecture of your own system, as it is where the
binaries will execute. Note, your OS must still be Linux. That means on Apple
Silicon, you'll still need to run this in a Linux environment, such as a
virtual machine, or x86 podman container.

## Overrides

A number of environment variables for overriding the behavior of external
binaries are available, but in general this should "just work". A complex set
of logic for determining the optimal release payload, and which pull
credentials to use are found in this code, and extensively documented in code
comments. The following environment variables are available to force certain
behaviors:

### Caching

By default, binaries will be cached in `$XDG_CACHE_HOME/openshift-tests`
(typically: `$HOME/.cache/openshift-tests`). Upon invocation, older binaries
than 7 days will be cleaned up. To disable this feature:

```bash
export OPENSHIFT_TESTS_DISABLE_CACHE=1
```

### Registry Auth Credentials

To change the pull secrets used for extracting the external binaries, set:

```bash
export REGISTRY_AUTH_FILE=$HOME/pull.json
```

### Release Payload

To change the payload used for extracting the external binaries, set:

```bash
export EXTENSIONS_PAYLOAD_OVERRIDE=registry.ci.openshift.org/ocp-arm64/release-arm64:4.18.0-0.nightly-arm64-2024-11-15-135718
```
256 changes: 256 additions & 0 deletions pkg/test/externalbinary/binary.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
package externalbinary

import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/openshift/origin/test/extended/util"
"github.com/pkg/errors"
"io"
"log"
"os/exec"
"path/filepath"
"strings"
"sync"
"syscall"
"time"
)

type externalBinaryStruct struct {
// The payload image tag in which an external binary path can be found
imageTag string
// The binary path to extract from the image
binaryPath string
}

var externalBinaries = []externalBinaryStruct{
{
imageTag: "hyperkube",
binaryPath: "/usr/bin/k8s-tests",
},
}

// TestBinary is an abstraction around extracted test binaries that provides an interface for listing the available
// tests. In the future, it will implement the entire openshift-tests-extension interface.
type TestBinary struct {
path string
logger *log.Logger
}

// ListTests returns which tests this binary advertises. Eventually, it should take an environment struct
// to provide to the binary so it can determine for itself which tests are relevant.
func (b *TestBinary) ListTests(ctx context.Context) (ExtensionTestSpecs, error) {
var tests ExtensionTestSpecs
start := time.Now()
binName := filepath.Base(b.path)

b.logger.Printf("Listing tests for %q", binName)
command := exec.Command(b.path, "list")
testList, err := runWithTimeout(ctx, command, 10*time.Minute)
if err != nil {
return nil, fmt.Errorf("failed running '%s list': %w", b.path, err)
}
buf := bytes.NewBuffer(testList)
for {
line, err := buf.ReadString('\n')
if err == io.EOF {
break
}
if !strings.HasPrefix(line, "[{") {
continue
}

var extensionTestSpecs ExtensionTestSpecs
err = json.Unmarshal([]byte(line), &extensionTestSpecs)
if err != nil {
return nil, err
}
for i := range extensionTestSpecs {
extensionTestSpecs[i].Binary = b.path
}
tests = append(tests, extensionTestSpecs...)
}
b.logger.Printf("Listed %d tests for %q in %v", len(tests), binName, time.Since(start))
return tests, nil
}

// ExtractAllTestBinaries determines the optimal release payload to use, and extracts all the external
// test binaries from it, and returns a slice of them.
func ExtractAllTestBinaries(ctx context.Context, logger *log.Logger, parallelism int) (func(), TestBinaries, error) {
if parallelism < 1 {
return nil, nil, errors.New("parallelism must be greater than zero")
}

releaseImage, err := determineReleasePayloadImage(logger)
if err != nil {
return nil, nil, errors.WithMessage(err, "couldn't determine release image")
}

oc := util.NewCLIWithoutNamespace("default")
registryAuthfilePath, err := getRegistryAuthFilePath(logger, oc)
if err != nil {
return nil, nil, errors.WithMessage(err, "couldn't get registry auth file path")
}

externalBinaryProvider, err := NewExternalBinaryProvider(logger, releaseImage, registryAuthfilePath)
if err != nil {
return nil, nil, errors.WithMessage(err, "could not create external binary provider")
}

var (
binaries []*TestBinary
mu sync.Mutex
wg sync.WaitGroup
errCh = make(chan error, len(externalBinaries))
jobCh = make(chan externalBinaryStruct)
)

// Producer: sends jobs to the jobCh channel
go func() {
defer close(jobCh)
for _, b := range externalBinaries {
select {
case <-ctx.Done():
return // Exit if context is cancelled
case jobCh <- b:
}
}
}()

// Consumer workers: extract test binaries concurrently
for i := 0; i < parallelism; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return // Context is cancelled
case b, ok := <-jobCh:
if !ok {
return // Channel is closed
}
testBinary, err := externalBinaryProvider.ExtractBinaryFromReleaseImage(b.imageTag, b.binaryPath)
if err != nil {
errCh <- err
continue
}
mu.Lock()
binaries = append(binaries, testBinary)
mu.Unlock()
}
}

}()
}

// Wait for all workers to finish
wg.Wait()
close(errCh)

// Check if any errors were reported
var errs []string
for err := range errCh {
errs = append(errs, err.Error())
}
if len(errs) > 0 {
externalBinaryProvider.Cleanup()
return nil, nil, fmt.Errorf("encountered errors while extracting binaries: %s", strings.Join(errs, ";"))
}

return externalBinaryProvider.Cleanup, binaries, nil
}

type TestBinaries []*TestBinary

// ListTests extracts the tests from all TestBinaries using the specified parallelism.
func (binaries TestBinaries) ListTests(ctx context.Context, parallelism int) (ExtensionTestSpecs, error) {
var (
allTests ExtensionTestSpecs
mu sync.Mutex
wg sync.WaitGroup
errCh = make(chan error, len(binaries))
jobCh = make(chan *TestBinary)
)

// Producer: sends jobs to the jobCh channel
go func() {
defer close(jobCh)
for _, binary := range binaries {
select {
case <-ctx.Done():
return // Exit when context is cancelled
case jobCh <- binary:
}
}
}()

// Consumer workers: extract tests concurrently
for i := 0; i < parallelism; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return // Exit when context is cancelled
case binary, ok := <-jobCh:
if !ok {
return // Channel was closed
}
tests, err := binary.ListTests(ctx)
if err != nil {
errCh <- err
}
mu.Lock()
allTests = append(allTests, tests...)
mu.Unlock()
}
}
}()
}

// Wait for all workers to finish
wg.Wait()
close(errCh)

// Check if any errors were reported
var errs []string
for err := range errCh {
errs = append(errs, err.Error())
}
if len(errs) > 0 {
return nil, fmt.Errorf("encountered errors while listing tests: %s", strings.Join(errs, ";"))
}

return allTests, nil
}

func runWithTimeout(ctx context.Context, c *exec.Cmd, timeout time.Duration) ([]byte, error) {
if timeout > 0 {
go func() {
select {
// interrupt tests after timeout, and abort if they don't complete quick enough
case <-time.After(timeout):
if c.Process != nil {
c.Process.Signal(syscall.SIGINT)
}
// if the process appears to be hung a significant amount of time after the timeout
// send an ABRT so we get a stack dump
select {
case <-time.After(time.Minute):
if c.Process != nil {
c.Process.Signal(syscall.SIGABRT)
}
}
case <-ctx.Done():
if c.Process != nil {
c.Process.Signal(syscall.SIGINT)
}
}

}()
}
return c.CombinedOutput()
}
Loading

0 comments on commit 6a12774

Please sign in to comment.