Skip to content
This repository has been archived by the owner on May 26, 2023. It is now read-only.

transport cmd foundation #43

Merged
merged 24 commits into from
Dec 1, 2021
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions hack/install-requirements.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,9 @@ $ export PATH=/usr/local/opt/gnu-tar/libexec/gnubin:\$PATH
$ export PATH=/usr/local/opt/grep/libexec/gnubin:\$PATH
EOM
fi


echo "> Compile processor binaries for testing"

go build -o "${PROJECT_ROOT}/tmp/test/bin/example-processor" "${PROJECT_ROOT}/pkg/transport/process/processors/example"
go build -o "${PROJECT_ROOT}/tmp/test/bin/sleep-processor" "${PROJECT_ROOT}/pkg/transport/process/processors/sleep"
174 changes: 174 additions & 0 deletions pkg/transport/process/extensions/extensions_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
// SPDX-FileCopyrightText: 2021 SAP SE or an SAP affiliate company and Gardener contributors.
//
// SPDX-License-Identifier: Apache-2.0
package extensions_test

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
"strings"
"testing"
"time"

cdv2 "github.com/gardener/component-spec/bindings-go/apis/v2"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

"github.com/gardener/component-cli/pkg/transport/process"
"github.com/gardener/component-cli/pkg/transport/process/extensions"
"github.com/gardener/component-cli/pkg/transport/process/utils"
)

const (
exampleProcessorBinaryPath = "../../../../tmp/test/bin/example-processor"
sleepProcessorBinaryPath = "../../../../tmp/test/bin/sleep-processor"
sleepTimeEnv = "SLEEP_TIME"
sleepTime = 5 * time.Second
)

func TestConfig(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "transport extensions Test Suite")
}

var _ = BeforeSuite(func() {
_, err := os.Stat(exampleProcessorBinaryPath)
Expect(err).ToNot(HaveOccurred(), exampleProcessorBinaryPath+" doesn't exists. pls run make install-requirements.")

_, err = os.Stat(sleepProcessorBinaryPath)
Expect(err).ToNot(HaveOccurred(), sleepProcessorBinaryPath+" doesn't exists. pls run make install-requirements.")
}, 5)

var _ = Describe("transport extensions", func() {
jschicktanz marked this conversation as resolved.
Show resolved Hide resolved

Context("stdio executable", func() {
It("should create processor successfully if env is nil", func() {
args := []string{}
_, err := extensions.NewStdIOExecutable(exampleProcessorBinaryPath, args, nil)
Expect(err).ToNot(HaveOccurred())
})

It("should modify the processed resource correctly", func() {
args := []string{}
env := map[string]string{}
processor, err := extensions.NewStdIOExecutable(exampleProcessorBinaryPath, args, env)
Expect(err).ToNot(HaveOccurred())

runExampleResourceTest(processor)
})

It("should exit with error when timeout is reached", func() {
args := []string{}
env := map[string]string{
sleepTimeEnv: sleepTime.String(),
}
processor, err := extensions.NewStdIOExecutable(sleepProcessorBinaryPath, args, env)
Expect(err).ToNot(HaveOccurred())

runTimeoutTest(processor)
})
})

Context("uds executable", func() {
jschicktanz marked this conversation as resolved.
Show resolved Hide resolved
It("should create processor successfully if env is nil", func() {
args := []string{}
_, err := extensions.NewUDSExecutable(exampleProcessorBinaryPath, args, nil)
Expect(err).ToNot(HaveOccurred())
})

It("should modify the processed resource correctly", func() {
args := []string{}
env := map[string]string{}
processor, err := extensions.NewUDSExecutable(exampleProcessorBinaryPath, args, env)
Expect(err).ToNot(HaveOccurred())

runExampleResourceTest(processor)
})

It("should raise an error when trying to set the server address env variable manually", func() {
args := []string{}
env := map[string]string{
extensions.ServerAddressEnv: "/tmp/my-processor.sock",
}
_, err := extensions.NewUDSExecutable(exampleProcessorBinaryPath, args, env)
Expect(err).To(MatchError(fmt.Sprintf("the env variable %s is not allowed to be set manually", extensions.ServerAddressEnv)))
})

It("should exit with error when timeout is reached", func() {
args := []string{}
env := map[string]string{
sleepTimeEnv: sleepTime.String(),
}
processor, err := extensions.NewUDSExecutable(sleepProcessorBinaryPath, args, env)
Expect(err).ToNot(HaveOccurred())

runTimeoutTest(processor)
})
})

})

