Skip to content

Commit

Permalink
Change modifyVolumeRequestHandlerTimeout to a configurable option (#1915
Browse files Browse the repository at this point in the history
)

* Change modifyVolumeRequestHandlerTimeout to a configurable option

* Fix zero timeout being passed to driver in non-controller modes

* Add docs about resizer and volumemodifier timeouts

* Add additional test for the controller options
  • Loading branch information
andrewcharlton authored Feb 1, 2024
1 parent bb2e6f3 commit 33fffa3
Show file tree
Hide file tree
Showing 11 changed files with 121 additions and 50 deletions.
1 change: 1 addition & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func main() {
driver.WithUserAgentExtra(options.ControllerOptions.UserAgentExtra),
driver.WithOtelTracing(options.ServerOptions.EnableOtelTracing),
driver.WithBatching(options.ControllerOptions.Batching),
driver.WithModifyVolumeRequestHandlerTimeout(options.ControllerOptions.ModifyVolumeRequestHandlerTimeout),
)
if err != nil {
klog.ErrorS(err, "failed to create driver")
Expand Down
7 changes: 7 additions & 0 deletions cmd/options/controller_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ limitations under the License.
package options

import (
"time"

flag "github.com/spf13/pflag"

"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/driver"
cliflag "k8s.io/component-base/cli/flag"
)

Expand All @@ -41,6 +44,9 @@ type ControllerOptions struct {
UserAgentExtra string
// flag to enable batching of API calls
Batching bool
// flag to set the timeout for volume modification requests to be coalesced into a single
// volume modification call to AWS.
ModifyVolumeRequestHandlerTimeout time.Duration
}

func (s *ControllerOptions) AddFlags(fs *flag.FlagSet) {
Expand All @@ -51,4 +57,5 @@ func (s *ControllerOptions) AddFlags(fs *flag.FlagSet) {
fs.BoolVar(&s.WarnOnInvalidTag, "warn-on-invalid-tag", false, "To warn on invalid tags, instead of returning an error")
fs.StringVar(&s.UserAgentExtra, "user-agent-extra", "", "Extra string appended to user agent.")
fs.BoolVar(&s.Batching, "batching", false, "To enable batching of API calls. This is especially helpful for improving performance in workloads that are sensitive to EC2 rate limits.")
fs.DurationVar(&s.ModifyVolumeRequestHandlerTimeout, "modify-volume-request-handler-timeout", driver.DefaultModifyVolumeRequestHandlerTimeout, "Timeout for the window in which volume modification calls must be received in order for them to coalesce into a single volume modification call to AWS. This must be lower than the csi-resizer and volumemodifier timeouts")
}
5 changes: 5 additions & 0 deletions cmd/options/controller_options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ func TestControllerOptions(t *testing.T) {
flag: "user-agent-extra",
found: true,
},
{
name: "lookup modify-volume-request-handler-timeout",
flag: "modify-volume-request-handler-timeout",
found: true,
},
{
name: "fail for non-desired flag",
flag: "some-other-flag",
Expand Down
1 change: 1 addition & 0 deletions docs/options.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ There are a couple of driver options that can be passed as arguments when starti
| user-agent-extra | csi-ebs | helm | Extra string appended to user agent|
| enable-otel-tracing | true | false | If set to true, the driver will enable opentelemetry tracing. Might need [additional env variables](https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/#general-sdk-configuration) to export the traces to the right collector|
| batching | true | true | If set to true, the driver will enable batching of API calls. This is especially helpful for improving performance in workloads that are sensitive to EC2 rate limits at the cost of a small increase to worst-case latency|
| modify-volume-request-handler-timeout | 10s | 2s | Timeout for the window in which volume modification calls must be received in order for them to coalesce into a single volume modification call to AWS. If changing this, be aware that the ebs-csi-controller's csi-resizer and volumemodifier containers both have timeouts on the calls they make, if this value exceeds those timeouts it will cause them to always fail and fall into a retry loop, so adjust those values accordingly.
5 changes: 4 additions & 1 deletion pkg/driver/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ limitations under the License.

package driver

import "time"

// constants of keys in PublishContext
const (
// devicePathKey represents key for device path in PublishContext
Expand Down Expand Up @@ -156,7 +158,8 @@ const (

// constants for default command line flag values
const (
DefaultCSIEndpoint = "unix://tmp/csi.sock"
DefaultCSIEndpoint = "unix://tmp/csi.sock"
DefaultModifyVolumeRequestHandlerTimeout = 2 * time.Second
)

// constants for disk block size
Expand Down
4 changes: 1 addition & 3 deletions pkg/driver/controller_modify_volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ const (
ModificationKeyIOPS = "iops"

ModificationKeyThroughput = "throughput"

modifyVolumeRequestHandlerTimeout = 2 * time.Second
)

type modifyVolumeRequest struct {
Expand Down Expand Up @@ -118,7 +116,7 @@ func (d *controllerService) processModifyVolumeRequests(h *modifyVolumeRequestHa
select {
case req := <-h.requestChan:
process(req)
case <-time.After(modifyVolumeRequestHandlerTimeout):
case <-time.After(d.driverOptions.modifyVolumeRequestHandlerTimeout):
d.modifyVolumeManager.requestHandlerMap.Delete(h.volumeID)
// At this point, no new requests can come in on the request channel because it has been removed from the map
// However, the request channel may still have requests waiting on it
Expand Down
36 changes: 24 additions & 12 deletions pkg/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"net"
"time"

"github.com/awslabs/volume-modifier-for-k8s/pkg/rpc"
csi "github.com/container-storage-interface/spec/lib/go/csi"
Expand Down Expand Up @@ -64,24 +65,26 @@ type Driver struct {
}

type DriverOptions struct {
endpoint string
extraTags map[string]string
mode Mode
volumeAttachLimit int64
kubernetesClusterID string
awsSdkDebugLog bool
batching bool
warnOnInvalidTag bool
userAgentExtra string
otelTracing bool
endpoint string
extraTags map[string]string
mode Mode
volumeAttachLimit int64
kubernetesClusterID string
awsSdkDebugLog bool
batching bool
warnOnInvalidTag bool
userAgentExtra string
otelTracing bool
modifyVolumeRequestHandlerTimeout time.Duration
}

func NewDriver(options ...func(*DriverOptions)) (*Driver, error) {
klog.InfoS("Driver Information", "Driver", DriverName, "Version", driverVersion)

driverOptions := DriverOptions{
endpoint: DefaultCSIEndpoint,
mode: AllMode,
endpoint: DefaultCSIEndpoint,
mode: AllMode,
modifyVolumeRequestHandlerTimeout: DefaultModifyVolumeRequestHandlerTimeout,
}
for _, option := range options {
option(&driverOptions)
Expand Down Expand Up @@ -253,3 +256,12 @@ func WithOtelTracing(enableOtelTracing bool) func(*DriverOptions) {
o.otelTracing = enableOtelTracing
}
}

func WithModifyVolumeRequestHandlerTimeout(timeout time.Duration) func(*DriverOptions) {
return func(o *DriverOptions) {
if timeout == 0 {
return
}
o.modifyVolumeRequestHandlerTimeout = timeout
}
}
11 changes: 11 additions & 0 deletions pkg/driver/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package driver
import (
"reflect"
"testing"
"time"
)

func TestWithEndpoint(t *testing.T) {
Expand Down Expand Up @@ -121,3 +122,13 @@ func TestWithBatching(t *testing.T) {
t.Fatalf("expected batching option got set to %v but is set to %v", batching, options.batching)
}
}

func TestWithModifyVolumeRequestHandlerTimeout(t *testing.T) {
timeout := 15 * time.Second
options := &DriverOptions{}
WithModifyVolumeRequestHandlerTimeout(timeout)(options)
if options.modifyVolumeRequestHandlerTimeout != timeout {
t.Fatalf("expected modifyVolumeRequestHandlerTimeout option got set to %v but is set to %v",
timeout, options.modifyVolumeRequestHandlerTimeout)
}
}
57 changes: 36 additions & 21 deletions pkg/driver/request_coalescing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package driver
import (
"context"
"fmt"

// "errors"
"sync"
"testing"
Expand Down Expand Up @@ -40,9 +41,11 @@ func TestBasicRequestCoalescingSuccess(t *testing.T) {
})

awsDriver := controllerService{
cloud: mockCloud,
inFlight: internal.NewInFlight(),
driverOptions: &DriverOptions{},
cloud: mockCloud,
inFlight: internal.NewInFlight(),
driverOptions: &DriverOptions{
modifyVolumeRequestHandlerTimeout: 2 * time.Second,
},
modifyVolumeManager: newModifyVolumeManager(),
}

Expand Down Expand Up @@ -95,9 +98,11 @@ func TestRequestFail(t *testing.T) {
})

awsDriver := controllerService{
cloud: mockCloud,
inFlight: internal.NewInFlight(),
driverOptions: &DriverOptions{},
cloud: mockCloud,
inFlight: internal.NewInFlight(),
driverOptions: &DriverOptions{
modifyVolumeRequestHandlerTimeout: 2 * time.Second,
},
modifyVolumeManager: newModifyVolumeManager(),
}

Expand Down Expand Up @@ -164,9 +169,11 @@ func TestPartialFail(t *testing.T) {
})

awsDriver := controllerService{
cloud: mockCloud,
inFlight: internal.NewInFlight(),
driverOptions: &DriverOptions{},
cloud: mockCloud,
inFlight: internal.NewInFlight(),
driverOptions: &DriverOptions{
modifyVolumeRequestHandlerTimeout: 2 * time.Second,
},
modifyVolumeManager: newModifyVolumeManager(),
}

Expand Down Expand Up @@ -249,9 +256,11 @@ func TestSequentialRequests(t *testing.T) {
}).Times(2)

awsDriver := controllerService{
cloud: mockCloud,
inFlight: internal.NewInFlight(),
driverOptions: &DriverOptions{},
cloud: mockCloud,
inFlight: internal.NewInFlight(),
driverOptions: &DriverOptions{
modifyVolumeRequestHandlerTimeout: 2 * time.Second,
},
modifyVolumeManager: newModifyVolumeManager(),
}

Expand Down Expand Up @@ -307,9 +316,11 @@ func TestDuplicateRequest(t *testing.T) {
})

awsDriver := controllerService{
cloud: mockCloud,
inFlight: internal.NewInFlight(),
driverOptions: &DriverOptions{},
cloud: mockCloud,
inFlight: internal.NewInFlight(),
driverOptions: &DriverOptions{
modifyVolumeRequestHandlerTimeout: 2 * time.Second,
},
modifyVolumeManager: newModifyVolumeManager(),
}

Expand Down Expand Up @@ -372,9 +383,11 @@ func TestContextTimeout(t *testing.T) {
})

awsDriver := controllerService{
cloud: mockCloud,
inFlight: internal.NewInFlight(),
driverOptions: &DriverOptions{},
cloud: mockCloud,
inFlight: internal.NewInFlight(),
driverOptions: &DriverOptions{
modifyVolumeRequestHandlerTimeout: 2 * time.Second,
},
modifyVolumeManager: newModifyVolumeManager(),
}

Expand Down Expand Up @@ -438,9 +451,11 @@ func TestResponseReturnTiming(t *testing.T) {
})

awsDriver := controllerService{
cloud: mockCloud,
inFlight: internal.NewInFlight(),
driverOptions: &DriverOptions{},
cloud: mockCloud,
inFlight: internal.NewInFlight(),
driverOptions: &DriverOptions{
modifyVolumeRequestHandlerTimeout: 2 * time.Second,
},
modifyVolumeManager: newModifyVolumeManager(),
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/driver/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package driver

import (
"errors"
"fmt"
"regexp"
"strings"
Expand All @@ -34,6 +35,10 @@ func ValidateDriverOptions(options *DriverOptions) error {
return fmt.Errorf("Invalid mode: %w", err)
}

if options.modifyVolumeRequestHandlerTimeout == 0 {
return errors.New("Invalid modifyVolumeRequestHandlerTimeout: Timeout cannot be zero")
}

return nil
}

Expand Down
39 changes: 26 additions & 13 deletions pkg/driver/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ limitations under the License.
package driver

import (
"errors"
"fmt"
"math/rand"
"reflect"
"strconv"
"testing"
"time"

"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud"
)
Expand Down Expand Up @@ -156,36 +158,47 @@ func TestValidateMode(t *testing.T) {

func TestValidateDriverOptions(t *testing.T) {
testCases := []struct {
name string
mode Mode
extraVolumeTags map[string]string
expErr error
name string
mode Mode
extraVolumeTags map[string]string
modifyVolumeTimeout time.Duration
expErr error
}{
{
name: "success",
mode: AllMode,
expErr: nil,
name: "success",
mode: AllMode,
modifyVolumeTimeout: 5 * time.Second,
expErr: nil,
},
{
name: "fail because validateMode fails",
mode: Mode("unknown"),
expErr: fmt.Errorf("Invalid mode: %w", fmt.Errorf("Mode is not supported (actual: unknown, supported: %v)", []Mode{AllMode, ControllerMode, NodeMode})),
name: "fail because validateMode fails",
mode: Mode("unknown"),
modifyVolumeTimeout: 5 * time.Second,
expErr: fmt.Errorf("Invalid mode: %w", fmt.Errorf("Mode is not supported (actual: unknown, supported: %v)", []Mode{AllMode, ControllerMode, NodeMode})),
},
{
name: "fail because validateExtraVolumeTags fails",
mode: AllMode,
extraVolumeTags: map[string]string{
randomString(cloud.MaxTagKeyLength + 1): "extra-tag-value",
},
expErr: fmt.Errorf("Invalid extra tags: %w", fmt.Errorf("Tag key too long (actual: %d, limit: %d)", cloud.MaxTagKeyLength+1, cloud.MaxTagKeyLength)),
modifyVolumeTimeout: 5 * time.Second,
expErr: fmt.Errorf("Invalid extra tags: %w", fmt.Errorf("Tag key too long (actual: %d, limit: %d)", cloud.MaxTagKeyLength+1, cloud.MaxTagKeyLength)),
},
{
name: "fail because modifyVolumeRequestHandlerTimeout is zero",
mode: AllMode,
modifyVolumeTimeout: 0,
expErr: errors.New("Invalid modifyVolumeRequestHandlerTimeout: Timeout cannot be zero"),
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := ValidateDriverOptions(&DriverOptions{
extraTags: tc.extraVolumeTags,
mode: tc.mode,
extraTags: tc.extraVolumeTags,
mode: tc.mode,
modifyVolumeRequestHandlerTimeout: tc.modifyVolumeTimeout,
})
if !reflect.DeepEqual(err, tc.expErr) {
t.Fatalf("error not equal\ngot:\n%s\nexpected:\n%s", err, tc.expErr)
Expand Down

0 comments on commit 33fffa3

Please sign in to comment.