From 2eae427cd855e1b58ed0602f581b9fb97f39a95f Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Wed, 9 Oct 2024 23:32:45 +0000 Subject: [PATCH] xdsclient: ADS stream implementation --- xds/internal/xdsclient/internal/internal.go | 11 +- .../xdsclient/transport/ads/ads_stream.go | 780 ++++++++++++++++++ .../transport/grpctransport/grpctransport.go | 138 ++++ .../grpctransport/grpctransport_ext_test.go | 93 +++ .../transport/transport_interface.go | 67 ++ 5 files changed, 1088 insertions(+), 1 deletion(-) create mode 100644 xds/internal/xdsclient/transport/ads/ads_stream.go create mode 100644 xds/internal/xdsclient/transport/grpctransport/grpctransport.go create mode 100644 xds/internal/xdsclient/transport/grpctransport/grpctransport_ext_test.go create mode 100644 xds/internal/xdsclient/transport/transport_interface.go diff --git a/xds/internal/xdsclient/internal/internal.go b/xds/internal/xdsclient/internal/internal.go index e12610744109..6301b2b2be47 100644 --- a/xds/internal/xdsclient/internal/internal.go +++ b/xds/internal/xdsclient/internal/internal.go @@ -20,6 +20,15 @@ package internal // The following vars can be overridden by tests. var ( - // NewADSStream is a function that returns a new ADS stream. + // GRPCNewClient returns a new gRPC Client. + GRPCNewClient any // func(string, ...grpc.DialOption) (*grpc.ClientConn, error) + + // NewADSStream returns a new ADS stream. NewADSStream any // func(context.Context, *grpc.ClientConn) (v3adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient, error) + + // ResourceWatchStateForTesting gets the watch state for the resource + // identified by the given resource type and resource name. Returns a + // non-nil error if there is no such resource being watched. + ResourceWatchStateForTesting any // func(xdsclient.XDSClient, xdsresource.Type, string) error + ) diff --git a/xds/internal/xdsclient/transport/ads/ads_stream.go b/xds/internal/xdsclient/transport/ads/ads_stream.go new file mode 100644 index 000000000000..f79c3b39cda8 --- /dev/null +++ b/xds/internal/xdsclient/transport/ads/ads_stream.go @@ -0,0 +1,780 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package ads provides the implementation of an ADS (Aggregated Discovery +// Service) stream for the xDS client. +package ads + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "time" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/internal/backoff" + "google.golang.org/grpc/internal/buffer" + igrpclog "google.golang.org/grpc/internal/grpclog" + "google.golang.org/grpc/internal/pretty" + "google.golang.org/grpc/xds/internal/xdsclient/transport" + "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" + "google.golang.org/protobuf/types/known/anypb" + + v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + statuspb "google.golang.org/genproto/googleapis/rpc/status" +) + +// Any per-RPC level logs which print complete request or response messages +// should be gated at this verbosity level. Other per-RPC level logs which print +// terse output should be at `INFO` and verbosity 2. +const perRPCVerbosityLevel = 9 + +// Response represents a response received on the ADS stream. It contains the +// type URL, version, and resources for the response. +type Response struct { + TypeURL string + Version string + Resources []*anypb.Any +} + +// StreamEventHandler is an interface that defines the callbacks for events that +// occur on the ADS stream. Methods on this interface may be invoked +// concurrently and implementations need to handle them in a thread-safe manner. +type StreamEventHandler interface { + OnADSStreamError(error) // Called when the ADS stream breaks. + OnADSWatchExpiry(xdsresource.Type, string) // Called when the watch timer expires for a resource. + OnADSResponse(Response, func()) ([]string, error) // Called when a response is received on the ADS stream. +} + +// WatchState is a enum that describes the watch state of a particular +// resource. +type WatchState int + +const ( + // ResourceWatchStateStarted is the state where a watch for a resource was + // started, but a request asking for that resource is yet to be sent to the + // management server. + ResourceWatchStateStarted WatchState = iota + // ResourceWatchStateRequested is the state when a request has been sent for + // the resource being watched. + ResourceWatchStateRequested + // ResourceWatchStateReceived is the state when a sesponse has been received + // for the resource being watched. + ResourceWatchStateReceived + // ResourceWatchStateTimeout is the state when the watch timer associated + // with the resource expired because no response was received. + ResourceWatchStateTimeout +) + +// ResourceWatchState is the state corresponding to a resource being watched. +type ResourceWatchState struct { + State WatchState // Watch state of the resource. + ExpiryTimer *time.Timer // Timer for the expiry of the watch. +} + +// State corresponding to a resource type. +type resourceTypeState struct { + version string // Last acked version. Should not be reset when the stream breaks. + nonce string // Last received nonce. Should be reset when the stream breaks. + bufferedRequests chan struct{} // Channel to buffer requests when writing is blocked. + subscribedResources map[string]*ResourceWatchState // Map of subscribed resource names to their state. +} + +// Stream provides the functionality associated with an ADS (Aggregated +// Discovery Service) stream on the client side. +type Stream struct { + // The following fields are initialized from arguments passed to the + // constructor and are read-only afterwards, and hence can be accessed + // without a mutex. + transport transport.Interface // Transport to use for ADS stream. + eventHandler StreamEventHandler // Callbacks into the xdsChannel. + backoff func(int) time.Duration // Backoff after stream failures. + nodeProto *v3corepb.Node // Identifies the gRPC application. + watchExpiryTimeout time.Duration // Resource watch expiry timeout + logger *igrpclog.PrefixLogger + + // The following fields are initialized in the constructor and are not + // written to afterwards, and hence can be accessed without a mutex. + streamCh chan transport.StreamingCall // New ADS streams are pushed here. + requestCh *buffer.Unbounded // Subscriptions and unsubscriptions are pushed here. + runnerDoneCh chan struct{} // To notify exit of runner goroutine. + cancel context.CancelFunc // To cancel the context passed to the runner goroutine. + + // Guards access to the below fields. + mu sync.Mutex + resourceTypeState map[xdsresource.Type]*resourceTypeState // Map of resource types to their state. + fc *adsFlowControl // Flow control for ADS stream. + firstRequest bool // False after the first request is sent out. +} + +// StreamOpts contains the options for creating a new ADS Stream. +type StreamOpts struct { + Transport transport.Interface // xDS transport to create the stream on. + EventHandler StreamEventHandler // Callbacks for stream events. + Backoff func(int) time.Duration // Backoff after stream failures. + NodeProto *v3corepb.Node // Node proto to identify the gRPC application. + WatchExpiryTimeout time.Duration // Resource watch expiry timeout. + LogPrefix string // Prefix to be used for log messages. +} + +// NewStream initializes a new ADS Stream instance using the given parameters. +// It also launches goroutines responsible for managing reads and writes of +// messages with the underlying stream. +func NewStream(opts StreamOpts) *Stream { + s := &Stream{ + transport: opts.Transport, + eventHandler: opts.EventHandler, + backoff: opts.Backoff, + nodeProto: opts.NodeProto, + watchExpiryTimeout: opts.WatchExpiryTimeout, + + streamCh: make(chan transport.StreamingCall, 1), + requestCh: buffer.NewUnbounded(), + runnerDoneCh: make(chan struct{}), + resourceTypeState: make(map[xdsresource.Type]*resourceTypeState), + } + + l := grpclog.Component("xds") + s.logger = igrpclog.NewPrefixLogger(l, opts.LogPrefix+fmt.Sprintf("[ads-stream %p] ", s)) + + ctx, cancel := context.WithCancel(context.Background()) + s.cancel = cancel + go s.runner(ctx) + return s +} + +// Stop blocks until the stream is closed and all spawned goroutines exit. +func (s *Stream) Stop() { + s.cancel() + s.requestCh.Close() + <-s.runnerDoneCh + s.logger.Infof("Stopping ADS stream") +} + +// Subscribe subscribes to the given resource. It is assumed that multiple +// subscriptions for the same resource is deduped at the caller. A discovery +// request is sent out on the underlying stream for the resource type when there +// is sufficient flow control quota. +func (s *Stream) Subscribe(typ xdsresource.Type, name string) { + if s.logger.V(2) { + s.logger.Infof("Subscribing to resource %q of type %q", name, typ.TypeName()) + } + + s.mu.Lock() + defer s.mu.Unlock() + + state, ok := s.resourceTypeState[typ] + if !ok { + // An entry in the type state map is created as part of the first + // subscription request for this type. + state = &resourceTypeState{ + subscribedResources: make(map[string]*ResourceWatchState), + bufferedRequests: make(chan struct{}, 1), + } + s.resourceTypeState[typ] = state + } + + // Create state for the newly subscribed resource. The watch timer will + // be started when a request for this resource is actually sent out. + state.subscribedResources[name] = &ResourceWatchState{State: ResourceWatchStateStarted} + + // Send a request for the resource type with updated subscriptions. + s.requestCh.Put(typ) +} + +// Unsubscribe cancels the subscription to the given resource. It is a no-op if +// the given resource does not exist. The watch expiry timer associated with the +// resource is stopped if one is active. A discovery request is sent out on the +// stream for the resource type when there is sufficient flow control quota. +func (s *Stream) Unsubscribe(typ xdsresource.Type, name string) { + if s.logger.V(2) { + s.logger.Infof("Unsubscribing to resource %q of type %q", name, typ.TypeName()) + } + + s.mu.Lock() + defer s.mu.Unlock() + + state, ok := s.resourceTypeState[typ] + if !ok { + s.logger.Infof("easwars: returning early while Unsubscribing to resource %q of type %q", name, typ.TypeName()) + return + } + + rs, ok := state.subscribedResources[name] + if !ok { + s.logger.Infof("easwars: returning early while Unsubscribing to resource %q of type %q", name, typ.TypeName()) + return + } + if rs.ExpiryTimer != nil { + rs.ExpiryTimer.Stop() + } + delete(state.subscribedResources, name) + + // Send a request for the resource type with updated subscriptions. + s.requestCh.Put(typ) +} + +// runner is a long-running goroutine that handles the lifecycle of the ADS +// stream. It creates a new stream when the previous one fails, and sends +// discovery requests on the stream. It backs off before creating new streams +// following stream failures. +func (s *Stream) runner(ctx context.Context) { + defer close(s.runnerDoneCh) + + go s.send(ctx) + + runStreamWithBackoff := func() error { + stream, err := s.transport.CreateStreamingCall(ctx, "/envoy.service.discovery.v3.AggregatedDiscoveryService/StreamAggregatedResources") + if err != nil { + s.logger.Warningf("Creating new ADS stream failed: %v", err) + s.onError(err, false) + return nil + } + if s.logger.V(2) { + s.logger.Infof("ADS stream created") + } + + s.mu.Lock() + // Flow control is a property of the underlying stream and needs to be + // initialized everytime a new stream is created. + s.fc = newADSFlowControl() + s.firstRequest = true + s.mu.Unlock() + + select { + case <-s.streamCh: + default: + } + s.streamCh <- stream + + // Backoff state is reset upon successful receipt at least one + // message from the server. + msgReceived := s.recv(ctx, stream) + if msgReceived { + return backoff.ErrResetBackoff + } + return nil + } + backoff.RunF(ctx, runStreamWithBackoff, s.backoff) +} + +// send is a long running goroutine that handles sending discovery requests for +// two scenarios: +// - a new subscription or unsubscription request is received +// - a new stream is created after the previous one failed +func (s *Stream) send(ctx context.Context) { + var stream transport.StreamingCall + for { + select { + case <-ctx.Done(): + return + case stream = <-s.streamCh: + if err := s.sendExisting(stream); err != nil { + // Send failed, clear the current stream. Attempt to resend will + // only be made after a new stream is created. + stream = nil + continue + } + case req, ok := <-s.requestCh.Get(): + s.logger.Infof("easwars: handling request in send goroutine") + if !ok { + return + } + s.requestCh.Load() + + typ := req.(xdsresource.Type) + if err := s.sendNew(stream, typ); err != nil { + stream = nil + continue + } + } + } +} + +// sendNew attempts to send a discovery request based on a new subscription or +// unsubscription. If there is no flow control quota, the request is buffered +// and will be sent later. This method also starts the watch expiry timer for +// resources that were sent in the request for the first time, i.e. their watch +// state is `watchStateStarted`. +func (s *Stream) sendNew(stream transport.StreamingCall, typ xdsresource.Type) error { + s.logger.Infof("easwars: sendNew for resource type %q", typ.TypeName()) + s.mu.Lock() + defer s.mu.Unlock() + + // Ig there's no stream yet, skip the request. This request will be resent + // when a new stream is created. If no stream is created, the watcher will + // timeout (same as server not sending response back). + if stream == nil { + return nil + } + + // If local processing of the most recently received response is not yet + // complete, i.e. fc.pending == true, queue this write and return early. + // This allows us to batch writes for requests which are generated as part + // of local processing of a received response. + state := s.resourceTypeState[typ] + if s.fc.pending.Load() { + select { + case state.bufferedRequests <- struct{}{}: + default: + } + s.logger.Infof("easwars: buffered type %q", typ.TypeName()) + return nil + } + + names := resourceNames(state.subscribedResources) + s.logger.Infof("easwars: sendNew for resource type %q, names: %v", typ.TypeName(), names) + if err := s.sendMessageLocked(stream, names, typ.TypeURL(), state.version, state.nonce, nil); err != nil { + return err + + } + select { + case <-state.bufferedRequests: + default: + } + s.startWatchTimersLocked(typ, names) + return nil +} + +// sendExisting sends out discovery requests for existing resources when +// recovering from a broken stream. +// +// The stream argument is guaranteed to be non-nil. +func (s *Stream) sendExisting(stream transport.StreamingCall) error { + s.mu.Lock() + defer s.mu.Unlock() + + for typ, state := range s.resourceTypeState { + // Reset only the nonces map when the stream restarts. + // + // xDS spec says the following. See section: + // https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#ack-nack-and-resource-type-instance-version + // + // Note that the version for a resource type is not a property of an + // individual xDS stream but rather a property of the resources + // themselves. If the stream becomes broken and the client creates a new + // stream, the client’s initial request on the new stream should + // indicate the most recent version seen by the client on the previous + // stream + state.nonce = "" + + if len(state.subscribedResources) == 0 { + continue + } + + names := resourceNames(state.subscribedResources) + if s.logger.V(2) { + s.logger.Infof("Re-requesting resources %v of type %q, as the stream has been recreated", names, typ.TypeURL()) + } + if err := s.sendMessageLocked(stream, names, typ.TypeURL(), state.version, state.nonce, nil); err != nil { + return err + } + select { + case <-state.bufferedRequests: + default: + } + s.startWatchTimersLocked(typ, names) + } + return nil +} + +// sendBuffered sends out discovery requests for resources that were buffered +// (when they were subscribed to) because of lack of flow control quota. +// +// The stream argument is guaranteed to be non-nil. +func (s *Stream) sendBuffered(stream transport.StreamingCall) error { + s.mu.Lock() + defer s.mu.Unlock() + + for typ, state := range s.resourceTypeState { + /* + if len(state.subscribedResources) == 0 { + s.logger.Infof("easwars: sendBuffered returning early for resource type %q", typ.TypeName()) + continue + } + */ + + select { + case <-state.bufferedRequests: + names := resourceNames(state.subscribedResources) + if err := s.sendMessageLocked(stream, names, typ.TypeURL(), state.version, state.nonce, nil); err != nil { + return err + } + s.startWatchTimersLocked(typ, names) + default: + // No buffered request. + continue + } + } + return nil +} + +// sendMessageLocked sends a discovery request to the server, populating the +// different fields of the message with the given parameters. Returns a non-nil +// error if the request could not be sent. +// +// Caller needs to hold c.mu. +func (s *Stream) sendMessageLocked(stream transport.StreamingCall, names []string, url, version, nonce string, nackErr error) error { + req := &v3discoverypb.DiscoveryRequest{ + ResourceNames: names, + TypeUrl: url, + VersionInfo: version, + ResponseNonce: nonce, + } + + // The xDS protocol only requires that we send the node proto in the first + // discovery request on every stream. Sending the node proto in every + // request wastes CPU resources on the client and the server. + if s.firstRequest { + req.Node = s.nodeProto + } + + // If this is a NACK, populate the error_detail field. + if nackErr != nil { + req.ErrorDetail = &statuspb.Status{ + Code: int32(codes.InvalidArgument), Message: nackErr.Error(), + } + } + + if err := stream.Send(req); err != nil { + s.logger.Warningf("Sending ADS request for type %q, resources: %v, version: %q, nonce: %q failed: %v", url, names, version, nonce, err) + return err + } + s.firstRequest = false + + if s.logger.V(perRPCVerbosityLevel) { + s.logger.Infof("ADS request sent: %v", pretty.ToJSON(req)) + } else { + if s.logger.V(2) { + s.logger.Warningf("ADS request sent for type %q, resources: %v, version: %q, nonce: %q", url, names, version, nonce) + } + } + return nil +} + +// Return value indicates if at least one message was received from the server. +func (s *Stream) recv(ctx context.Context, stream transport.StreamingCall) bool { + msgReceived := false + for { + // Wait for ADS stream level flow control to be available. + if !s.fc.wait(ctx) { + if s.logger.V(2) { + s.logger.Infof("ADS stream context canceled") + } + return msgReceived + } + + // Send out a request if anything was buffered while we were waiting for + // local processing of the previous response to complete. + s.sendBuffered(stream) + + resources, url, version, nonce, err := s.recvMessage(stream) + if err != nil { + // Note that we do not consider it an error if the ADS stream was closed + // after having received a response on the stream. This is because there + // are legitimate reasons why the server may need to close the stream during + // normal operations, such as needing to rebalance load or the underlying + // connection hitting its max connection age limit. + // (see [gRFC A9](https://github.com/grpc/proposal/blob/master/A9-server-side-conn-mgt.md)). + if msgReceived { + err = xdsresource.NewErrorf(xdsresource.ErrTypeStreamFailedAfterRecv, err.Error()) + } + s.onError(err, msgReceived) + s.logger.Warningf("ADS stream closed: %v", err) + return msgReceived + } + msgReceived = true + + // Invoke the onResponse event handler to parse the incoming message and + // decide whether to send an ACK or NACK. + resp := Response{ + Resources: resources, + TypeURL: url, + Version: version, + } + var resourceNames []string + var nackErr error + s.fc.setPending() + resourceNames, nackErr = s.eventHandler.OnADSResponse(resp, s.fc.onDone) + if xdsresource.ErrType(nackErr) == xdsresource.ErrorTypeResourceTypeUnsupported { + s.logger.Warningf("%v", nackErr) + continue + } + + s.onRecv(stream, resourceNames, url, version, nonce, nackErr) + } +} + +func (s *Stream) recvMessage(stream transport.StreamingCall) (resources []*anypb.Any, url, version, nonce string, err error) { + r, err := stream.Recv() + if err != nil { + return nil, "", "", "", err + } + resp, ok := r.(*v3discoverypb.DiscoveryResponse) + if !ok { + s.logger.Infof("Message received on ADS stream of unexpected type: %T", r) + return nil, "", "", "", fmt.Errorf("unexpected message type %T", r) + } + + if s.logger.V(perRPCVerbosityLevel) { + s.logger.Infof("ADS response received: %v", pretty.ToJSON(resp)) + } else if s.logger.V(2) { + s.logger.Infof("ADS response received for type %q, version %q, nonce %q", resp.GetTypeUrl(), resp.GetVersionInfo(), resp.GetNonce()) + } + return resp.GetResources(), resp.GetTypeUrl(), resp.GetVersionInfo(), resp.GetNonce(), nil +} + +// onRecv is invoked when a response is received from the server. The arguments +// passed to this method correspond to the most recently received response. +// +// It performs the following actions: +// - updates resource type specific state +// - updates resource specific state for resources in the response +// - sends an ACK or NACK to the server based on the response +func (s *Stream) onRecv(stream transport.StreamingCall, names []string, url, version, nonce string, nackErr error) { + s.mu.Lock() + defer s.mu.Unlock() + + // Lookup the resource type specific state based on the type URL. + var typ xdsresource.Type + for t := range s.resourceTypeState { + if t.TypeURL() == url { + typ = t + break + } + } + typeState, ok := s.resourceTypeState[typ] + if !ok { + s.logger.Warningf("ADS stream received a response for type %q, but no state exists for it", url) + return + } + + // Update the resource type specific state. This includes: + // - updating the nonce unconditionally + // - updating the version only if the response is to be ACKed + previousVersion := typeState.version + typeState.nonce = nonce + if nackErr == nil { + typeState.version = version + } + + // Update the resource specific state. For all resources received as + // part of this response that are in state `started` or `requested`, + // this includes: + // - setting the watch state to watchstateReceived + // - stopping the expiry timer, if one exists + for _, name := range names { + rs, ok := typeState.subscribedResources[name] + if !ok { + s.logger.Warningf("ADS stream received a response for resource %q, but no state exists for it", name) + continue + } + if ws := rs.State; ws == ResourceWatchStateStarted || ws == ResourceWatchStateRequested { + rs.State = ResourceWatchStateReceived + if rs.ExpiryTimer != nil { + rs.ExpiryTimer.Stop() + rs.ExpiryTimer = nil + } + } + } + + // Send an ACK or NACK. + subscribedResourceNames := resourceNames(typeState.subscribedResources) + if nackErr != nil { + s.logger.Warningf("Sending NACK for resource type: %q, version: %q, nonce: %q, reason: %v", url, version, nonce, nackErr) + s.sendMessageLocked(stream, subscribedResourceNames, url, previousVersion, nonce, nackErr) + } else { + if s.logger.V(2) { + s.logger.Infof("Sending ACK for resource type: %q, version: %q, nonce: %q", url, version, nonce) + } + s.sendMessageLocked(stream, subscribedResourceNames, url, version, nonce, nil) + } +} + +func (s *Stream) onError(err error, msgReceived bool) { + s.mu.Lock() + // For resources that been requested but not yet responded to by the + // management server, stop the resource timers and reset the watch state to + // watchStateStarted. + for _, state := range s.resourceTypeState { + for _, rs := range state.subscribedResources { + if rs.State != ResourceWatchStateRequested { + continue + } + if rs.ExpiryTimer != nil { + rs.ExpiryTimer.Stop() + rs.ExpiryTimer = nil + } + rs.State = ResourceWatchStateStarted + } + } + + // Forward the error to the channel. + if msgReceived { + err = xdsresource.NewErrorf(xdsresource.ErrTypeStreamFailedAfterRecv, err.Error()) + } + s.mu.Unlock() + + s.eventHandler.OnADSStreamError(err) +} + +// Caller must hold s.mu. +func (s *Stream) startWatchTimersLocked(typ xdsresource.Type, names []string) { + typeState := s.resourceTypeState[typ] + for _, name := range names { + resourceState, ok := typeState.subscribedResources[name] + if !ok { + continue + } + if resourceState.State != ResourceWatchStateStarted { + continue + } + resourceState.State = ResourceWatchStateRequested + + rs := resourceState + resourceState.ExpiryTimer = time.AfterFunc(s.watchExpiryTimeout, func() { + s.mu.Lock() + rs.State = ResourceWatchStateTimeout + rs.ExpiryTimer = nil + s.mu.Unlock() + s.eventHandler.OnADSWatchExpiry(typ, name) + }) + } +} + +func resourceNames(m map[string]*ResourceWatchState) []string { + ret := make([]string, 0, len(m)) + for i := range m { + ret = append(ret, i) + } + return ret +} + +// TriggerResourceNotFoundForTesting triggers a resource not found event for the +// given resource type and name. This is intended for testing purposes only, to +// simulate a resource not found scenario. +func (s *Stream) TriggerResourceNotFoundForTesting(typ xdsresource.Type, resourceName string) { + s.mu.Lock() + + state, ok := s.resourceTypeState[typ] + if !ok { + s.mu.Unlock() + return + } + resourceState, ok := state.subscribedResources[resourceName] + if !ok { + s.mu.Unlock() + return + } + + if s.logger.V(2) { + s.logger.Infof("Triggering resource not found for type: %s, resource name: %s", typ.TypeName(), resourceName) + } + resourceState.State = ResourceWatchStateTimeout + if resourceState.ExpiryTimer != nil { + resourceState.ExpiryTimer.Stop() + resourceState.ExpiryTimer = nil + } + s.mu.Unlock() + go s.eventHandler.OnADSWatchExpiry(typ, resourceName) +} + +// ResourceWatchStateForTesting returns the ResourceWatchState for the given +// resource type and name. This is intended for testing purposes only, to +// inspect the internal state of the ADS stream. +func (s *Stream) ResourceWatchStateForTesting(typ xdsresource.Type, resourceName string) (ResourceWatchState, error) { + s.mu.Lock() + defer s.mu.Unlock() + + state, ok := s.resourceTypeState[typ] + if !ok { + return ResourceWatchState{}, fmt.Errorf("unknown resource type: %v", typ) + } + resourceState, ok := state.subscribedResources[resourceName] + if !ok { + return ResourceWatchState{}, fmt.Errorf("unknown resource name: %v", resourceName) + } + return *resourceState, nil +} + +// adsFlowControl implements ADS stream level flow control that enables the +// transport to block the reading of the next message off of the stream until +// the previous update is consumed by all watchers. +// +// The lifetime of the flow control is tied to the lifetime of the stream. +type adsFlowControl struct { + logger *igrpclog.PrefixLogger + + // Whether the most recent update is pending consumption by all watchers. + pending atomic.Bool + // Channel used to notify when all the watchers have consumed the most + // recent update. Wait() blocks on reading a value from this channel. + readyCh chan struct{} +} + +// newADSFlowControl returns a new adsFlowControl. +func newADSFlowControl() *adsFlowControl { + return &adsFlowControl{readyCh: make(chan struct{}, 1)} +} + +// setPending changes the internal state to indicate that there is an update +// pending consumption by all watchers. +func (fc *adsFlowControl) setPending() { + fc.pending.Store(true) +} + +// wait blocks until all the watchers have consumed the most recent update and +// returns true. If the context expires before that, it returns false. +func (fc *adsFlowControl) wait(ctx context.Context) bool { + // If there is no pending update, there is no need to block. + if !fc.pending.Load() { + // If all watchers finished processing the most recent update before the + // `recv` goroutine made the next call to `Wait()`, there would be an + // entry in the readyCh channel that needs to be drained to ensure that + // the next call to `Wait()` doesn't unblock before it actually should. + select { + case <-fc.readyCh: + default: + } + return true + } + + select { + case <-ctx.Done(): + return false + case <-fc.readyCh: + return true + } +} + +// onDone indicates that all watchers have consumed the most recent update. +func (fc *adsFlowControl) onDone() { + fc.pending.Store(false) + + select { + // Writes to the readyCh channel should not block ideally. The default + // branch here is to appease the paranoid mind. + case fc.readyCh <- struct{}{}: + default: + if fc.logger.V(2) { + fc.logger.Infof("ADS stream flow control readyCh is full") + } + } +} diff --git a/xds/internal/xdsclient/transport/grpctransport/grpctransport.go b/xds/internal/xdsclient/transport/grpctransport/grpctransport.go new file mode 100644 index 000000000000..1b39f14122b9 --- /dev/null +++ b/xds/internal/xdsclient/transport/grpctransport/grpctransport.go @@ -0,0 +1,138 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package grpctransport provides an implementation of the transport interface +// using gRPC. +package grpctransport + +import ( + "context" + "fmt" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" + "google.golang.org/grpc/xds/internal/xdsclient/internal" + "google.golang.org/grpc/xds/internal/xdsclient/transport" + + v3adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + v3adspb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + v3lrsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v3" + v3lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v3" +) + +func init() { + internal.GRPCNewClient = grpc.NewClient + internal.NewADSStream = func(ctx context.Context, cc *grpc.ClientConn) (v3adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient, error) { + return v3adsgrpc.NewAggregatedDiscoveryServiceClient(cc).StreamAggregatedResources(ctx) + } +} + +// Builder provides a way to build a gRPC-based transport to an XDS server. +type Builder struct{} + +// Build creates a new gRPC-based transport to an XDS server using the provided +// options. It establishes a gRPC client connection using the server URI and +// credentials specified in the provided options. +func (b *Builder) Build(opts transport.BuildOptions) (transport.Interface, error) { + if opts.ServerConfig == nil { + return nil, fmt.Errorf("ServerConfig field in opts cannot be nil") + } + + // NOTE: The bootstrap package ensures that the server_uri and credentials + // inside the server config are always populated. If we end up using a + // different type in BuildOptions to specify the server configuration, we + // must ensure that those fields are not empty before proceeding. + + // Dial the xDS management server with dial options specified by the server + // configuration and a static keepalive configuration that is common across + // gRPC language implementations. + kpCfg := grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: 5 * time.Minute, + Timeout: 20 * time.Second, + }) + dopts := append(opts.ServerConfig.DialOptions(), kpCfg) + dialer := internal.GRPCNewClient.(func(string, ...grpc.DialOption) (*grpc.ClientConn, error)) + cc, err := dialer(opts.ServerConfig.ServerURI(), dopts...) + if err != nil { + // An error from a non-blocking dial indicates something serious. + return nil, fmt.Errorf("failed to create a grpc transport to the management server %q: %v", opts.ServerConfig.ServerURI(), err) + } + cc.Connect() + + return &grpcTransport{cc: cc}, nil +} + +type grpcTransport struct { + cc *grpc.ClientConn +} + +func (g *grpcTransport) CreateStreamingCall(ctx context.Context, method string) (transport.StreamingCall, error) { + switch method { + case v3adsgrpc.AggregatedDiscoveryService_StreamAggregatedResources_FullMethodName: + return g.newADSStreamingCall(ctx) + case v3lrsgrpc.LoadReportingService_StreamLoadStats_FullMethodName: + return g.newLRSStreamingCall(ctx) + default: + return nil, fmt.Errorf("unsupported method: %v", method) + } +} + +func (g *grpcTransport) newADSStreamingCall(ctx context.Context) (transport.StreamingCall, error) { + newStream := internal.NewADSStream.(func(context.Context, *grpc.ClientConn) (v3adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient, error)) + stream, err := newStream(ctx, g.cc) + if err != nil { + return nil, fmt.Errorf("failed to create an ADS stream: %v", err) + } + return &adsStream{stream: stream}, nil +} + +func (g *grpcTransport) newLRSStreamingCall(ctx context.Context) (transport.StreamingCall, error) { + stream, err := v3lrsgrpc.NewLoadReportingServiceClient(g.cc).StreamLoadStats(ctx) + if err != nil { + return nil, fmt.Errorf("failed to create an LRS stream: %v", err) + } + return &lrsStream{stream: stream}, nil +} + +func (g *grpcTransport) Close() error { + return g.cc.Close() +} + +type adsStream struct { + stream v3adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient +} + +func (a *adsStream) Send(msg any) error { + return a.stream.Send(msg.(*v3adspb.DiscoveryRequest)) +} + +func (a *adsStream) Recv() (any, error) { + return a.stream.Recv() +} + +type lrsStream struct { + stream v3lrsgrpc.LoadReportingService_StreamLoadStatsClient +} + +func (l *lrsStream) Send(msg any) error { + return l.stream.Send(msg.(*v3lrspb.LoadStatsRequest)) +} + +func (l *lrsStream) Recv() (any, error) { + return l.stream.Recv() +} diff --git a/xds/internal/xdsclient/transport/grpctransport/grpctransport_ext_test.go b/xds/internal/xdsclient/transport/grpctransport/grpctransport_ext_test.go new file mode 100644 index 000000000000..23e5aae6c481 --- /dev/null +++ b/xds/internal/xdsclient/transport/grpctransport/grpctransport_ext_test.go @@ -0,0 +1,93 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package grpctransport_test + +import ( + "testing" + + "google.golang.org/grpc" + "google.golang.org/grpc/internal/grpctest" + internalbootstrap "google.golang.org/grpc/internal/xds/bootstrap" + "google.golang.org/grpc/xds/internal/xdsclient/internal" + "google.golang.org/grpc/xds/internal/xdsclient/transport" + "google.golang.org/grpc/xds/internal/xdsclient/transport/grpctransport" +) + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +// Tests that the grpctransport.Builder creates a new grpc.ClientConn every time +// Build() is called. +func (s) TestBuild_CustomDialer(t *testing.T) { + // Override the dialer with a custom one. + customDialerCalled := false + origDialer := internal.GRPCNewClient + internal.GRPCNewClient = func(target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + customDialerCalled = true + return grpc.NewClient(target, opts...) + } + defer func() { internal.GRPCNewClient = origDialer }() + + serverCfg, err := internalbootstrap.ServerConfigForTesting(internalbootstrap.ServerConfigTestingOptions{URI: "server-address"}) + if err != nil { + t.Fatalf("Failed to create server config for testing: %v", err) + } + + // Create a new transport and ensure that the custom dialer was called. + opts := transport.BuildOptions{ServerConfig: serverCfg} + builder := &grpctransport.Builder{} + tr, err := builder.Build(opts) + if err != nil { + t.Fatalf("Builder.Build(%+v) failed: %v", opts, err) + } + defer tr.Close() + + if !customDialerCalled { + t.Fatalf("Builder.Build(%+v): custom dialer called = false, want true", opts) + } + customDialerCalled = false + + // Reset the dialer, create a new transport and ensure that our custom + // dialer is no longer called. + internal.GRPCNewClient = origDialer + tr, err = builder.Build(opts) + if err != nil { + t.Fatalf("Builder.Build(%+v) failed: %v", opts, err) + } + defer tr.Close() + + if customDialerCalled { + t.Fatalf("Builder.Build(%+v): custom dialer called = true, want false", opts) + } +} + +// Tests that the grpctransport.Builder fails to build a transport when the +// provided BuildOptions do not contain a ServerConfig. +func (s) TestBuild_EmptyServerConfig(t *testing.T) { + builder := &grpctransport.Builder{} + tr, err := builder.Build(transport.BuildOptions{}) + if err == nil { + tr.Close() + t.Fatalf("Builder.Build(%+v) succeeded when expected to fail", transport.BuildOptions{}) + } +} diff --git a/xds/internal/xdsclient/transport/transport_interface.go b/xds/internal/xdsclient/transport/transport_interface.go new file mode 100644 index 000000000000..9cde4452cf51 --- /dev/null +++ b/xds/internal/xdsclient/transport/transport_interface.go @@ -0,0 +1,67 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package transport defines the interface that describe the functionality +// required to communicate with an xDS server using streaming calls. +package transport + +import ( + "context" + + "google.golang.org/grpc/internal/xds/bootstrap" +) + +// Builder is an interface for building a new xDS transport. +type Builder interface { + // Build creates a new xDS transport with the provided options. + Build(opts BuildOptions) (Interface, error) +} + +// BuildOptions contains the options for building a new xDS transport. +type BuildOptions struct { + // ServerConfig contains the configuration that controls how the transport + // interacts with the XDS server. This includes the server URI and the + // credentials to use to connect to the server, among other things. + ServerConfig *bootstrap.ServerConfig +} + +// Interface provides the functionality to communicate with an XDS server using +// streaming calls. +// +// TODO(easwars): Rename this to Transport once the existing Transport type is +// removed. +type Interface interface { + // CreateStreamingCall creates a new streaming call to the XDS server for the + // specified method name. The returned StreamingCall interface can be used to + // send and receive messages on the stream. + CreateStreamingCall(context.Context, string) (StreamingCall, error) + + // Close closes the underlying connection and cleans up any resources used by the + // Transport. + Close() error +} + +// StreamingCall is an interface that provides a way to send and receive +// messages on a stream. The methods accept or return any.Any messages instead +// of concrete types to allow this interface to be used for both ADS and LRS. +type StreamingCall interface { + // Send sends the provided message on the stream. + Send(any) error + + // Recv block until the next message is received on the stream. + Recv() (any, error) +}