Skip to content

Commit

Permalink
Merge pull request #2024 from ConnorJC3/coalescer1
Browse files Browse the repository at this point in the history
Implement separate coalescer package and unit tests
  • Loading branch information
k8s-ci-robot authored Apr 30, 2024
2 parents f929e1f + cd9afc5 commit c124663
Show file tree
Hide file tree
Showing 2 changed files with 289 additions and 0 deletions.
165 changes: 165 additions & 0 deletions pkg/coalescer/coalescer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package coalescer combines multiple requests made over a period of time into a single request
package coalescer

import (
"time"

"k8s.io/klog/v2"
)

// Coalescer is an interface to combine multiple requests made over a period of time into a single request
//
// When a request is received that matches an existing in-flight request, the coalescer will attempt to
// merge that request into the existing request pool using the provided mergeFunction
//
// When the delay on the request expires (determined by the time the first request comes in), the merged
// input is passed to the execution function, and the result to all waiting callers (those that were
// not rejected during the merge step)
type Coalescer[InputType comparable, ResultType any] interface {
// Coalesce is a function to coalesce a given input
// key = only requests with this same key will be coalesced (such as volume ID)
// input = input to merge with other inputs
// It is NOT guaranteed all callers receive the same result (for example, if
// an input fails to merge, only that caller will receive an error)
Coalesce(key string, input InputType) (ResultType, error)
}

// New is a function to creates a new coalescer and immediately begin processing requests
// delay = the time to wait for other requests to coalesce before executing
// mergeFunction = a function to merge a new input with the existing inputs
// (should return an error if the new input cannot be combined with the existing inputs,
// otherwise return the new merged input)
// executeFunction = the function to call when the delay expires
func New[InputType comparable, ResultType any](delay time.Duration,
mergeFunction func(input InputType, existing InputType) (InputType, error),
executeFunction func(key string, input InputType) (ResultType, error),
) Coalescer[InputType, ResultType] {
c := coalescer[InputType, ResultType]{
delay: delay,
mergeFunction: mergeFunction,
executeFunction: executeFunction,
inputChannel: make(chan newInput[InputType, ResultType]),
timerChannel: make(chan string),
pendingInputs: make(map[string]pendingInput[InputType, ResultType]),
}

go c.coalescerThread()
return &c
}

// Type to store a result or error in channels
type result[ResultType any] struct {
result ResultType
err error
}

// Type to send inputs from Coalesce() to coalescerThread() via channel
// Includes a return channel for the result
type newInput[InputType comparable, ResultType any] struct {
key string
input InputType
resultChannel chan result[ResultType]
}

// Type to store pending inputs in the input map
type pendingInput[InputType comparable, ResultType any] struct {
input InputType
resultChannels []chan result[ResultType]
}

type coalescer[InputType comparable, ResultType any] struct {
delay time.Duration
mergeFunction func(input InputType, existing InputType) (InputType, error)
executeFunction func(key string, input InputType) (ResultType, error)

inputChannel chan newInput[InputType, ResultType]
timerChannel chan string

pendingInputs map[string]pendingInput[InputType, ResultType]
}

func (c *coalescer[InputType, ResultType]) Coalesce(key string, input InputType) (ResultType, error) {
resultChannel := make(chan result[ResultType])

c.inputChannel <- newInput[InputType, ResultType]{
key: key,
input: input,
resultChannel: resultChannel,
}
result := <-resultChannel

if result.err != nil {
return *new(ResultType), result.err
} else {
return result.result, nil
}
}

