Skip to content
This repository has been archived by the owner on May 26, 2023. It is now read-only.

transport cmd #64

Open
wants to merge 105 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
105 commits
Select commit Hold shift + click to select a range
8d25c2e
implements first version of new transport command
jschicktanz Sep 8, 2021
3f75dc1
reduces unix domain socket filepath length
jschicktanz Sep 8, 2021
b829dc0
working on down/upload of localOciBlob resources
jschicktanz Sep 9, 2021
f9269d0
refactoring
jschicktanz Sep 10, 2021
cd1102b
refactored
jschicktanz Sep 14, 2021
2d6b4f1
refactoring
jschicktanz Sep 14, 2021
58a6494
init
jschicktanz Sep 14, 2021
2813a7d
refactoring
jschicktanz Sep 14, 2021
a5bdb52
review feedback
jschicktanz Sep 16, 2021
4475238
run formatter
jschicktanz Sep 16, 2021
9d7a1d3
renames extension pkg and adds tests
jschicktanz Sep 20, 2021
38fd5ff
adds license headers
jschicktanz Sep 20, 2021
8fe0835
refactoring + adds test
jschicktanz Sep 20, 2021
5fbf6ba
fix tests and adds check for test processor binary
jschicktanz Sep 21, 2021
94da006
formatting
jschicktanz Sep 21, 2021
2c8504e
improves before suite check
jschicktanz Sep 21, 2021
a878e86
refactoring + adds processor timeouts
jschicktanz Sep 21, 2021
acc06be
refactoring + updates doc
jschicktanz Sep 21, 2021
e41fd55
refactoring, adds pipeline tests, adds labelling processor
jschicktanz Sep 22, 2021
d343d8c
wip
jschicktanz Sep 22, 2021
2103309
Merge branch 'transport-cmd-0' into transport-cmd
jschicktanz Sep 22, 2021
93933fa
restores deleted files and fixes compile errors
jschicktanz Sep 22, 2021
a5a016e
wip
jschicktanz Sep 27, 2021
dd28da3
wip
jschicktanz Oct 5, 2021
8acaba7
add config parser and pipeline compiler
enrico-kaack-comp Oct 6, 2021
b8908cb
wip
jschicktanz Oct 8, 2021
fa93196
Merge branch 'main' into transport-cmd
jschicktanz Oct 8, 2021
b59ae47
wip
jschicktanz Oct 12, 2021
3f84f86
wip
jschicktanz Oct 12, 2021
45818cc
should fix oci image filter
jschicktanz Oct 13, 2021
e62058b
refactoring and godoc
jschicktanz Oct 13, 2021
8b18f1e
refactors and improves config parsing
jschicktanz Oct 14, 2021
3c2a1ef
implements object factories
jschicktanz Oct 15, 2021
555ca81
implements processor factory
jschicktanz Oct 15, 2021
f21513a
fix compile errors + refactoring
jschicktanz Oct 18, 2021
7845707
adds upload of patched component descriptors
jschicktanz Oct 18, 2021
5d1142b
implement additional filters
jschicktanz Oct 19, 2021
5c3a5c2
makes transport cmd run through
jschicktanz Oct 19, 2021
21ce176
fixes filtering of gzip compressed layers
jschicktanz Oct 20, 2021
48955f9
increase processor timeout
jschicktanz Oct 20, 2021
4ca458d
refactoring
jschicktanz Oct 20, 2021
e183dc3
refactoring
jschicktanz Oct 20, 2021
75f5ba5
refactors filter package and adds tests
jschicktanz Oct 21, 2021
f6611fb
renames accessTypeFilter
jschicktanz Oct 21, 2021
6e247b6
format and lint
jschicktanz Oct 21, 2021
d2538a8
refactors filters package
jschicktanz Oct 21, 2021
c7c2cc3
adds filters package
jschicktanz Oct 21, 2021
6dbba63
Merge branch 'filters-package' into transport-cmd
jschicktanz Oct 21, 2021
02d0bb2
refactoring + added tests
jschicktanz Oct 26, 2021
54b89b9
adds tests for local oci blob downloader
jschicktanz Oct 26, 2021
6304559
refactoring + adds test for oci artifact downloader
jschicktanz Oct 26, 2021
4953cc3
refactoring + adds tests
jschicktanz Oct 28, 2021
00cb568
wip
jschicktanz Oct 28, 2021
06815ff
adds first test for oci image serialization
jschicktanz Oct 29, 2021
162707f
improves tests of oci artifact serialization
jschicktanz Nov 2, 2021
db92cd5
improves uploaders package
jschicktanz Nov 2, 2021
3b4a5dc
renames file
jschicktanz Nov 2, 2021
5b1ab2b
moves function to utils package and adds tests
jschicktanz Nov 2, 2021
e5fc684
renames labeling processor and adds test
jschicktanz Nov 3, 2021
e1d99f4
refactoring
jschicktanz Nov 3, 2021
779cd8c
refactoring
jschicktanz Nov 3, 2021
526e68f
use map[string]string for passing env variables
jschicktanz Nov 3, 2021
9fbcdc7
refactoring + changes doc
jschicktanz Nov 3, 2021
fc6e1a0
Merge branch 'transport-cmd-0' into transport-cmd
jschicktanz Nov 3, 2021
5325cbc
moves oci artifact serialization to process/utils package
jschicktanz Nov 3, 2021
2b2710c
updates license headers
jschicktanz Nov 3, 2021
fd1bf55
Merge branch 'main' into transport-cmd
jschicktanz Nov 3, 2021
5385d57
adds tests for utils
jschicktanz Nov 4, 2021
7c694d1
adds tests for filtering
jschicktanz Nov 9, 2021
9630a31
wip
jschicktanz Nov 10, 2021
2b9b508
adds test for image index in oci image filter
jschicktanz Nov 11, 2021
bc7da1d
comment out unnecessary code
jschicktanz Nov 11, 2021
a6e85fb
adds tests for config package
jschicktanz Nov 11, 2021
a2a335a
refactoring and docs added
jschicktanz Nov 12, 2021
648abf7
wip
jschicktanz Nov 19, 2021
e5b943b
wip
jschicktanz Nov 22, 2021
7f0e10c
adds testcase for oci artifact uploader
jschicktanz Nov 22, 2021
7ea523d
refactors oci uploader tests
jschicktanz Nov 24, 2021
9b24bd4
refactors test coding
jschicktanz Nov 24, 2021
2690218
refactors test coding
jschicktanz Nov 24, 2021
a296000
renames uds to unix domain socket
jschicktanz Nov 29, 2021
9dc8e97
review feedback
jschicktanz Nov 29, 2021
6d0d6d2
remove unix domain socket file after processor finished
jschicktanz Nov 29, 2021
128e337
Merge branch 'transport-cmd-0' into transport-cmd
jschicktanz Nov 30, 2021
38acc5b
fix compile error
jschicktanz Nov 30, 2021
4f49aa6
increase processor timeout
jschicktanz Nov 30, 2021
6dea353
wip
jschicktanz Dec 1, 2021
bea11b5
Merge branch 'main' into transport-cmd
jschicktanz Dec 2, 2021
d675ef2
adds component descriptor signature generation
jschicktanz Dec 13, 2021
ade22ae
improves logging and error handling
jschicktanz Dec 13, 2021
038e794
implements dry-run + refactoring
jschicktanz Dec 14, 2021
aa6e24f
refactoring
jschicktanz Dec 14, 2021
61edcc3
adds processor timeout cli option
jschicktanz Dec 14, 2021
0baa125
moves processing job to proces package
jschicktanz Dec 14, 2021
30e4a65
refactoring
jschicktanz Dec 15, 2021
f1074d6
Merge branch 'main' into transport-cmd
jschicktanz Dec 16, 2021
f848587
Merge branch 'main' into transport-cmd
jschicktanz Dec 21, 2021
1d9dce8
adds newline to output
jschicktanz Dec 21, 2021
afb2371
adds test for repo ctx override
jschicktanz Dec 21, 2021
92ca076
Merge branch 'main' into transport-cmd
jschicktanz Dec 21, 2021
a0dc11a
adds doc
jschicktanz Jan 10, 2022
f996602
Merge branch 'main' into transport-cmd
jschicktanz Jan 24, 2022
302d47c
removes signature coding
jschicktanz Jan 24, 2022
fb08e8b
remove unix domain socket file after processor exit
jschicktanz Jan 24, 2022
fb6cf9d
moves transport cmd to remote command
jschicktanz Jan 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
review feedback
  • Loading branch information
