Skip to content

Commit

Permalink
Merge pull request #830 from arkhipov/master
Browse files Browse the repository at this point in the history
feat: coordination session client
  • Loading branch information
asmyasnikov authored Mar 25, 2024
2 parents 173e976 + 1948cf3 commit b19bdf4
Show file tree
Hide file tree
Showing 14 changed files with 3,326 additions and 7 deletions.
196 changes: 196 additions & 0 deletions coordination/coordination.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@ package coordination

import (
"context"
"errors"
"fmt"
"math"
"time"

"github.com/ydb-platform/ydb-go-sdk/v3/coordination/options"
"github.com/ydb-platform/ydb-go-sdk/v3/scheme"
)

Expand All @@ -11,4 +16,195 @@ type Client interface {
AlterNode(ctx context.Context, path string, config NodeConfig) (err error)
DropNode(ctx context.Context, path string) (err error)
DescribeNode(ctx context.Context, path string) (_ *scheme.Entry, _ *NodeConfig, err error)

// OpenSession starts a new session. This method blocks until the server session is created. The context provided is
// used for the lifetime of the session.
//
// To ensure resources are not leaked, one of the following actions must be performed:
//
// - cancel the provided context,
// - call Close on the Session,
// - close the Client which the session was created with,
// - call any method of the Session until the ErrSessionClosed is returned.
OpenSession(ctx context.Context, path string, opts ...options.OpenSessionOption) (Session, error)
}

// ErrOperationStatusUnknown indicates that the request has been sent to the server but no reply has been received.
// The client usually automatically retries calls of that kind, but there are cases when it is not possible:
// - the request is not idempotent, non-idempotent requests are never retried,
// - the session was lost and its context is canceled.
var ErrOperationStatusUnknown = errors.New("operation status is unknown")

// ErrSessionClosed indicates that the Session object is closed.
var ErrSessionClosed = errors.New("session is closed")

// ErrAcquireTimeout indicates that the Session.AcquireSemaphore method could not acquire the semaphore before the
// operation timeout (see options.WithAcquireTimeout).
var ErrAcquireTimeout = errors.New("acquire semaphore timeout")

const (
// MaxSemaphoreLimit defines the maximum value of the limit parameter in the Session.CreateSemaphore method.
MaxSemaphoreLimit = math.MaxUint64

// Exclusive is just a shortcut for the maximum semaphore limit value. You can use this to acquire a semaphore in
// the exclusive mode if it was created with the limit value of MaxSemaphoreLimit, which is always true for
// ephemeral semaphores.
Exclusive = math.MaxUint64

// Shared is just a shortcut for the minimum semaphore limit value (1). You can use this to acquire a semaphore in
// the shared mode if it was created with the limit value of MaxSemaphoreLimit, which is always true for ephemeral
// semaphores.
Shared = 1
)

