This repository has been archived by the owner on May 26, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 17
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* init * refactoring * review feedback * run formatter * renames extension pkg and adds tests * adds license headers * refactoring + adds test * fix tests and adds check for test processor binary * formatting * improves before suite check * refactoring + adds processor timeouts * refactoring + updates doc * refactoring, adds pipeline tests, adds labelling processor * refactoring and godoc * moves function to utils package and adds tests * renames labeling processor and adds test * refactoring * refactoring * use map[string]string for passing env variables * refactoring + changes doc * renames uds to unix domain socket * review feedback * remove unix domain socket file after processor finished * remove unnecessary copy step from example processor
- Loading branch information
1 parent
b3567a1
commit bae35c6
Showing
20 changed files
with
1,305 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
174 changes: 174 additions & 0 deletions
174
pkg/transport/process/extensions/extensions_suite_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,174 @@ | ||
// SPDX-FileCopyrightText: 2021 SAP SE or an SAP affiliate company and Gardener contributors. | ||
// | ||
// SPDX-License-Identifier: Apache-2.0 | ||
package extensions_test | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"encoding/json" | ||
"fmt" | ||
"io" | ||
"os" | ||
"strings" | ||
"testing" | ||
"time" | ||
|
||
cdv2 "github.com/gardener/component-spec/bindings-go/apis/v2" | ||
. "github.com/onsi/ginkgo" | ||
. "github.com/onsi/gomega" | ||
|
||
"github.com/gardener/component-cli/pkg/transport/process" | ||
"github.com/gardener/component-cli/pkg/transport/process/extensions" | ||
"github.com/gardener/component-cli/pkg/transport/process/utils" | ||
) | ||
|
||
const ( | ||
exampleProcessorBinaryPath = "../../../../tmp/test/bin/example-processor" | ||
sleepProcessorBinaryPath = "../../../../tmp/test/bin/sleep-processor" | ||
sleepTimeEnv = "SLEEP_TIME" | ||
sleepTime = 5 * time.Second | ||
) | ||
|
||
func TestConfig(t *testing.T) { | ||
RegisterFailHandler(Fail) | ||
RunSpecs(t, "transport extensions Test Suite") | ||
} | ||
|
||
var _ = BeforeSuite(func() { | ||
_, err := os.Stat(exampleProcessorBinaryPath) | ||
Expect(err).ToNot(HaveOccurred(), exampleProcessorBinaryPath+" doesn't exists. pls run make install-requirements.") | ||
|
||
_, err = os.Stat(sleepProcessorBinaryPath) | ||
Expect(err).ToNot(HaveOccurred(), sleepProcessorBinaryPath+" doesn't exists. pls run make install-requirements.") | ||
}, 5) | ||
|
||
var _ = Describe("transport extensions", func() { | ||
|
||
Context("stdio executable", func() { | ||
It("should create processor successfully if env is nil", func() { | ||
args := []string{} | ||
_, err := extensions.NewStdIOExecutable(exampleProcessorBinaryPath, args, nil) | ||
Expect(err).ToNot(HaveOccurred()) | ||
}) | ||
|
||
It("should modify the processed resource correctly", func() { | ||
args := []string{} | ||
env := map[string]string{} | ||
processor, err := extensions.NewStdIOExecutable(exampleProcessorBinaryPath, args, env) | ||
Expect(err).ToNot(HaveOccurred()) | ||
|
||
runExampleResourceTest(processor) | ||
}) | ||
|
||
It("should exit with error when timeout is reached", func() { | ||
args := []string{} | ||
env := map[string]string{ | ||
sleepTimeEnv: sleepTime.String(), | ||
} | ||
processor, err := extensions.NewStdIOExecutable(sleepProcessorBinaryPath, args, env) | ||
Expect(err).ToNot(HaveOccurred()) | ||
|
||
runTimeoutTest(processor) | ||
}) | ||
}) | ||
|
||
Context("unix domain socket executable", func() { | ||
It("should create processor successfully if env is nil", func() { | ||
args := []string{} | ||
_, err := extensions.NewUnixDomainSocketExecutable(exampleProcessorBinaryPath, args, nil) | ||
Expect(err).ToNot(HaveOccurred()) | ||
}) | ||
|
||
It("should modify the processed resource correctly", func() { | ||
args := []string{} | ||
env := map[string]string{} | ||
processor, err := extensions.NewUnixDomainSocketExecutable(exampleProcessorBinaryPath, args, env) | ||
Expect(err).ToNot(HaveOccurred()) | ||
|
||
runExampleResourceTest(processor) | ||
}) | ||
|
||
It("should raise an error when trying to set the server address env variable manually", func() { | ||
args := []string{} | ||
env := map[string]string{ | ||
extensions.ProcessorServerAddressEnv: "/tmp/my-processor.sock", | ||
} | ||
_, err := extensions.NewUnixDomainSocketExecutable(exampleProcessorBinaryPath, args, env) | ||
Expect(err).To(MatchError(fmt.Sprintf("the env variable %s is not allowed to be set manually", extensions.ProcessorServerAddressEnv))) | ||
}) | ||
|
||
It("should exit with error when timeout is reached", func() { | ||
args := []string{} | ||
env := map[string]string{ | ||
sleepTimeEnv: sleepTime.String(), | ||
} | ||
processor, err := extensions.NewUnixDomainSocketExecutable(sleepProcessorBinaryPath, args, env) | ||
Expect(err).ToNot(HaveOccurred()) | ||
|
||
runTimeoutTest(processor) | ||
}) | ||
}) | ||
|
||
}) | ||
|
||
func runTimeoutTest(processor process.ResourceStreamProcessor) { | ||
const timeout = 2 * time.Second | ||
|
||
ctx, cancelfunc := context.WithTimeout(context.TODO(), timeout) | ||
defer cancelfunc() | ||
|
||
err := processor.Process(ctx, bytes.NewBuffer([]byte{}), bytes.NewBuffer([]byte{})) | ||
Expect(err).To(MatchError("unable to wait for processor: signal: killed")) | ||
} | ||
|
||
func runExampleResourceTest(processor process.ResourceStreamProcessor) { | ||
const ( | ||
processorName = "example-processor" | ||
resourceData = "12345" | ||
expectedResourceData = resourceData + "\n" + processorName | ||
) | ||
|
||
res := cdv2.Resource{ | ||
IdentityObjectMeta: cdv2.IdentityObjectMeta{ | ||
Name: "my-res", | ||
Version: "v0.1.0", | ||
Type: "ociImage", | ||
}, | ||
} | ||
|
||
l := cdv2.Label{ | ||
Name: "processor-name", | ||
Value: json.RawMessage(`"` + processorName + `"`), | ||
} | ||
expectedRes := res | ||
expectedRes.Labels = append(expectedRes.Labels, l) | ||
|
||
cd := cdv2.ComponentDescriptor{ | ||
ComponentSpec: cdv2.ComponentSpec{ | ||
Resources: []cdv2.Resource{ | ||
res, | ||
}, | ||
}, | ||
} | ||
|
||
inputBuf := bytes.NewBuffer([]byte{}) | ||
err := utils.WriteProcessorMessage(cd, res, strings.NewReader(resourceData), inputBuf) | ||
Expect(err).ToNot(HaveOccurred()) | ||
|
||
outputBuf := bytes.NewBuffer([]byte{}) | ||
err = processor.Process(context.TODO(), inputBuf, outputBuf) | ||
Expect(err).ToNot(HaveOccurred()) | ||
|
||
processedCD, processedRes, processedBlobReader, err := utils.ReadProcessorMessage(outputBuf) | ||
Expect(err).ToNot(HaveOccurred()) | ||
|
||
Expect(*processedCD).To(Equal(cd)) | ||
Expect(processedRes).To(Equal(expectedRes)) | ||
|
||
processedResourceDataBuf := bytes.NewBuffer([]byte{}) | ||
_, err = io.Copy(processedResourceDataBuf, processedBlobReader) | ||
Expect(err).ToNot(HaveOccurred()) | ||
|
||
Expect(processedResourceDataBuf.String()).To(Equal(expectedResourceData)) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
// SPDX-FileCopyrightText: 2021 SAP SE or an SAP affiliate company and Gardener contributors. | ||
// | ||
// SPDX-License-Identifier: Apache-2.0 | ||
package extensions | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"io" | ||
"os" | ||
"os/exec" | ||
|
||
"github.com/gardener/component-cli/pkg/transport/process" | ||
) | ||
|
||
type stdIOExecutable struct { | ||
bin string | ||
args []string | ||
env []string | ||
} | ||
|
||
// NewStdIOExecutable returns a resource processor extension which runs an executable in the | ||
// background when calling Process(). It communicates with this processor via stdin/stdout pipes. | ||
func NewStdIOExecutable(bin string, args []string, env map[string]string) (process.ResourceStreamProcessor, error) { | ||
parsedEnv := []string{} | ||
for k, v := range env { | ||
parsedEnv = append(parsedEnv, fmt.Sprintf("%s=%s", k, v)) | ||
} | ||
|
||
e := stdIOExecutable{ | ||
bin: bin, | ||
args: args, | ||
env: parsedEnv, | ||
} | ||
|
||
return &e, nil | ||
} | ||
|
||
func (e *stdIOExecutable) Process(ctx context.Context, r io.Reader, w io.Writer) error { | ||
cmd := exec.CommandContext(ctx, e.bin, e.args...) | ||
cmd.Env = e.env | ||
stdin, err := cmd.StdinPipe() | ||
if err != nil { | ||
return fmt.Errorf("unable to get stdin pipe: %w", err) | ||
} | ||
stdout, err := cmd.StdoutPipe() | ||
if err != nil { | ||
return fmt.Errorf("unable to get stdout pipe: %w", err) | ||
} | ||
cmd.Stderr = os.Stderr | ||
|
||
if err := cmd.Start(); err != nil { | ||
return fmt.Errorf("unable to start processor: %w", err) | ||
} | ||
|
||
if _, err := io.Copy(stdin, r); err != nil { | ||
return fmt.Errorf("unable to write input: %w", err) | ||
} | ||
|
||
if err := stdin.Close(); err != nil { | ||
return fmt.Errorf("unable to close input writer: %w", err) | ||
} | ||
|
||
if _, err := io.Copy(w, stdout); err != nil { | ||
return fmt.Errorf("unable to read output: %w", err) | ||
} | ||
|
||
if err := cmd.Wait(); err != nil { | ||
return fmt.Errorf("unable to wait for processor: %w", err) | ||
} | ||
|
||
return nil | ||
} |
127 changes: 127 additions & 0 deletions
127
pkg/transport/process/extensions/unix_domain_socket_executable.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
// SPDX-FileCopyrightText: 2021 SAP SE or an SAP affiliate company and Gardener contributors. | ||
// | ||
// SPDX-License-Identifier: Apache-2.0 | ||
package extensions | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"io" | ||
"net" | ||
"os" | ||
"os/exec" | ||
"syscall" | ||
"time" | ||
|
||
"github.com/gardener/component-cli/pkg/transport/process" | ||
"github.com/gardener/component-cli/pkg/utils" | ||
) | ||
|
||
// ProcessorServerAddressEnv is the environment variable key which is used to store the | ||
// address under which a resource processor server should start. | ||
const ProcessorServerAddressEnv = "PROCESSOR_SERVER_ADDRESS" | ||
|
||
type unixDomainSocketExecutable struct { | ||
bin string | ||
args []string | ||
env []string | ||
addr string | ||
} | ||
|
||
// NewUnixDomainSocketExecutable returns a resource processor extension which runs an executable in the | ||
// background when calling Process(). It communicates with this processor via Unix Domain Sockets. | ||
func NewUnixDomainSocketExecutable(bin string, args []string, env map[string]string) (process.ResourceStreamProcessor, error) { | ||
if _, ok := env[ProcessorServerAddressEnv]; ok { | ||
return nil, fmt.Errorf("the env variable %s is not allowed to be set manually", ProcessorServerAddressEnv) | ||
} | ||
|
||
parsedEnv := []string{} | ||
for k, v := range env { | ||
parsedEnv = append(parsedEnv, fmt.Sprintf("%s=%s", k, v)) | ||
} | ||
|
||
wd, err := os.Getwd() | ||
if err != nil { | ||
return nil, err | ||
} | ||
addr := fmt.Sprintf("%s/%s.sock", wd, utils.RandomString(8)) | ||
parsedEnv = append(parsedEnv, fmt.Sprintf("%s=%s", ProcessorServerAddressEnv, addr)) | ||
|
||
e := unixDomainSocketExecutable{ | ||
bin: bin, | ||
args: args, | ||
env: parsedEnv, | ||
addr: addr, | ||
} | ||
|
||
return &e, nil | ||
} | ||
|
||
func (e *unixDomainSocketExecutable) Process(ctx context.Context, r io.Reader, w io.Writer) error { | ||
cmd := exec.CommandContext(ctx, e.bin, e.args...) | ||
cmd.Env = e.env | ||
cmd.Stdout = os.Stdout | ||
cmd.Stderr = os.Stderr | ||
|
||
if err := cmd.Start(); err != nil { | ||
return fmt.Errorf("unable to start processor: %w", err) | ||
} | ||
|
||
conn, err := tryConnect(e.addr) | ||
if err != nil { | ||
return fmt.Errorf("unable to connect to processor: %w", err) | ||
} | ||
|
||
if _, err := io.Copy(conn, r); err != nil { | ||
return fmt.Errorf("unable to write input: %w", err) | ||
} | ||
|
||
usock := conn.(*net.UnixConn) | ||
if err := usock.CloseWrite(); err != nil { | ||
return fmt.Errorf("unable to close input writer: %w", err) | ||
} | ||
|
||
if _, err := io.Copy(w, conn); err != nil { | ||
return fmt.Errorf("unable to read output: %w", err) | ||
} | ||
|
||
if err := cmd.Process.Signal(syscall.SIGTERM); err != nil { | ||
return fmt.Errorf("unable to send SIGTERM to processor: %w", err) | ||
} | ||
|
||
// extension servers must implement ordinary shutdown (!) | ||
if err := cmd.Wait(); err != nil { | ||
return fmt.Errorf("unable to wait for processor: %w", err) | ||
} | ||
|
||
// remove socket file if server hasn't already cleaned up | ||
if _, err := os.Stat(e.addr); err == nil { | ||
if err := os.Remove(e.addr); err != nil { | ||
return fmt.Errorf("unable to remove %s: %w", e.addr, err) | ||
} | ||
} else if !os.IsNotExist(err) { | ||
return fmt.Errorf("unable to get file stats for %s: %w", e.addr, err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func tryConnect(addr string) (net.Conn, error) { | ||
const ( | ||
maxRetries = 5 | ||
sleeptime = 500 * time.Millisecond | ||
) | ||
|
||
var conn net.Conn | ||
var err error | ||
for i := 0; i <= maxRetries; i++ { | ||
conn, err = net.Dial("unix", addr) | ||
if err == nil { | ||
break | ||
} | ||
|
||
time.Sleep(sleeptime) | ||
} | ||
|
||
return conn, err | ||
} |
Oops, something went wrong.