Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: coordination session client #830

Merged
merged 7 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 196 additions & 0 deletions coordination/coordination.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@ package coordination

import (
"context"
"errors"
"fmt"
"math"
"time"

"github.com/ydb-platform/ydb-go-sdk/v3/coordination/options"
"github.com/ydb-platform/ydb-go-sdk/v3/scheme"
)

Expand All @@ -11,4 +16,195 @@ type Client interface {
AlterNode(ctx context.Context, path string, config NodeConfig) (err error)
DropNode(ctx context.Context, path string) (err error)
DescribeNode(ctx context.Context, path string) (_ *scheme.Entry, _ *NodeConfig, err error)

// OpenSession starts a new session. This method blocks until the server session is created. The context provided is
// used for the lifetime of the session.
//
// To ensure resources are not leaked, one of the following actions must be performed:
//
// - cancel the provided context,
// - call Close on the Session,
// - close the Client which the session was created with,
// - call any method of the Session until the ErrSessionClosed is returned.
OpenSession(ctx context.Context, path string, opts ...options.OpenSessionOption) (Session, error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. In the sdk and in other methods of the interface ctx is cancel token for current method call.
  2. Timeout for the request is unusual for go: by option.

What about to use ctx in the OpenSession same way - for set cancel context/timeout of OpenSession method.

And add option for set additional context for lifetime control.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And lets to return struct/struct pointer, not interface.

Any change in interface (including add any method) - is brokable changing.
It will difficult to extend API of the Session if it will the interface.

Copy link
Contributor Author

@arkhipov arkhipov Oct 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. In the sdk and in other methods of the interface ctx is cancel token for current method call.
  2. Timeout for the request is unusual for go: by option.

What about to use ctx in the OpenSession same way - for set cancel context/timeout of OpenSession method.

And add option for set additional context for lifetime control.

It looks a bit dangerous to me since it may cause session leaks. Though, if the whole SDK uses context that way, I think it's better to be consistent. I haven't found a way to do that for Driver or Table.Session, what are you suggestions on naming the option?

}

// ErrOperationStatusUnknown indicates that the request has been sent to the server but no reply has been received.
// The client usually automatically retries calls of that kind, but there are cases when it is not possible:
// - the request is not idempotent, non-idempotent requests are never retried,
// - the session was lost and its context is canceled.
var ErrOperationStatusUnknown = errors.New("operation status is unknown")

// ErrSessionClosed indicates that the Session object is closed.
var ErrSessionClosed = errors.New("session is closed")

// ErrAcquireTimeout indicates that the Session.AcquireSemaphore method could not acquire the semaphore before the
// operation timeout (see options.WithAcquireTimeout).
var ErrAcquireTimeout = errors.New("acquire semaphore timeout")

const (
// MaxSemaphoreLimit defines the maximum value of the limit parameter in the Session.CreateSemaphore method.
MaxSemaphoreLimit = math.MaxUint64

// Exclusive is just a shortcut for the maximum semaphore limit value. You can use this to acquire a semaphore in
// the exclusive mode if it was created with the limit value of MaxSemaphoreLimit, which is always true for
// ephemeral semaphores.
Exclusive = math.MaxUint64

// Shared is just a shortcut for the minimum semaphore limit value (1). You can use this to acquire a semaphore in
// the shared mode if it was created with the limit value of MaxSemaphoreLimit, which is always true for ephemeral
// semaphores.
Shared = 1
)

// Session defines a coordination service backed session.
//
// In general, Session API is concurrency-friendly, you can safely access all of its methods concurrently.
//
// The client guarantees that sequential calls of the methods are sent to the server in the same order. However, the
// session client may reorder and suspend some of the requests without violating correctness of the execution. This also
// applies to the situations when the underlying gRPC stream has been recreated due to network or server issues.
//
// The client automatically keeps the underlying gRPC stream alive by sending keep-alive (ping-pong) requests. If the
// client can no longer consider the session alive, it immediately cancels the session context which also leads to
// cancellation of contexts of all semaphore leases created by this session.
type Session interface {
// Close closes the coordination service session. It cancels all active requests on the server and notifies every
// pending or waiting for response request on the client side. It also cancels the session context and tries to
// stop the session gracefully on the server. If the ctx is canceled, this will not wait for the server session to
// become stopped and returns immediately with an error. Once this function returns with no error, all subsequent
// calls will be noop.
Close(ctx context.Context) error

// Context returns the context of the session. It is canceled when the underlying server session is over or if the
// client could not get any successful response from the server before the session timeout (see
// options.WithSessionTimeout).
Context() context.Context

// CreateSemaphore creates a new semaphore. This method waits until the server successfully creates a new semaphore
// or returns an error.
//
// This method is not idempotent. If the request has been sent to the server but no reply has been received, it
// returns the ErrOperationStatusUnknown error.
CreateSemaphore(ctx context.Context, name string, limit uint64, opts ...options.CreateSemaphoreOption) error

// UpdateSemaphore changes semaphore data. This method waits until the server successfully updates the semaphore or
// returns an error.
//
// This method is idempotent. The client will automatically retry in the case of network or server failure.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it shouldn't be idempotent? Since Create and Delete are not idempotent, it looks like an odd one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is tricky. Although it's not idempotent in general, in most cases users synchronize access to this method in order to avoid data races so that it becomes idempotent. I can hardly imagine why someone might want to update semaphore data without proper synchronization. I suggest changing this comment to explain that. However, I think it is feasible to let the client automatically retry this method calls and have an option of turning them off. What do you think of that?

UpdateSemaphore(ctx context.Context, name string, opts ...options.UpdateSemaphoreOption) error

// DeleteSemaphore deletes an existing semaphore. This method waits until the server successfully deletes the
// semaphore or returns an error.
//
// This method is not idempotent. If the request has been sent to the server but no reply has been received, it
// returns the ErrOperationStatusUnknown error.
DeleteSemaphore(ctx context.Context, name string, opts ...options.DeleteSemaphoreOption) error

// DescribeSemaphore returns the state of the semaphore.
//
// This method is idempotent. The client will automatically retry in the case of network or server failure.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think at least Describe with watches cannot be idempotent, but you don't seem to support them. Are you planning to add watches as a separate method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment, there are no watches in the client, so it should be idempotent. I've been thinking about supporting this feature, however I'm not sure what the API will look like.

DescribeSemaphore(
ctx context.Context,
name string,
opts ...options.DescribeSemaphoreOption,
) (*SemaphoreDescription, error)

// AcquireSemaphore acquires the semaphore. If you acquire an ephemeral semaphore (see options.WithEphemeral), its
// limit will be set to MaxSemaphoreLimit. Later requests override previous operations with the same semaphore, e.g.
// to reduce acquired count, change timeout or attached data.
//
// This method blocks until the semaphore is acquired, an error is returned from the server or the session is
// closed. If the operation context was canceled but the server replied that the semaphore was actually acquired,
// the client will automatically release the semaphore.
//
// Semaphore waiting is fair: the semaphore guarantees that other sessions invoking the AcquireSemaphore method
// acquire permits in the order which they were called (FIFO). If a session invokes the AcquireSemaphore method
// multiple times while the first invocation is still in process, the position in the queue remains unchanged.
//
// This method is idempotent. The client will automatically retry in the case of network or server failure.
AcquireSemaphore(
ctx context.Context,
name string,
count uint64,
opts ...options.AcquireSemaphoreOption,
) (Lease, error)

// SessionID returns a server-generated identifier of the session. This value is permanent and unique within the
// coordination service node.
SessionID() uint64

// Reconnect forcibly shuts down the underlying gRPC stream and initiates a new one. This method is highly unlikely
// to be of use in a typical application but is extremely useful for testing an API implementation.
Reconnect()
}

// Lease is the object which defines the rights of the session to the acquired semaphore. Lease is alive until its
// context is not canceled. This may happen implicitly, when the associated session becomes lost or closed, or
// explicitly, if someone calls the Release method of the lease.
type Lease interface {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as Session - let return the type, not interface

// Context returns the context of the lease. It is canceled when the session it was created by was lost or closed,
// or if the lease was released by calling the Release method.
Context() context.Context

// Release releases the acquired lease to the semaphore. It also cancels the context of the lease. This method does
// not take a ctx argument, but you can cancel the execution of it by closing the session or canceling its context.
Release() error

// Session returns the session which this lease was created by.
Session() Session
}

// SemaphoreDescription describes the state of a semaphore.
type SemaphoreDescription struct {
// Name is the name of the semaphore.
Name string

// Limit is the maximum number of tokens that may be acquired.
Limit uint64

// Count is the number of tokens currently acquired by its owners.
Count uint64

// Ephemeral semaphores are deleted when there are no owners and waiters left.
Ephemeral bool

// Data is user-defined data attached to the semaphore.
Data []byte

// Owner is the list of current owners of the semaphore.
Owners []*SemaphoreSession

// Waiter is the list of current waiters of the semaphore.
Waiters []*SemaphoreSession
}

// SemaphoreSession describes an owner or a waiter of this semaphore.
type SemaphoreSession struct {
// SessionID is the id of the session which tried to acquire the semaphore.
SessionID uint64

// Count is the number of tokens for the acquire operation.
Count uint64

// OrderId is a monotonically increasing id which determines locking order.
OrderID uint64

// Data is user-defined data attached to the acquire operation.
Data []byte

// Timeout is the timeout for the operation in the waiter queue. If this is time.Duration(math.MaxInt64) the session
// will wait for the semaphore until the operation is canceled.
Timeout time.Duration
}

func (d *SemaphoreDescription) String() string {
return fmt.Sprintf(
"{Name: %q Limit: %d Count: %d Ephemeral: %t Data: %q Owners: %s Waiters: %s}",
d.Name, d.Limit, d.Count, d.Ephemeral, d.Data, d.Owners, d.Waiters)
}

func (s *SemaphoreSession) String() string {
return fmt.Sprintf("{SessionID: %d Count: %d OrderID: %d Data: %q TimeoutMillis: %v}",
s.SessionID, s.Count, s.OrderID, s.Data, s.Timeout)
}
173 changes: 173 additions & 0 deletions coordination/options/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package options

import (
"math"
"time"

"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Coordination"
)

// WithDescription returns an OpenSessionOption that specifies a user-defined description that may be used to describe
// the client.
func WithDescription(description string) OpenSessionOption {
return func(c *OpenSessionOptions) {
c.Description = description
}
}

// WithSessionTimeout returns an OpenSessionOption that specifies the timeout during which client may restore a
// detached session. The client is forced to terminate the session if the last successful session request occurred
// earlier than this time.
//
// If this is not set, the client uses the default 5 seconds.
func WithSessionTimeout(timeout time.Duration) OpenSessionOption {
return func(c *OpenSessionOptions) {
c.SessionTimeout = timeout
}
}

// WithSessionStartTimeout returns an OpenSessionOption that specifies the time that the client should wait for a
// response to the StartSession request from the server before it terminates the gRPC stream and tries to reconnect.
//
// If this is not set, the client uses the default time 1 second.
func WithSessionStartTimeout(timeout time.Duration) OpenSessionOption {
return func(c *OpenSessionOptions) {
c.SessionStartTimeout = timeout
}
}

// WithSessionStopTimeout returns an OpenSessionOption that specifies the time that the client should wait for a
// response to the StopSession request from the server before it terminates the gRPC stream and tries to reconnect.
//
// If this is not set, the client uses the default time 1 second.
func WithSessionStopTimeout(timeout time.Duration) OpenSessionOption {
return func(c *OpenSessionOptions) {
c.SessionStartTimeout = timeout
}
}

// WithSessionKeepAliveTimeout returns an OpenSessionOption that specifies the time that the client will wait before it
// terminates the gRPC stream and tries to reconnect if no successful responses have been received from the server.
//
// If this is not set, the client uses the default time 10 seconds.
func WithSessionKeepAliveTimeout(timeout time.Duration) OpenSessionOption {
return func(c *OpenSessionOptions) {
c.SessionKeepAliveTimeout = timeout
}
}

// WithSessionReconnectDelay returns an OpenSessionOption that specifies the time that the client will wait before it
// tries to reconnect the underlying gRPC stream in case of error.
//
// If this is not set, the client uses the default time 500 milliseconds.
func WithSessionReconnectDelay(delay time.Duration) OpenSessionOption {
return func(c *OpenSessionOptions) {
c.SessionReconnectDelay = delay
}
}

// OpenSessionOption configures how we open a new session.
type OpenSessionOption func(c *OpenSessionOptions)

// OpenSessionOptions configure an OpenSession call. OpenSessionOptions are set by the OpenSessionOption values passed
// to the OpenSession function.
type OpenSessionOptions struct {
Description string
SessionTimeout time.Duration
SessionStartTimeout time.Duration
SessionStopTimeout time.Duration
SessionKeepAliveTimeout time.Duration
SessionReconnectDelay time.Duration
}

// WithEphemeral returns an AcquireSemaphoreOption that causes to create an ephemeral semaphore.
//
// Ephemeral semaphores are created with the first acquire operation and automatically deleted with the last release
// operation. Ephemeral semaphore are always created with the limit of coordination.MaxSemaphoreLimit.
func WithEphemeral(ephemeral bool) AcquireSemaphoreOption {
return func(c *Ydb_Coordination.SessionRequest_AcquireSemaphore) {
c.Ephemeral = ephemeral
}
}

// WithAcquireTimeout returns an AcquireSemaphoreOption which sets the timeout after which the operation fails if it
// is still waiting in the queue. Use 0 to make the AcquireSemaphore method fail immediately if the semaphore is already
// acquired by another session.
//
// If this is not set, the client waits for the acquire operation result until the operation or session context is done.
// You can reset the default value of this timeout by calling the WithNoAcquireTimeout method.
func WithAcquireTimeout(timeout time.Duration) AcquireSemaphoreOption {
return func(c *Ydb_Coordination.SessionRequest_AcquireSemaphore) {
c.TimeoutMillis = uint64(timeout.Milliseconds())
}
}

// WithNoAcquireTimeout returns an AcquireSemaphoreOption which disables the timeout after which the operation fails if
// it is still waiting in the queue.
//
// This is the default behavior. You can set the specific timeout by calling the WithAcquireTimeout method.
func WithNoAcquireTimeout() AcquireSemaphoreOption {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NoTimeout can mean two things:

  1. Wait infinite time
  2. No wait and return immediately

What about rename the method to WithAquireInfiniteTimeout?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it not useful anymore since there is the MaxTimeout constant in the coordination package.

return func(c *Ydb_Coordination.SessionRequest_AcquireSemaphore) {
c.TimeoutMillis = math.MaxUint64
}
}

// WithAcquireData returns an AcquireSemaphoreOption which attaches user-defined data to the operation.
func WithAcquireData(data []byte) AcquireSemaphoreOption {
return func(c *Ydb_Coordination.SessionRequest_AcquireSemaphore) {
c.Data = data
}
}

// AcquireSemaphoreOption configures how we acquire a semaphore.
type AcquireSemaphoreOption func(c *Ydb_Coordination.SessionRequest_AcquireSemaphore)

// WithForceDelete return a DeleteSemaphoreOption which allows to delete a semaphore even if it is currently acquired
// by other sessions.
func WithForceDelete(force bool) DeleteSemaphoreOption {
return func(c *Ydb_Coordination.SessionRequest_DeleteSemaphore) {
c.Force = force
}
}

// DeleteSemaphoreOption configures how we delete a semaphore.
type DeleteSemaphoreOption func(c *Ydb_Coordination.SessionRequest_DeleteSemaphore)

// WithCreateData return a CreateSemaphoreOption which attaches user-defined data to the semaphore.
func WithCreateData(data []byte) CreateSemaphoreOption {
return func(c *Ydb_Coordination.SessionRequest_CreateSemaphore) {
c.Data = data
}
}

// CreateSemaphoreOption configures how we create a semaphore.
type CreateSemaphoreOption func(c *Ydb_Coordination.SessionRequest_CreateSemaphore)

// WithUpdateData return a UpdateSemaphoreOption which changes user-defined data in the semaphore.
func WithUpdateData(data []byte) UpdateSemaphoreOption {
return func(c *Ydb_Coordination.SessionRequest_UpdateSemaphore) {
c.Data = data
}
}

// UpdateSemaphoreOption configures how we update a semaphore.
type UpdateSemaphoreOption func(c *Ydb_Coordination.SessionRequest_UpdateSemaphore)

// WithDescribeOwners return a DescribeSemaphoreOption which causes server send the list of owners in the response
// to the DescribeSemaphore request.
func WithDescribeOwners(describeOwners bool) DescribeSemaphoreOption {
return func(c *Ydb_Coordination.SessionRequest_DescribeSemaphore) {
c.IncludeOwners = describeOwners
}
}

// WithDescribeWaiters return a DescribeSemaphoreOption which causes server send the list of waiters in the response
// to the DescribeSemaphore request.
func WithDescribeWaiters(describeWaiters bool) DescribeSemaphoreOption {
return func(c *Ydb_Coordination.SessionRequest_DescribeSemaphore) {
c.IncludeWaiters = describeWaiters
}
}

// DescribeSemaphoreOption configures how we update a semaphore.
type DescribeSemaphoreOption func(c *Ydb_Coordination.SessionRequest_DescribeSemaphore)
Loading