Skip to content

Commit

Permalink
Refactor stream to preserve key order and allocate less
Browse files Browse the repository at this point in the history
  • Loading branch information
alxarch committed Feb 11, 2019
1 parent d2728b7 commit 20409b2
Show file tree
Hide file tree
Showing 7 changed files with 658 additions and 170 deletions.
3 changes: 1 addition & 2 deletions arguments_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ func TestParseArgs(t *testing.T) {
t.Errorf("Invalid help")
}

p := ycat.BlankPipeline()
p = p.Pipe(context.Background(), tasks...)
p := ycat.MakePipeline(context.Background(), tasks...)
for err := range p.Errors() {
if err != nil {
t.Error(err)
Expand Down
69 changes: 49 additions & 20 deletions codec.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ycat

import (
"bytes"
"encoding/json"
"io"
"os"
Expand All @@ -12,12 +13,14 @@ import (
// Format is input file format
type Format uint

// Input formats
const (
Auto Format = iota
YAML
JSON
)

// FormatFromString converts a string to Format
func FormatFromString(s string) Format {
switch strings.ToLower(s) {
case "json", "j":
Expand All @@ -29,22 +32,26 @@ func FormatFromString(s string) Format {
}
}

// DetectFormat detects an input format from the extension
func DetectFormat(path string) Format {
if strings.HasSuffix(path, ".json") {
return JSON
}
return YAML
}

// Output is an output format
type Output int

// Output formats
const (
OutputInvalid Output = iota - 1
OutputYAML
OutputJSON
OutputRaw // Only with --eval
)

// OutputFromString converts a string to Output
func OutputFromString(s string) Output {
switch strings.ToLower(s) {
case "json", "j":
Expand All @@ -58,10 +65,12 @@ func OutputFromString(s string) Output {
}
}

// Decoder is a value decoder
type Decoder interface {
Decode(interface{}) error
}

// NewDecoder creates a new Decoder decoding values from a Reader
func NewDecoder(r io.Reader, format Format) Decoder {
switch format {
case JSON:
Expand All @@ -71,6 +80,7 @@ func NewDecoder(r io.Reader, format Format) Decoder {
}
}

// ReadFromFile creates a StreamTask to read values from a file
func ReadFromFile(path string, format Format) ProducerFunc {
if format == Auto {
format = DetectFormat(path)
Expand All @@ -86,12 +96,13 @@ func ReadFromFile(path string, format Format) ProducerFunc {
}
}

// ReadFromTask creates a StreamTask to read values from a Reader
func ReadFromTask(r io.Reader, format Format) ProducerFunc {
return func(s WriteStream) error {
dec := NewDecoder(r, format)
for {
v := new(Value)
if err := dec.Decode(v); err != nil {
var v RawValue
if err := dec.Decode(&v); err != nil {
if err == io.EOF {
return nil
}
Expand All @@ -105,47 +116,65 @@ func ReadFromTask(r io.Reader, format Format) ProducerFunc {
}
}

// StreamWriteJSON creates a StreamTask to write values as JSON to a Writer
func StreamWriteJSON(w io.WriteCloser) ConsumerFunc {
enc := json.NewEncoder(w)
return func(s ReadStream) error {
defer w.Close()
var (
buf = bytes.Buffer{}
data []byte
)
for {
v, ok := s.Next()
if !ok {
// No more stream values
return nil
}
if err := enc.Encode(v); err != nil {
data = append(data[:0], string(v)...) // Avoid allocations

// Compact JSON output
buf.Reset()
if err := json.Compact(&buf, data); err != nil {
return err
}
// One value per line
buf.WriteByte('\n')
if _, err := buf.WriteTo(w); err != nil {
return err
}
}
}
}

// StreamWriteYAML creates a StreamTask to write values as YAML to a Writer
func StreamWriteYAML(w io.WriteCloser) ConsumerFunc {
n := int64(0)
return func(s ReadStream) error {
const newDocSeparator = "---\n"

return func(s ReadStream) (err error) {
// Close output when done
// Not sure this is the responsibility of the task
defer w.Close()
for {
for numValues := 0; ; numValues++ {
v, ok := s.Next()
if !ok {
return nil
// No more stream values
return
}
if n > 0 {
nn, err := w.Write([]byte("---\n"))

// Separate YAML documents
if numValues > 0 {
_, err = io.WriteString(w, newDocSeparator)
if err != nil {
return err
return
}
n += int64(nn)
}
data, err := yaml.Marshal(v)
if err != nil {
return err
}
nn, err := w.Write(data)
if err != nil {
return err
// Encode a single YAML document
// Cannot use one encoder for all values because of a yaml.Encoder bug
// with multiple documents and scalar values
enc := yaml.NewEncoder(w)
if err = enc.Encode(v); err != nil {
return
}
n += int64(nn)
}
}
}
24 changes: 11 additions & 13 deletions eval.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,33 @@
package ycat

import (
"encoding/json"
"os"
"path"
"strings"

jsonnet "github.com/google/go-jsonnet"
)

// Eval handles Jsonnet snippet evaluation
// Eval is the execution environment for Jsonnet
type Eval struct {
Bind string
MaxStackSize int
Array bool
Vars map[string]Var
}

// VarType is the type of an external variable
type VarType uint

// VarTypes
const (
_ VarType = iota
FileVar
CodeVar
RawVar
)

// Var is an external variable
type Var struct {
Type VarType
Value string
Expand Down Expand Up @@ -64,6 +66,7 @@ func (v Var) Render(w *strings.Builder, name string) {

}

// Render renders a snippet binding local variables
func (e *Eval) Render(snippet string) string {
w := strings.Builder{}
for name, v := range e.Vars {
Expand All @@ -75,6 +78,7 @@ func (e *Eval) Render(snippet string) string {
return w.String()
}

// VM updates or creates a Jsonnet VM
func (e *Eval) VM(vm *jsonnet.VM) *jsonnet.VM {
if vm == nil {
vm = jsonnet.MakeVM()
Expand All @@ -99,6 +103,7 @@ func (e *Eval) VM(vm *jsonnet.VM) *jsonnet.VM {

}

// DefaultInputVar is the default name for the stream value
const DefaultInputVar = "x"

func bindVar(v string) string {
Expand All @@ -117,26 +122,19 @@ func EvalTask(vm *jsonnet.VM, bind, filename, snippet string) StreamTask {
if !ok {
return nil
}
raw, err := json.Marshal(v)
vm.ExtCode(bind, string(v))
result, err := vm.EvaluateSnippet(filename, snippet)
if err != nil {
return err
}
vm.ExtCode(bind, string(raw))
val, err := vm.EvaluateSnippet(filename, snippet)
if err != nil {
return err
}
result := new(Value)
if err := json.Unmarshal([]byte(val), result); err != nil {
return err
}
if !s.Push(result) {
if !s.Push(RawValue(result)) {
return nil
}
}
})
}

// EvalFilename returns a filename on CWD
func EvalFilename() (string, error) {
cwd, err := os.Getwd()
if err != nil {
Expand Down
23 changes: 16 additions & 7 deletions pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"sync"
)

// Pipeline is the endpoint of a value stream process
type Pipeline struct {
values <-chan *Value
values <-chan RawValue
errors <-chan error
}

// Errors returns a channel with errors from tasks
func (p *Pipeline) Errors() <-chan error {
if p.errors == nil {
ch := make(chan error)
Expand All @@ -19,18 +21,23 @@ func (p *Pipeline) Errors() <-chan error {
return p.errors
}

func (p *Pipeline) Values() <-chan *Value {
// Values returns a channel with values from tasks
func (p *Pipeline) Values() <-chan RawValue {
if p.values == nil {
ch := make(chan *Value)
ch := make(chan RawValue)
close(ch)
p.values = ch
}
return p.values
}

// MakePipeline builds and runs a pipeline
func MakePipeline(ctx context.Context, tasks ...StreamTask) (p *Pipeline) {
p = new(Pipeline)
return p.Pipe(ctx, tasks...)
}

// Pipe adds tasks ro a pipeline
func (p *Pipeline) Pipe(ctx context.Context, tasks ...StreamTask) *Pipeline {
ecs := make([]<-chan error, 0, len(tasks)+1)
ecs = append(ecs, p.Errors())
Expand All @@ -40,25 +47,26 @@ func (p *Pipeline) Pipe(ctx context.Context, tasks ...StreamTask) *Pipeline {
}
return &Pipeline{p.Values(), MergeErrors(ecs...)}
}

func (p *Pipeline) task(ctx context.Context, task StreamTask) *Pipeline {
src := p.Values()
errc := make(chan error, 1)
s := stream{
done: ctx.Done(),
src: src,
}
var out chan *Value
var out chan RawValue
switch task := task.(type) {
case Consumer:
out = make(chan *Value)
out = make(chan RawValue)
close(out)
s.out = out
go func() {
defer close(errc)
errc <- task.Consume(&s)
}()
case Producer:
out = make(chan *Value, 1)
out = make(chan RawValue, 1)
s.out = out
go func() {
defer close(errc)
Expand All @@ -67,7 +75,7 @@ func (p *Pipeline) task(ctx context.Context, task StreamTask) *Pipeline {
errc <- task.Produce(&s)
}()
default:
out = make(chan *Value)
out = make(chan RawValue)
s.out = out
go func() {
defer close(errc)
Expand All @@ -79,6 +87,7 @@ func (p *Pipeline) task(ctx context.Context, task StreamTask) *Pipeline {

}

// MergeErrors is a helper function that merges error channels
func MergeErrors(cs ...<-chan error) <-chan error {
switch n := len(cs); n {
case 1:
Expand Down
Loading

0 comments on commit 20409b2

Please sign in to comment.