Skip to content

[WIP] hitless #3447

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 74 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
b02eed6
feat: add general push notification system
ndyakov Jun 26, 2025
1ff0ded
feat: enforce single handler per notification type
ndyakov Jun 26, 2025
e6e2cea
feat: remove global handlers and enable push notifications by default
ndyakov Jun 26, 2025
d7fbe18
feat: fix connection health check interference with push notifications
ndyakov Jun 26, 2025
1331fb9
fix: remove unused fields and ensure push notifications work in clone…
ndyakov Jun 26, 2025
4747610
test: add comprehensive unit tests for 100% coverage
ndyakov Jun 26, 2025
70231ae
refactor: simplify push notification interface
ndyakov Jun 26, 2025
958fb1a
fix: resolve data race in PushNotificationProcessor
ndyakov Jun 26, 2025
79f6df2
remove: push-notification-demo
ndyakov Jun 26, 2025
c33b157
feat: add protected handler support and rename command to pushNotific…
ndyakov Jun 26, 2025
fdfcf94
feat: add VoidPushNotificationProcessor for disabled push notifications
ndyakov Jun 26, 2025
be9b6dd
refactor: remove unnecessary enabled field and IsEnabled/SetEnabled m…
ndyakov Jun 26, 2025
8006fab
fix: ensure push notification processor is never nil in newConn
ndyakov Jun 26, 2025
d1d4529
fix: initialize push notification processor in SentinelClient
ndyakov Jun 26, 2025
a2de263
fix: copy push notification processor to transaction baseClient
ndyakov Jun 26, 2025
ad16b21
fix: initialize push notification processor in NewFailoverClient
ndyakov Jun 27, 2025
d3f6197
feat: add GetHandler method and improve push notification API encapsu…
ndyakov Jun 27, 2025
e6c5590
feat: enable real push notification processors for SentinelClient and…
ndyakov Jun 27, 2025
03bfd9f
feat: remove GetRegistry from PushNotificationProcessorInterface for …
ndyakov Jun 27, 2025
9a7a5c8
fix: add nil reader check in ProcessPendingNotifications to prevent p…
ndyakov Jun 27, 2025
ada72ce
refactor: move push notification logic to pusnotif package
ndyakov Jun 27, 2025
91805bc
refactor: remove handlerWrapper and use separate maps in registry
ndyakov Jun 27, 2025
e31987f
Fixes tests:
ndyakov Jun 27, 2025
075b930
fix: update coverage test to expect errors for disabled push notifica…
ndyakov Jun 27, 2025
f7948b5
fix: address pr review
ndyakov Jun 27, 2025
3473c1e
fix: simplify api
ndyakov Jun 27, 2025
d820ade
test: add comprehensive test coverage for pushnotif package
ndyakov Jun 27, 2025
b6e712b
feat: add proactive push notification processing to WithReader
ndyakov Jun 27, 2025
f66518c
feat: add pub/sub message filtering to push notification processor
ndyakov Jun 27, 2025
f4ff2d6
feat: expand notification filtering to include streams, keyspace, and…
ndyakov Jun 27, 2025
cb8a4e5
feat: process push notifications before returning connections from pool
ndyakov Jul 2, 2025
c44c8b5
fix: increase peek notification name bytes
ndyakov Jul 3, 2025
47dd490
feat: enhance push notification handlers with context information
ndyakov Jul 4, 2025
1606de8
feat: implement strongly typed HandlerContext interface
ndyakov Jul 4, 2025
d530d45
feat: implement strongly typed HandlerContext with concrete types in …
ndyakov Jul 4, 2025
5972b4c
refactor: move all push notification logic to root package and remove…
ndyakov Jul 4, 2025
ec4bf57
cleanup: remove redundant internal push notification packages
ndyakov Jul 4, 2025
b4d0ff1
refactor: organize push notification code into separate files
ndyakov Jul 4, 2025
84123b1
refactor(push): completly change the package structure
ndyakov Jul 4, 2025
d780401
refactor(push): simplify handler context
ndyakov Jul 5, 2025
604c8e3
fix(tests): debug logger
ndyakov Jul 5, 2025
b23f43c
fix(peek): non-blocking peek
ndyakov Jul 5, 2025
7a0f316
fix(tests): remove bench_decode tests
ndyakov Jul 5, 2025
225c0bf
fix(tests): add global ctx in tests
ndyakov Jul 5, 2025
2681d6d
Merge branch 'master' into ndyakov/CAE-1088-resp3-notification-handlers
ndyakov Jul 5, 2025
32bca83
fix(proto): fix notification parser
ndyakov Jul 16, 2025
4b45620
Merge branch 'master' into ndyakov/CAE-1088-resp3-notification-handlers
ndyakov Jul 16, 2025
8e17e62
fix(log): remove debug log
ndyakov Jul 16, 2025
52f2b2c
fix(push): fix error checks
ndyakov Jul 16, 2025
8418c6b
fix(push): fix error checks
ndyakov Jul 16, 2025
1d204c2
fix(pool): return connection in the pool
ndyakov Jul 16, 2025
be3a6c6
fix(push): address comments
ndyakov Jul 16, 2025
84f788e
fix(push): fix tests
ndyakov Jul 16, 2025
11ecbaf
fix(push): fix tests
ndyakov Jul 16, 2025
409dac1
fix(push): fix tests
ndyakov Jul 16, 2025
1e2df9f
fix(checkConn): try to peek into the connection instead of consuming
ndyakov Jul 18, 2025
4cd9853
fix(connCheck): don't block on peeking
ndyakov Jul 18, 2025
72c24d5
Merge branch 'master' into ndyakov/CAE-1088-resp3-notification-handlers
ndyakov Jul 22, 2025
af6a103
feat(push): reading optimization for Linux
ndyakov Jul 24, 2025
5d1d063
Merge branch 'master' into ndyakov/CAE-1088-resp3-notification-handlers
ndyakov Jul 24, 2025
25bde87
add comments
ndyakov Jul 24, 2025
0abd22b
wip
ndyakov Jul 25, 2025
f0b8ccb
add tests
ndyakov Jul 28, 2025
faf4ae1
optimizations
ndyakov Jul 28, 2025
7af418a
revert some optimizations
ndyakov Jul 28, 2025
c0baba6
fix race
ndyakov Jul 28, 2025
3570c1a
wrap atomic
ndyakov Jul 28, 2025
d79dcec
pubsub to work with new conn
ndyakov Jul 28, 2025
0507340
fix
ndyakov Jul 28, 2025
4a9b0f7
remove unused bench tests
ndyakov Jul 29, 2025
8ac1cb7
pubsub is not working well with pool
ndyakov Jul 29, 2025
58c9f55
wip
ndyakov Jul 29, 2025
e9f32f0
wip
ndyakov Jul 29, 2025
a39a23a
pubsub pool
ndyakov Jul 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@ coverage.txt
**/coverage.txt
.vscode
tmp/*

# Hitless upgrade documentation (temporary)
hitless/docs/
240 changes: 240 additions & 0 deletions adapters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
package redis

import (
"context"
"errors"
"fmt"
"net"
"time"

"github.com/redis/go-redis/v9/internal/interfaces"
"github.com/redis/go-redis/v9/internal/pool"
"github.com/redis/go-redis/v9/push"
)

// ErrInvalidCommand is returned when an invalid command is passed to ExecuteCommand.
var ErrInvalidCommand = errors.New("invalid command type")

// ErrInvalidPool is returned when the pool type is not supported.
var ErrInvalidPool = errors.New("invalid pool type")

// NewClientAdapter creates a new client adapter for regular Redis clients.
func NewClientAdapter(client *Client) interfaces.ClientInterface {
return &clientAdapter{client: client}
}

// NewClusterClientAdapter creates a new client adapter for cluster Redis clients.
func NewClusterClientAdapter(client interface{}) interfaces.ClientInterface {
return &clusterClientAdapter{client: client}
}

// clientAdapter adapts a Redis client to implement interfaces.ClientInterface.
type clientAdapter struct {
client *Client
}

// GetOptions returns the client options.
func (ca *clientAdapter) GetOptions() interfaces.OptionsInterface {
return &optionsAdapter{options: ca.client.opt}
}

// GetPushProcessor returns the client's push notification processor.
func (ca *clientAdapter) GetPushProcessor() interfaces.NotificationProcessor {
return &pushProcessorAdapter{processor: ca.client.pushProcessor}
}

// optionsAdapter adapts Redis options to implement interfaces.OptionsInterface.
type optionsAdapter struct {
options *Options
}

// GetReadTimeout returns the read timeout.
func (oa *optionsAdapter) GetReadTimeout() time.Duration {
return oa.options.ReadTimeout
}

// GetWriteTimeout returns the write timeout.
func (oa *optionsAdapter) GetWriteTimeout() time.Duration {
return oa.options.WriteTimeout
}

// GetAddr returns the connection address.
func (oa *optionsAdapter) GetAddr() string {
return oa.options.Addr
}

// IsTLSEnabled returns true if TLS is enabled.
func (oa *optionsAdapter) IsTLSEnabled() bool {
return oa.options.TLSConfig != nil
}

// GetProtocol returns the protocol version.
func (oa *optionsAdapter) GetProtocol() int {
return oa.options.Protocol
}

// GetPoolSize returns the connection pool size.
func (oa *optionsAdapter) GetPoolSize() int {
return oa.options.PoolSize
}

// NewDialer returns a new dialer function for the connection.
func (oa *optionsAdapter) NewDialer() func(context.Context) (net.Conn, error) {
baseDialer := oa.options.NewDialer()
return func(ctx context.Context) (net.Conn, error) {
// Extract network and address from the options
network := "tcp"
addr := oa.options.Addr
return baseDialer(ctx, network, addr)
}
}

// connectionAdapter adapts a Redis connection to interfaces.ConnectionWithRelaxedTimeout
type connectionAdapter struct {
conn *pool.Conn
}

// Close closes the connection.
func (ca *connectionAdapter) Close() error {
return ca.conn.Close()
}

// IsUsable returns true if the connection is safe to use for new commands.
func (ca *connectionAdapter) IsUsable() bool {
return ca.conn.IsUsable()
}

// GetPoolConnection returns the underlying pool connection.
func (ca *connectionAdapter) GetPoolConnection() *pool.Conn {
return ca.conn
}

// SetRelaxedTimeout sets relaxed timeouts for this connection during hitless upgrades.
// These timeouts remain active until explicitly cleared.
func (ca *connectionAdapter) SetRelaxedTimeout(readTimeout, writeTimeout time.Duration) {
ca.conn.SetRelaxedTimeout(readTimeout, writeTimeout)
}

// SetRelaxedTimeoutWithDeadline sets relaxed timeouts with an expiration deadline.
// After the deadline, timeouts automatically revert to normal values.
func (ca *connectionAdapter) SetRelaxedTimeoutWithDeadline(readTimeout, writeTimeout time.Duration, deadline time.Time) {
ca.conn.SetRelaxedTimeoutWithDeadline(readTimeout, writeTimeout, deadline)
}

// ClearRelaxedTimeout clears relaxed timeouts for this connection.
func (ca *connectionAdapter) ClearRelaxedTimeout() {
ca.conn.ClearRelaxedTimeout()
}

// clusterClientAdapter adapts a cluster client to implement interfaces.ClientInterface.
type clusterClientAdapter struct {
client interface{}
}

// GetOptions returns the client options.
func (cca *clusterClientAdapter) GetOptions() interfaces.OptionsInterface {
// Return a mock options adapter for cluster clients
return &mockClusterOptionsAdapter{}
}

// ExecuteCommand executes a command on the cluster client.
func (cca *clusterClientAdapter) ExecuteCommand(ctx context.Context, cmd interface{}) error {
// Use reflection to call Process method on the cluster client
// This is a simplified implementation for the refactoring
return nil // Mock implementation
}

// GetPushProcessor returns the cluster client's push notification processor.
func (cca *clusterClientAdapter) GetPushProcessor() interfaces.NotificationProcessor {
// For cluster clients, return a mock processor since the actual implementation
// would be more complex and distributed across nodes
return &mockClusterPushProcessor{}
}

// DialToEndpoint creates a connection to the specified endpoint for cluster clients.
func (cca *clusterClientAdapter) DialToEndpoint(ctx context.Context, endpoint string) (interface{}, error) {
// For cluster clients, this would need to handle cluster-specific connection logic
// For now, return an error indicating this is not implemented for cluster clients
return nil, fmt.Errorf("DialToEndpoint not implemented for cluster clients")
}

// mockClusterOptionsAdapter is a mock implementation for cluster options.
type mockClusterOptionsAdapter struct{}

// GetReadTimeout returns the read timeout.
func (mcoa *mockClusterOptionsAdapter) GetReadTimeout() time.Duration {
return 5 * time.Second
}

// GetWriteTimeout returns the write timeout.
func (mcoa *mockClusterOptionsAdapter) GetWriteTimeout() time.Duration {
return 3 * time.Second
}

// GetAddr returns the connection address.
func (mcoa *mockClusterOptionsAdapter) GetAddr() string {
return "localhost:6379"
}

// IsTLSEnabled returns true if TLS is enabled.
func (mcoa *mockClusterOptionsAdapter) IsTLSEnabled() bool {
return false
}

// GetProtocol returns the protocol version.
func (mcoa *mockClusterOptionsAdapter) GetProtocol() int {
return 3
}

// GetPoolSize returns the connection pool size.
func (mcoa *mockClusterOptionsAdapter) GetPoolSize() int {
return 50 // Default cluster pool size (5 * runtime.GOMAXPROCS(0))
}

// NewDialer returns a new dialer function for the connection.
func (mcoa *mockClusterOptionsAdapter) NewDialer() func(context.Context) (net.Conn, error) {
return func(ctx context.Context) (net.Conn, error) {
return nil, errors.New("mock cluster dialer")
}
}

// pushProcessorAdapter adapts a push.NotificationProcessor to implement interfaces.NotificationProcessor.
type pushProcessorAdapter struct {
processor push.NotificationProcessor
}

// RegisterHandler registers a handler for a specific push notification name.
func (ppa *pushProcessorAdapter) RegisterHandler(pushNotificationName string, handler interface{}, protected bool) error {
if pushHandler, ok := handler.(push.NotificationHandler); ok {
return ppa.processor.RegisterHandler(pushNotificationName, pushHandler, protected)
}
return errors.New("handler must implement push.NotificationHandler")
}

// UnregisterHandler removes a handler for a specific push notification name.
func (ppa *pushProcessorAdapter) UnregisterHandler(pushNotificationName string) error {
return ppa.processor.UnregisterHandler(pushNotificationName)
}

// GetHandler returns the handler for a specific push notification name.
func (ppa *pushProcessorAdapter) GetHandler(pushNotificationName string) interface{} {
return ppa.processor.GetHandler(pushNotificationName)
}

// mockClusterPushProcessor is a mock implementation for cluster push processors.
type mockClusterPushProcessor struct{}

// RegisterHandler registers a handler (mock implementation).
func (mcpp *mockClusterPushProcessor) RegisterHandler(pushNotificationName string, handler interface{}, protected bool) error {
return nil
}

// UnregisterHandler removes a handler (mock implementation).
func (mcpp *mockClusterPushProcessor) UnregisterHandler(pushNotificationName string) error {
return nil
}

// GetHandler returns the handler (mock implementation).
func (mcpp *mockClusterPushProcessor) GetHandler(pushNotificationName string) interface{} {
return nil
}
Loading
Loading