func runTimeoutTest(processor process.ResourceStreamProcessor) {
const timeout = 2 * time.Second

ctx, cancelfunc := context.WithTimeout(context.TODO(), timeout)
defer cancelfunc()

err := processor.Process(ctx, bytes.NewBuffer([]byte{}), bytes.NewBuffer([]byte{}))
Expect(err).To(MatchError("unable to wait for processor: signal: killed"))
}

func runExampleResourceTest(processor process.ResourceStreamProcessor) {
const (
processorName = "example-processor"
resourceData = "12345"
expectedResourceData = resourceData + "\n" + processorName
)

res := cdv2.Resource{
IdentityObjectMeta: cdv2.IdentityObjectMeta{
Name: "my-res",
Version: "v0.1.0",
Type: "ociImage",
},
}

l := cdv2.Label{
Name: "processor-name",
Value: json.RawMessage(`"` + processorName + `"`),
}
expectedRes := res
expectedRes.Labels = append(expectedRes.Labels, l)

cd := cdv2.ComponentDescriptor{
ComponentSpec: cdv2.ComponentSpec{
Resources: []cdv2.Resource{
res,
},
},
}

inputBuf := bytes.NewBuffer([]byte{})
err := utils.WriteProcessorMessage(cd, res, strings.NewReader(resourceData), inputBuf)
Expect(err).ToNot(HaveOccurred())

outputBuf := bytes.NewBuffer([]byte{})
err = processor.Process(context.TODO(), inputBuf, outputBuf)
Expect(err).ToNot(HaveOccurred())

processedCD, processedRes, processedBlobReader, err := utils.ReadProcessorMessage(outputBuf)
Expect(err).ToNot(HaveOccurred())

Expect(*processedCD).To(Equal(cd))
Expect(processedRes).To(Equal(expectedRes))

processedResourceDataBuf := bytes.NewBuffer([]byte{})
_, err = io.Copy(processedResourceDataBuf, processedBlobReader)
Expect(err).ToNot(HaveOccurred())

Expect(processedResourceDataBuf.String()).To(Equal(expectedResourceData))
}
73 changes: 73 additions & 0 deletions pkg/transport/process/extensions/stdio_executable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// SPDX-FileCopyrightText: 2021 SAP SE or an SAP affiliate company and Gardener contributors.
//
// SPDX-License-Identifier: Apache-2.0
package extensions

import (
"context"
"fmt"
"io"
"os"
"os/exec"

"github.com/gardener/component-cli/pkg/transport/process"
)

type stdIOExecutable struct {
bin string
args []string
env []string
}

// NewStdIOExecutable returns a resource processor extension which runs an executable.
// in the background. It communicates with this processor via stdin/stdout pipes.
func NewStdIOExecutable(bin string, args []string, env map[string]string) (process.ResourceStreamProcessor, error) {
parsedEnv := []string{}
for k, v := range env {
parsedEnv = append(parsedEnv, fmt.Sprintf("%s=%s", k, v))
}

e := stdIOExecutable{
bin: bin,
args: args,
env: parsedEnv,
}

return &e, nil
}

