Skip to content

Commit

Permalink
Introduce xDS module and SotW stream (line#5342)
Browse files Browse the repository at this point in the history
Motivation:

We would like to introduce a xDS integration module.
In order to fetch resources from xDS module, I would like to introduce a
`Stream` object which represents a single persistent connection to a
remote control plane server. The `Stream` will query a
`SubscriberStorage` to check which resources are subscribed and update
the request accordingly.

Once a response is recieved, the type will be used to fetch a
`ResourceParser` which will later parse the response. Afterwards, the
parameters will be sent over to a `ResponseHandler` which will handle
the response and send either an `ackResponse` or `nackResponse` to keep
the feedback loop open. Also, a `resourcesUpdated` can be sent to
indicate the subscribed resources have been changed.

Modifications:

- Introduced a `SotwXdsStream` which represents a persistent connection
to a control plane server
- Introduced a `ResourceParser` which will be responsible for parsing a
response in a format readily sendable in a nack response
- Introduced a `XdsStreamSubscriber` and `SubscriberStorage` which
represent the currently subscribed resources

Result:

- A basic xDS stream implementation is added
- Step 1 of line#5336 is merged
  • Loading branch information
jrhee17 authored Dec 27, 2023
1 parent d328e27 commit 02cbc34
Show file tree
Hide file tree
Showing 25 changed files with 1,406 additions and 0 deletions.
11 changes: 11 additions & 0 deletions dependencies.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ caffeine = "2.9.3"
cglib = "3.3.0"
checkerframework = "2.5.5"
checkstyle = "10.3.2"
controlplane = "1.0.41"
curator = "5.5.0"
dagger = "2.48.1"
dgs = "7.6.0"
Expand Down Expand Up @@ -277,6 +278,16 @@ version.ref = "checkerframework"
module = "com.puppycrawl.tools:checkstyle"
version.ref = "checkstyle"

[libraries.controlplane-api]
module = "io.envoyproxy.controlplane:api"
version.ref = "controlplane"
[libraries.controlplane-cache]
module = "io.envoyproxy.controlplane:cache"
version.ref = "controlplane"
[libraries.controlplane-server]
module = "io.envoyproxy.controlplane:server"
version.ref = "controlplane"

# Ensure that we use the same ZooKeeper version as what Curator depends on.
# See: https://github.com/apache/curator/blob/master/pom.xml
# (Switch to the right tag to find out the right version.)
Expand Down
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ project(':thrift0.18').projectDir = file('thrift/thrift0.18')
includeWithFlags ':tomcat8', 'java', 'publish', 'relocate', 'no_aggregation'
includeWithFlags ':tomcat9', 'java', 'publish', 'relocate', 'no_aggregation'
includeWithFlags ':tomcat10', 'java11', 'publish', 'relocate'
includeWithFlags ':xds', 'java', 'relocate'
includeWithFlags ':zookeeper3', 'java', 'publish', 'relocate', 'native'
includeWithFlags ':saml', 'java', 'publish', 'relocate', 'native'
includeWithFlags ':bucket4j', 'java', 'publish', 'relocate', 'native'
Expand Down
1 change: 1 addition & 0 deletions testing-internal/src/main/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
<logger name="com.linecorp.armeria.internal.common.Http2GoAwayHandler" level="INFO" />
<logger name="io.netty.resolver.dns.TraceDnsQueryLifeCycleObserverFactory" level="DEBUG" />
<logger name="reactor" level="INFO" />
<logger name="io.envoyproxy.controlplane.cache" level="DEBUG" />
<logger name="example.armeria" level="DEBUG" />
<logger name="loggerTest" level="ALL" additivity="false">
<appender-ref ref="NOP" />
Expand Down
7 changes: 7 additions & 0 deletions xds/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
dependencies {
api project(':grpc')
api libs.controlplane.api

testImplementation libs.controlplane.server
testImplementation libs.controlplane.cache
}
183 changes: 183 additions & 0 deletions xds/docs/DEVELOPER.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
xDS Integration
----

## Abstract

This document describes the design of Armeria's `XdsBootstrap`, which is the
main point for xDS operations. The intention is to document the initial design
of this component, rather than to restrain how the API should be evolved.

## Background

We have received many requests for more dynamic control over Armeria components.
An example of such a use-case was to dynamically modify `RetryingClient` or
`CircuitBreakerClient` behavior. Another often sought feature
has been finer Service Discovery, where users would like to dynamically route requests
depending on parameters. For seamless integration with other ecosystems, it has been
proposed that the xDS protocol be implemented.

In order to integrate xDS with Armeria components, it is important to first fetch and parse
remote xDS resources. This document introduces a `XdsBootstrap` which is responsible for doing so.

## Implementation

### Terminology

- **Subscribe**: Refers to opening a connection to a remote control plane server for a resource.
- **Watch**: Refers to registering a callback for a resource of interest.
- **Resource**: The xDS resources (`Listener`, `Cluster`, ...).
- For the purposes of this document, `Endpoint` and `Route` are used instead of the xDS resource names
(i.e. `ClusterLoadAssignment`, `RouteConfiguration`) for brevity.
- **ConfigSource**: Configuration of how xDS resources are fetched.
- **Stream**: Represents a persistent gRPC stream to a control plane server.

### The XdsBootstrap API

The `XdsBootstrap` is the entry point which stores a xDS `Bootstrap`, and retrieves xDS resources.
A `Bootstrap` contains information about where to initially fetch resources from.
New watches will use the control plane server specified.

Conceptually, a `xDSClient` can perform two operations:
- Subscribe to a resource.
- Watch a resource.

Subscribing to a resource signifies that the `XdsBootstrap` will start subscribing to a remote
control plane for the requested resource. If the resource is not watched before, this
may involve creating a new connection with the remote control plane to fetch remote data.
It is important that subscriptions are closed so that `XdsBootstrap` doesn't leak resources.

Watching a resource means registering a callback to the `XdsBootstrap`. Once a `XdsBootstrap`
receives a resource, it will notify the registered watchers of the event.

The above operations are distinct because users may not necessarily want to always
subscribe to a resource. For instance users may want to query a resource defined
in the `Bootstrap`'s `static_resources` section.

To prove by contradiction, assume that only a single `XdsBootstrap.watch` API were defined.
If we are interested in watching a cluster and its endpoints, we can write the following:
```
XdsBootstrap.watch(CLUSTER, "my-cluster", clusterWatcher);
XdsBootstrap.watch(ENDPOINT, "my-cluster", endpointWatcher);
```

For the first call to `CLUSTER`, we would like to query the remote control plane and fetch the
remote resources. However, for the second call to `ENDPOINT` we probably wouldn't like to open
a new connection since it is obvious that `ENDPOINT` is obviously fetched as a result of the first call.
Even if we do query for the endpoint, there is no guarantee that the endpoint
exists since it might not be queryable as an EDS.

All watch callbacks are guaranteed to be invoked from a single event loop.

### Resources

Although xDS defines many different types of resources, only the basic
`Listener`, `Route`, `Cluster`, `Endpoint` will be supported. Because `XdsBootstrap`
is not aware of which features it would like to support, it will

### ConfigSource

`ConfigSource` contains information on where a resource may be fetched from.
The `Bootstrap` contains `ConfigSource`s which are used when `XdsBootstrap.subscribe`
is called.

Additionally, each subscribed resource may contain a config source. For instance,
a `Cluster` may contain an EDS `ConfigSource`. For this reason, a `ConfigSource`
may also be supplied when subscribing to a resource.

`XdsBootstrap` must maintain connections to different remote control planes depending
on the `ConfigSource`. To avoid opening a connection for every `XdsBootstrap.subscribe` call,
a map of `ConfigSource` to clients (called `ConfigSourceClient`) is maintained.
Once `XdsBootstrap.subscribe` is called, the appropriate `ConfigSourceClient` is fetched
and `ConfigSourceClient.subscribe` is called.

### XdsStream

Each `ConfigSourceClient` maintains a single persistent connection (or a `Stream`) to the remote
control plane server. Because a `Stream` can support different `XdsBootstrap.subscribe(type, resourceName)`
calls, a single `Stream` may subscribe to multiple xDS types and resources.
In order to represent the types and resources being subscribed, each `ConfigSourceClient` maintains
its own map of `Subscriber`s. Conceptually, each `(type, resource)` is mapped to a `Subscriber`.

Once a `ConfigSourceClient` is subscribed to, a new `Stream` is created.
To the contrary, if a `Stream` is fully unsubscribed, a `Stream` is not necessary and is cleaned up.
Since it is possible that multiple subscribes to a same type and resource is called, each `Subscriber` holds
a reference count.

Whenever a subscribed resource is updated in the remote control plane server, a notification is
sent to the client over the `Stream`. The `Stream` then parses the resources and passes the
data to the `Subscriber`. Eventually, the subscriber will notify the subscribed `Watcher`s of this resource.

### Watchers

`Watcher`s can be registered via `XdsBootstrap.addClusterWatcher(type, resource, watcher)`.
The `XdsBootstrap` contains a `WatcherStorage` which stores these `Watcher`s.
The `WatcherStorage` contains a `(type, resource)` to `Watcher[]` map.

### Automatic Resource Fetching

Each xDS resource may contain a `ConfigSource` which indicates how to fetch another resource.
For instance, a `Cluster` may contain an `EDS` configuration indicating how `Endpoint`s should
be fetched.

The purpose of the `XdsBootstrap` is to fetch all remote resources and to allow other components
to query these resources. For this reason, it makes sense that if a fetched resource contains
a `ConfigSource`, the next resource is also fetched.

As an example, we can see that if a `Listener` is fetched the next resources are also fetched
in a tree-like structure. Because each `Resource` acts like a tree node conceptually, the stored
`Resource`s are named `ResourceNode`s. `Resource`s that watch other nodes are called `DynamicResource`s,
whereas `Resource`s that do not watch other nodes are called `StaticResource`s.

![resource_tree](resources/resource_tree.png)

For this reason, each `Resource` may also invoke a new `XdsBootstrap.subscribe`. For this reason,
each `Resource` is also a potential `Watcher` and implements the `Watcher` interface.
For practical purposes, fetched `Resource`s are also stored in the `WatcherStorage`.
It is still possible that the tree happens to contain multiple `Resource`s with the same type and name.
For this reason, `WatcherStorage` contains a `(type, resource)` to `ResourceNode[]` map.
The `ResourceNode[]` is stored in order of insertion.

### Duplicate ResourceNodes

Once a `Stream` receives a `Resource`, the `Subscriber` stores the `Resource` into the `WatcherStorage`
as a `ResourceNode`. However, it is possible that multiple different `ResourceNode`s are registered
for the same `(type, resource)` in different control plane servers.

For instance, assume that `ControlPlaneA` contains `ClusterA` pointing to `MyEndpoint`, and `ControlPlaneB`
contains `ClusterB` pointing to `MyEndpoint`. Although the name of both endpoints are `MyEndpoint`,
the contents of the `Endpoint` may be different.

![duplicate_resource_nodes](resources/duplicate_resource_nodes.png)

When an update occurs, listeners are notified of the `ResourceNode` that was stored the longest.
The rationale behind this decision was to avoid frequent updates to listeners.
Each time a `Resource` is updated, the `ResourceNode`s are looped over to find a non-null `Resource`.
In order to avoid potentially duplicate notifications for the same resource, a `CompositeWatcher`
caches the last notified value.

## Error Handling

`Stream` tries to notify the control plane server of any parsing errors.
This is so that the control plane implementation is not in the dark and can be notified of
failures. This is preferably done by sending a nack response instead of resetting the connection
as it requires less resources. When a response is first received, all validation is done to
decide whether a response should be acked or not. After validation, it is expected that the code path
downstream does not throw an exception. If an exception is thrown, the exception is logged.

For each resource, there are features that haven't been implemented yet.
The current implementation only validates on features that are partially supported, and ignores
features that aren't supported.For instance, currently `PathConfigSource` is not supported but
`ApiConfigSource` is supported. If a user tries to pass a `ConfigSource` with `PathConfigSource`
an error is returned. On the other hand, `Filter`s currently aren't implemented at all,
and thus are ignored.

## Future Works

- Many of the `Bootstrap` parameters which specify how to connect to the control plane server
such as `path_config_source`, `tls`, etc. are not supported.
- Provide better visibility into fetched objects for easier debugging.
- The xDS V3 protobuf API is exposed directly to the user. This may cause unexpected behavior because
protobuf objects are by default mutable.
- Provide a way to return a snapshot of an update. For instance, if a listener is updated a watcher
could notify after all resources the listener fetched are aggregated. For instance if a `Cluster`
is modified, then a callback could be invoked once the attached `Endpoint`s are also fetched completely.
17 changes: 17 additions & 0 deletions xds/docs/resources/duplicate_resource_nodes.plantuml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
@startuml resourceTree

object ControlPlaneA
object ControlPlaneB
object ClusterA
object ClusterB
object MyEndpoint
MyEndpoint : host = "hostA:80"
MyEndpoint : host = "hostB:80"

ControlPlaneA --> ClusterA
ClusterA --> MyEndpoint

ControlPlaneB --> ClusterB
ClusterB --> MyEndpoint

@enduml
Binary file added xds/docs/resources/duplicate_resource_nodes.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
21 changes: 21 additions & 0 deletions xds/docs/resources/resource_tree.plantuml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
@startuml resourceTree

object Listener1
object Listener2
object Route1
object Route2
object Cluster1
object Cluster2
object Endpoint1
object Endpoint2

Listener1 --> Route1
Listener1 --> Route2

Route1 --> Cluster1
Route1 --> Cluster2

Cluster1 --> Endpoint1
Cluster1 --> Endpoint2

@enduml
Binary file added xds/docs/resources/resource_tree.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.xds;

import com.google.protobuf.Message;

import io.envoyproxy.envoy.config.cluster.v3.Cluster;

final class ClusterResourceParser extends ResourceParser {

static final ClusterResourceParser INSTANCE = new ClusterResourceParser();

private ClusterResourceParser() {}

@Override
String name(Message message) {
if (!(message instanceof Cluster)) {
throw new IllegalArgumentException("message not type of Cluster");
}
return ((Cluster) message).getName();
}

@Override
Class<Cluster> clazz() {
return Cluster.class;
}

@Override
XdsType type() {
return XdsType.CLUSTER;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.xds;

import com.google.protobuf.Message;

import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;

final class EndpointResourceParser extends ResourceParser {

static final EndpointResourceParser INSTANCE = new EndpointResourceParser();

private EndpointResourceParser() {}

@Override
String name(Message message) {
if (!(message instanceof ClusterLoadAssignment)) {
throw new IllegalArgumentException("message not type of ClusterLoadAssignment");
}
return ((ClusterLoadAssignment) message).getClusterName();
}

@Override
Class<ClusterLoadAssignment> clazz() {
return ClusterLoadAssignment.class;
}

@Override
XdsType type() {
return XdsType.ENDPOINT;
}
}
Loading

0 comments on commit 02cbc34

Please sign in to comment.