func (c *coalescer[InputType, ResultType]) coalescerThread() {
for {
select {
case i := <-c.inputChannel:
klog.V(7).InfoS("coalescerThread: Input received", "key", i.key, "input", i.input)
if pending, ok := c.pendingInputs[i.key]; ok {
klog.V(7).InfoS("coalescerThread: Input matched existing input, attempting to merge", "key", i.key)
newInput, err := c.mergeFunction(i.input, pending.input)

if err == nil {
klog.V(7).InfoS("coalescerThread: Merged input into existing inputs", "key", i.key)
pending.input = newInput
pending.resultChannels = append(pending.resultChannels, i.resultChannel)
c.pendingInputs[i.key] = pending
} else {
klog.V(7).InfoS("coalescerThread: Failed to merge inputs into existing inputs", "key", i.key)
i.resultChannel <- result[ResultType]{
err: err,
}
}
} else {
klog.V(7).InfoS("coalescerThread: New input, setting up fresh coalesce operation", "key", i.key)
c.pendingInputs[i.key] = pendingInput[InputType, ResultType]{
input: i.input,
resultChannels: []chan result[ResultType]{
i.resultChannel,
},
}
time.AfterFunc(c.delay, func() {
c.timerChannel <- i.key
})
}

case k := <-c.timerChannel:
klog.V(7).InfoS("coalescerThread: Coalescing delay reached, spawning execution thread", "key", k)
pending := c.pendingInputs[k]
delete(c.pendingInputs, k)

go func() {
r, err := c.executeFunction(k, pending.input)
klog.V(7).InfoS("coalescerThread: Finished executing", "key", k, "result", r, "error", err)
result := result[ResultType]{
result: r,
err: err,
}
for _, c := range pending.resultChannels {
c <- result
}
}()
}
}
}
124 changes: 124 additions & 0 deletions pkg/coalescer/coalescer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package coalescer

import (
"errors"
"fmt"
"testing"
"time"
)

var (
errFailedToMerge = fmt.Errorf("Failed to merge")
errFailedToExecute = fmt.Errorf("Failed to execute")
)

// Merge function used to test the coalescer
// For testing purposes, positive numbers are added to the existing input,
// and negative numbers return an error ("fail to merge")
func mockMerge(input int, existing int) (int, error) {
if input < 0 {
return 0, errFailedToMerge
} else {
return input + existing, nil
}
}

// Execute function used to test the coalescer
// For testing purposes, small numbers (numbers less than 100) successfully execute,
// and large numbers (numbers 100 or greater) fail to execute
func mockExecute(_ string, input int) (string, error) {
if input < 100 {
return "success", nil
} else {
return "failure", errFailedToExecute
}
}

func TestCoalescer(t *testing.T) {
testCases := []struct {
name string
inputs []int
expectMergeFailure bool
expectExecuteFailure bool
}{
{
name: "one input",
inputs: []int{42},
},
{
name: "many inputs",
inputs: []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
},
{
name: "failed merge",
inputs: []int{1, -2, 3, -4, 5, -6, 7, -8, 9, -10},
expectMergeFailure: true,
},
{
name: "failed execute",
inputs: []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 100},
expectExecuteFailure: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
c := New[int, string](50*time.Millisecond, mockMerge, mockExecute)
testChannel := make(chan error)

for _, i := range tc.inputs {
go func() {
_, err := c.Coalesce("testKey", i)
testChannel <- err
}()
}

mergeFailure := false
executeFailure := false
for range tc.inputs {
err := <-testChannel
if err != nil {
if errors.Is(err, errFailedToMerge) {
mergeFailure = true
} else if errors.Is(err, errFailedToExecute) {
executeFailure = true
} else {
t.Fatalf("Unexpected error %v", err)
}
}
}

if mergeFailure != tc.expectMergeFailure {
if tc.expectMergeFailure {
t.Fatalf("Expected to observe merge failure, did not")
} else {
t.Fatalf("Observed unexpected merge failure")
}
}
if executeFailure != tc.expectExecuteFailure {
if tc.expectExecuteFailure {
t.Fatalf("Expected to observe execute failure, did not")
} else {
t.Fatalf("Observed unexpected execute failure")
}
}
})
}
}

0 comments on commit c124663

Please sign in to comment.