func (e *stdIOExecutable) Process(ctx context.Context, r io.Reader, w io.Writer) error {
cmd := exec.CommandContext(ctx, e.bin, e.args...)
cmd.Env = e.env
stdin, err := cmd.StdinPipe()
if err != nil {
return fmt.Errorf("unable to get stdin pipe: %w", err)
}
stdout, err := cmd.StdoutPipe()
if err != nil {
return fmt.Errorf("unable to get stdout pipe: %w", err)
}
cmd.Stderr = os.Stderr

if err := cmd.Start(); err != nil {
return fmt.Errorf("unable to start processor: %w", err)
}

if _, err := io.Copy(stdin, r); err != nil {
return fmt.Errorf("unable to write input: %w", err)
}

if err := stdin.Close(); err != nil {
return fmt.Errorf("unable to close input writer: %w", err)
}

if _, err := io.Copy(w, stdout); err != nil {
return fmt.Errorf("unable to read output: %w", err)
}

if err := cmd.Wait(); err != nil {
return fmt.Errorf("unable to wait for processor: %w", err)
}

return nil
}
118 changes: 118 additions & 0 deletions pkg/transport/process/extensions/uds_executable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// SPDX-FileCopyrightText: 2021 SAP SE or an SAP affiliate company and Gardener contributors.
//
// SPDX-License-Identifier: Apache-2.0
package extensions

import (
"context"
"fmt"
"io"
"net"
"os"
"os/exec"
"syscall"
"time"

"github.com/gardener/component-cli/pkg/transport/process"
"github.com/gardener/component-cli/pkg/utils"
)

// ServerAddressEnv is the environment variable key which is used to store the
// address under which a resource processor server should start.
const ServerAddressEnv = "SERVER_ADDRESS"
enrico-kaack-comp marked this conversation as resolved.
Show resolved Hide resolved

type udsExecutable struct {
bin string
args []string
env []string
addr string
}

// NewUDSExecutable runs a resource processor extension executable in the background.
jschicktanz marked this conversation as resolved.
Show resolved Hide resolved
// It communicates with this processor via Unix Domain Sockets.
func NewUDSExecutable(bin string, args []string, env map[string]string) (process.ResourceStreamProcessor, error) {
if _, ok := env[ServerAddressEnv]; ok {
return nil, fmt.Errorf("the env variable %s is not allowed to be set manually", ServerAddressEnv)
}

parsedEnv := []string{}
for k, v := range env {
parsedEnv = append(parsedEnv, fmt.Sprintf("%s=%s", k, v))
}

wd, err := os.Getwd()
if err != nil {
return nil, err
}
addr := fmt.Sprintf("%s/%s.sock", wd, utils.RandomString(8))
parsedEnv = append(parsedEnv, fmt.Sprintf("%s=%s", ServerAddressEnv, addr))

e := udsExecutable{
bin: bin,
args: args,
env: parsedEnv,
addr: addr,
}

return &e, nil
}

func (e *udsExecutable) Process(ctx context.Context, r io.Reader, w io.Writer) error {
cmd := exec.CommandContext(ctx, e.bin, e.args...)
cmd.Env = e.env
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr

if err := cmd.Start(); err != nil {
return fmt.Errorf("unable to start processor: %w", err)
}

conn, err := tryConnect(e.addr)
if err != nil {
return fmt.Errorf("unable to connect to processor: %w", err)
}

if _, err := io.Copy(conn, r); err != nil {
return fmt.Errorf("unable to write input: %w", err)
}

usock := conn.(*net.UnixConn)
if err := usock.CloseWrite(); err != nil {
return fmt.Errorf("unable to close input writer: %w", err)
}

if _, err := io.Copy(w, conn); err != nil {
return fmt.Errorf("unable to read output: %w", err)
}

if err := cmd.Process.Signal(syscall.SIGTERM); err != nil {
return fmt.Errorf("unable to send SIGTERM to processor: %w", err)
}

// extension servers must implement ordinary shutdown (!)
if err := cmd.Wait(); err != nil {
return fmt.Errorf("unable to wait for processor: %w", err)
}

return nil
}

func tryConnect(addr string) (net.Conn, error) {
const (
maxRetries = 5
sleeptime = 500 * time.Millisecond
)

var conn net.Conn
var err error
for i := 0; i <= maxRetries; i++ {
conn, err = net.Dial("unix", addr)
if err == nil {
break
}

time.Sleep(sleeptime)
}

return conn, err
}
Loading