// Session defines a coordination service backed session.
//
// In general, Session API is concurrency-friendly, you can safely access all of its methods concurrently.
//
// The client guarantees that sequential calls of the methods are sent to the server in the same order. However, the
// session client may reorder and suspend some of the requests without violating correctness of the execution. This also
// applies to the situations when the underlying gRPC stream has been recreated due to network or server issues.
//
// The client automatically keeps the underlying gRPC stream alive by sending keep-alive (ping-pong) requests. If the
// client can no longer consider the session alive, it immediately cancels the session context which also leads to
// cancellation of contexts of all semaphore leases created by this session.
type Session interface {
// Close closes the coordination service session. It cancels all active requests on the server and notifies every
// pending or waiting for response request on the client side. It also cancels the session context and tries to
// stop the session gracefully on the server. If the ctx is canceled, this will not wait for the server session to
// become stopped and returns immediately with an error. Once this function returns with no error, all subsequent
// calls will be noop.
Close(ctx context.Context) error

// Context returns the context of the session. It is canceled when the underlying server session is over or if the
// client could not get any successful response from the server before the session timeout (see
// options.WithSessionTimeout).
Context() context.Context

// CreateSemaphore creates a new semaphore. This method waits until the server successfully creates a new semaphore
// or returns an error.
//
// This method is not idempotent. If the request has been sent to the server but no reply has been received, it
// returns the ErrOperationStatusUnknown error.
CreateSemaphore(ctx context.Context, name string, limit uint64, opts ...options.CreateSemaphoreOption) error

// UpdateSemaphore changes semaphore data. This method waits until the server successfully updates the semaphore or
// returns an error.
//
// This method is idempotent. The client will automatically retry in the case of network or server failure.
UpdateSemaphore(ctx context.Context, name string, opts ...options.UpdateSemaphoreOption) error

// DeleteSemaphore deletes an existing semaphore. This method waits until the server successfully deletes the
// semaphore or returns an error.
//
// This method is not idempotent. If the request has been sent to the server but no reply has been received, it
// returns the ErrOperationStatusUnknown error.
DeleteSemaphore(ctx context.Context, name string, opts ...options.DeleteSemaphoreOption) error

// DescribeSemaphore returns the state of the semaphore.
//
// This method is idempotent. The client will automatically retry in the case of network or server failure.
DescribeSemaphore(
ctx context.Context,
name string,
opts ...options.DescribeSemaphoreOption,
) (*SemaphoreDescription, error)

// AcquireSemaphore acquires the semaphore. If you acquire an ephemeral semaphore (see options.WithEphemeral), its
// limit will be set to MaxSemaphoreLimit. Later requests override previous operations with the same semaphore, e.g.
// to reduce acquired count, change timeout or attached data.
//
// This method blocks until the semaphore is acquired, an error is returned from the server or the session is
// closed. If the operation context was canceled but the server replied that the semaphore was actually acquired,
// the client will automatically release the semaphore.
//
// Semaphore waiting is fair: the semaphore guarantees that other sessions invoking the AcquireSemaphore method
// acquire permits in the order which they were called (FIFO). If a session invokes the AcquireSemaphore method
// multiple times while the first invocation is still in process, the position in the queue remains unchanged.
//
// This method is idempotent. The client will automatically retry in the case of network or server failure.
AcquireSemaphore(
ctx context.Context,
name string,
count uint64,
opts ...options.AcquireSemaphoreOption,
) (Lease, error)

// SessionID returns a server-generated identifier of the session. This value is permanent and unique within the
// coordination service node.
SessionID() uint64

// Reconnect forcibly shuts down the underlying gRPC stream and initiates a new one. This method is highly unlikely
// to be of use in a typical application but is extremely useful for testing an API implementation.
Reconnect()
}

// Lease is the object which defines the rights of the session to the acquired semaphore. Lease is alive until its
// context is not canceled. This may happen implicitly, when the associated session becomes lost or closed, or
// explicitly, if someone calls the Release method of the lease.
type Lease interface {
// Context returns the context of the lease. It is canceled when the session it was created by was lost or closed,
// or if the lease was released by calling the Release method.
Context() context.Context

// Release releases the acquired lease to the semaphore. It also cancels the context of the lease. This method does
// not take a ctx argument, but you can cancel the execution of it by closing the session or canceling its context.
Release() error

// Session returns the session which this lease was created by.
Session() Session
}

// SemaphoreDescription describes the state of a semaphore.
type SemaphoreDescription struct {
// Name is the name of the semaphore.
Name string

// Limit is the maximum number of tokens that may be acquired.
Limit uint64

// Count is the number of tokens currently acquired by its owners.
Count uint64

// Ephemeral semaphores are deleted when there are no owners and waiters left.
Ephemeral bool

// Data is user-defined data attached to the semaphore.
Data []byte

// Owner is the list of current owners of the semaphore.
Owners []*SemaphoreSession

// Waiter is the list of current waiters of the semaphore.
Waiters []*SemaphoreSession
}

// SemaphoreSession describes an owner or a waiter of this semaphore.
type SemaphoreSession struct {
// SessionID is the id of the session which tried to acquire the semaphore.
SessionID uint64

// Count is the number of tokens for the acquire operation.
Count uint64

// OrderId is a monotonically increasing id which determines locking order.
OrderID uint64

// Data is user-defined data attached to the acquire operation.
Data []byte

// Timeout is the timeout for the operation in the waiter queue. If this is time.Duration(math.MaxInt64) the session
// will wait for the semaphore until the operation is canceled.
Timeout time.Duration
}

