From ace1bb07541890b2b8835a678045cd83dfb316ec Mon Sep 17 00:00:00 2001 From: Vlad Arkhipov Date: Wed, 20 Sep 2023 07:49:39 +0200 Subject: [PATCH 1/3] feat: add coordination session client API Introduce Coordination Service Session synchronous API. For now, it supports operations that do not create subscriptions (though, the DescribeSemaphore method still can be used to observe owners and waiters of a semaphore). The Session object is a high-level abstraction of the session concept, which is introduced in the Coordination Service API. The client takes over and controls the network, connection and server issues. It also seamlessly integration with the Golang context concept using the Lease object which context is alive until the corresponding semaphore is considered acquired. See also #53 --- coordination/coordination.go | 196 ++++++++++++++++++++++++++++++++ coordination/options/options.go | 173 ++++++++++++++++++++++++++++ internal/coordination/client.go | 14 +++ 3 files changed, 383 insertions(+) create mode 100644 coordination/options/options.go diff --git a/coordination/coordination.go b/coordination/coordination.go index 7bd9f6dbf..b6340a326 100644 --- a/coordination/coordination.go +++ b/coordination/coordination.go @@ -2,7 +2,12 @@ package coordination import ( "context" + "errors" + "fmt" + "math" + "time" + "github.com/ydb-platform/ydb-go-sdk/v3/coordination/options" "github.com/ydb-platform/ydb-go-sdk/v3/scheme" ) @@ -11,4 +16,195 @@ type Client interface { AlterNode(ctx context.Context, path string, config NodeConfig) (err error) DropNode(ctx context.Context, path string) (err error) DescribeNode(ctx context.Context, path string) (_ *scheme.Entry, _ *NodeConfig, err error) + + // OpenSession starts a new session. This method blocks until the server session is created. The context provided is + // used for the lifetime of the session. + // + // To ensure resources are not leaked, one of the following actions must be performed: + // + // - cancel the provided context, + // - call Close on the Session, + // - close the Client which the session was created with, + // - call any method of the Session until the ErrSessionClosed is returned. + OpenSession(ctx context.Context, path string, opts ...options.OpenSessionOption) (Session, error) +} + +// ErrOperationStatusUnknown indicates that the request has been sent to the server but no reply has been received. +// The client usually automatically retries calls of that kind, but there are cases when it is not possible: +// - the request is not idempotent, non-idempotent requests are never retried, +// - the session was lost and its context is canceled. +var ErrOperationStatusUnknown = errors.New("operation status is unknown") + +// ErrSessionClosed indicates that the Session object is closed. +var ErrSessionClosed = errors.New("session is closed") + +// ErrAcquireTimeout indicates that the Session.AcquireSemaphore method could not acquire the semaphore before the +// operation timeout (see options.WithAcquireTimeout). +var ErrAcquireTimeout = errors.New("acquire semaphore timeout") + +const ( + // MaxSemaphoreLimit defines the maximum value of the limit parameter in the Session.CreateSemaphore method. + MaxSemaphoreLimit = math.MaxUint64 + + // Exclusive is just a shortcut for the maximum semaphore limit value. You can use this to acquire a semaphore in + // the exclusive mode if it was created with the limit value of MaxSemaphoreLimit, which is always true for + // ephemeral semaphores. + Exclusive = math.MaxUint64 + + // Shared is just a shortcut for the minimum semaphore limit value (1). You can use this to acquire a semaphore in + // the shared mode if it was created with the limit value of MaxSemaphoreLimit, which is always true for ephemeral + // semaphores. + Shared = 1 +) + +// Session defines a coordination service backed session. +// +// In general, Session API is concurrency-friendly, you can safely access all of its methods concurrently. +// +// The client guarantees that sequential calls of the methods are sent to the server in the same order. However, the +// session client may reorder and suspend some of the requests without violating correctness of the execution. This also +// applies to the situations when the underlying gRPC stream has been recreated due to network or server issues. +// +// The client automatically keeps the underlying gRPC stream alive by sending keep-alive (ping-pong) requests. If the +// client can no longer consider the session alive, it immediately cancels the session context which also leads to +// cancellation of contexts of all semaphore leases created by this session. +type Session interface { + // Close closes the coordination service session. It cancels all active requests on the server and notifies every + // pending or waiting for response request on the client side. It also cancels the session context and tries to + // stop the session gracefully on the server. If the ctx is canceled, this will not wait for the server session to + // become stopped and returns immediately with an error. Once this function returns with no error, all subsequent + // calls will be noop. + Close(ctx context.Context) error + + // Context returns the context of the session. It is canceled when the underlying server session is over or if the + // client could not get any successful response from the server before the session timeout (see + // options.WithSessionTimeout). + Context() context.Context + + // CreateSemaphore creates a new semaphore. This method waits until the server successfully creates a new semaphore + // or returns an error. + // + // This method is not idempotent. If the request has been sent to the server but no reply has been received, it + // returns the ErrOperationStatusUnknown error. + CreateSemaphore(ctx context.Context, name string, limit uint64, opts ...options.CreateSemaphoreOption) error + + // UpdateSemaphore changes semaphore data. This method waits until the server successfully updates the semaphore or + // returns an error. + // + // This method is idempotent. The client will automatically retry in the case of network or server failure. + UpdateSemaphore(ctx context.Context, name string, opts ...options.UpdateSemaphoreOption) error + + // DeleteSemaphore deletes an existing semaphore. This method waits until the server successfully deletes the + // semaphore or returns an error. + // + // This method is not idempotent. If the request has been sent to the server but no reply has been received, it + // returns the ErrOperationStatusUnknown error. + DeleteSemaphore(ctx context.Context, name string, opts ...options.DeleteSemaphoreOption) error + + // DescribeSemaphore returns the state of the semaphore. + // + // This method is idempotent. The client will automatically retry in the case of network or server failure. + DescribeSemaphore( + ctx context.Context, + name string, + opts ...options.DescribeSemaphoreOption, + ) (*SemaphoreDescription, error) + + // AcquireSemaphore acquires the semaphore. If you acquire an ephemeral semaphore (see options.WithEphemeral), its + // limit will be set to MaxSemaphoreLimit. Later requests override previous operations with the same semaphore, e.g. + // to reduce acquired count, change timeout or attached data. + // + // This method blocks until the semaphore is acquired, an error is returned from the server or the session is + // closed. If the operation context was canceled but the server replied that the semaphore was actually acquired, + // the client will automatically release the semaphore. + // + // Semaphore waiting is fair: the semaphore guarantees that other sessions invoking the AcquireSemaphore method + // acquire permits in the order which they were called (FIFO). If a session invokes the AcquireSemaphore method + // multiple times while the first invocation is still in process, the position in the queue remains unchanged. + // + // This method is idempotent. The client will automatically retry in the case of network or server failure. + AcquireSemaphore( + ctx context.Context, + name string, + count uint64, + opts ...options.AcquireSemaphoreOption, + ) (Lease, error) + + // SessionID returns a server-generated identifier of the session. This value is permanent and unique within the + // coordination service node. + SessionID() uint64 + + // Reconnect forcibly shuts down the underlying gRPC stream and initiates a new one. This method is highly unlikely + // to be of use in a typical application but is extremely useful for testing an API implementation. + Reconnect() +} + +// Lease is the object which defines the rights of the session to the acquired semaphore. Lease is alive until its +// context is not canceled. This may happen implicitly, when the associated session becomes lost or closed, or +// explicitly, if someone calls the Release method of the lease. +type Lease interface { + // Context returns the context of the lease. It is canceled when the session it was created by was lost or closed, + // or if the lease was released by calling the Release method. + Context() context.Context + + // Release releases the acquired lease to the semaphore. It also cancels the context of the lease. This method does + // not take a ctx argument, but you can cancel the execution of it by closing the session or canceling its context. + Release() error + + // Session returns the session which this lease was created by. + Session() Session +} + +// SemaphoreDescription describes the state of a semaphore. +type SemaphoreDescription struct { + // Name is the name of the semaphore. + Name string + + // Limit is the maximum number of tokens that may be acquired. + Limit uint64 + + // Count is the number of tokens currently acquired by its owners. + Count uint64 + + // Ephemeral semaphores are deleted when there are no owners and waiters left. + Ephemeral bool + + // Data is user-defined data attached to the semaphore. + Data []byte + + // Owner is the list of current owners of the semaphore. + Owners []*SemaphoreSession + + // Waiter is the list of current waiters of the semaphore. + Waiters []*SemaphoreSession +} + +// SemaphoreSession describes an owner or a waiter of this semaphore. +type SemaphoreSession struct { + // SessionID is the id of the session which tried to acquire the semaphore. + SessionID uint64 + + // Count is the number of tokens for the acquire operation. + Count uint64 + + // OrderId is a monotonically increasing id which determines locking order. + OrderID uint64 + + // Data is user-defined data attached to the acquire operation. + Data []byte + + // Timeout is the timeout for the operation in the waiter queue. If this is time.Duration(math.MaxInt64) the session + // will wait for the semaphore until the operation is canceled. + Timeout time.Duration +} + +func (d *SemaphoreDescription) String() string { + return fmt.Sprintf( + "{Name: %q Limit: %d Count: %d Ephemeral: %t Data: %q Owners: %s Waiters: %s}", + d.Name, d.Limit, d.Count, d.Ephemeral, d.Data, d.Owners, d.Waiters) +} + +func (s *SemaphoreSession) String() string { + return fmt.Sprintf("{SessionID: %d Count: %d OrderID: %d Data: %q TimeoutMillis: %v}", + s.SessionID, s.Count, s.OrderID, s.Data, s.Timeout) } diff --git a/coordination/options/options.go b/coordination/options/options.go new file mode 100644 index 000000000..944e07b1d --- /dev/null +++ b/coordination/options/options.go @@ -0,0 +1,173 @@ +package options + +import ( + "math" + "time" + + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Coordination" +) + +// WithDescription returns an OpenSessionOption that specifies a user-defined description that may be used to describe +// the client. +func WithDescription(description string) OpenSessionOption { + return func(c *OpenSessionOptions) { + c.Description = description + } +} + +// WithSessionTimeout returns an OpenSessionOption that specifies the timeout during which client may restore a +// detached session. The client is forced to terminate the session if the last successful session request occurred +// earlier than this time. +// +// If this is not set, the client uses the default 5 seconds. +func WithSessionTimeout(timeout time.Duration) OpenSessionOption { + return func(c *OpenSessionOptions) { + c.SessionTimeout = timeout + } +} + +// WithSessionStartTimeout returns an OpenSessionOption that specifies the time that the client should wait for a +// response to the StartSession request from the server before it terminates the gRPC stream and tries to reconnect. +// +// If this is not set, the client uses the default time 1 second. +func WithSessionStartTimeout(timeout time.Duration) OpenSessionOption { + return func(c *OpenSessionOptions) { + c.SessionStartTimeout = timeout + } +} + +// WithSessionStopTimeout returns an OpenSessionOption that specifies the time that the client should wait for a +// response to the StopSession request from the server before it terminates the gRPC stream and tries to reconnect. +// +// If this is not set, the client uses the default time 1 second. +func WithSessionStopTimeout(timeout time.Duration) OpenSessionOption { + return func(c *OpenSessionOptions) { + c.SessionStartTimeout = timeout + } +} + +// WithSessionKeepAliveTimeout returns an OpenSessionOption that specifies the time that the client will wait before it +// terminates the gRPC stream and tries to reconnect if no successful responses have been received from the server. +// +// If this is not set, the client uses the default time 10 seconds. +func WithSessionKeepAliveTimeout(timeout time.Duration) OpenSessionOption { + return func(c *OpenSessionOptions) { + c.SessionKeepAliveTimeout = timeout + } +} + +// WithSessionReconnectDelay returns an OpenSessionOption that specifies the time that the client will wait before it +// tries to reconnect the underlying gRPC stream in case of error. +// +// If this is not set, the client uses the default time 500 milliseconds. +func WithSessionReconnectDelay(delay time.Duration) OpenSessionOption { + return func(c *OpenSessionOptions) { + c.SessionReconnectDelay = delay + } +} + +// OpenSessionOption configures how we open a new session. +type OpenSessionOption func(c *OpenSessionOptions) + +// OpenSessionOptions configure an OpenSession call. OpenSessionOptions are set by the OpenSessionOption values passed +// to the OpenSession function. +type OpenSessionOptions struct { + Description string + SessionTimeout time.Duration + SessionStartTimeout time.Duration + SessionStopTimeout time.Duration + SessionKeepAliveTimeout time.Duration + SessionReconnectDelay time.Duration +} + +// WithEphemeral returns an AcquireSemaphoreOption that causes to create an ephemeral semaphore. +// +// Ephemeral semaphores are created with the first acquire operation and automatically deleted with the last release +// operation. Ephemeral semaphore are always created with the limit of coordination.MaxSemaphoreLimit. +func WithEphemeral(ephemeral bool) AcquireSemaphoreOption { + return func(c *Ydb_Coordination.SessionRequest_AcquireSemaphore) { + c.Ephemeral = ephemeral + } +} + +// WithAcquireTimeout returns an AcquireSemaphoreOption which sets the timeout after which the operation fails if it +// is still waiting in the queue. Use 0 to make the AcquireSemaphore method fail immediately if the semaphore is already +// acquired by another session. +// +// If this is not set, the client waits for the acquire operation result until the operation or session context is done. +// You can reset the default value of this timeout by calling the WithNoAcquireTimeout method. +func WithAcquireTimeout(timeout time.Duration) AcquireSemaphoreOption { + return func(c *Ydb_Coordination.SessionRequest_AcquireSemaphore) { + c.TimeoutMillis = uint64(timeout.Milliseconds()) + } +} + +// WithNoAcquireTimeout returns an AcquireSemaphoreOption which disables the timeout after which the operation fails if +// it is still waiting in the queue. +// +// This is the default behavior. You can set the specific timeout by calling the WithAcquireTimeout method. +func WithNoAcquireTimeout() AcquireSemaphoreOption { + return func(c *Ydb_Coordination.SessionRequest_AcquireSemaphore) { + c.TimeoutMillis = math.MaxUint64 + } +} + +// WithAcquireData returns an AcquireSemaphoreOption which attaches user-defined data to the operation. +func WithAcquireData(data []byte) AcquireSemaphoreOption { + return func(c *Ydb_Coordination.SessionRequest_AcquireSemaphore) { + c.Data = data + } +} + +// AcquireSemaphoreOption configures how we acquire a semaphore. +type AcquireSemaphoreOption func(c *Ydb_Coordination.SessionRequest_AcquireSemaphore) + +// WithForceDelete return a DeleteSemaphoreOption which allows to delete a semaphore even if it is currently acquired +// by other sessions. +func WithForceDelete(force bool) DeleteSemaphoreOption { + return func(c *Ydb_Coordination.SessionRequest_DeleteSemaphore) { + c.Force = force + } +} + +// DeleteSemaphoreOption configures how we delete a semaphore. +type DeleteSemaphoreOption func(c *Ydb_Coordination.SessionRequest_DeleteSemaphore) + +// WithCreateData return a CreateSemaphoreOption which attaches user-defined data to the semaphore. +func WithCreateData(data []byte) CreateSemaphoreOption { + return func(c *Ydb_Coordination.SessionRequest_CreateSemaphore) { + c.Data = data + } +} + +// CreateSemaphoreOption configures how we create a semaphore. +type CreateSemaphoreOption func(c *Ydb_Coordination.SessionRequest_CreateSemaphore) + +// WithUpdateData return a UpdateSemaphoreOption which changes user-defined data in the semaphore. +func WithUpdateData(data []byte) UpdateSemaphoreOption { + return func(c *Ydb_Coordination.SessionRequest_UpdateSemaphore) { + c.Data = data + } +} + +// UpdateSemaphoreOption configures how we update a semaphore. +type UpdateSemaphoreOption func(c *Ydb_Coordination.SessionRequest_UpdateSemaphore) + +// WithDescribeOwners return a DescribeSemaphoreOption which causes server send the list of owners in the response +// to the DescribeSemaphore request. +func WithDescribeOwners(describeOwners bool) DescribeSemaphoreOption { + return func(c *Ydb_Coordination.SessionRequest_DescribeSemaphore) { + c.IncludeOwners = describeOwners + } +} + +// WithDescribeWaiters return a DescribeSemaphoreOption which causes server send the list of waiters in the response +// to the DescribeSemaphore request. +func WithDescribeWaiters(describeWaiters bool) DescribeSemaphoreOption { + return func(c *Ydb_Coordination.SessionRequest_DescribeSemaphore) { + c.IncludeWaiters = describeWaiters + } +} + +// DescribeSemaphoreOption configures how we update a semaphore. +type DescribeSemaphoreOption func(c *Ydb_Coordination.SessionRequest_DescribeSemaphore) diff --git a/internal/coordination/client.go b/internal/coordination/client.go index a73a81d45..4d03da5d9 100644 --- a/internal/coordination/client.go +++ b/internal/coordination/client.go @@ -9,6 +9,7 @@ import ( "google.golang.org/grpc" "github.com/ydb-platform/ydb-go-sdk/v3/coordination" + "github.com/ydb-platform/ydb-go-sdk/v3/coordination/options" "github.com/ydb-platform/ydb-go-sdk/v3/internal/coordination/config" "github.com/ydb-platform/ydb-go-sdk/v3/internal/operation" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" @@ -203,6 +204,19 @@ func (c *Client) describeNode( }, nil } +func (c *Client) OpenSession( + ctx context.Context, + path string, + opts ...options.OpenSessionOption, +) (s coordination.Session, err error) { + if c == nil { + err = xerrors.WithStackTrace(errNilClient) + return + } + err = errors.New("not implemented") + return +} + func (c *Client) Close(ctx context.Context) error { if c == nil { return xerrors.WithStackTrace(errNilClient) From 5356cbad1f09117240d18c2c2d778687d5f25a0b Mon Sep 17 00:00:00 2001 From: Vlad Arkhipov Date: Wed, 20 Sep 2023 07:41:50 +0200 Subject: [PATCH 2/3] feat: implement coordination session client This contains the implementation of the Session coordination client and related objects that may be used to implement a typical client for a conversation-like protocol based on a bidirectional gRPC stream. See also #53 --- internal/coordination/client.go | 66 +- internal/coordination/config/config.go | 4 +- .../coordination/conversation/conversation.go | 513 ++++++++++ internal/coordination/session.go | 891 ++++++++++++++++++ log/coordination.go | 191 ++++ options.go | 4 +- tests/integration/coordination_test.go | 104 ++ trace/coordination.go | 96 +- trace/coordination_gtrace.go | 564 +++++++++++ 9 files changed, 2421 insertions(+), 12 deletions(-) create mode 100644 internal/coordination/conversation/conversation.go create mode 100644 internal/coordination/session.go create mode 100644 tests/integration/coordination_test.go diff --git a/internal/coordination/client.go b/internal/coordination/client.go index 4d03da5d9..9425721f4 100644 --- a/internal/coordination/client.go +++ b/internal/coordination/client.go @@ -3,6 +3,8 @@ package coordination import ( "context" "errors" + "sync" + "time" "github.com/ydb-platform/ydb-go-genproto/Ydb_Coordination_V1" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Coordination" @@ -26,12 +28,16 @@ var ( type Client struct { config config.Config service Ydb_Coordination_V1.CoordinationServiceClient + + mutex sync.Mutex // guards the fields below + sessions map[*session]struct{} } func New(cc grpc.ClientConnInterface, config config.Config) *Client { return &Client{ - config: config, - service: Ydb_Coordination_V1.NewCoordinationServiceClient(cc), + config: config, + service: Ydb_Coordination_V1.NewCoordinationServiceClient(cc), + sessions: make(map[*session]struct{}), } } @@ -204,23 +210,69 @@ func (c *Client) describeNode( }, nil } +func newOpenSessionConfig(opts ...options.OpenSessionOption) *options.OpenSessionOptions { + c := defaultOpenSessionConfig() + for _, o := range opts { + if o != nil { + o(c) + } + } + return c +} + +func (c *Client) sessionOpened(s *session) { + c.mutex.Lock() + defer c.mutex.Unlock() + + c.sessions[s] = struct{}{} +} + +func (c *Client) sessionClosed(s *session) { + c.mutex.Lock() + defer c.mutex.Unlock() + + delete(c.sessions, s) +} + +func (c *Client) closeSessions(ctx context.Context) { + c.mutex.Lock() + defer c.mutex.Unlock() + + for s := range c.sessions { + s.Close(ctx) + } +} + +func defaultOpenSessionConfig() *options.OpenSessionOptions { + return &options.OpenSessionOptions{ + Description: "YDB Go SDK", + SessionTimeout: time.Second * 5, + SessionStartTimeout: time.Second * 1, + SessionStopTimeout: time.Second * 1, + SessionKeepAliveTimeout: time.Second * 10, + SessionReconnectDelay: time.Millisecond * 500, + } +} + func (c *Client) OpenSession( ctx context.Context, path string, opts ...options.OpenSessionOption, -) (s coordination.Session, err error) { +) (coordination.Session, error) { if c == nil { - err = xerrors.WithStackTrace(errNilClient) - return + return nil, xerrors.WithStackTrace(errNilClient) } - err = errors.New("not implemented") - return + + return openSession(ctx, c, path, newOpenSessionConfig(opts...)) } func (c *Client) Close(ctx context.Context) error { if c == nil { return xerrors.WithStackTrace(errNilClient) } + + c.closeSessions(ctx) + return c.close(ctx) } diff --git a/internal/coordination/config/config.go b/internal/coordination/config/config.go index b10fc43d4..c3c21dddc 100644 --- a/internal/coordination/config/config.go +++ b/internal/coordination/config/config.go @@ -22,9 +22,9 @@ func (c Config) Trace() *trace.Coordination { type Option func(c *Config) // WithTrace appends coordination trace to early defined traces -func WithTrace(trace trace.Coordination, opts ...trace.CoordinationComposeOption) Option { +func WithTrace(trace *trace.Coordination, opts ...trace.CoordinationComposeOption) Option { return func(c *Config) { - c.trace = c.trace.Compose(&trace, opts...) + c.trace = c.trace.Compose(trace, opts...) } } diff --git a/internal/coordination/conversation/conversation.go b/internal/coordination/conversation/conversation.go new file mode 100644 index 000000000..7fa836c85 --- /dev/null +++ b/internal/coordination/conversation/conversation.go @@ -0,0 +1,513 @@ +// Package conversation contains coordination session internal code that helps implement a typical conversation-like +// session protocol based on a bidirectional gRPC stream. +package conversation + +import ( + "context" + "sync" + + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Coordination" + + "github.com/ydb-platform/ydb-go-sdk/v3/coordination" +) + +// Controller provides a simple mechanism to work with a session protocol using a gRPC bidirectional stream. Creating a +// bidirectional stream client may be quite tricky because messages are usually being processed independently and in +// parallel. Moreover, the gRPC client library puts strict limitations on an implementation of the client, e.g. multiple +// calls of the Send or Recv methods of the stub client must not be performed from different goroutines. Also, there are +// no guarantees that a message successfully dispatched by the Send method will actually reach the server, neither does +// the server enjoy same guarantees when delivering messages to the client. This usually ends up having two goroutines +// (one for sending outgoing messages and another one for receiving incoming ones) and a queue where messages are +// published to be eventually delivered to the server. The Controller simplifies working with this model providing +// generic implementation of the message queue and related routines, handling retries of sent and pending operations +// when the underlying gRPC stream needs to be reconnected. +// +// A typical coordination session looks like this (we are going to skip for now how the gRPC stream is created, handled +// and kept alive, you can find the details on that in the Session, and focus on the protocol): +// +// 1. The client opens a new gRPC bidirectional stream. +// 2. The client sends the SessionStart request and wait until the Failure or the SessionStarted reply. +// 3. The server sends the SessionStarted response with the SessionID. At this point the session is started. If the +// client needs to reconnect the gRPC stream in the future, it will use that SessionID to attach to the previously +// created session in the SessionStart request. +// 4. The client sends the AcquireSemaphore request to acquire a permit to the semaphore in this session with count 5. +// 5. After a moment, the client decides to acquire another semaphore, it sends one more AcquireSemaphore request with +// count 4. +// 6. The server replies with the AcquireSemaphoreResult response to the second AcquireSemaphore request to inform the +// client that the semaphore was successfully acquired. +// 7. The server replies with the AcquireSemaphorePending response in order to inform the client that the semaphore +// from the first request has been acquired by another session. +// 8. After a while, the server sends the AcquireSemaphoreResult response which implies that the semaphore from the +// first request is acquired in the current session. +// 9. Then the client sends the ReleaseSemaphore request in order to release the acquired semaphore. +// 10. The server replies with the ReleaseSemaphoreResult. +// 11. The client terminates the session with the SessionStop request. +// 12. The server let the client know that the session is over sending the SessionStopped response and closing the gRPC +// stream. +// +// We can notice five independent conversations here: +// +// 1. StartSession, SessionStarted — points 2–3; +// 2. AcquireSemaphore, AcquireSemaphoreResult — points 4, 6; +// 3. AcquireSemaphore, AcquireSemaphorePending, AcquireSemaphoreResult — points 5, 7 and 8; +// 4. ReleaseSemaphore, ReleaseSemaphoreResult — points 9–10; +// 5. SessionStop, SessionStopped — points 11–12. +// +// If at any time the client encounters an unrecoverable error (for example, the underlying gRPC stream becomes +// disconnected), the client will have to replay every conversation from their very beginning. Let us see why it is +// actually the case. But before we go into that, let us look at the grpc.ClientStream SendMsg method: +// +// "…SendMsg does not wait until the message is received by the server. An untimely stream closure may result in lost +// messages. To ensure delivery, users should ensure the RPC completed successfully using RecvMsg…" +// +// This is true for both, the client and the server. So when the server replies to the client it does not really know if +// the response is received by the client. And vice versa, when the client sends a request to the server it has no way +// to know if the request was delivered to the server unless the server sends another message to the client in reply. +// +// That is why conversation-like protocols typically use idempotent requests. Idempotent requests can be safely retried +// as long as you keep the original order of the conversations. For example, if the gRPC stream is terminated before +// the point 6, we cannot know if the server gets the requests. There may be one, two or none AcquireSemaphore requests +// successfully delivered to and handled by the server. Moreover, the server may have already sent to the client the +// corresponding responses. Nevertheless, if the requests are idempotent, we can safely retry them all in the newly +// created gRPC stream and get the same results as we would have got if we had sent them without stream termination. +// Note that if the stream is terminated before the point 8, we still need to replay the first AcquireSemaphore +// conversation because we have no knowledge if the server replied with the AcquireSemaphoreResult in the terminated +// stream. +// +// However, sometimes even idempotent requests cannot be safely retried. Consider the case wherein the point 5 from the +// original list is: +// +// 5. After a moment, the client decides to modify the AcquireSemaphore request and sends another one with the same +// semaphore but with count 4. +// +// If then the gRPC stream terminates, there are two likely outcomes: +// +// 1. The server received the first request but the second one was not delivered. The current semaphore count is 5. +// 2. The server received and processed the both requests. The current semaphore permit count is 4. +// +// If we retry the both requests, the observed result will be different depending on which outcome occurs: +// +// 1. The first retry will be a noop, the second one will decrease the semaphore count to 4. This is expected behavior. +// 2. The first retry will try to increase the semaphore count to 5, it causes an error. This is unexpected. +// +// In order to avoid that we could postpone a conversation if there is another one for the same semaphore which has been +// sent but has not been yet delivered to the server. For more details, see the WithConflictKey option. +type Controller struct { + mutex sync.Mutex // guards access to the fields below + + queue []*Conversation // the message queue, the front is in the end of the slice + conflicts map[string]struct{} + + notifyChan chan struct{} + closed bool +} + +// ResponseFilter defines the filter function called by the controller to know if a received message relates to the +// conversation. If a ResponseFilter returns true, the message is considered to be part of the conversation. +type ResponseFilter func(request *Ydb_Coordination.SessionRequest, response *Ydb_Coordination.SessionResponse) bool + +// Conversation is a core concept of the conversation package. It is an ordered sequence of connected request/reply +// messages. For example, the acquiring semaphore conversation may look like this: +// +// 1. The client sends the AcquireSemaphore request. +// 2. The server replies with the AcquireSemaphorePending response. +// 3. After a while, the server replies with the AcquireSemaphoreResult response. The conversation is ended. +// +// There may be many different conversations carried out simultaneously in one session, so the exact order of all the +// messages in the session is unspecified. In the example above, there may be other messages (from different +// conversations) between points 1 and 2, or 2 and 3. +type Conversation struct { + message func() *Ydb_Coordination.SessionRequest + responseFilter ResponseFilter + acknowledgeFilter ResponseFilter + cancelMessage func(req *Ydb_Coordination.SessionRequest) *Ydb_Coordination.SessionRequest + cancelFilter ResponseFilter + conflictKey string + idempotent bool + requestSent *Ydb_Coordination.SessionRequest + cancelRequestSent *Ydb_Coordination.SessionRequest + response *Ydb_Coordination.SessionResponse + responseErr error + canceled bool + done chan struct{} +} + +// NewController creates a new conversation controller. You usually have one controller per one session. +func NewController() *Controller { + return &Controller{ + notifyChan: make(chan struct{}, 1), + conflicts: make(map[string]struct{}), + } +} + +// WithResponseFilter returns an Option that specifies the filter function that is used to detect the last response +// message in the conversation. If such a message was found, the conversation is immediately ended and the response +// becomes available by the Conversation.Await method. +func WithResponseFilter(filter ResponseFilter) Option { + return func(c *Conversation) { + c.responseFilter = filter + } +} + +// WithAcknowledgeFilter returns an Option that specifies the filter function that is used to detect an intermediate +// response message in the conversation. If such a message was found, the conversation continues, but it lets the client +// know that the server successfully consumed the first request of the conversation. +func WithAcknowledgeFilter(filter ResponseFilter) Option { + return func(c *Conversation) { + c.acknowledgeFilter = filter + } +} + +// WithCancelMessage returns an Option that specifies the message and filter functions that are used to cancel the +// conversation which has been already sent. This message is sent in the background when the caller cancels the context +// of the Controller.Await function. The response is never received by the caller and is only used to end the +// conversation and remove it from the queue. +func WithCancelMessage( + message func(req *Ydb_Coordination.SessionRequest) *Ydb_Coordination.SessionRequest, + filter ResponseFilter, +) Option { + return func(c *Conversation) { + c.cancelMessage = message + c.cancelFilter = filter + } +} + +// WithConflictKey returns an Option that specifies the key that is used to find out messages that cannot be delivered +// to the server until the server acknowledged the request. If there is a conversation with the same conflict key in the +// queue that has not been yet delivered to the server, the controller will temporarily suspend other conversations with +// the same conflict key until the first one is acknowledged. +func WithConflictKey(key string) Option { + return func(c *Conversation) { + c.conflictKey = key + } +} + +// WithIdempotence returns an Option that enabled retries for this conversation when the underlying gRPC stream +// reconnects. The controller will replay the whole conversation from scratch unless it is not ended. +func WithIdempotence(idempotent bool) Option { + return func(c *Conversation) { + c.idempotent = idempotent + } +} + +// Option configures how we create a new conversation. +type Option func(c *Conversation) + +// NewConversation creates a new conversation that starts with a specified message. +func NewConversation(request func() *Ydb_Coordination.SessionRequest, opts ...Option) *Conversation { + conversation := Conversation{message: request} + for _, o := range opts { + if o != nil { + o(&conversation) + } + } + return &conversation +} + +func (c *Controller) notify() { + select { + case c.notifyChan <- struct{}{}: + default: + } +} + +// PushBack puts a new conversation at the end of the queue. +func (c *Controller) PushBack(conversation *Conversation) error { + c.mutex.Lock() + defer c.mutex.Unlock() + + if c.closed { + return coordination.ErrSessionClosed + } + + conversation.enqueue() + c.queue = append([]*Conversation{conversation}, c.queue...) + c.notify() + + return nil +} + +// PushFront puts a new conversation at the beginning of the queue. +func (c *Controller) PushFront(conversation *Conversation) error { + c.mutex.Lock() + defer c.mutex.Unlock() + + if c.closed { + return coordination.ErrSessionClosed + } + + conversation.enqueue() + c.queue = append(c.queue, conversation) + c.notify() + + return nil +} + +func (c *Controller) sendFront() *Ydb_Coordination.SessionRequest { + c.mutex.Lock() + defer c.mutex.Unlock() + + // We are notified but there are no conversations in the queue. Return nil to make the loop in OnSend wait. + if len(c.queue) == 0 { + return nil + } + + for i := len(c.queue) - 1; i >= 0; i-- { + req := c.queue[i] + + if req.canceled && req.cancelRequestSent == nil { + req.sendCancel() + c.notify() + return req.cancelRequestSent + } + + if req.requestSent != nil { + continue + } + + if _, ok := c.conflicts[req.conflictKey]; ok { + continue + } + + req.send() + + if req.conflictKey != "" { + c.conflicts[req.conflictKey] = struct{}{} + } + if req.responseFilter == nil && req.acknowledgeFilter == nil { + c.queue = append(c.queue[:i], c.queue[i+1:]...) + } + c.notify() + return req.requestSent + } + + return nil +} + +// OnSend blocks until a new conversation request becomes available at the end of the queue. You should call this method +// in the goroutine that handles gRPC stream Send method. ctx can be used to cancel the call. +func (c *Controller) OnSend(ctx context.Context) (*Ydb_Coordination.SessionRequest, error) { + var req *Ydb_Coordination.SessionRequest + for { + select { + case <-ctx.Done(): + case <-c.notifyChan: + req = c.sendFront() + } + + // Process ctx.Done() first to make sure we cancel the call if conversations are too chatty. + if ctx.Err() != nil { + return nil, ctx.Err() + } + + // We were notified but there were no messages in the queue. Just wait for more messages become available. + if req != nil { + break + } + } + + return req, nil +} + +// OnRecv consumes a new conversation response and process with the corresponding conversation if any exists for it. The +// returned value indicates if any conversation considers the incoming message part of it or the controller is closed. +// You should call this method in the goroutine that handles gRPC stream Recv method. +func (c *Controller) OnRecv(resp *Ydb_Coordination.SessionResponse) bool { + c.mutex.Lock() + defer c.mutex.Unlock() + + notify := false + handled := false + for i := len(c.queue) - 1; i >= 0; i-- { + req := c.queue[i] + if req.requestSent == nil { + continue + } + + switch { + case req.responseFilter != nil && req.responseFilter(req.requestSent, resp): + if !req.canceled { + req.succeed(resp) + + if req.conflictKey != "" { + delete(c.conflicts, req.conflictKey) + notify = true + } + + c.queue = append(c.queue[:i], c.queue[i+1:]...) + } + + handled = true + case req.acknowledgeFilter != nil && req.acknowledgeFilter(req.requestSent, resp): + if !req.canceled { + if req.conflictKey != "" { + delete(c.conflicts, req.conflictKey) + notify = true + } + } + + handled = true + case req.cancelRequestSent != nil && req.cancelFilter(req.cancelRequestSent, resp): + if req.conflictKey != "" { + delete(c.conflicts, req.conflictKey) + notify = true + } + c.queue = append(c.queue[:i], c.queue[i+1:]...) + handled = true + } + } + + if notify { + c.notify() + } + + return c.closed || handled +} + +// OnDetach fails all non-idempotent conversations if there are any in the queue. You should call this method when the +// underlying gRPC stream of the session is closed. +func (c *Controller) OnDetach() { + c.mutex.Lock() + defer c.mutex.Unlock() + + for i := len(c.queue) - 1; i >= 0; i-- { + req := c.queue[i] + if !req.idempotent { + req.fail(coordination.ErrOperationStatusUnknown) + + if req.requestSent != nil && req.conflictKey != "" { + delete(c.conflicts, req.conflictKey) + } + + c.queue = append(c.queue[:i], c.queue[i+1:]...) + } + } +} + +// Close fails all conversations if there are any in the queue. It also does not allow pushing more conversations to the +// queue anymore. You may optionally specify the final conversation if needed. +func (c *Controller) Close(byeConversation *Conversation) { + c.mutex.Lock() + defer c.mutex.Unlock() + + c.closed = true + + for i := len(c.queue) - 1; i >= 0; i-- { + req := c.queue[i] + if !req.canceled { + req.fail(coordination.ErrSessionClosed) + } + } + + if byeConversation != nil { + byeConversation.enqueue() + c.queue = []*Conversation{byeConversation} + } + + c.notify() +} + +// OnAttach retries all idempotent conversations if there are any in the queue. You should call this method when the +// underlying gRPC stream of the session is connected. +func (c *Controller) OnAttach() { + c.mutex.Lock() + defer c.mutex.Unlock() + + notify := false + for i := len(c.queue) - 1; i >= 0; i-- { + req := c.queue[i] + if req.idempotent && req.requestSent != nil { + if req.conflictKey != "" { + delete(c.conflicts, req.conflictKey) + } + + req.requestSent = nil + notify = true + } + } + + if notify { + c.notify() + } +} + +// Cancel the conversation if it has been sent and there is no response ready. This returns false if the response is +// ready and the caller may safely return it instead of canceling the conversation. +func (c *Controller) cancel(conversation *Conversation) bool { + if conversation.cancelMessage == nil { + return true + } + + c.mutex.Lock() + defer c.mutex.Unlock() + + // The context is canceled but the response is ready, return it anyway. + if conversation.response != nil || conversation.responseErr != nil { + return false + } + + if conversation.requestSent != nil { + conversation.cancel() + c.notify() + } else { + // If the response has not been sent, just remove it from the queue. + for i := len(c.queue) - 1; i >= 0; i-- { + req := c.queue[i] + if req == conversation { + c.queue = append(c.queue[:i], c.queue[i+1:]...) + break + } + } + } + + return true +} + +// Await waits until the conversation ends. ctx can be used to cancel the call. +func (c *Controller) Await( + ctx context.Context, + conversation *Conversation, +) (*Ydb_Coordination.SessionResponse, error) { + select { + case <-conversation.done: + case <-ctx.Done(): + } + + if ctx.Err() != nil && c.cancel(conversation) { + return nil, ctx.Err() + } + + if conversation.responseErr != nil { + return nil, conversation.responseErr + } + + return conversation.response, nil +} + +func (c *Conversation) enqueue() { + c.requestSent = nil + c.done = make(chan struct{}) +} + +func (c *Conversation) send() { + c.requestSent = c.message() +} + +func (c *Conversation) sendCancel() { + c.cancelRequestSent = c.cancelMessage(c.requestSent) +} + +func (c *Conversation) succeed(response *Ydb_Coordination.SessionResponse) { + c.response = response + close(c.done) +} + +func (c *Conversation) fail(err error) { + c.responseErr = err + close(c.done) +} + +func (c *Conversation) cancel() { + c.canceled = true + close(c.done) +} diff --git a/internal/coordination/session.go b/internal/coordination/session.go new file mode 100644 index 000000000..e7b3cb747 --- /dev/null +++ b/internal/coordination/session.go @@ -0,0 +1,891 @@ +package coordination + +import ( + "context" + "encoding/binary" + "math" + "math/rand" + "sync" + "time" + + "github.com/ydb-platform/ydb-go-genproto/Ydb_Coordination_V1" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Coordination" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Issue" + + "github.com/ydb-platform/ydb-go-sdk/v3/coordination" + "github.com/ydb-platform/ydb-go-sdk/v3/coordination/options" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/coordination/conversation" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" + "github.com/ydb-platform/ydb-go-sdk/v3/trace" +) + +type session struct { + options *options.OpenSessionOptions + client *Client + + ctx context.Context + cancel context.CancelFunc + sessionClosedChan chan struct{} + controller *conversation.Controller + sessionID uint64 + + mutex sync.Mutex // guards the field below + lastGoodResponseTime time.Time + cancelStream context.CancelFunc +} + +type lease struct { + session *session + name string + ctx context.Context + cancel context.CancelFunc +} + +func openSession( + ctx context.Context, + client *Client, + path string, + opts *options.OpenSessionOptions, +) (*session, error) { + sessionCtx, cancel := context.WithCancel(ctx) + s := session{ + options: opts, + client: client, + ctx: sessionCtx, + cancel: cancel, + sessionClosedChan: make(chan struct{}), + controller: conversation.NewController(), + } + client.sessionOpened(&s) + + sessionStartedChan := make(chan struct{}) + go s.mainLoop(path, sessionStartedChan) + + select { + case <-sessionStartedChan: + case <-sessionCtx.Done(): + return nil, ctx.Err() + } + + return &s, nil +} + +func newProtectionKey() []byte { + key := make([]byte, 8) + binary.LittleEndian.PutUint64(key, rand.Uint64()) //nolint:gosec + return key +} + +func newReqID() uint64 { + return rand.Uint64() //nolint:gosec +} + +func (s *session) updateLastGoodResponseTime() { + s.mutex.Lock() + defer s.mutex.Unlock() + + now := time.Now() + if now.After(s.lastGoodResponseTime) { + s.lastGoodResponseTime = now + } +} + +func (s *session) getLastGoodResponseTime() time.Time { + s.mutex.Lock() + defer s.mutex.Unlock() + + return s.lastGoodResponseTime +} + +func (s *session) updateCancelStream(cancel context.CancelFunc) { + s.mutex.Lock() + defer s.mutex.Unlock() + + s.cancelStream = cancel +} + +// Create a new gRPC stream using an independent context. +func (s *session) newStream( + streamCtx context.Context, + cancelStream context.CancelFunc, +) (Ydb_Coordination_V1.CoordinationService_SessionClient, error) { + // This deadline if final. If we have not got a session before it, the session is either expired or has never been + // opened. + var deadline time.Time + if s.sessionID != 0 { + deadline = s.getLastGoodResponseTime().Add(s.options.SessionTimeout) + } else { + // Large enough to make the loop infinite, small enough to allow the maximum duration value (~290 years). + deadline = time.Now().Add(time.Hour * 24 * 365 * 100) + } + + lastChance := false + for { + result := make(chan Ydb_Coordination_V1.CoordinationService_SessionClient, 1) + go func() { + var err error + onDone := trace.CoordinationOnStreamNew(s.client.config.Trace()) + defer func() { + onDone(err) + }() + + client, err := s.client.service.Session(streamCtx) + result <- client + }() + + var client Ydb_Coordination_V1.CoordinationService_SessionClient + if lastChance { + select { + case <-time.After(s.options.SessionKeepAliveTimeout): + case client = <-result: + } + + if client != nil { + return client, nil + } + + cancelStream() + return nil, s.ctx.Err() + } + + // Since the deadline is probably large enough, avoid the timer leak with time.After. + timer := time.NewTimer(time.Until(deadline)) + select { + case <-s.ctx.Done(): + case client = <-result: + case <-timer.C: + trace.CoordinationOnSessionClientTimeout( + s.client.config.Trace(), + s.getLastGoodResponseTime(), + s.options.SessionTimeout, + ) + cancelStream() + return nil, coordination.ErrSessionClosed + } + timer.Stop() + + if client != nil { + return client, nil + } + + // Waiting for some time before trying to reconnect. + select { + case <-time.After(s.options.SessionReconnectDelay): + case <-s.ctx.Done(): + } + + if s.ctx.Err() != nil { + // Give this session the last chance to stop gracefully if the session is canceled in the reconnect cycle. + if s.sessionID != 0 { + lastChance = true + } else { + cancelStream() + return nil, s.ctx.Err() + } + } + } +} + +func (s *session) mainLoop(path string, sessionStartedChan chan struct{}) { + defer s.client.sessionClosed(s) + defer close(s.sessionClosedChan) + defer s.cancel() + + var seqNo uint64 + + protectionKey := newProtectionKey() + closing := false + + for { + // Open a new grpc stream and start the receiver and sender loops. + // + // We use the stream context as a way to inform the main loop that the session must be reconnected if an + // unrecoverable error occurs in the receiver or sender loop. This also helps stop the other loop if an error + // is caught on only one of them. + // + // We intentionally place a stream context outside the scope of any existing contexts to make an attempt to + // close the session gracefully at the end of the main loop. + + streamCtx, cancelStream := context.WithCancel(context.Background()) + sessionClient, err := s.newStream(streamCtx, cancelStream) + if err != nil { + // Giving up, we can do nothing without a stream. + s.controller.Close(nil) + return + } + + s.updateCancelStream(cancelStream) + + // Start the loops. + wg := sync.WaitGroup{} + wg.Add(2) + sessionStarted := make(chan *Ydb_Coordination.SessionResponse_SessionStarted, 1) + sessionStopped := make(chan *Ydb_Coordination.SessionResponse_SessionStopped, 1) + startSending := make(chan struct{}) + s.controller.OnAttach() + + go s.receiveLoop(&wg, sessionClient, cancelStream, sessionStarted, sessionStopped) + go s.sendLoop( + &wg, + sessionClient, + streamCtx, + cancelStream, + startSending, + path, + protectionKey, + s.sessionID, + seqNo, + ) + + // Wait for the session started response unless the stream context is done. We intentionally do not take into + // account stream context cancellation in order to proceed with the graceful shutdown if it requires reconnect. + select { + case start := <-sessionStarted: + trace.CoordinationOnSessionStarted(s.client.config.Trace(), start.GetSessionId(), s.sessionID) + if s.sessionID == 0 { + s.sessionID = start.GetSessionId() + close(sessionStartedChan) + } else if start.GetSessionId() != s.sessionID { + // Reconnect if the server response is invalid. + cancelStream() + } + close(startSending) + case <-time.After(s.options.SessionStartTimeout): + // Reconnect if no response was received before the timeout occurred. + trace.CoordinationOnSessionStartTimeout(s.client.config.Trace(), s.options.SessionStartTimeout) + cancelStream() + case <-streamCtx.Done(): + } + + for { + // Respect the failure reason priority: if the session context is done, we must stop the session, even + // though the stream context may also be canceled. + if s.ctx.Err() != nil { + closing = true + break + } + if streamCtx.Err() != nil { + // Reconnect if an error occurred during the start session conversation. + break + } + + keepAliveTime := time.Until(s.getLastGoodResponseTime().Add(s.options.SessionKeepAliveTimeout)) + select { + case <-time.After(keepAliveTime): + last := s.getLastGoodResponseTime() + if time.Since(last) > s.options.SessionKeepAliveTimeout { + // Reconnect if the underlying stream is likely to be dead. + trace.CoordinationOnSessionKeepAliveTimeout( + s.client.config.Trace(), + last, + s.options.SessionKeepAliveTimeout, + ) + cancelStream() + } + case <-streamCtx.Done(): + case <-s.ctx.Done(): + } + } + + if closing { + // No need to stop the session if it was not started. + if s.sessionID == 0 { + s.controller.Close(nil) + cancelStream() + return + } + + trace.CoordinationOnSessionStop(s.client.config.Trace(), s.sessionID) + s.controller.Close(conversation.NewConversation( + func() *Ydb_Coordination.SessionRequest { + return &Ydb_Coordination.SessionRequest{ + Request: &Ydb_Coordination.SessionRequest_SessionStop_{ + SessionStop: &Ydb_Coordination.SessionRequest_SessionStop{}, + }, + } + }), + ) + + // Wait for the session stopped response unless the stream context is done. + select { + case stop := <-sessionStopped: + trace.CoordinationOnSessionStopped(s.client.config.Trace(), stop.GetSessionId(), s.sessionID) + if stop.GetSessionId() == s.sessionID { + cancelStream() + return + } + + // Reconnect if the server response is invalid. + cancelStream() + case <-time.After(s.options.SessionStopTimeout): + // Reconnect if no response was received before the timeout occurred. + trace.CoordinationOnSessionStopTimeout(s.client.config.Trace(), s.options.SessionStopTimeout) + cancelStream() + case <-streamCtx.Done(): + } + } + + // Make sure no one is processing the stream anymore. + wg.Wait() + + s.controller.OnDetach() + seqNo++ + } +} + +func (s *session) receiveLoop( + wg *sync.WaitGroup, + sessionClient Ydb_Coordination_V1.CoordinationService_SessionClient, + cancelStream context.CancelFunc, + sessionStarted chan *Ydb_Coordination.SessionResponse_SessionStarted, + sessionStopped chan *Ydb_Coordination.SessionResponse_SessionStopped, +) { + // If the sendLoop is done, make sure the stream is also canceled to make the receiveLoop finish its work and cause + // reconnect. + defer wg.Done() + defer cancelStream() + + for { + onDone := trace.CoordinationOnSessionReceive(s.client.config.Trace()) + message, err := sessionClient.Recv() + if err != nil { + // Any stream error is unrecoverable, try to reconnect. + onDone(nil, err) + return + } + onDone(message, nil) + + switch message.GetResponse().(type) { + case *Ydb_Coordination.SessionResponse_Failure_: + if message.GetFailure().GetStatus() == Ydb.StatusIds_SESSION_EXPIRED || + message.GetFailure().GetStatus() == Ydb.StatusIds_UNAUTHORIZED || + message.GetFailure().GetStatus() == Ydb.StatusIds_NOT_FOUND { + // Consider the session expired if we got an unrecoverable status. + trace.CoordinationOnSessionServerExpire(s.client.config.Trace(), message.GetFailure()) + return + } + + trace.CoordinationOnSessionServerError(s.client.config.Trace(), message.GetFailure()) + return + case *Ydb_Coordination.SessionResponse_SessionStarted_: + sessionStarted <- message.GetSessionStarted() + s.updateLastGoodResponseTime() + case *Ydb_Coordination.SessionResponse_SessionStopped_: + sessionStopped <- message.GetSessionStopped() + s.cancel() + return + case *Ydb_Coordination.SessionResponse_Ping: + opaque := message.GetPing().GetOpaque() + err := s.controller.PushFront(conversation.NewConversation( + func() *Ydb_Coordination.SessionRequest { + return &Ydb_Coordination.SessionRequest{ + Request: &Ydb_Coordination.SessionRequest_Pong{ + Pong: &Ydb_Coordination.SessionRequest_PingPong{ + Opaque: opaque, + }, + }, + } + }), + ) + if err != nil { + // The session is closed if we cannot send the pong request back, so just exit the loop. + return + } + s.updateLastGoodResponseTime() + case *Ydb_Coordination.SessionResponse_Pong: + // Ignore pongs since we do not ping the server. + default: + if !s.controller.OnRecv(message) { + // Reconnect if the message is not from any known conversation. + trace.CoordinationOnSessionReceiveUnexpected(s.client.config.Trace(), message) + return + } + + s.updateLastGoodResponseTime() + } + } +} + +//nolint:revive +func (s *session) sendLoop( + wg *sync.WaitGroup, + sessionClient Ydb_Coordination_V1.CoordinationService_SessionClient, + streamCtx context.Context, + cancelStream context.CancelFunc, + startSending chan struct{}, + path string, + protectionKey []byte, + sessionID uint64, + seqNo uint64, +) { + // If the sendLoop is done, make sure the stream is also canceled to make the receiveLoop finish its work and cause + // reconnect. + defer wg.Done() + defer cancelStream() + + // Start a new session. + onDone := trace.CoordinationOnSessionStart(s.client.config.Trace()) + startSession := Ydb_Coordination.SessionRequest{ + Request: &Ydb_Coordination.SessionRequest_SessionStart_{ + SessionStart: &Ydb_Coordination.SessionRequest_SessionStart{ + Path: path, + SessionId: sessionID, + TimeoutMillis: uint64(s.options.SessionTimeout.Milliseconds()), + ProtectionKey: protectionKey, + SeqNo: seqNo, + Description: s.options.Description, + }, + }, + } + err := sessionClient.Send(&startSession) + if err != nil { + // Reconnect if a session cannot be started in this stream. + onDone(err) + return + } + onDone(nil) + + // Wait for a response to the session start request in order to carry over the accumulated conversations until the + // server confirms that the session is running. This is not absolutely necessary but helps the client to not fail + // non-idempotent requests in case of the session handshake errors. + select { + case <-streamCtx.Done(): + case <-startSending: + } + + for { + message, err := s.controller.OnSend(streamCtx) + if err != nil { + return + } + + onSendDone := trace.CoordinationOnSessionSend(s.client.config.Trace(), message) + err = sessionClient.Send(message) + if err != nil { + // Any stream error is unrecoverable, try to reconnect. + onSendDone(err) + return + } + onSendDone(nil) + } +} + +func (s *session) Context() context.Context { + return s.ctx +} + +func (s *session) Close(ctx context.Context) error { + s.cancel() + + select { + case <-s.sessionClosedChan: + case <-ctx.Done(): + return ctx.Err() + } + + return nil +} + +func (s *session) Reconnect() { + s.mutex.Lock() + defer s.mutex.Unlock() + + if s.cancelStream != nil { + s.cancelStream() + } +} + +func (s *session) SessionID() uint64 { + return s.sessionID +} + +func errorFromResponse(code Ydb.StatusIds_StatusCode, issues []*Ydb_Issue.IssueMessage) error { + if code == Ydb.StatusIds_SUCCESS { + return nil + } + + return xerrors.Operation(xerrors.WithStatusCode(code), xerrors.WithIssues(issues)) +} + +func (s *session) CreateSemaphore( + ctx context.Context, + name string, + limit uint64, + opts ...options.CreateSemaphoreOption, +) error { + req := conversation.NewConversation( + func() *Ydb_Coordination.SessionRequest { + createSemaphore := Ydb_Coordination.SessionRequest_CreateSemaphore{ + ReqId: newReqID(), + Name: name, + Limit: limit, + } + for _, o := range opts { + if o != nil { + o(&createSemaphore) + } + } + return &Ydb_Coordination.SessionRequest{ + Request: &Ydb_Coordination.SessionRequest_CreateSemaphore_{ + CreateSemaphore: &createSemaphore, + }, + } + }, + conversation.WithResponseFilter(func( + request *Ydb_Coordination.SessionRequest, + response *Ydb_Coordination.SessionResponse, + ) bool { + return response.GetCreateSemaphoreResult().GetReqId() == request.GetCreateSemaphore().GetReqId() + }), + ) + if err := s.controller.PushBack(req); err != nil { + return err + } + + resp, err := s.controller.Await(ctx, req) + if err != nil { + return err + } + + return errorFromResponse( + resp.GetCreateSemaphoreResult().GetStatus(), + resp.GetCreateSemaphoreResult().GetIssues(), + ) +} + +func (s *session) UpdateSemaphore( + ctx context.Context, + name string, + opts ...options.UpdateSemaphoreOption, +) error { + req := conversation.NewConversation( + func() *Ydb_Coordination.SessionRequest { + updateSemaphore := Ydb_Coordination.SessionRequest_UpdateSemaphore{ + ReqId: newReqID(), + Name: name, + } + for _, o := range opts { + if o != nil { + o(&updateSemaphore) + } + } + return &Ydb_Coordination.SessionRequest{ + Request: &Ydb_Coordination.SessionRequest_UpdateSemaphore_{ + UpdateSemaphore: &updateSemaphore, + }, + } + }, + conversation.WithResponseFilter(func( + request *Ydb_Coordination.SessionRequest, + response *Ydb_Coordination.SessionResponse, + ) bool { + return response.GetUpdateSemaphoreResult().GetReqId() == request.GetUpdateSemaphore().GetReqId() + }), + conversation.WithConflictKey(name), + conversation.WithIdempotence(true), + ) + if err := s.controller.PushBack(req); err != nil { + return err + } + + resp, err := s.controller.Await(ctx, req) + if err != nil { + return err + } + + return errorFromResponse( + resp.GetUpdateSemaphoreResult().GetStatus(), + resp.GetUpdateSemaphoreResult().GetIssues(), + ) +} + +func (s *session) DeleteSemaphore( + ctx context.Context, + name string, + opts ...options.DeleteSemaphoreOption, +) error { + req := conversation.NewConversation( + func() *Ydb_Coordination.SessionRequest { + deleteSemaphore := Ydb_Coordination.SessionRequest_DeleteSemaphore{ + ReqId: newReqID(), + Name: name, + } + for _, o := range opts { + if o != nil { + o(&deleteSemaphore) + } + } + return &Ydb_Coordination.SessionRequest{ + Request: &Ydb_Coordination.SessionRequest_DeleteSemaphore_{ + DeleteSemaphore: &deleteSemaphore, + }, + } + }, + conversation.WithResponseFilter(func( + request *Ydb_Coordination.SessionRequest, + response *Ydb_Coordination.SessionResponse, + ) bool { + return response.GetDeleteSemaphoreResult().GetReqId() == request.GetDeleteSemaphore().GetReqId() + }), + conversation.WithConflictKey(name), + ) + if err := s.controller.PushBack(req); err != nil { + return err + } + + resp, err := s.controller.Await(ctx, req) + if err != nil { + return err + } + + return errorFromResponse( + resp.GetDeleteSemaphoreResult().GetStatus(), + resp.GetDeleteSemaphoreResult().GetIssues(), + ) +} + +func (s *session) DescribeSemaphore( + ctx context.Context, + name string, + opts ...options.DescribeSemaphoreOption, +) (*coordination.SemaphoreDescription, error) { + req := conversation.NewConversation( + func() *Ydb_Coordination.SessionRequest { + describeSemaphore := Ydb_Coordination.SessionRequest_DescribeSemaphore{ + ReqId: newReqID(), + Name: name, + } + for _, o := range opts { + if o != nil { + o(&describeSemaphore) + } + } + return &Ydb_Coordination.SessionRequest{ + Request: &Ydb_Coordination.SessionRequest_DescribeSemaphore_{ + DescribeSemaphore: &describeSemaphore, + }, + } + }, + conversation.WithResponseFilter(func( + request *Ydb_Coordination.SessionRequest, + response *Ydb_Coordination.SessionResponse, + ) bool { + return response.GetDescribeSemaphoreResult().GetReqId() == request.GetDescribeSemaphore().GetReqId() + }), + conversation.WithConflictKey(name), + conversation.WithIdempotence(true), + ) + if err := s.controller.PushBack(req); err != nil { + return nil, err + } + + resp, err := s.controller.Await(ctx, req) + if err != nil { + return nil, err + } + + err = errorFromResponse( + resp.GetDescribeSemaphoreResult().GetStatus(), + resp.GetDescribeSemaphoreResult().GetIssues(), + ) + if err != nil { + return nil, err + } + + return convertSemaphoreDescription(resp.GetDescribeSemaphoreResult().GetSemaphoreDescription()), nil +} + +func convertSemaphoreDescription( + desc *Ydb_Coordination.SemaphoreDescription, +) *coordination.SemaphoreDescription { + var result coordination.SemaphoreDescription + + if desc != nil { + result.Name = desc.GetName() + result.Limit = desc.GetLimit() + result.Ephemeral = desc.GetEphemeral() + result.Count = desc.GetCount() + result.Data = desc.GetData() + result.Owners = convertSemaphoreSessions(desc.GetOwners()) + result.Waiters = convertSemaphoreSessions(desc.GetWaiters()) + } + + return &result +} + +func convertSemaphoreSessions( + sessions []*Ydb_Coordination.SemaphoreSession, +) []*coordination.SemaphoreSession { + if sessions == nil { + return nil + } + + result := make([]*coordination.SemaphoreSession, len(sessions)) + for i, s := range sessions { + result[i] = convertSemaphoreSession(s) + } + + return result +} + +func convertSemaphoreSession( + session *Ydb_Coordination.SemaphoreSession, +) *coordination.SemaphoreSession { + var result coordination.SemaphoreSession + + if session != nil { + result.SessionID = session.GetSessionId() + result.Count = session.GetCount() + result.OrderID = session.GetOrderId() + result.Data = session.GetData() + if session.GetTimeoutMillis() == math.MaxUint64 { + result.Timeout = time.Duration(math.MaxInt64) + } else { + // The service does not allow big timeout values, so the conversion seems to be safe. + result.Timeout = time.Duration(session.GetTimeoutMillis()) * time.Millisecond + } + } + + return &result +} + +func (s *session) AcquireSemaphore( + ctx context.Context, + name string, + count uint64, + opts ...options.AcquireSemaphoreOption, +) (coordination.Lease, error) { + req := conversation.NewConversation( + func() *Ydb_Coordination.SessionRequest { + acquireSemaphore := Ydb_Coordination.SessionRequest_AcquireSemaphore{ + ReqId: newReqID(), + Name: name, + Count: count, + TimeoutMillis: math.MaxUint64, + } + for _, o := range opts { + if o != nil { + o(&acquireSemaphore) + } + } + return &Ydb_Coordination.SessionRequest{ + Request: &Ydb_Coordination.SessionRequest_AcquireSemaphore_{ + AcquireSemaphore: &acquireSemaphore, + }, + } + }, + conversation.WithResponseFilter(func( + request *Ydb_Coordination.SessionRequest, + response *Ydb_Coordination.SessionResponse, + ) bool { + return response.GetAcquireSemaphoreResult().GetReqId() == request.GetAcquireSemaphore().GetReqId() + }), + conversation.WithAcknowledgeFilter(func( + request *Ydb_Coordination.SessionRequest, + response *Ydb_Coordination.SessionResponse, + ) bool { + return response.GetAcquireSemaphorePending().GetReqId() == request.GetAcquireSemaphore().GetReqId() + }), + conversation.WithCancelMessage( + func(request *Ydb_Coordination.SessionRequest) *Ydb_Coordination.SessionRequest { + return &Ydb_Coordination.SessionRequest{ + Request: &Ydb_Coordination.SessionRequest_ReleaseSemaphore_{ + ReleaseSemaphore: &Ydb_Coordination.SessionRequest_ReleaseSemaphore{ + Name: name, + ReqId: newReqID(), + }, + }, + } + }, + func( + request *Ydb_Coordination.SessionRequest, + response *Ydb_Coordination.SessionResponse, + ) bool { + return response.GetReleaseSemaphoreResult().GetReqId() == request.GetReleaseSemaphore().GetReqId() + }, + ), + conversation.WithConflictKey(name), + conversation.WithIdempotence(true), + ) + if err := s.controller.PushBack(req); err != nil { + return nil, err + } + + resp, err := s.controller.Await(ctx, req) + if err != nil { + return nil, err + } + + err = errorFromResponse( + resp.GetAcquireSemaphoreResult().GetStatus(), + resp.GetAcquireSemaphoreResult().GetIssues(), + ) + if err != nil { + return nil, err + } + + if !resp.GetAcquireSemaphoreResult().Acquired { + return nil, coordination.ErrAcquireTimeout + } + + ctx, cancel := context.WithCancel(s.ctx) + + return &lease{ + session: s, + name: name, + ctx: ctx, + cancel: cancel, + }, nil +} + +func (l *lease) Context() context.Context { + return l.ctx +} + +func (l *lease) Release() error { + req := conversation.NewConversation( + func() *Ydb_Coordination.SessionRequest { + return &Ydb_Coordination.SessionRequest{ + Request: &Ydb_Coordination.SessionRequest_ReleaseSemaphore_{ + ReleaseSemaphore: &Ydb_Coordination.SessionRequest_ReleaseSemaphore{ + ReqId: newReqID(), + Name: l.name, + }, + }, + } + }, + conversation.WithResponseFilter(func( + request *Ydb_Coordination.SessionRequest, + response *Ydb_Coordination.SessionResponse, + ) bool { + return response.GetReleaseSemaphoreResult().GetReqId() == request.GetReleaseSemaphore().GetReqId() + }), + conversation.WithConflictKey(l.name), + conversation.WithIdempotence(true), + ) + if err := l.session.controller.PushBack(req); err != nil { + return err + } + + resp, err := l.session.controller.Await(l.session.ctx, req) + if err != nil { + return err + } + + err = errorFromResponse( + resp.GetReleaseSemaphoreResult().GetStatus(), + resp.GetReleaseSemaphoreResult().GetIssues(), + ) + if err != nil { + return err + } + + l.cancel() + + return nil +} + +func (l *lease) Session() coordination.Session { + return l.session +} diff --git a/log/coordination.go b/log/coordination.go index 1485118de..45e053dad 100644 --- a/log/coordination.go +++ b/log/coordination.go @@ -1,10 +1,201 @@ package log import ( + "context" + "strconv" + "time" + "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) // Coordination makes trace.Coordination with logging events from details func Coordination(l Logger, d trace.Detailer, opts ...Option) (t trace.Coordination) { + return internalCoordination(wrapLogger(l, opts...), d) +} + +func internalCoordination(l *wrapper, d trace.Detailer) (t trace.Coordination) { + t.OnStreamNew = func( + info trace.CoordinationStreamNewStartInfo, + ) func( + info trace.CoordinationStreamNewDoneInfo, + ) { + if d.Details()&trace.CoordinationEvents == 0 { + return nil + } + ctx := with(context.Background(), TRACE, "ydb", "coordination", "session", "stream", "new") + l.Log(ctx, "stream") + start := time.Now() + return func(info trace.CoordinationStreamNewDoneInfo) { + l.Log(ctx, "done", + latencyField(start), + Error(info.Error), + versionField()) + } + } + + t.OnSessionStarted = func(info trace.CoordinationSessionStartedInfo) { + if d.Details()&trace.CoordinationEvents == 0 { + return + } + ctx := with(context.Background(), TRACE, "ydb", "coordination", "session", "started") + l.Log(ctx, "", + String("sessionID", strconv.FormatUint(info.SessionID, 10)), + String("expectedSessionID", strconv.FormatUint(info.SessionID, 10)), + ) + } + + t.OnSessionStartTimeout = func(info trace.CoordinationSessionStartTimeoutInfo) { + if d.Details()&trace.CoordinationEvents == 0 { + return + } + ctx := with(context.Background(), TRACE, "ydb", "coordination", "session", "start", "timeout") + l.Log(ctx, "", + Stringer("timeout", info.Timeout), + ) + } + + t.OnSessionKeepAliveTimeout = func(info trace.CoordinationSessionKeepAliveTimeoutInfo) { + if d.Details()&trace.CoordinationEvents == 0 { + return + } + ctx := with(context.Background(), TRACE, "ydb", "coordination", "session", "keepAlive", "timeout") + l.Log(ctx, "", + Stringer("timeout", info.Timeout), + Stringer("lastGoodResponseTime", info.LastGoodResponseTime), + ) + } + + t.OnSessionStopped = func(info trace.CoordinationSessionStoppedInfo) { + if d.Details()&trace.CoordinationEvents == 0 { + return + } + ctx := with(context.Background(), TRACE, "ydb", "coordination", "session", "stopped") + l.Log(ctx, "", + String("sessionID", strconv.FormatUint(info.SessionID, 10)), + String("expectedSessionID", strconv.FormatUint(info.SessionID, 10)), + ) + } + + t.OnSessionStopTimeout = func(info trace.CoordinationSessionStopTimeoutInfo) { + if d.Details()&trace.CoordinationEvents == 0 { + return + } + ctx := with(context.Background(), TRACE, "ydb", "coordination", "session", "stop", "timeout") + l.Log(ctx, "", + Stringer("timeout", info.Timeout), + ) + } + + t.OnSessionClientTimeout = func(info trace.CoordinationSessionClientTimeoutInfo) { + if d.Details()&trace.CoordinationEvents == 0 { + return + } + ctx := with(context.Background(), TRACE, "ydb", "coordination", "session", "client", "timeout") + l.Log(ctx, "", + Stringer("timeout", info.Timeout), + Stringer("lastGoodResponseTime", info.LastGoodResponseTime), + ) + } + + t.OnSessionServerExpire = func(info trace.CoordinationSessionServerExpireInfo) { + if d.Details()&trace.CoordinationEvents == 0 { + return + } + ctx := with(context.Background(), TRACE, "ydb", "coordination", "session", "server", "expire") + l.Log(ctx, "", + Stringer("failure", info.Failure), + ) + } + + t.OnSessionServerError = func(info trace.CoordinationSessionServerErrorInfo) { + if d.Details()&trace.CoordinationEvents == 0 { + return + } + ctx := with(context.Background(), TRACE, "ydb", "coordination", "session", "server", "error") + l.Log(ctx, "", + Stringer("failure", info.Failure), + ) + } + + t.OnSessionReceive = func( + info trace.CoordinationSessionReceiveStartInfo, + ) func( + info trace.CoordinationSessionReceiveDoneInfo, + ) { + if d.Details()&trace.CoordinationEvents == 0 { + return nil + } + ctx := with(context.Background(), TRACE, "ydb", "coordination", "session", "receive") + l.Log(ctx, "receive") + start := time.Now() + return func(info trace.CoordinationSessionReceiveDoneInfo) { + l.Log(ctx, "done", + latencyField(start), + Error(info.Error), + Stringer("response", info.Response), + versionField()) + } + } + + t.OnSessionReceiveUnexpected = func(info trace.CoordinationSessionReceiveUnexpectedInfo) { + if d.Details()&trace.CoordinationEvents == 0 { + return + } + ctx := with(context.Background(), TRACE, "ydb", "coordination", "session", "receive", "unexpected") + l.Log(ctx, "", + Stringer("response", info.Response), + ) + } + + t.OnSessionStop = func(info trace.CoordinationSessionStopInfo) { + if d.Details()&trace.CoordinationEvents == 0 { + return + } + ctx := with(context.Background(), TRACE, "ydb", "coordination", "session", "stop") + l.Log(ctx, "", + String("sessionID", strconv.FormatUint(info.SessionID, 10)), + ) + } + + t.OnSessionStart = func( + info trace.CoordinationSessionStartStartInfo, + ) func( + info trace.CoordinationSessionStartDoneInfo, + ) { + if d.Details()&trace.CoordinationEvents == 0 { + return nil + } + ctx := with(context.Background(), TRACE, "ydb", "coordination", "session", "start") + l.Log(ctx, "start") + start := time.Now() + return func(info trace.CoordinationSessionStartDoneInfo) { + l.Log(ctx, "done", + latencyField(start), + Error(info.Error), + versionField()) + } + } + + t.OnSessionSend = func( + info trace.CoordinationSessionSendStartInfo, + ) func( + info trace.CoordinationSessionSendDoneInfo, + ) { + if d.Details()&trace.CoordinationEvents == 0 { + return nil + } + ctx := with(context.Background(), TRACE, "ydb", "coordination", "session", "send") + l.Log(ctx, "start", + Stringer("request", info.Request), + ) + start := time.Now() + return func(info trace.CoordinationSessionSendDoneInfo) { + l.Log(ctx, "done", + latencyField(start), + Error(info.Error), + versionField()) + } + } + return t } diff --git a/options.go b/options.go index 1f92d4ce0..2329beeed 100644 --- a/options.go +++ b/options.go @@ -452,12 +452,12 @@ func WithTraceScheme(t trace.Scheme, opts ...trace.SchemeComposeOption) Option { } // WithTraceCoordination returns coordination trace option -func WithTraceCoordination(t trace.Coordination, opts ...trace.CoordinationComposeOption) Option { +func WithTraceCoordination(t trace.Coordination, opts ...trace.CoordinationComposeOption) Option { //nolint:gocritic return func(ctx context.Context, c *Driver) error { c.coordinationOptions = append( c.coordinationOptions, coordinationConfig.WithTrace( - t, + &t, append( []trace.CoordinationComposeOption{ trace.WithCoordinationPanicCallback(c.panicCallback), diff --git a/tests/integration/coordination_test.go b/tests/integration/coordination_test.go new file mode 100644 index 000000000..706876b8a --- /dev/null +++ b/tests/integration/coordination_test.go @@ -0,0 +1,104 @@ +//go:build integration +// +build integration + +package integration + +import ( + "context" + "fmt" + "os" + "testing" + + "github.com/ydb-platform/ydb-go-sdk/v3" + "github.com/ydb-platform/ydb-go-sdk/v3/coordination" + "github.com/ydb-platform/ydb-go-sdk/v3/coordination/options" + "github.com/ydb-platform/ydb-go-sdk/v3/log" + "github.com/ydb-platform/ydb-go-sdk/v3/trace" +) + +//nolint:errcheck +func TestExample(t *testing.T) { + ctx := context.TODO() + db, err := ydb.Open(ctx, "grpc://localhost:2136/local", ydb.WithLogger( + log.Default(os.Stderr, + log.WithMinLevel(log.TRACE), + ), + trace.MatchDetails(`ydb\.(coordination).*`))) + if err != nil { + fmt.Printf("failed to connect: %v", err) + return + } + defer db.Close(ctx) // cleanup resources + // create node + err = db.Coordination().CreateNode(ctx, "/local/test", coordination.NodeConfig{ + Path: "", + SelfCheckPeriodMillis: 1000, + SessionGracePeriodMillis: 1000, + ReadConsistencyMode: coordination.ConsistencyModeStrict, + AttachConsistencyMode: coordination.ConsistencyModeStrict, + RatelimiterCountersMode: coordination.RatelimiterCountersModeDetailed, + }) + if err != nil { + fmt.Printf("failed to create node: %v", err) + return + } + defer db.Coordination().DropNode(ctx, "/local/test") + e, c, err := db.Coordination().DescribeNode(ctx, "/local/test") + if err != nil { + fmt.Printf("failed to describe node: %v\n", err) + return + } + fmt.Printf("node description: %+v\nnode config: %+v\n", e, c) + + s, err := db.Coordination().OpenSession(ctx, "/local/test") + if err != nil { + fmt.Printf("failed to open session: %v\n", err) + return + } + defer s.Close(ctx) + fmt.Printf("session 1 opened, id: %d\n", s.SessionID()) + + err = s.CreateSemaphore(ctx, "my-semaphore", 20, options.WithCreateData([]byte{1, 2, 3})) + if err != nil { + fmt.Printf("failed to create semaphore: %v", err) + return + } + fmt.Printf("semaphore my-semaphore created\n") + + lease, err := s.AcquireSemaphore(ctx, "my-semaphore", 10) + if err != nil { + fmt.Printf("failed to acquire semaphore: %v", err) + return + } + defer lease.Release() + fmt.Printf("session 1 acquired semaphore 10\n") + + s.Reconnect() + fmt.Printf("session 1 reconnected\n") + + desc, err := s.DescribeSemaphore( + ctx, + "my-semaphore", + options.WithDescribeOwners(true), + options.WithDescribeWaiters(true), + ) + if err != nil { + fmt.Printf("failed to describe semaphore: %v", err) + return + } + fmt.Printf("session 1 described semaphore %v\n", desc) + + err = lease.Release() + if err != nil { + fmt.Printf("failed to release semaphore: %v", err) + return + } + fmt.Printf("session 1 released semaphore my-semaphore\n") + + err = s.DeleteSemaphore(ctx, "my-semaphore", options.WithForceDelete(true)) + if err != nil { + fmt.Printf("failed to delete semaphore: %v", err) + return + } + fmt.Printf("deleted semaphore my-semaphore\n") +} diff --git a/trace/coordination.go b/trace/coordination.go index 551064671..7c049c4fa 100644 --- a/trace/coordination.go +++ b/trace/coordination.go @@ -1,5 +1,11 @@ package trace +import ( + "time" + + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Coordination" +) + // tool gtrace used from ./internal/cmd/gtrace //go:generate gtrace @@ -7,5 +13,93 @@ package trace type ( // Coordination specified trace of coordination client activity. // gtrace:gen - Coordination struct{} + Coordination struct { + OnStreamNew func(CoordinationStreamNewStartInfo) func(CoordinationStreamNewDoneInfo) + OnSessionStarted func(CoordinationSessionStartedInfo) + OnSessionStartTimeout func(CoordinationSessionStartTimeoutInfo) + OnSessionKeepAliveTimeout func(CoordinationSessionKeepAliveTimeoutInfo) + OnSessionStopped func(CoordinationSessionStoppedInfo) + OnSessionStopTimeout func(CoordinationSessionStopTimeoutInfo) + OnSessionClientTimeout func(CoordinationSessionClientTimeoutInfo) + OnSessionServerExpire func(CoordinationSessionServerExpireInfo) + OnSessionServerError func(CoordinationSessionServerErrorInfo) + + OnSessionReceive func(CoordinationSessionReceiveStartInfo) func(CoordinationSessionReceiveDoneInfo) + OnSessionReceiveUnexpected func(CoordinationSessionReceiveUnexpectedInfo) + + OnSessionStop func(CoordinationSessionStopInfo) + OnSessionStart func(CoordinationSessionStartStartInfo) func(CoordinationSessionStartDoneInfo) + OnSessionSend func(CoordinationSessionSendStartInfo) func(CoordinationSessionSendDoneInfo) + } + + CoordinationStreamNewStartInfo struct{} + + CoordinationStreamNewDoneInfo struct { + Error error + } + + CoordinationSessionStartedInfo struct { + SessionID uint64 + ExpectedSessionID uint64 + } + + CoordinationSessionStartTimeoutInfo struct { + Timeout time.Duration + } + + CoordinationSessionKeepAliveTimeoutInfo struct { + LastGoodResponseTime time.Time + Timeout time.Duration + } + + CoordinationSessionStoppedInfo struct { + SessionID uint64 + ExpectedSessionID uint64 + } + + CoordinationSessionStopTimeoutInfo struct { + Timeout time.Duration + } + + CoordinationSessionClientTimeoutInfo struct { + LastGoodResponseTime time.Time + Timeout time.Duration + } + + CoordinationSessionServerExpireInfo struct { + Failure *Ydb_Coordination.SessionResponse_Failure + } + + CoordinationSessionServerErrorInfo struct { + Failure *Ydb_Coordination.SessionResponse_Failure + } + + CoordinationSessionReceiveStartInfo struct{} + + CoordinationSessionReceiveDoneInfo struct { + Response *Ydb_Coordination.SessionResponse + Error error + } + + CoordinationSessionReceiveUnexpectedInfo struct { + Response *Ydb_Coordination.SessionResponse + } + + CoordinationSessionStartStartInfo struct{} + + CoordinationSessionStartDoneInfo struct { + Error error + } + + CoordinationSessionStopInfo struct { + SessionID uint64 + } + + CoordinationSessionSendStartInfo struct { + Request *Ydb_Coordination.SessionRequest + } + + CoordinationSessionSendDoneInfo struct { + Error error + } ) diff --git a/trace/coordination_gtrace.go b/trace/coordination_gtrace.go index 609198059..17075da0c 100644 --- a/trace/coordination_gtrace.go +++ b/trace/coordination_gtrace.go @@ -2,6 +2,12 @@ package trace +import ( + "time" + + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Coordination" +) + // coordinationComposeOptions is a holder of options type coordinationComposeOptions struct { panicCallback func(e interface{}) @@ -20,5 +26,563 @@ func WithCoordinationPanicCallback(cb func(e interface{})) CoordinationComposeOp // Compose returns a new Coordination which has functional fields composed both from t and x. func (t *Coordination) Compose(x *Coordination, opts ...CoordinationComposeOption) *Coordination { var ret Coordination + options := coordinationComposeOptions{} + for _, opt := range opts { + if opt != nil { + opt(&options) + } + } + { + h1 := t.OnStreamNew + h2 := x.OnStreamNew + ret.OnStreamNew = func(c CoordinationStreamNewStartInfo) func(CoordinationStreamNewDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + var r, r1 func(CoordinationStreamNewDoneInfo) + if h1 != nil { + r = h1(c) + } + if h2 != nil { + r1 = h2(c) + } + return func(c CoordinationStreamNewDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if r != nil { + r(c) + } + if r1 != nil { + r1(c) + } + } + } + } + { + h1 := t.OnSessionStarted + h2 := x.OnSessionStarted + ret.OnSessionStarted = func(c CoordinationSessionStartedInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if h1 != nil { + h1(c) + } + if h2 != nil { + h2(c) + } + } + } + { + h1 := t.OnSessionStartTimeout + h2 := x.OnSessionStartTimeout + ret.OnSessionStartTimeout = func(c CoordinationSessionStartTimeoutInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if h1 != nil { + h1(c) + } + if h2 != nil { + h2(c) + } + } + } + { + h1 := t.OnSessionKeepAliveTimeout + h2 := x.OnSessionKeepAliveTimeout + ret.OnSessionKeepAliveTimeout = func(c CoordinationSessionKeepAliveTimeoutInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if h1 != nil { + h1(c) + } + if h2 != nil { + h2(c) + } + } + } + { + h1 := t.OnSessionStopped + h2 := x.OnSessionStopped + ret.OnSessionStopped = func(c CoordinationSessionStoppedInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if h1 != nil { + h1(c) + } + if h2 != nil { + h2(c) + } + } + } + { + h1 := t.OnSessionStopTimeout + h2 := x.OnSessionStopTimeout + ret.OnSessionStopTimeout = func(c CoordinationSessionStopTimeoutInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if h1 != nil { + h1(c) + } + if h2 != nil { + h2(c) + } + } + } + { + h1 := t.OnSessionClientTimeout + h2 := x.OnSessionClientTimeout + ret.OnSessionClientTimeout = func(c CoordinationSessionClientTimeoutInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if h1 != nil { + h1(c) + } + if h2 != nil { + h2(c) + } + } + } + { + h1 := t.OnSessionServerExpire + h2 := x.OnSessionServerExpire + ret.OnSessionServerExpire = func(c CoordinationSessionServerExpireInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if h1 != nil { + h1(c) + } + if h2 != nil { + h2(c) + } + } + } + { + h1 := t.OnSessionServerError + h2 := x.OnSessionServerError + ret.OnSessionServerError = func(c CoordinationSessionServerErrorInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if h1 != nil { + h1(c) + } + if h2 != nil { + h2(c) + } + } + } + { + h1 := t.OnSessionReceive + h2 := x.OnSessionReceive + ret.OnSessionReceive = func(c CoordinationSessionReceiveStartInfo) func(CoordinationSessionReceiveDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + var r, r1 func(CoordinationSessionReceiveDoneInfo) + if h1 != nil { + r = h1(c) + } + if h2 != nil { + r1 = h2(c) + } + return func(c CoordinationSessionReceiveDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if r != nil { + r(c) + } + if r1 != nil { + r1(c) + } + } + } + } + { + h1 := t.OnSessionReceiveUnexpected + h2 := x.OnSessionReceiveUnexpected + ret.OnSessionReceiveUnexpected = func(c CoordinationSessionReceiveUnexpectedInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if h1 != nil { + h1(c) + } + if h2 != nil { + h2(c) + } + } + } + { + h1 := t.OnSessionStop + h2 := x.OnSessionStop + ret.OnSessionStop = func(c CoordinationSessionStopInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if h1 != nil { + h1(c) + } + if h2 != nil { + h2(c) + } + } + } + { + h1 := t.OnSessionStart + h2 := x.OnSessionStart + ret.OnSessionStart = func(c CoordinationSessionStartStartInfo) func(CoordinationSessionStartDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + var r, r1 func(CoordinationSessionStartDoneInfo) + if h1 != nil { + r = h1(c) + } + if h2 != nil { + r1 = h2(c) + } + return func(c CoordinationSessionStartDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if r != nil { + r(c) + } + if r1 != nil { + r1(c) + } + } + } + } + { + h1 := t.OnSessionSend + h2 := x.OnSessionSend + ret.OnSessionSend = func(c CoordinationSessionSendStartInfo) func(CoordinationSessionSendDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + var r, r1 func(CoordinationSessionSendDoneInfo) + if h1 != nil { + r = h1(c) + } + if h2 != nil { + r1 = h2(c) + } + return func(c CoordinationSessionSendDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if r != nil { + r(c) + } + if r1 != nil { + r1(c) + } + } + } + } return &ret } +func (t *Coordination) onStreamNew(c CoordinationStreamNewStartInfo) func(CoordinationStreamNewDoneInfo) { + fn := t.OnStreamNew + if fn == nil { + return func(CoordinationStreamNewDoneInfo) { + return + } + } + res := fn(c) + if res == nil { + return func(CoordinationStreamNewDoneInfo) { + return + } + } + return res +} +func (t *Coordination) onSessionStarted(c CoordinationSessionStartedInfo) { + fn := t.OnSessionStarted + if fn == nil { + return + } + fn(c) +} +func (t *Coordination) onSessionStartTimeout(c CoordinationSessionStartTimeoutInfo) { + fn := t.OnSessionStartTimeout + if fn == nil { + return + } + fn(c) +} +func (t *Coordination) onSessionKeepAliveTimeout(c CoordinationSessionKeepAliveTimeoutInfo) { + fn := t.OnSessionKeepAliveTimeout + if fn == nil { + return + } + fn(c) +} +func (t *Coordination) onSessionStopped(c CoordinationSessionStoppedInfo) { + fn := t.OnSessionStopped + if fn == nil { + return + } + fn(c) +} +func (t *Coordination) onSessionStopTimeout(c CoordinationSessionStopTimeoutInfo) { + fn := t.OnSessionStopTimeout + if fn == nil { + return + } + fn(c) +} +func (t *Coordination) onSessionClientTimeout(c CoordinationSessionClientTimeoutInfo) { + fn := t.OnSessionClientTimeout + if fn == nil { + return + } + fn(c) +} +func (t *Coordination) onSessionServerExpire(c CoordinationSessionServerExpireInfo) { + fn := t.OnSessionServerExpire + if fn == nil { + return + } + fn(c) +} +func (t *Coordination) onSessionServerError(c CoordinationSessionServerErrorInfo) { + fn := t.OnSessionServerError + if fn == nil { + return + } + fn(c) +} +func (t *Coordination) onSessionReceive(c CoordinationSessionReceiveStartInfo) func(CoordinationSessionReceiveDoneInfo) { + fn := t.OnSessionReceive + if fn == nil { + return func(CoordinationSessionReceiveDoneInfo) { + return + } + } + res := fn(c) + if res == nil { + return func(CoordinationSessionReceiveDoneInfo) { + return + } + } + return res +} +func (t *Coordination) onSessionReceiveUnexpected(c CoordinationSessionReceiveUnexpectedInfo) { + fn := t.OnSessionReceiveUnexpected + if fn == nil { + return + } + fn(c) +} +func (t *Coordination) onSessionStop(c CoordinationSessionStopInfo) { + fn := t.OnSessionStop + if fn == nil { + return + } + fn(c) +} +func (t *Coordination) onSessionStart(c CoordinationSessionStartStartInfo) func(CoordinationSessionStartDoneInfo) { + fn := t.OnSessionStart + if fn == nil { + return func(CoordinationSessionStartDoneInfo) { + return + } + } + res := fn(c) + if res == nil { + return func(CoordinationSessionStartDoneInfo) { + return + } + } + return res +} +func (t *Coordination) onSessionSend(c CoordinationSessionSendStartInfo) func(CoordinationSessionSendDoneInfo) { + fn := t.OnSessionSend + if fn == nil { + return func(CoordinationSessionSendDoneInfo) { + return + } + } + res := fn(c) + if res == nil { + return func(CoordinationSessionSendDoneInfo) { + return + } + } + return res +} +func CoordinationOnStreamNew(t *Coordination) func(error) { + var p CoordinationStreamNewStartInfo + res := t.onStreamNew(p) + return func(e error) { + var p CoordinationStreamNewDoneInfo + p.Error = e + res(p) + } +} +func CoordinationOnSessionStarted(t *Coordination, sessionID uint64, expectedSessionID uint64) { + var p CoordinationSessionStartedInfo + p.SessionID = sessionID + p.ExpectedSessionID = expectedSessionID + t.onSessionStarted(p) +} +func CoordinationOnSessionStartTimeout(t *Coordination, timeout time.Duration) { + var p CoordinationSessionStartTimeoutInfo + p.Timeout = timeout + t.onSessionStartTimeout(p) +} +func CoordinationOnSessionKeepAliveTimeout(t *Coordination, lastGoodResponseTime time.Time, timeout time.Duration) { + var p CoordinationSessionKeepAliveTimeoutInfo + p.LastGoodResponseTime = lastGoodResponseTime + p.Timeout = timeout + t.onSessionKeepAliveTimeout(p) +} +func CoordinationOnSessionStopped(t *Coordination, sessionID uint64, expectedSessionID uint64) { + var p CoordinationSessionStoppedInfo + p.SessionID = sessionID + p.ExpectedSessionID = expectedSessionID + t.onSessionStopped(p) +} +func CoordinationOnSessionStopTimeout(t *Coordination, timeout time.Duration) { + var p CoordinationSessionStopTimeoutInfo + p.Timeout = timeout + t.onSessionStopTimeout(p) +} +func CoordinationOnSessionClientTimeout(t *Coordination, lastGoodResponseTime time.Time, timeout time.Duration) { + var p CoordinationSessionClientTimeoutInfo + p.LastGoodResponseTime = lastGoodResponseTime + p.Timeout = timeout + t.onSessionClientTimeout(p) +} +func CoordinationOnSessionServerExpire(t *Coordination, failure *Ydb_Coordination.SessionResponse_Failure) { + var p CoordinationSessionServerExpireInfo + p.Failure = failure + t.onSessionServerExpire(p) +} +func CoordinationOnSessionServerError(t *Coordination, failure *Ydb_Coordination.SessionResponse_Failure) { + var p CoordinationSessionServerErrorInfo + p.Failure = failure + t.onSessionServerError(p) +} +func CoordinationOnSessionReceive(t *Coordination) func(response *Ydb_Coordination.SessionResponse, _ error) { + var p CoordinationSessionReceiveStartInfo + res := t.onSessionReceive(p) + return func(response *Ydb_Coordination.SessionResponse, e error) { + var p CoordinationSessionReceiveDoneInfo + p.Response = response + p.Error = e + res(p) + } +} +func CoordinationOnSessionReceiveUnexpected(t *Coordination, response *Ydb_Coordination.SessionResponse) { + var p CoordinationSessionReceiveUnexpectedInfo + p.Response = response + t.onSessionReceiveUnexpected(p) +} +func CoordinationOnSessionStop(t *Coordination, sessionID uint64) { + var p CoordinationSessionStopInfo + p.SessionID = sessionID + t.onSessionStop(p) +} +func CoordinationOnSessionStart(t *Coordination) func(error) { + var p CoordinationSessionStartStartInfo + res := t.onSessionStart(p) + return func(e error) { + var p CoordinationSessionStartDoneInfo + p.Error = e + res(p) + } +} +func CoordinationOnSessionSend(t *Coordination, request *Ydb_Coordination.SessionRequest) func(error) { + var p CoordinationSessionSendStartInfo + p.Request = request + res := t.onSessionSend(p) + return func(e error) { + var p CoordinationSessionSendDoneInfo + p.Error = e + res(p) + } +} From 0eff713a9543436fc5648af8e81b26f070b997d8 Mon Sep 17 00:00:00 2001 From: Vlad Arkhipov Date: Fri, 22 Sep 2023 19:25:02 +0200 Subject: [PATCH 3/3] feat: add coordination service client examples - Locks: shows how to use the Lease object and its context to make code execute at one node in a cluster at a time. - Workers: shows how use the Coordination Service to distribute independent tasks among multiple workers. See also #53 --- examples/coordination/README.md | 156 ++++++++++++++++++ examples/coordination/lock/main.go | 145 ++++++++++++++++ examples/coordination/workers/main.go | 227 ++++++++++++++++++++++++++ 3 files changed, 528 insertions(+) create mode 100644 examples/coordination/README.md create mode 100644 examples/coordination/lock/main.go create mode 100644 examples/coordination/workers/main.go diff --git a/examples/coordination/README.md b/examples/coordination/README.md new file mode 100644 index 000000000..d5248aa99 --- /dev/null +++ b/examples/coordination/README.md @@ -0,0 +1,156 @@ +# Coordination Examples + +Coordination Examples demonstrate how to implement various distributed consistency primitives using the +`coordination.Client` API. + +## Locks + +Strictly speaking, this is an implementation of a [lease](https://en.wikipedia.org/wiki/Lease_(computer_science)), not a +lock in the traditional sense. + +Consider an application which is running a number of different instances. You want to ensure that some shared resource +is accessed by only one instance at a time. + +The `lock` application is an example of that instance. When it starts, it waits until the distributed lock is acquired. +When the application cannot consider the lock acquired anymore (for example, in the case of network issues) it stops +working and tries to acquire the lock again. + +You may start any number of applications, only one of them will consider itself the holder of the lock at a time. + +Although we call it a lock, this mechanism **cannot be used to implement mutual exclusion by itself**. Session relies on +physical time clocks. The server and the client clocks may be, and actually always are, out of sync. This results in a +situation where for example the server releases the lease but the client still assumes it owns the lease. However, +leases can be used as optimization in order to significantly reduce the possibility of resource contention. + +To start the application with the [YDB Docker database instance](https://ydb.tech/en/docs/getting_started/self_hosted/ydb_docker) run + +```bash +$ go build +$ YDB_ANONYMOUS_CREDENTIALS=1 ./lock -ydb grpc://localhost:2136/local --path /local/test --semaphore lock +``` + +When you stop the application which is currently holding the semaphore, another one should immediately acquire the lock +and start doing its work. + +This example uses an ephemeral semaphore which is always acquired exclusive. The following pseudocode shows how it is +used. + +```go +for { + session, err := db.Coordination().OpenSession(ctx, path) + if err != nil { + // The context is canceled. + break + } + + lease, err := session.AcquireSemaphore(ctx, semaphore, coordination.Exclusive, options.WithEphemeral(true)) + if err != nil { + session.Close(ctx) + continue + } + + // The lock is acquired. + go doWork(lease.Context()) + + // Wait until the lock is released. + <-lease.Context().Done(): + + // The lock is not acquired anymore. + cancelWork() +} +``` + +## Workers + +This is an example of distributing long-running tasks among multiple workers. Consider there is a number of tasks that +need to be processed simultaneously by a pool of workers. Each task is processed independently and the order of +processing does not matter. A worker has a fixed capacity that defines how many tasks it is ready to process at a time. + +Any single task is associated with a Coordination Service semaphore. When the application starts, it grabs all task +semaphores in order to acquire at least `capacity` of them. When it happens, it releases excessive ones and wait until +the application finishes. Until then, it starts a new task for every acquired semaphore and waits until the number of +running tasks becomes equal to the capacity of the worker. + +To start the application with the [YDB Docker database instance](https://ydb.tech/en/docs/getting_started/self_hosted/ydb_docker) run + +```bash +$ go build +$ YDB_ANONYMOUS_CREDENTIALS=1 ./lock -ydb grpc://localhost:2136/local -path /local/test --semaphore-prefix job- --tasks 10 --capacity 4 +``` + +This example uses ephemeral semaphores which are always acquired exclusive. However, in a real application you may +want to use persistent semaphores in order to store the state of workers in attached data. The following pseudocode +shows how it is used. + +```go +for { + session, err := db.Coordination().OpenSession(ctx, path) + if err != nil { + // The context is canceled. + break + } + + semaphoreCtx, semaphoreCancel := context.WithCancel(ctx) + capacitySemaphore := semaphore.NewWeighted(capacity) + leaseChan := make(chan *coordination.Lease) + for _, name := range tasks { + go awaitSemaphore(semaphoreCtx, semaphoreCancel, session, name, capacitySemaphore, leaseChan) + } + + tasksStarted := 0 +loop: + for { + lease := <-leaseChan + + // Run a new task every time we acquire a semaphore. + go doWork(lease.Context()) + + tasksStarted++ + if tasksStarted == capacity { + break + } + } + + // The capacity is full, cancel all Acquire operations. + semaphoreCancel() + + // Wait until the session is alive. + <-session.Context().Done() + + // The tasks must be stopped since we do not own the semaphores anymore. + cancelTasks() +} + +func awaitSemaphore( + ctx context.Context, + cancel context.CancelFunc, + session coordination.Session, + semaphoreName string, + capacitySemaphore *semaphore.Weighted, + leaseChan chan *coordination.Lease, +) { + lease, err := session.AcquireSemaphore( + ctx, + semaphoreName, + coordination.Exclusive, + options.WithEphemeral(true), + ) + if err != nil { + // Let the main loop know that something is wrong. + // ... + return + } + + // If there is a need in tasks for the current worker, provide it with a new lease. + if capacitySemaphore.TryAcquire(1) { + leaseChan <- lease + } else { + // This may happen since we are waiting for all existing semaphores trying to grab the first available to us. + err := lease.Release() + if err != nil { + // Let the main loop know that something is wrong. + // ... + } + } +} +``` diff --git a/examples/coordination/lock/main.go b/examples/coordination/lock/main.go new file mode 100644 index 000000000..22fb03d52 --- /dev/null +++ b/examples/coordination/lock/main.go @@ -0,0 +1,145 @@ +package main + +import ( + "context" + "flag" + "fmt" + "os" + "os/signal" + "sync" + "time" + + environ "github.com/ydb-platform/ydb-go-sdk-auth-environ" + + ydb "github.com/ydb-platform/ydb-go-sdk/v3" + "github.com/ydb-platform/ydb-go-sdk/v3/coordination" + "github.com/ydb-platform/ydb-go-sdk/v3/coordination/options" +) + +var ( + dsn string + path string + semaphore string +) + +func init() { + required := []string{"ydb", "path", "semaphore"} + flagSet := flag.NewFlagSet(os.Args[0], flag.ExitOnError) + flagSet.Usage = func() { + out := flagSet.Output() + _, _ = fmt.Fprintf(out, "Usage:\n%s [options]\n", os.Args[0]) + _, _ = fmt.Fprintf(out, "\nOptions:\n") + flagSet.PrintDefaults() + } + flagSet.StringVar(&dsn, + "ydb", "", + "YDB connection string", + ) + flagSet.StringVar(&path, + "path", "", + "coordination node path", + ) + flagSet.StringVar(&semaphore, + "semaphore", "", + "semaphore name", + ) + if err := flagSet.Parse(os.Args[1:]); err != nil { + flagSet.Usage() + os.Exit(1) + } + flagSet.Visit(func(f *flag.Flag) { + for i, arg := range required { + if arg == f.Name { + required = append(required[:i], required[i+1:]...) + } + } + }) + if len(required) > 0 { + fmt.Printf("\nSome required options not defined: %v\n\n", required) + flagSet.Usage() + os.Exit(1) + } +} + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + defer func() { + signal.Stop(c) + cancel() + }() + + db, err := ydb.Open(ctx, dsn, + environ.WithEnvironCredentials(ctx), + ) + if err != nil { + panic(fmt.Errorf("connect error: %w", err)) + } + defer func() { _ = db.Close(ctx) }() + + err = db.Coordination().CreateNode(ctx, path, coordination.NodeConfig{ + Path: "", + SelfCheckPeriodMillis: 1000, + SessionGracePeriodMillis: 1000, + ReadConsistencyMode: coordination.ConsistencyModeStrict, + AttachConsistencyMode: coordination.ConsistencyModeStrict, + RatelimiterCountersMode: coordination.RatelimiterCountersModeDetailed, + }) + if err != nil { + fmt.Printf("failed to create coordination node: %v\n", err) + return + } + + for { + fmt.Println("waiting for a lock...") + + session, err := db.Coordination().OpenSession(ctx, path) + if err != nil { + fmt.Println("failed to open session", err) + return + } + + lease, err := session.AcquireSemaphore(ctx, semaphore, coordination.Exclusive, options.WithEphemeral(true)) + if err != nil { + fmt.Printf("failed to acquire semaphore: %v\n", err) + _ = session.Close(ctx) + continue + } + + fmt.Println("the lock is acquired") + + wg := sync.WaitGroup{} + wg.Add(1) + go doWork(lease.Context(), &wg) + + select { + case <-c: + fmt.Println("exiting") + return + case <-lease.Context().Done(): + } + + fmt.Println("the lock is released") + wg.Wait() + } +} + +func doWork(ctx context.Context, wg *sync.WaitGroup) { + fmt.Println("starting work") + +loop: + for { + fmt.Println("work is in progress...") + + select { + case <-ctx.Done(): + break loop + case <-time.After(time.Second): + } + } + + fmt.Println("suspending work") + + wg.Done() +} diff --git a/examples/coordination/workers/main.go b/examples/coordination/workers/main.go new file mode 100644 index 000000000..fc9c6f328 --- /dev/null +++ b/examples/coordination/workers/main.go @@ -0,0 +1,227 @@ +package main + +import ( + "context" + "flag" + "fmt" + "math/rand" + "os" + "os/signal" + "sync" + "time" + + environ "github.com/ydb-platform/ydb-go-sdk-auth-environ" + "golang.org/x/sync/semaphore" + + ydb "github.com/ydb-platform/ydb-go-sdk/v3" + "github.com/ydb-platform/ydb-go-sdk/v3/coordination" + "github.com/ydb-platform/ydb-go-sdk/v3/coordination/options" +) + +var ( + dsn string + path string + semaphorePrefix string + taskCount int + capacity int +) + +func init() { + required := []string{"ydb", "path"} + flagSet := flag.NewFlagSet(os.Args[0], flag.ExitOnError) + flagSet.Usage = func() { + out := flagSet.Output() + _, _ = fmt.Fprintf(out, "Usage:\n%s [options]\n", os.Args[0]) + _, _ = fmt.Fprintf(out, "\nOptions:\n") + flagSet.PrintDefaults() + } + flagSet.StringVar(&dsn, + "ydb", "", + "YDB connection string", + ) + flagSet.StringVar(&path, + "path", "", + "coordination node path", + ) + flagSet.StringVar(&semaphorePrefix, + "semaphore-prefix", "job-", + "semaphore prefix", + ) + flagSet.IntVar(&taskCount, + "tasks", 10, + "the number of tasks", + ) + flagSet.IntVar(&capacity, + "capacity", 4, + "the maximum number of tasks a worker can run", + ) + if err := flagSet.Parse(os.Args[1:]); err != nil { + flagSet.Usage() + os.Exit(1) + } + flagSet.Visit(func(f *flag.Flag) { + for i, arg := range required { + if arg == f.Name { + required = append(required[:i], required[i+1:]...) + } + } + }) + if len(required) > 0 { + fmt.Printf("\nSome required options not defined: %v\n\n", required) + flagSet.Usage() + os.Exit(1) + } +} + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + defer func() { + signal.Stop(c) + cancel() + }() + + db, err := ydb.Open(ctx, dsn, + environ.WithEnvironCredentials(ctx), + ) + if err != nil { + panic(fmt.Errorf("connect error: %w", err)) + } + defer func() { _ = db.Close(ctx) }() + + err = db.Coordination().CreateNode(ctx, path, coordination.NodeConfig{ + Path: "", + SelfCheckPeriodMillis: 1000, + SessionGracePeriodMillis: 1000, + ReadConsistencyMode: coordination.ConsistencyModeStrict, + AttachConsistencyMode: coordination.ConsistencyModeStrict, + RatelimiterCountersMode: coordination.RatelimiterCountersModeDetailed, + }) + if err != nil { + fmt.Printf("failed to create coordination node: %v\n", err) + return + } + + tasks := make([]string, taskCount) + for i := 0; i < taskCount; i++ { + tasks[i] = fmt.Sprintf("%s%d", semaphorePrefix, i) + } + rand.Shuffle(taskCount, func(i int, j int) { + tasks[i], tasks[j] = tasks[j], tasks[i] + }) + + fmt.Println("starting tasks") + for { + session, err := db.Coordination().OpenSession(ctx, path) + if err != nil { + fmt.Println("failed to open session", err) + return + } + + semaphoreCtx, semaphoreCancel := context.WithCancel(ctx) + wg := sync.WaitGroup{} + wg.Add(taskCount) + leaseChan := make(chan *LeaseInfo) + sem := semaphore.NewWeighted(int64(capacity)) + + for _, name := range tasks { + go awaitSemaphore(semaphoreCtx, &wg, session, name, leaseChan, sem, semaphoreCancel) + } + + tasksStarted := 0 + loop: + for { + select { + case <-semaphoreCtx.Done(): + break loop + case <-c: + fmt.Println("exiting") + return + case lease := <-leaseChan: + go doWork(lease.lease, lease.semaphoreName) + tasksStarted++ + if tasksStarted == capacity { + break loop + } + } + } + + fmt.Println("all workers are started") + + semaphoreCancel() + wg.Wait() + + select { + case <-session.Context().Done(): + case <-c: + fmt.Println("exiting") + return + } + } +} + +type LeaseInfo struct { + lease coordination.Lease + semaphoreName string +} + +func awaitSemaphore( + ctx context.Context, + done *sync.WaitGroup, + session coordination.Session, + semaphoreName string, + leaseChan chan *LeaseInfo, + sem *semaphore.Weighted, + cancel context.CancelFunc, +) { + defer done.Done() + + lease, err := session.AcquireSemaphore( + ctx, + semaphoreName, + coordination.Exclusive, + options.WithEphemeral(true), + ) + if err != nil { + if ctx.Err() != nil { + return + } + + fmt.Println("failed to acquire semaphore", err) + cancel() + return + } + + if sem.TryAcquire(1) { + leaseChan <- &LeaseInfo{lease: lease, semaphoreName: semaphoreName} + } else { + err := lease.Release() + if err != nil { + fmt.Println("failed to release semaphore", err) + cancel() + } + } +} + +func doWork( + lease coordination.Lease, + name string, +) { + fmt.Printf("worker %s: starting\n", name) + + for { + select { + case <-lease.Context().Done(): + fmt.Printf("worker %s: done\n", name) + err := lease.Release() + if err != nil { + fmt.Println("failed to release semaphore", err) + } + return + case <-time.After(time.Second): + } + + fmt.Printf("worker %s: in progress\n", name) + } +}