diff --git a/cmd/root.go b/cmd/root.go index 0c220bef..107586e8 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -39,8 +39,9 @@ var ( watchAvailableStorageMinBytes uint64 lowAvailableStorageAction string - etcdMode bool - watchQueryTimeout time.Duration + etcdMode bool + watchQueryTimeout time.Duration + watchProgressNotifyInterval time.Duration } rootCmd = &cobra.Command{ @@ -106,6 +107,7 @@ var ( rootCmdOpts.lowAvailableStorageAction, rootCmdOpts.connectionPoolConfig, rootCmdOpts.watchQueryTimeout, + rootCmdOpts.watchProgressNotifyInterval, ) if err != nil { logrus.WithError(err).Fatal("Failed to create server") @@ -181,6 +183,7 @@ func init() { rootCmd.Flags().Uint64Var(&rootCmdOpts.watchAvailableStorageMinBytes, "watch-storage-available-size-min-bytes", 10*1024*1024, "Minimum required available disk size (in bytes) to continue operation. If available disk space gets below this threshold, then the --low-available-storage-action is performed") rootCmd.Flags().StringVar(&rootCmdOpts.lowAvailableStorageAction, "low-available-storage-action", "none", "Action to perform in case the available storage is low. One of (none|handover|terminate). none means no action is performed. handover means the dqlite node will handover its leadership role, if any. terminate means this dqlite node will shutdown") rootCmd.Flags().DurationVar(&rootCmdOpts.watchQueryTimeout, "watch-query-timeout", 20*time.Second, "Timeout for querying events in the watch poll loop. If timeout is reached, the poll loop will be re-triggered. The minimum value is 5 seconds.") + rootCmd.Flags().DurationVar(&rootCmdOpts.watchProgressNotifyInterval, "watch-progress-notify-interval", 5*time.Second, "Interval between periodic watch progress notifications. Default is 5s to ensure support for watch progress notifications.") rootCmd.AddCommand(&cobra.Command{ Use: "version", diff --git a/docs/configuration.md b/docs/configuration.md index ee80413c..4b475415 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -32,6 +32,7 @@ The following configuration options are available listed in a table format: | ~~`--admission-control-policy-limit`~~ | `REMOVED` | - | | ~~`--admission-control-only-for-write-queries`~~ | `REMOVED` | - | | `--watch-query-timeout` | Timeout for querying events in the watch poll loop | `20s` | +| `--watch-progress-notify-interval` | Interval between periodic watch progress notifications. Default is 5s to ensure support for watch progress notifications. | `5s` | ## Observability diff --git a/go.mod b/go.mod index 315f5853..52ca0cc0 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,13 @@ module github.com/canonical/k8s-dqlite -go 1.22 +go 1.22.0 + +toolchain go1.22.10 require ( github.com/canonical/go-dqlite/v2 v2.0.0 github.com/mattn/go-sqlite3 v1.14.22 - github.com/onsi/gomega v1.27.10 + github.com/onsi/gomega v1.33.1 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.19.0 github.com/sirupsen/logrus v1.9.3 @@ -21,7 +23,7 @@ require ( go.opentelemetry.io/otel/sdk v1.31.0 go.opentelemetry.io/otel/sdk/metric v1.31.0 go.opentelemetry.io/otel/trace v1.31.0 - golang.org/x/sys v0.26.0 + golang.org/x/sys v0.28.0 google.golang.org/grpc v1.67.1 gopkg.in/yaml.v2 v2.4.0 ) @@ -33,6 +35,7 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect @@ -41,6 +44,7 @@ require ( github.com/golang/protobuf v1.5.4 // indirect github.com/google/btree v1.1.2 // indirect github.com/google/go-cmp v0.6.0 // indirect + github.com/google/pprof v0.0.0-20240525223248-4bfdf5a9a2af // indirect github.com/google/renameio v1.0.1 // indirect github.com/google/uuid v1.6.0 // indirect github.com/gorilla/websocket v1.5.1 // indirect @@ -53,6 +57,8 @@ require ( github.com/json-iterator/go v1.1.12 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/onsi/ginkgo/v2 v2.19.0 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_model v0.6.0 // indirect github.com/prometheus/common v0.50.0 // indirect github.com/prometheus/procfs v0.13.0 // indirect @@ -69,11 +75,12 @@ require ( go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect - golang.org/x/crypto v0.28.0 // indirect - golang.org/x/net v0.30.0 // indirect - golang.org/x/sync v0.8.0 // indirect - golang.org/x/text v0.19.0 // indirect + golang.org/x/crypto v0.30.0 // indirect + golang.org/x/net v0.32.0 // indirect + golang.org/x/sync v0.10.0 // indirect + golang.org/x/text v0.21.0 // indirect golang.org/x/time v0.5.0 // indirect + golang.org/x/tools v0.28.0 // indirect google.golang.org/genproto v0.0.0-20240311173647-c811ad7063a7 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20241007155032-5fefd90f89a9 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9 // indirect diff --git a/go.sum b/go.sum index a2f10948..5a52c69f 100644 --- a/go.sum +++ b/go.sum @@ -24,8 +24,9 @@ github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8 github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -41,8 +42,8 @@ github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ4 github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= -github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= -github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= +github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= +github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= @@ -63,8 +64,8 @@ github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE8dj7HMvPfh66eeA2JYW7eFpSE= -github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/pprof v0.0.0-20240525223248-4bfdf5a9a2af h1:kmjWCqn2qkEml422C2Rrd27c3VGxi6a/6HNq8QmHRKM= +github.com/google/pprof v0.0.0-20240525223248-4bfdf5a9a2af/go.mod h1:K1liHPHnj73Fdn/EKuT8nrFqBihUSKXoLYU0BuatOYo= github.com/google/renameio v1.0.1 h1:Lh/jXZmvZxb0BBeSY5VKEfidcbcbenKjZFzM/q0fSeU= github.com/google/renameio v1.0.1/go.mod h1:t/HQoYBZSsWSNK35C6CO/TpPLDVWvxOHboWUAweKUpk= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -107,17 +108,18 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= -github.com/onsi/ginkgo/v2 v2.11.0 h1:WgqUCUt/lT6yXoQ8Wef0fsNn5cAuMK7+KT9UFRz2tcU= -github.com/onsi/ginkgo/v2 v2.11.0/go.mod h1:ZhrRA5XmEE3x3rhlzamx/JJvujdZoJ2uvgI7kR0iZvM= -github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI= -github.com/onsi/gomega v1.27.10/go.mod h1:RsS8tutOdbdgzbPtzzATp12yT7kM5I5aElG3evPbQ0M= +github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA= +github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To= +github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk= +github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/peterh/liner v1.2.2/go.mod h1:xFwJyiKIXJZUKItq5dGHZSTBRAuG/CpeNpWLyiNRNwI= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU= github.com/prometheus/client_golang v1.19.0/go.mod h1:ZRM9uEAypZakd+q/x7+gmsvXdURP+DABIEIjnmDdp+k= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= @@ -205,8 +207,8 @@ go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw= -golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= +golang.org/x/crypto v0.30.0 h1:RwoQn3GkWiMkzlX562cLB7OxWvjH1L8xutO2WoJcRoY= +golang.org/x/crypto v0.30.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= @@ -226,8 +228,8 @@ golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20211123203042-d83791d6bcd9/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= -golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= +golang.org/x/net v0.32.0 h1:ZqPmj8Kzc+Y6e0+skZsuACbx+wzMgo5MQsJh9Qd6aYI= +golang.org/x/net v0.32.0/go.mod h1:CwU0IoeOlnQQWJ6ioyFrfRuomB8GKF6KbYXZVyeXNfs= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -236,8 +238,9 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -251,14 +254,14 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211117180635-dee7805ff2e1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= -golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= -golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -270,8 +273,8 @@ golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg= -golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= +golang.org/x/tools v0.28.0 h1:WuB6qZ4RPCQo5aP3WdKZS7i595EdWqWR8vqJTlwTVK8= +golang.org/x/tools v0.28.0/go.mod h1:dcIOrVd3mfQKTgrDVQHqCPMWy6lnhfhtX3hLXYVLfRw= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/pkg/kine/endpoint/endpoint.go b/pkg/kine/endpoint/endpoint.go index 46880ca9..1048638a 100644 --- a/pkg/kine/endpoint/endpoint.go +++ b/pkg/kine/endpoint/endpoint.go @@ -6,6 +6,7 @@ import ( "net" "os" "strings" + "time" "github.com/canonical/k8s-dqlite/pkg/kine/drivers/dqlite" "github.com/canonical/k8s-dqlite/pkg/kine/drivers/generic" @@ -31,8 +32,8 @@ type Config struct { Listener string Endpoint string ConnectionPoolConfig generic.ConnectionPoolConfig - tls.Config + NotifyInterval time.Duration } type ETCDConfig struct { @@ -65,7 +66,7 @@ func Listen(ctx context.Context, config Config) (ETCDConfig, error) { listen = KineSocket } - b := server.New(backend) + b := server.New(backend, config.NotifyInterval) grpcServer := grpcServer(config) b.Register(grpcServer) @@ -130,7 +131,7 @@ func ListenAndReturnBackend(ctx context.Context, config Config) (ETCDConfig, ser listen = KineSocket } - b := server.New(backend) + b := server.New(backend, config.NotifyInterval) grpcServer := grpcServer(config) b.Register(grpcServer) diff --git a/pkg/kine/server/limited.go b/pkg/kine/server/limited.go index 83ea068b..8f8a6b15 100644 --- a/pkg/kine/server/limited.go +++ b/pkg/kine/server/limited.go @@ -3,12 +3,14 @@ package server import ( "context" "fmt" + "time" "go.etcd.io/etcd/api/v3/etcdserverpb" ) type LimitedServer struct { - backend Backend + backend Backend + notifyInterval time.Duration } func (l *LimitedServer) Range(ctx context.Context, r *etcdserverpb.RangeRequest) (*RangeResponse, error) { diff --git a/pkg/kine/server/maintenance.go b/pkg/kine/server/maintenance.go index bc05aa83..cc6a26b5 100644 --- a/pkg/kine/server/maintenance.go +++ b/pkg/kine/server/maintenance.go @@ -9,6 +9,10 @@ import ( var _ etcdserverpb.MaintenanceServer = (*KVServerBridge)(nil) +// The emulated etcd version is returned on a call to the status endpoint. The version 3.5.13, indicates support for the watch progress notifications. +// See: https://github.com/kubernetes/kubernetes/blob/beb696c2c9467dbc44cbaf35c5a4a3daf0321db3/staging/src/k8s.io/apiserver/pkg/storage/feature/feature_support_checker.go#L157 +const emulatedEtcdVersion = "3.5.13" + func (s *KVServerBridge) Alarm(context.Context, *etcdserverpb.AlarmRequest) (*etcdserverpb.AlarmResponse, error) { return nil, fmt.Errorf("alarm is not supported") } @@ -19,8 +23,9 @@ func (s *KVServerBridge) Status(ctx context.Context, r *etcdserverpb.StatusReque return nil, err } return &etcdserverpb.StatusResponse{ - Header: &etcdserverpb.ResponseHeader{}, - DbSize: size, + Header: &etcdserverpb.ResponseHeader{}, + DbSize: size, + Version: emulatedEtcdVersion, }, nil } diff --git a/pkg/kine/server/server.go b/pkg/kine/server/server.go index e204e952..e2eed0a6 100644 --- a/pkg/kine/server/server.go +++ b/pkg/kine/server/server.go @@ -3,6 +3,7 @@ package server import ( "context" "fmt" + "time" "github.com/sirupsen/logrus" "go.etcd.io/etcd/api/v3/etcdserverpb" @@ -21,10 +22,11 @@ type KVServerBridge struct { limited *LimitedServer } -func New(backend Backend) *KVServerBridge { +func New(backend Backend, notifyInterval time.Duration) *KVServerBridge { return &KVServerBridge{ limited: &LimitedServer{ - backend: backend, + backend: backend, + notifyInterval: notifyInterval, }, } } diff --git a/pkg/kine/server/types.go b/pkg/kine/server/types.go index a437f0e0..dd9fc883 100644 --- a/pkg/kine/server/types.go +++ b/pkg/kine/server/types.go @@ -7,8 +7,9 @@ import ( ) var ( - ErrKeyExists = rpctypes.ErrGRPCDuplicateKey - ErrCompacted = rpctypes.ErrGRPCCompacted + ErrKeyExists = rpctypes.ErrGRPCDuplicateKey + ErrCompacted = rpctypes.ErrGRPCCompacted + ErrGRPCUnhealthy = rpctypes.ErrGRPCUnhealthy ) type Backend interface { @@ -21,6 +22,8 @@ type Backend interface { Update(ctx context.Context, key string, value []byte, revision, lease int64) (int64, bool, error) Watch(ctx context.Context, key string, revision int64) (<-chan []*Event, error) DbSize(ctx context.Context) (int64, error) + CurrentRevision(ctx context.Context) (int64, error) + GetCompactRevision(ctx context.Context) (int64, int64, error) DoCompact(ctx context.Context) error Close() error } diff --git a/pkg/kine/server/watch.go b/pkg/kine/server/watch.go index b78f9e8d..e7074b70 100644 --- a/pkg/kine/server/watch.go +++ b/pkg/kine/server/watch.go @@ -4,49 +4,86 @@ import ( "context" "sync" "sync/atomic" + "time" "github.com/sirupsen/logrus" "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/mvccpb" + clientv3 "go.etcd.io/etcd/client/v3" ) var ( watchID int64 ) +// explicit interface check +var _ etcdserverpb.WatchServer = (*KVServerBridge)(nil) + func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error { w := watcher{ - server: ws, - backend: s.limited.backend, - watches: map[int64]func(){}, + server: ws, + backend: s.limited.backend, + watches: map[int64]func(){}, + progress: map[int64]chan<- int64{}, } defer w.Close() + w.pollProgressNotify(ws.Context(), s.limited.notifyInterval) + for { msg, err := ws.Recv() if err != nil { return err } - if msg.GetCreateRequest() != nil { - w.Start(ws.Context(), msg.GetCreateRequest()) - } else if msg.GetCancelRequest() != nil { - logrus.Debugf("WATCH CANCEL REQ id=%d", msg.GetCancelRequest().GetWatchId()) - w.Cancel(msg.GetCancelRequest().WatchId, nil) + if cr := msg.GetCreateRequest(); cr != nil { + w.Start(ws.Context(), cr) + } + if cr := msg.GetCancelRequest(); cr != nil { + logrus.Tracef("WATCH CANCEL REQ id=%d", cr.WatchId) + w.Cancel(cr.WatchId, nil) + } + if pr := msg.GetProgressRequest(); pr != nil { + w.Progress(ws.Context()) } } } +// pollProgressNotify periodically sends progress notifications to all watchers. +func (w *watcher) pollProgressNotify(ctx context.Context, interval time.Duration) { + go func() { + tick := time.NewTicker(interval) + defer tick.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-tick.C: + if err := w.ProgressIfSynced(ctx); err != nil { + logrus.Errorf("Failed to send progress notification: %v", err) + } + } + } + }() +} + type watcher struct { sync.Mutex - wg sync.WaitGroup - backend Backend - server etcdserverpb.Watch_WatchServer - watches map[int64]func() + wg sync.WaitGroup + backend Backend + server etcdserverpb.Watch_WatchServer + watches map[int64]func() + progress map[int64]chan<- int64 } func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) { + if r.WatchId != clientv3.AutoWatchID { + logrus.Errorf("WATCH START id=%d ignoring request with client-provided id", r.WatchId) + return + } + w.Lock() defer w.Unlock() @@ -57,8 +94,15 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) w.wg.Add(1) key := string(r.Key) + startRevision := r.StartRevision + + var progressCh chan int64 + if r.ProgressNotify { + progressCh = make(chan int64) + w.progress[id] = progressCh + } - logrus.Debugf("WATCH START id=%d, count=%d, key=%s, revision=%d", id, len(w.watches), key, r.StartRevision) + logrus.Tracef("WATCH START id=%d, key=%s, revision=%d, progressNotify=%v, watchCount=%d", id, key, startRevision, r.ProgressNotify, len(w.watches)) go func() { defer w.wg.Done() @@ -71,32 +115,69 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) return } - watchCh, err := w.backend.Watch(ctx, key, r.StartRevision) + watchCh, err := w.backend.Watch(ctx, key, startRevision) if err != nil { + logrus.Errorf("Failed to start watch: %v", err) w.Cancel(id, err) return } - for events := range watchCh { - if len(events) == 0 { - continue + + trace := logrus.IsLevelEnabled(logrus.TraceLevel) + outer := true + for outer { + var reads int + var events []*Event + var revision int64 + + // Wait for events or progress notifications + select { + case events = <-watchCh: + // We received events; batch any additional queued events + reads++ + inner := true + for inner { + select { + case e, ok := <-watchCh: + reads++ + events = append(events, e...) + if !ok { + // channel is closed, break out of both loops + inner = false + outer = false + } + default: + // No more events in the queue, we can exit the inner loop + inner = false + } + } + case revision = <-progressCh: + // Received progress update without events } - if logrus.IsLevelEnabled(logrus.DebugLevel) { - for _, event := range events { - logrus.Debugf("WATCH READ id=%d, key=%s, revision=%d", id, event.KV.Key, event.KV.ModRevision) + // Determine the highest revision among the collected events + if len(events) > 0 { + revision = events[len(events)-1].KV.ModRevision + if trace { + for _, event := range events { + logrus.Tracef("WATCH READ id=%d, key=%s, revision=%d", id, event.KV.Key, event.KV.ModRevision) + } } } - if err := w.server.Send(&etcdserverpb.WatchResponse{ - Header: txnHeader(events[len(events)-1].KV.ModRevision), - WatchId: id, - Events: toEvents(events...), - }); err != nil { - w.Cancel(id, err) - continue + // Send response, even if this is a progress-only response and no events occured + if revision >= startRevision { + wr := &etcdserverpb.WatchResponse{ + Header: txnHeader(revision), + WatchId: id, + Events: toEvents(events...), + } + logrus.Tracef("WATCH SEND id=%d, key=%s, revision=%d, events=%d, size=%d, reads=%d", id, key, revision, len(wr.Events), wr.Size(), reads) + if err := w.server.Send(wr); err != nil { + w.Cancel(id, err) + } } } - w.Cancel(id, nil) + logrus.Debugf("WATCH CLOSE id=%d, key=%s", id, key) }() } @@ -126,33 +207,109 @@ func toEvent(event *Event) *mvccpb.Event { func (w *watcher) Cancel(watchID int64, err error) { w.Lock() + if progressCh, ok := w.progress[watchID]; ok { + close(progressCh) + delete(w.progress, watchID) + } if cancel, ok := w.watches[watchID]; ok { cancel() delete(w.watches, watchID) } w.Unlock() + revision := int64(0) + compactRev := int64(0) reason := "" if err != nil { reason = err.Error() + if err == ErrCompacted { + // the requested start revision is compacted. Pass the current and and compact + // revision to the client via the cancel response, along with the correct error message. + compactRev, revision, err = w.backend.GetCompactRevision(w.server.Context()) + if err != nil { + logrus.Errorf("Failed to get compact and current revision for cancel response: %v", err) + } + } } - logrus.Debugf("WATCH CANCEL id=%d reason=%s", watchID, reason) + logrus.Tracef("WATCH CANCEL id=%d, reason=%s, compactRev=%d", watchID, reason, compactRev) + serr := w.server.Send(&etcdserverpb.WatchResponse{ - Header: &etcdserverpb.ResponseHeader{}, - Canceled: true, - CancelReason: "watch closed", - WatchId: watchID, + Header: txnHeader(revision), + Canceled: true, + CancelReason: reason, + WatchId: watchID, + CompactRevision: compactRev, }) - if serr != nil && err != nil { + if serr != nil && err != nil && !clientv3.IsConnCanceled(serr) { logrus.Errorf("WATCH Failed to send cancel response for watchID %d: %v", watchID, serr) } } func (w *watcher) Close() { + logrus.Tracef("WATCH SERVER CLOSE") w.Lock() - for _, v := range w.watches { - v() + for id, progressCh := range w.progress { + close(progressCh) + delete(w.progress, id) + } + for id, cancel := range w.watches { + cancel() + delete(w.watches, id) } w.Unlock() w.wg.Wait() } + +// Progress sends a progress report if all watchers are synced. +// Ref: https://github.com/etcd-io/etcd/blob/v3.5.11/server/mvcc/watchable_store.go#L500-L504 +func (w *watcher) Progress(ctx context.Context) { + w.Lock() + defer w.Unlock() + + logrus.Tracef("WATCH REQUEST PROGRESS") + + // All synced watchers will be blocked in the outer loop and able to receive on the progress channel. + // If any cannot be sent to, then it is not synced and has pending events to be sent. + // Send revision 0, as we don't actually want the watchers to send a progress response if they do receive. + for id, progressCh := range w.progress { + select { + case progressCh <- 0: + default: + logrus.Tracef("WATCH SEND PROGRESS FAILED NOT SYNCED id=%d ", id) + return + } + } + + // If all watchers are synced, send a broadcast progress notification with the latest revision. + id := int64(clientv3.InvalidWatchID) + rev, err := w.backend.CurrentRevision(ctx) + if err != nil { + logrus.Errorf("Failed to get current revision for ProgressNotify: %v", err) + return + } + + logrus.Tracef("WATCH SEND PROGRESS id=%d, revision=%d", id, rev) + go w.server.Send(&etcdserverpb.WatchResponse{Header: txnHeader(rev), WatchId: id}) +} + +// ProgressIfSynced sends a progress report on any channels that are synced and blocked on the outer loop +func (w *watcher) ProgressIfSynced(ctx context.Context) error { + logrus.Tracef("WATCH PROGRESS TICK") + revision, err := w.backend.CurrentRevision(ctx) + if err != nil { + logrus.Errorf("Failed to get current revision for ProgressNotify: %v", err) + return err + } + + w.Lock() + defer w.Unlock() + + // Send revision to all synced channels + for _, progressCh := range w.progress { + select { + case progressCh <- revision: + default: + } + } + return nil +} diff --git a/pkg/kine/sqllog/sqllog.go b/pkg/kine/sqllog/sqllog.go index 9cb9b8b7..b4efa9ce 100644 --- a/pkg/kine/sqllog/sqllog.go +++ b/pkg/kine/sqllog/sqllog.go @@ -22,6 +22,7 @@ const ( otelName = "sqllog" SupersededCount = 100 compactBatchSize = 1000 + pollBatchSize = 500 ) var ( @@ -216,6 +217,14 @@ func (s *SQLLog) DoCompact(ctx context.Context) (err error) { return nil } +func (s *SQLLog) CurrentRevision(ctx context.Context) (int64, error) { + return s.d.CurrentRevision(ctx) +} + +func (s *SQLLog) GetCompactRevision(ctx context.Context) (int64, int64, error) { + return s.d.GetCompactRevision(ctx) +} + func (s *SQLLog) After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error) { var err error ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.After", otelName)) @@ -362,8 +371,11 @@ func (s *SQLLog) Watch(ctx context.Context, key string, startRevision int64) (<- attribute.Int64("startRevision", startRevision), ) + // starting watching right away so we don't miss anything + ctx, cancel := context.WithCancel(ctx) values, err := s.broadcaster.Subscribe(ctx) if err != nil { + cancel() return nil, err } @@ -373,7 +385,16 @@ func (s *SQLLog) Watch(ctx context.Context, key string, startRevision int64) (<- initialRevision, initialEvents, err := s.After(ctx, key, startRevision, 0) if err != nil { - span.RecordError(err) + if !errors.Is(err, context.Canceled) { + span.RecordError(err) + logrus.Errorf("Failed to list %s for revision %d: %v", key, startRevision, err) + // We return an error message that the api-server understands: server.ErrGRPCUnhealthy + if err != server.ErrCompacted { + err = server.ErrGRPCUnhealthy + } + } + // Cancel the watcher by cancelling the context of its subscription to the broadcaster + cancel() return nil, err } @@ -387,8 +408,10 @@ func (s *SQLLog) Watch(ctx context.Context, key string, startRevision int64) (<- defer func() { close(res) s.wg.Done() + cancel() }() + // Filter for events that update/create/delete the given key for events := range values { filtered := filterEvents(events, key, initialRevision) if len(filtered) > 0 { @@ -396,6 +419,7 @@ func (s *SQLLog) Watch(ctx context.Context, key string, startRevision int64) (<- } } }() + return res, nil } @@ -484,7 +508,7 @@ func (s *SQLLog) poll(ctx context.Context, result chan []*server.Event, pollStar watchCtx, cancel := context.WithTimeout(ctx, s.d.GetWatchQueryTimeout()) defer cancel() - rows, err := s.d.After(watchCtx, last, 500) + rows, err := s.d.After(watchCtx, last, pollBatchSize) if err != nil { if !errors.Is(err, context.DeadlineExceeded) { logrus.Errorf("fail to list latest changes: %v", err) @@ -515,6 +539,7 @@ func (s *SQLLog) poll(ctx context.Context, result chan []*server.Event, pollStar // Ensure that we are notifying events in a sequential fashion. For example if we find row 4 before 3 // we don't want to notify row 4 because 3 is essentially dropped forever. if event.KV.ModRevision != next { + logrus.Tracef("MODREVISION GAP: expected %v, got %v", next, event.KV.ModRevision) if canSkipRevision(next, skip, skipTime) { // This situation should never happen, but we have it here as a fallback just for unknown reasons // we don't want to pause all watches forever diff --git a/pkg/server/server.go b/pkg/server/server.go index 21c2f89f..79c58d81 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -69,6 +69,8 @@ func New( lowAvailableStorageAction string, connectionPoolConfig generic.ConnectionPoolConfig, watchQueryTimeout time.Duration, + watchProgressNotifyInterval time.Duration, + ) (*Server, error) { var ( options []app.Option @@ -222,6 +224,8 @@ func New( } // set datastore connection pool options kineConfig.ConnectionPoolConfig = connectionPoolConfig + // set watch progress notify interval + kineConfig.NotifyInterval = watchProgressNotifyInterval // handle tuning parameters if exists, err := fileExists(dir, "tuning.yaml"); err != nil { return nil, fmt.Errorf("failed to check for tuning.yaml: %w", err) diff --git a/test/util_test.go b/test/util_test.go index 6d0d7c1c..985a4815 100644 --- a/test/util_test.go +++ b/test/util_test.go @@ -138,8 +138,9 @@ func startSqlite(_ context.Context, tb testing.TB, dir string) (*endpoint.Config } return &endpoint.Config{ - Listener: fmt.Sprintf("unix://%s/kine.sock", dir), - Endpoint: fmt.Sprintf("sqlite://%s", dbPath), + Listener: fmt.Sprintf("unix://%s/kine.sock", dir), + Endpoint: fmt.Sprintf("sqlite://%s", dbPath), + NotifyInterval: 5 * time.Second, }, db } @@ -164,8 +165,9 @@ func startDqlite(ctx context.Context, tb testing.TB, dir string, listener *instr } return &endpoint.Config{ - Listener: fmt.Sprintf("unix://%s/kine.sock", dir), - Endpoint: fmt.Sprintf("dqlite://k8s?driver-name=%s", app.Driver()), + Listener: fmt.Sprintf("unix://%s/kine.sock", dir), + Endpoint: fmt.Sprintf("dqlite://k8s?driver-name=%s", app.Driver()), + NotifyInterval: 5 * time.Second, }, db } diff --git a/test/watch_test.go b/test/watch_test.go index 860ce37b..09981573 100644 --- a/test/watch_test.go +++ b/test/watch_test.go @@ -17,6 +17,9 @@ func TestWatch(t *testing.T) { // pollTimeout is the timeout for waiting to receive an event. pollTimeout = 50 * time.Millisecond + // progressNotifyTimeout is the timeout for waiting to receive a progress notify. + progressNotifyTimeout = 1 * time.Second + // idleTimeout is the amount of time to wait to ensure that no events // are received when they should not. idleTimeout = 100 * time.Millisecond @@ -113,6 +116,16 @@ func TestWatch(t *testing.T) { g.Consistently(watchCh, idleTimeout).ShouldNot(Receive()) }) + t.Run("ProgressNotify", func(t *testing.T) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + g := NewWithT(t) + err := kine.client.RequestProgress(ctx) + g.Expect(err).NotTo(HaveOccurred()) + + g.Eventually(watchCh, progressNotifyTimeout).Should(ReceiveProgressNotify(g)) + }) + }) } } @@ -131,6 +144,12 @@ func ReceiveEvents(g Gomega, checks ...EventMatcher) types.GomegaMatcher { })) } +func ReceiveProgressNotify(g Gomega) types.GomegaMatcher { + return Receive(Satisfy(func(watch clientv3.WatchResponse) bool { + return watch.IsProgressNotify() + })) +} + func CreateEvent(g Gomega, key, value string, revision int64) EventMatcher { return func(event *clientv3.Event) bool { ok := g.Expect(event.Type).To(Equal(clientv3.EventTypePut))