func (d *SemaphoreDescription) String() string {
return fmt.Sprintf(
"{Name: %q Limit: %d Count: %d Ephemeral: %t Data: %q Owners: %s Waiters: %s}",
d.Name, d.Limit, d.Count, d.Ephemeral, d.Data, d.Owners, d.Waiters)
}

func (s *SemaphoreSession) String() string {
return fmt.Sprintf("{SessionID: %d Count: %d OrderID: %d Data: %q TimeoutMillis: %v}",
s.SessionID, s.Count, s.OrderID, s.Data, s.Timeout)
}
173 changes: 173 additions & 0 deletions coordination/options/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package options

import (
"math"
"time"

"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Coordination"
)

// WithDescription returns an OpenSessionOption that specifies a user-defined description that may be used to describe
// the client.
func WithDescription(description string) OpenSessionOption {
return func(c *OpenSessionOptions) {
c.Description = description
}
}

// WithSessionTimeout returns an OpenSessionOption that specifies the timeout during which client may restore a
// detached session. The client is forced to terminate the session if the last successful session request occurred
// earlier than this time.
//
// If this is not set, the client uses the default 5 seconds.
func WithSessionTimeout(timeout time.Duration) OpenSessionOption {
return func(c *OpenSessionOptions) {
c.SessionTimeout = timeout
}
}

// WithSessionStartTimeout returns an OpenSessionOption that specifies the time that the client should wait for a
// response to the StartSession request from the server before it terminates the gRPC stream and tries to reconnect.
//
// If this is not set, the client uses the default time 1 second.
func WithSessionStartTimeout(timeout time.Duration) OpenSessionOption {
return func(c *OpenSessionOptions) {
c.SessionStartTimeout = timeout
}
}

// WithSessionStopTimeout returns an OpenSessionOption that specifies the time that the client should wait for a
// response to the StopSession request from the server before it terminates the gRPC stream and tries to reconnect.
//
// If this is not set, the client uses the default time 1 second.
func WithSessionStopTimeout(timeout time.Duration) OpenSessionOption {
return func(c *OpenSessionOptions) {
c.SessionStartTimeout = timeout
}
}

// WithSessionKeepAliveTimeout returns an OpenSessionOption that specifies the time that the client will wait before it
// terminates the gRPC stream and tries to reconnect if no successful responses have been received from the server.
//
// If this is not set, the client uses the default time 10 seconds.
func WithSessionKeepAliveTimeout(timeout time.Duration) OpenSessionOption {
return func(c *OpenSessionOptions) {
c.SessionKeepAliveTimeout = timeout
}
}

// WithSessionReconnectDelay returns an OpenSessionOption that specifies the time that the client will wait before it
// tries to reconnect the underlying gRPC stream in case of error.
//
// If this is not set, the client uses the default time 500 milliseconds.
func WithSessionReconnectDelay(delay time.Duration) OpenSessionOption {
return func(c *OpenSessionOptions) {
c.SessionReconnectDelay = delay
}
}

// OpenSessionOption configures how we open a new session.
type OpenSessionOption func(c *OpenSessionOptions)

// OpenSessionOptions configure an OpenSession call. OpenSessionOptions are set by the OpenSessionOption values passed
// to the OpenSession function.
type OpenSessionOptions struct {
Description string
SessionTimeout time.Duration
SessionStartTimeout time.Duration
SessionStopTimeout time.Duration
SessionKeepAliveTimeout time.Duration
SessionReconnectDelay time.Duration
}

// WithEphemeral returns an AcquireSemaphoreOption that causes to create an ephemeral semaphore.
//
// Ephemeral semaphores are created with the first acquire operation and automatically deleted with the last release
// operation. Ephemeral semaphore are always created with the limit of coordination.MaxSemaphoreLimit.
func WithEphemeral(ephemeral bool) AcquireSemaphoreOption {
return func(c *Ydb_Coordination.SessionRequest_AcquireSemaphore) {
c.Ephemeral = ephemeral
}
}