jschicktanz committed Sep 16, 2021
commit a5bdb52b4c14bb8b611085f19e2b9fede8074de8
26 changes: 11 additions & 15 deletions pkg/transport/process/extension/stdio_executable.go
Original file line number Diff line number Diff line change
@@ -16,10 +16,11 @@ type stdIOExecutable struct {
stdout io.Reader
}

// NewStdIOExecutable runs resource processor extension executable in the background.
// NewStdIOExecutable runs a resource processor extension executable in the background.
// It communicates with this processor via stdin/stdout pipes.
func NewStdIOExecutable(ctx context.Context, bin string, args ...string) (process.ResourceStreamProcessor, error) {
func NewStdIOExecutable(ctx context.Context, bin string, args []string, env []string) (process.ResourceStreamProcessor, error) {
cmd := exec.CommandContext(ctx, bin, args...)
cmd.Env = env
stdin, err := cmd.StdinPipe()
if err != nil {
return nil, err
@@ -30,11 +31,6 @@ func NewStdIOExecutable(ctx context.Context, bin string, args ...string) (proces
}
cmd.Stderr = os.Stderr

err = cmd.Start()
if err != nil {
return nil, fmt.Errorf("unable to start processor: %w", err)
}

e := stdIOExecutable{
processor: cmd,
stdin: stdin,
@@ -45,23 +41,23 @@ func NewStdIOExecutable(ctx context.Context, bin string, args ...string) (proces
}

func (e *stdIOExecutable) Process(ctx context.Context, r io.Reader, w io.Writer) error {
_, err := io.Copy(e.stdin, r)
if err != nil {
if err := e.processor.Start(); err != nil {
return fmt.Errorf("unable to start processor: %w", err)
}

if _, err := io.Copy(e.stdin, r); err != nil {
return fmt.Errorf("unable to write input: %w", err)
}

err = e.stdin.Close()
if err != nil {
if err := e.stdin.Close(); err != nil {
return fmt.Errorf("unable to close input writer: %w", err)
}

_, err = io.Copy(w, e.stdout)
if err != nil {
if _, err := io.Copy(w, e.stdout); err != nil {
return fmt.Errorf("unable to read output: %w", err)
}

err = e.processor.Wait()
if err != nil {
if err := e.processor.Wait(); err != nil {
return fmt.Errorf("unable to stop processor: %w", err)
}

6 changes: 3 additions & 3 deletions pkg/transport/process/extension/uds_executable.go
Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@ type udsExecutable struct {

// NewUDSExecutable runs a resource processor extension executable in the background.
// It communicates with this processor via Unix Domain Sockets.
func NewUDSExecutable(ctx context.Context, bin string, args ...string) (process.ResourceStreamProcessor, error) {
func NewUDSExecutable(ctx context.Context, bin string, args []string, env []string) (process.ResourceStreamProcessor, error) {
for _, arg := range args {
if arg == serverAddressFlag {
return nil, fmt.Errorf("the flag %s is not allowed to be set manually", serverAddressFlag)
@@ -38,11 +38,11 @@ func NewUDSExecutable(ctx context.Context, bin string, args ...string) (process.
args = append(args, "--addr", addr)

cmd := exec.CommandContext(ctx, bin, args...)
cmd.Env = env
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr

err = cmd.Start()
if err != nil {
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("unable to start processor: %w", err)
}

15 changes: 6 additions & 9 deletions pkg/transport/process/pipeline.go
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ package process

import (
"context"
"io"
"os"

"archive/tar"
@@ -16,13 +17,12 @@ type resourceProcessingPipelineImpl struct {
}

func (p *resourceProcessingPipelineImpl) Process(ctx context.Context, cd cdv2.ComponentDescriptor, res cdv2.Resource) (*cdv2.ComponentDescriptor, cdv2.Resource, error) {
infile, err := ioutil.TempFile("", "out")
infile, err := ioutil.TempFile("", "")
if err != nil {
return nil, cdv2.Resource{}, fmt.Errorf("unable to create temporary infile: %w", err)
}

err = WriteTARArchive(ctx, cd, res, nil, tar.NewWriter(infile))
if err != nil {
if err := WriteTARArchive(ctx, cd, res, nil, tar.NewWriter(infile)); err != nil {
return nil, cdv2.Resource{}, fmt.Errorf("unable to write: %w", err)
}

@@ -36,8 +36,7 @@ func (p *resourceProcessingPipelineImpl) Process(ctx context.Context, cd cdv2.Co
}
defer infile.Close()

_, err = infile.Seek(0, 0)
if err != nil {
if _, err := infile.Seek(0, io.SeekStart); err != nil {
return nil, cdv2.Resource{}, err
}

@@ -53,8 +52,7 @@ func (p *resourceProcessingPipelineImpl) Process(ctx context.Context, cd cdv2.Co
func (p *resourceProcessingPipelineImpl) process(ctx context.Context, infile *os.File, proc ResourceStreamProcessor) (*os.File, error) {
defer infile.Close()

_, err := infile.Seek(0, 0)
if err != nil {
if _, err := infile.Seek(0, io.SeekStart); err != nil {
return nil, fmt.Errorf("unable to seek to beginning of input file: %w", err)
}

@@ -66,8 +64,7 @@ func (p *resourceProcessingPipelineImpl) process(ctx context.Context, infile *os
inreader := infile
outwriter := outfile

err = proc.Process(ctx, inreader, outwriter)
if err != nil {
if err := proc.Process(ctx, inreader, outwriter); err != nil {
return nil, fmt.Errorf("unable to process resource: %w", err)
}

47 changes: 16 additions & 31 deletions pkg/transport/process/util.go
Original file line number Diff line number Diff line change
@@ -29,8 +29,7 @@ func WriteTARArchive(ctx context.Context, cd cdv2.ComponentDescriptor, res cdv2.
return fmt.Errorf("unable to marshal component descriptor: %w", err)
}

err = writeFileToTARArchive(componentDescriptorFile, bytes.NewReader(marshaledCD), outArchive)
if err != nil {
if err := writeFileToTARArchive(componentDescriptorFile, bytes.NewReader(marshaledCD), outArchive); err != nil {
return fmt.Errorf("unable to write %s: %w", componentDescriptorFile, err)
}

@@ -39,14 +38,12 @@ func WriteTARArchive(ctx context.Context, cd cdv2.ComponentDescriptor, res cdv2.
return fmt.Errorf("unable to marshal resource: %w", err)
}

err = writeFileToTARArchive(resourceFile, bytes.NewReader(marshaledRes), outArchive)
if err != nil {
if err := writeFileToTARArchive(resourceFile, bytes.NewReader(marshaledRes), outArchive); err != nil {
return fmt.Errorf("unable to write %s: %w", resourceFile, err)
}

if resourceBlobReader != nil {
err = writeFileToTARArchive(resourceBlobFile, resourceBlobReader, outArchive)
if err != nil {
if err := writeFileToTARArchive(resourceBlobFile, resourceBlobReader, outArchive); err != nil {
return fmt.Errorf("unable to write %s: %w", resourceBlobFile, err)
}
}
@@ -61,13 +58,11 @@ func writeFileToTARArchive(filename string, contentReader io.Reader, outArchive
}
defer tempfile.Close()

_, err = io.Copy(tempfile, contentReader)
if err != nil {
if _, err := io.Copy(tempfile, contentReader); err != nil {
return fmt.Errorf("unable to write content to file: %w", err)
}

_, err = tempfile.Seek(0, 0)
if err != nil {
if _, err := tempfile.Seek(0, io.SeekStart); err != nil {
return fmt.Errorf("unable to seek to beginning of file: %w", err)
}

@@ -83,12 +78,11 @@ func writeFileToTARArchive(filename string, contentReader io.Reader, outArchive
ModTime: time.Now(),
}

if err = outArchive.WriteHeader(&header); err != nil {
if err := outArchive.WriteHeader(&header); err != nil {
return fmt.Errorf("unable to write tar header: %w", err)
}

_, err = io.Copy(outArchive, tempfile)
if err != nil {
if _, err := io.Copy(outArchive, tempfile); err != nil {
return fmt.Errorf("unable to write file to tar archive: %w", err)
}

@@ -113,30 +107,25 @@ func ReadTARArchive(r *tar.Reader) (*cdv2.ComponentDescriptor, cdv2.Resource, io

switch header.Name {
case resourceFile:
res, err = readResource(r)
if err != nil {
if res, err = readResource(r); err != nil {
return nil, cdv2.Resource{}, nil, fmt.Errorf("unable to read %s: %w", resourceFile, err)
}
case componentDescriptorFile:
cd, err = readComponentDescriptor(r)
if err != nil {
if cd, err = readComponentDescriptor(r); err != nil {
return nil, cdv2.Resource{}, nil, fmt.Errorf("unable to read %s: %w", componentDescriptorFile, err)
}
case resourceBlobFile:
f, err = ioutil.TempFile("", "")
if err != nil {
if f, err = ioutil.TempFile("", ""); err != nil {
return nil, cdv2.Resource{}, nil, fmt.Errorf("unable to create tempfile: %w", err)
}
_, err = io.Copy(f, r)
if err != nil {
if _, err := io.Copy(f, r); err != nil {
return nil, cdv2.Resource{}, nil, fmt.Errorf("unable to read %s: %w", resourceBlobFile, err)
}
}
}

if f != nil {
_, err := f.Seek(0, 0)
if err != nil {
if _, err := f.Seek(0, io.SeekStart); err != nil {
return nil, cdv2.Resource{}, nil, fmt.Errorf("unable to seek to beginning of file: %w", err)
}
}
@@ -146,14 +135,12 @@ func ReadTARArchive(r *tar.Reader) (*cdv2.ComponentDescriptor, cdv2.Resource, io

func readResource(r *tar.Reader) (cdv2.Resource, error) {
buf := bytes.NewBuffer([]byte{})
_, err := io.Copy(buf, r)
if err != nil {
if _, err := io.Copy(buf, r); err != nil {
return cdv2.Resource{}, fmt.Errorf("unable to read from stream: %w", err)
}

var res cdv2.Resource
err = yaml.Unmarshal(buf.Bytes(), &res)
if err != nil {
if err := yaml.Unmarshal(buf.Bytes(), &res); err != nil {
return cdv2.Resource{}, fmt.Errorf("unable to unmarshal: %w", err)
}

@@ -162,14 +149,12 @@ func readResource(r *tar.Reader) (cdv2.Resource, error) {

func readComponentDescriptor(r *tar.Reader) (*cdv2.ComponentDescriptor, error) {
buf := bytes.NewBuffer([]byte{})
_, err := io.Copy(buf, r)
if err != nil {
if _, err := io.Copy(buf, r); err != nil {
return nil, fmt.Errorf("unable to read from stream: %w", err)
}

var cd cdv2.ComponentDescriptor
err = yaml.Unmarshal(buf.Bytes(), &cd)
if err != nil {
if err := yaml.Unmarshal(buf.Bytes(), &cd); err != nil {
return nil, fmt.Errorf("unable to unmarshal: %w", err)
}