diff --git a/go.mod b/go.mod index 315f5853..67faf97f 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,8 @@ module github.com/canonical/k8s-dqlite -go 1.22 +go 1.22.0 + +toolchain go1.22.10 require ( github.com/canonical/go-dqlite/v2 v2.0.0 @@ -21,9 +23,11 @@ require ( go.opentelemetry.io/otel/sdk v1.31.0 go.opentelemetry.io/otel/sdk/metric v1.31.0 go.opentelemetry.io/otel/trace v1.31.0 + golang.org/x/exp v0.0.0-20241204233417-43b7b7cde48d golang.org/x/sys v0.26.0 google.golang.org/grpc v1.67.1 gopkg.in/yaml.v2 v2.4.0 + k8s.io/apimachinery v0.31.3 ) require ( @@ -71,7 +75,7 @@ require ( go.uber.org/zap v1.27.0 // indirect golang.org/x/crypto v0.28.0 // indirect golang.org/x/net v0.30.0 // indirect - golang.org/x/sync v0.8.0 // indirect + golang.org/x/sync v0.10.0 // indirect golang.org/x/text v0.19.0 // indirect golang.org/x/time v0.5.0 // indirect google.golang.org/genproto v0.0.0-20240311173647-c811ad7063a7 // indirect @@ -80,5 +84,7 @@ require ( google.golang.org/protobuf v1.35.1 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + k8s.io/klog/v2 v2.130.1 // indirect + k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect sigs.k8s.io/yaml v1.4.0 // indirect ) diff --git a/go.sum b/go.sum index a2f10948..00424172 100644 --- a/go.sum +++ b/go.sum @@ -24,8 +24,9 @@ github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8 github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -42,7 +43,8 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= -github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= +github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= +github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= @@ -63,8 +65,8 @@ github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE8dj7HMvPfh66eeA2JYW7eFpSE= -github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/pprof v0.0.0-20240525223248-4bfdf5a9a2af h1:kmjWCqn2qkEml422C2Rrd27c3VGxi6a/6HNq8QmHRKM= +github.com/google/pprof v0.0.0-20240525223248-4bfdf5a9a2af/go.mod h1:K1liHPHnj73Fdn/EKuT8nrFqBihUSKXoLYU0BuatOYo= github.com/google/renameio v1.0.1 h1:Lh/jXZmvZxb0BBeSY5VKEfidcbcbenKjZFzM/q0fSeU= github.com/google/renameio v1.0.1/go.mod h1:t/HQoYBZSsWSNK35C6CO/TpPLDVWvxOHboWUAweKUpk= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -107,8 +109,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= -github.com/onsi/ginkgo/v2 v2.11.0 h1:WgqUCUt/lT6yXoQ8Wef0fsNn5cAuMK7+KT9UFRz2tcU= -github.com/onsi/ginkgo/v2 v2.11.0/go.mod h1:ZhrRA5XmEE3x3rhlzamx/JJvujdZoJ2uvgI7kR0iZvM= +github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA= +github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To= github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI= github.com/onsi/gomega v1.27.10/go.mod h1:RsS8tutOdbdgzbPtzzATp12yT7kM5I5aElG3evPbQ0M= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= @@ -116,8 +118,9 @@ github.com/peterh/liner v1.2.2/go.mod h1:xFwJyiKIXJZUKItq5dGHZSTBRAuG/CpeNpWLyiN github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU= github.com/prometheus/client_golang v1.19.0/go.mod h1:ZRM9uEAypZakd+q/x7+gmsvXdURP+DABIEIjnmDdp+k= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= @@ -208,6 +211,8 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw= golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20241204233417-43b7b7cde48d h1:0olWaB5pg3+oychR51GUVCEsGkeCU/2JxjBgIo4f3M0= +golang.org/x/exp v0.0.0-20241204233417-43b7b7cde48d/go.mod h1:qj5a5QZpwLU2NLQudwIN5koi3beDhSAlJwa67PuM98c= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= @@ -236,8 +241,9 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -270,8 +276,8 @@ golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg= -golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= +golang.org/x/tools v0.28.0 h1:WuB6qZ4RPCQo5aP3WdKZS7i595EdWqWR8vqJTlwTVK8= +golang.org/x/tools v0.28.0/go.mod h1:dcIOrVd3mfQKTgrDVQHqCPMWy6lnhfhtX3hLXYVLfRw= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -315,5 +321,11 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +k8s.io/apimachinery v0.31.3 h1:6l0WhcYgasZ/wk9ktLq5vLaoXJJr5ts6lkaQzgeYPq4= +k8s.io/apimachinery v0.31.3/go.mod h1:rsPdaZJfTfLsNJSQzNHQvYoTmxhoOEofxtOsF3rtsMo= +k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= +k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= +k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 h1:pUdcCO1Lk/tbT5ztQWOBi5HBgbBP1J8+AsQnQCKsi8A= +k8s.io/utils v0.0.0-20240711033017-18e509b52bc8/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= diff --git a/pkg/kine/server/types.go b/pkg/kine/server/types.go index a437f0e0..23e7d26e 100644 --- a/pkg/kine/server/types.go +++ b/pkg/kine/server/types.go @@ -19,8 +19,9 @@ type Backend interface { List(ctx context.Context, prefix, startKey string, limit, revision int64) (int64, []*KeyValue, error) Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error) Update(ctx context.Context, key string, value []byte, revision, lease int64) (int64, bool, error) - Watch(ctx context.Context, key string, revision int64) (<-chan []*Event, error) + Watch(ctx context.Context, key string, revision int64) (WatchResult, error) DbSize(ctx context.Context) (int64, error) + CurrentRevision(ctx context.Context) (int64, error) DoCompact(ctx context.Context) error Close() error } @@ -39,3 +40,9 @@ type Event struct { KV *KeyValue PrevKV *KeyValue } + +type WatchResult struct { + CurrentRevision int64 + CompactRevision int64 + Events <-chan []*Event +} diff --git a/pkg/kine/server/watch.go b/pkg/kine/server/watch.go index b78f9e8d..0e8189e5 100644 --- a/pkg/kine/server/watch.go +++ b/pkg/kine/server/watch.go @@ -4,49 +4,78 @@ import ( "context" "sync" "sync/atomic" + "time" "github.com/sirupsen/logrus" "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/mvccpb" + clientv3 "go.etcd.io/etcd/client/v3" + "golang.org/x/exp/rand" + "k8s.io/apimachinery/pkg/util/wait" ) var ( watchID int64 ) +// explicit interface check +var _ etcdserverpb.WatchServer = (*KVServerBridge)(nil) + +// getProgressReportInterval returns the configured progress report interval, with some jitter +func (s *KVServerBridge) getProgressReportInterval() time.Duration { + // add rand(1/10*notifyInterval) as jitter so that kine will not + // send progress notifications to watchers at the same time even when watchers + // are created at the same time. + jitter := time.Duration(rand.Int63n(int64(s.limited.notifyInterval) / 10)) + return s.limited.notifyInterval + jitter +} + func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error { w := watcher{ - server: ws, - backend: s.limited.backend, - watches: map[int64]func(){}, + server: ws, + backend: s.limited.backend, + watches: map[int64]func(){}, + progress: map[int64]chan<- int64{}, } defer w.Close() + go wait.PollInfiniteWithContext(ws.Context(), s.getProgressReportInterval(), w.ProgressIfSynced) + for { msg, err := ws.Recv() if err != nil { return err } - if msg.GetCreateRequest() != nil { - w.Start(ws.Context(), msg.GetCreateRequest()) - } else if msg.GetCancelRequest() != nil { - logrus.Debugf("WATCH CANCEL REQ id=%d", msg.GetCancelRequest().GetWatchId()) - w.Cancel(msg.GetCancelRequest().WatchId, nil) + if cr := msg.GetCreateRequest(); cr != nil { + w.Start(ws.Context(), cr) + } + if cr := msg.GetCancelRequest(); cr != nil { + logrus.Tracef("WATCH CANCEL REQ id=%d", cr.WatchId) + w.Cancel(cr.WatchId, 0, 0, nil) + } + if pr := msg.GetProgressRequest(); pr != nil { + w.Progress(ws.Context()) } } } type watcher struct { - sync.Mutex + sync.RWMutex - wg sync.WaitGroup - backend Backend - server etcdserverpb.Watch_WatchServer - watches map[int64]func() + wg sync.WaitGroup + backend Backend + server etcdserverpb.Watch_WatchServer + watches map[int64]func() + progress map[int64]chan<- int64 } func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) { + if r.WatchId != clientv3.AutoWatchID { + logrus.Warnf("WATCH START id=%d ignoring request with client-provided id", r.WatchId) + return + } + w.Lock() defer w.Unlock() @@ -57,8 +86,15 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) w.wg.Add(1) key := string(r.Key) + startRevision := r.StartRevision + + var progressCh chan int64 + if r.ProgressNotify { + progressCh = make(chan int64) + w.progress[id] = progressCh + } - logrus.Debugf("WATCH START id=%d, count=%d, key=%s, revision=%d", id, len(w.watches), key, r.StartRevision) + logrus.Tracef("WATCH START id=%d, key=%s, revision=%d, progressNotify=%v, watchCount=%d", id, key, startRevision, r.ProgressNotify, len(w.watches)) go func() { defer w.wg.Done() @@ -67,36 +103,74 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) Created: true, WatchId: id, }); err != nil { - w.Cancel(id, err) + w.Cancel(id, 0, 0, err) return } - watchCh, err := w.backend.Watch(ctx, key, r.StartRevision) + wr, err := w.backend.Watch(ctx, key, startRevision) + // If the watch result has a non-zero CompactRevision, then the watch request failed due to + // the requested start revision having been compacted. Pass the current and and compact + // revision to the client via the cancel response, along with the correct error message. if err != nil { - w.Cancel(id, err) + w.Cancel(id, 0, 0, err) //TODO: need to return currrev and compact rev return } - for events := range watchCh { - if len(events) == 0 { - continue + + trace := logrus.IsLevelEnabled(logrus.TraceLevel) + outer := true + for outer { + var reads int + var events []*Event + var revision int64 + + // Block on initial read from events or progress channel + select { + case events = <-wr.Events: + // got events; read additional queued events from the channel and add to batch + reads++ + inner := true + for inner { + select { + case e, ok := <-wr.Events: + reads++ + events = append(events, e...) + if !ok { + // channel was closed, break out of both loops + inner = false + outer = false + } + default: + inner = false + } + } + case revision = <-progressCh: + // have been requested to send progress with no events } - if logrus.IsLevelEnabled(logrus.DebugLevel) { - for _, event := range events { - logrus.Debugf("WATCH READ id=%d, key=%s, revision=%d", id, event.KV.Key, event.KV.ModRevision) + // get max revision from collected events + if len(events) > 0 { + revision = events[len(events)-1].KV.ModRevision + if trace { + for _, event := range events { + logrus.Tracef("WATCH READ id=%d, key=%s, revision=%d", id, event.KV.Key, event.KV.ModRevision) + } } } - if err := w.server.Send(&etcdserverpb.WatchResponse{ - Header: txnHeader(events[len(events)-1].KV.ModRevision), - WatchId: id, - Events: toEvents(events...), - }); err != nil { - w.Cancel(id, err) - continue + // send response. note that there are no events if this is a progress response. + if revision >= startRevision { + wr := &etcdserverpb.WatchResponse{ + Header: txnHeader(revision), + WatchId: id, + Events: toEvents(events...), + } + logrus.Tracef("WATCH SEND id=%d, key=%s, revision=%d, events=%d, size=%d, reads=%d", id, key, revision, len(wr.Events), wr.Size(), reads) + if err := w.server.Send(wr); err != nil { + w.Cancel(id, 0, 0, err) + } } } - w.Cancel(id, nil) + w.Cancel(id, 0, 0, nil) logrus.Debugf("WATCH CLOSE id=%d, key=%s", id, key) }() } @@ -124,8 +198,12 @@ func toEvent(event *Event) *mvccpb.Event { return e } -func (w *watcher) Cancel(watchID int64, err error) { +func (w *watcher) Cancel(watchID int64, revision, compactRev int64, err error) { w.Lock() + if progressCh, ok := w.progress[watchID]; ok { + close(progressCh) + delete(w.progress, watchID) + } if cancel, ok := w.watches[watchID]; ok { cancel() delete(w.watches, watchID) @@ -136,23 +214,85 @@ func (w *watcher) Cancel(watchID int64, err error) { if err != nil { reason = err.Error() } - logrus.Debugf("WATCH CANCEL id=%d reason=%s", watchID, reason) + logrus.Tracef("WATCH CANCEL id=%d, reason=%s, compactRev=%d", watchID, reason, compactRev) + serr := w.server.Send(&etcdserverpb.WatchResponse{ - Header: &etcdserverpb.ResponseHeader{}, - Canceled: true, - CancelReason: "watch closed", - WatchId: watchID, + Header: txnHeader(revision), + Canceled: err != nil, + CancelReason: reason, + WatchId: watchID, + CompactRevision: compactRev, }) - if serr != nil && err != nil { + if serr != nil && err != nil && !clientv3.IsConnCanceled(serr) { logrus.Errorf("WATCH Failed to send cancel response for watchID %d: %v", watchID, serr) } } func (w *watcher) Close() { + logrus.Tracef("WATCH SERVER CLOSE") w.Lock() - for _, v := range w.watches { - v() + for id, progressCh := range w.progress { + close(progressCh) + delete(w.progress, id) + } + for id, cancel := range w.watches { + cancel() + delete(w.watches, id) } w.Unlock() w.wg.Wait() } + +// Progress sends a progress report if all watchers are synced. +// Ref: https://github.com/etcd-io/etcd/blob/v3.5.11/server/mvcc/watchable_store.go#L500-L504 +func (w *watcher) Progress(ctx context.Context) { + w.RLock() + defer w.RUnlock() + + logrus.Tracef("WATCH REQUEST PROGRESS") + + // All synced watchers will be blocked in the outer loop and able to receive on the progress channel. + // If any cannot be sent to, then it is not synced and has pending events to be sent. + // Send revision 0, as we don't actually want the watchers to send a progress response if they do receive. + for id, progressCh := range w.progress { + select { + case progressCh <- 0: + default: + logrus.Tracef("WATCH SEND PROGRESS FAILED NOT SYNCED id=%d ", id) + return + } + } + + // If all watchers are synced, send a broadcast progress notification with the latest revision. + id := int64(clientv3.InvalidWatchID) + rev, err := w.backend.CurrentRevision(ctx) + if err != nil { + logrus.Errorf("Failed to get current revision for ProgressNotify: %v", err) + return + } + + logrus.Tracef("WATCH SEND PROGRESS id=%d, revision=%d", id, rev) + go w.server.Send(&etcdserverpb.WatchResponse{Header: txnHeader(rev), WatchId: id}) +} + +// ProgressIfSynced sends a progress report on any channels that are synced and blocked on the outer loop +func (w *watcher) ProgressIfSynced(ctx context.Context) (bool, error) { + logrus.Tracef("WATCH PROGRESS TICK") + revision, err := w.backend.CurrentRevision(ctx) + if err != nil { + logrus.Errorf("Failed to get current revision for ProgressNotify: %v", err) + return false, nil + } + + w.RLock() + defer w.RUnlock() + + // Send revision to all synced channels + for _, progressCh := range w.progress { + select { + case progressCh <- revision: + default: + } + } + return false, nil +} diff --git a/pkg/kine/sqllog/sqllog.go b/pkg/kine/sqllog/sqllog.go index 9cb9b8b7..7d2aa04a 100644 --- a/pkg/kine/sqllog/sqllog.go +++ b/pkg/kine/sqllog/sqllog.go @@ -22,6 +22,7 @@ const ( otelName = "sqllog" SupersededCount = 100 compactBatchSize = 1000 + pollBatchSize = 500 ) var ( @@ -71,6 +72,7 @@ type SQLLog struct { d Dialect broadcaster broadcaster.Broadcaster[[]*server.Event] notify chan int64 + currentRev int64 wg sync.WaitGroup } @@ -216,6 +218,13 @@ func (s *SQLLog) DoCompact(ctx context.Context) (err error) { return nil } +func (s *SQLLog) CurrentRevision(ctx context.Context) (int64, error) { + if s.currentRev != 0 { + return s.currentRev, nil + } + return s.d.CurrentRevision(ctx) +} + func (s *SQLLog) After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error) { var err error ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.After", otelName)) @@ -338,13 +347,13 @@ func (s *SQLLog) ttl(ctx context.Context) { go run(ctx, key, revision, time.Duration(lease)*time.Second) } - watchCh, err := s.Watch(ctx, "/", startRevision) + wr, err := s.Watch(ctx, "/", startRevision) if err != nil { logrus.Errorf("failed to watch events for ttl: %v", err) return } - for events := range watchCh { + for events := range wr.Events { for _, event := range events { if event.KV.Lease > 0 { go run(ctx, event.KV.Key, event.KV.ModRevision, time.Duration(event.KV.Lease)*time.Second) @@ -354,7 +363,7 @@ func (s *SQLLog) ttl(ctx context.Context) { }() } -func (s *SQLLog) Watch(ctx context.Context, key string, startRevision int64) (<-chan []*server.Event, error) { +func (s *SQLLog) Watch(ctx context.Context, key string, startRevision int64) (server.WatchResult, error) { ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.Watch", otelName)) defer span.End() span.SetAttributes( @@ -362,9 +371,12 @@ func (s *SQLLog) Watch(ctx context.Context, key string, startRevision int64) (<- attribute.Int64("startRevision", startRevision), ) + res := make(chan []*server.Event, 100) + wr := server.WatchResult{Events: res} + values, err := s.broadcaster.Subscribe(ctx) if err != nil { - return nil, err + return wr, err } if startRevision > 0 { @@ -374,10 +386,13 @@ func (s *SQLLog) Watch(ctx context.Context, key string, startRevision int64) (<- initialRevision, initialEvents, err := s.After(ctx, key, startRevision, 0) if err != nil { span.RecordError(err) - return nil, err + if err == server.ErrCompacted { + compact, _, _ := s.d.GetCompactRevision(ctx) + wr.CompactRevision = compact + wr.CurrentRevision = initialRevision + } } - res := make(chan []*server.Event, 100) if len(initialEvents) > 0 { res <- initialEvents } @@ -396,7 +411,8 @@ func (s *SQLLog) Watch(ctx context.Context, key string, startRevision int64) (<- } } }() - return res, nil + + return wr, nil } func filterEvents(events []*server.Event, key string, startRevision int64) []*server.Event { @@ -457,8 +473,8 @@ func (s *SQLLog) startWatch(ctx context.Context) (chan []*server.Event, error) { } func (s *SQLLog) poll(ctx context.Context, result chan []*server.Event, pollStart int64) { + s.currentRev = pollStart var ( - last = pollStart skip int64 skipTime time.Time waitForMore = true @@ -474,7 +490,7 @@ func (s *SQLLog) poll(ctx context.Context, result chan []*server.Event, pollStar case <-ctx.Done(): return case check := <-s.notify: - if check <= last { + if check <= s.currentRev { continue } case <-wait.C: @@ -484,7 +500,7 @@ func (s *SQLLog) poll(ctx context.Context, result chan []*server.Event, pollStar watchCtx, cancel := context.WithTimeout(ctx, s.d.GetWatchQueryTimeout()) defer cancel() - rows, err := s.d.After(watchCtx, last, 500) + rows, err := s.d.After(watchCtx, s.currentRev, pollBatchSize) if err != nil { if !errors.Is(err, context.DeadlineExceeded) { logrus.Errorf("fail to list latest changes: %v", err) @@ -504,7 +520,7 @@ func (s *SQLLog) poll(ctx context.Context, result chan []*server.Event, pollStar waitForMore = len(events) < 100 - rev := last + rev := s.currentRev var ( sequential []*server.Event saveLast bool @@ -515,6 +531,7 @@ func (s *SQLLog) poll(ctx context.Context, result chan []*server.Event, pollStar // Ensure that we are notifying events in a sequential fashion. For example if we find row 4 before 3 // we don't want to notify row 4 because 3 is essentially dropped forever. if event.KV.ModRevision != next { + logrus.Tracef("MODREVISION GAP: expected %v, got %v", next, event.KV.ModRevision) if canSkipRevision(next, skip, skipTime) { // This situation should never happen, but we have it here as a fallback just for unknown reasons // we don't want to pause all watches forever @@ -553,7 +570,7 @@ func (s *SQLLog) poll(ctx context.Context, result chan []*server.Event, pollStar } if saveLast { - last = rev + s.currentRev = rev if len(sequential) > 0 { result <- sequential } diff --git a/test/util_test.go b/test/util_test.go index 6d0d7c1c..985a4815 100644 --- a/test/util_test.go +++ b/test/util_test.go @@ -138,8 +138,9 @@ func startSqlite(_ context.Context, tb testing.TB, dir string) (*endpoint.Config } return &endpoint.Config{ - Listener: fmt.Sprintf("unix://%s/kine.sock", dir), - Endpoint: fmt.Sprintf("sqlite://%s", dbPath), + Listener: fmt.Sprintf("unix://%s/kine.sock", dir), + Endpoint: fmt.Sprintf("sqlite://%s", dbPath), + NotifyInterval: 5 * time.Second, }, db } @@ -164,8 +165,9 @@ func startDqlite(ctx context.Context, tb testing.TB, dir string, listener *instr } return &endpoint.Config{ - Listener: fmt.Sprintf("unix://%s/kine.sock", dir), - Endpoint: fmt.Sprintf("dqlite://k8s?driver-name=%s", app.Driver()), + Listener: fmt.Sprintf("unix://%s/kine.sock", dir), + Endpoint: fmt.Sprintf("dqlite://k8s?driver-name=%s", app.Driver()), + NotifyInterval: 5 * time.Second, }, db }