// WithAcquireTimeout returns an AcquireSemaphoreOption which sets the timeout after which the operation fails if it
// is still waiting in the queue. Use 0 to make the AcquireSemaphore method fail immediately if the semaphore is already
// acquired by another session.
//
// If this is not set, the client waits for the acquire operation result until the operation or session context is done.
// You can reset the default value of this timeout by calling the WithNoAcquireTimeout method.
func WithAcquireTimeout(timeout time.Duration) AcquireSemaphoreOption {
return func(c *Ydb_Coordination.SessionRequest_AcquireSemaphore) {
c.TimeoutMillis = uint64(timeout.Milliseconds())
}
}

// WithNoAcquireTimeout returns an AcquireSemaphoreOption which disables the timeout after which the operation fails if
// it is still waiting in the queue.
//
// This is the default behavior. You can set the specific timeout by calling the WithAcquireTimeout method.
func WithNoAcquireTimeout() AcquireSemaphoreOption {
return func(c *Ydb_Coordination.SessionRequest_AcquireSemaphore) {
c.TimeoutMillis = math.MaxUint64
}
}

// WithAcquireData returns an AcquireSemaphoreOption which attaches user-defined data to the operation.
func WithAcquireData(data []byte) AcquireSemaphoreOption {
return func(c *Ydb_Coordination.SessionRequest_AcquireSemaphore) {
c.Data = data
}
}

// AcquireSemaphoreOption configures how we acquire a semaphore.
type AcquireSemaphoreOption func(c *Ydb_Coordination.SessionRequest_AcquireSemaphore)

// WithForceDelete return a DeleteSemaphoreOption which allows to delete a semaphore even if it is currently acquired
// by other sessions.
func WithForceDelete(force bool) DeleteSemaphoreOption {
return func(c *Ydb_Coordination.SessionRequest_DeleteSemaphore) {
c.Force = force
}
}

// DeleteSemaphoreOption configures how we delete a semaphore.
type DeleteSemaphoreOption func(c *Ydb_Coordination.SessionRequest_DeleteSemaphore)

// WithCreateData return a CreateSemaphoreOption which attaches user-defined data to the semaphore.
func WithCreateData(data []byte) CreateSemaphoreOption {
return func(c *Ydb_Coordination.SessionRequest_CreateSemaphore) {
c.Data = data
}
}

// CreateSemaphoreOption configures how we create a semaphore.
type CreateSemaphoreOption func(c *Ydb_Coordination.SessionRequest_CreateSemaphore)

// WithUpdateData return a UpdateSemaphoreOption which changes user-defined data in the semaphore.
func WithUpdateData(data []byte) UpdateSemaphoreOption {
return func(c *Ydb_Coordination.SessionRequest_UpdateSemaphore) {
c.Data = data
}
}

// UpdateSemaphoreOption configures how we update a semaphore.
type UpdateSemaphoreOption func(c *Ydb_Coordination.SessionRequest_UpdateSemaphore)

// WithDescribeOwners return a DescribeSemaphoreOption which causes server send the list of owners in the response
// to the DescribeSemaphore request.
func WithDescribeOwners(describeOwners bool) DescribeSemaphoreOption {
return func(c *Ydb_Coordination.SessionRequest_DescribeSemaphore) {
c.IncludeOwners = describeOwners
}
}

// WithDescribeWaiters return a DescribeSemaphoreOption which causes server send the list of waiters in the response
// to the DescribeSemaphore request.
func WithDescribeWaiters(describeWaiters bool) DescribeSemaphoreOption {
return func(c *Ydb_Coordination.SessionRequest_DescribeSemaphore) {
c.IncludeWaiters = describeWaiters
}
}

// DescribeSemaphoreOption configures how we update a semaphore.
type DescribeSemaphoreOption func(c *Ydb_Coordination.SessionRequest_DescribeSemaphore)
Loading

0 comments on commit b19bdf4

Please sign in to comment.