这个部分描述了Spring框架对于RSocket协议的支持。
RSocket 是一种应用协议,用于通过 TCP、WebSocket 和其他字节流传输协议进行多路复用、双工通信,使用以下交互模型之一:
-
请求-响应
— 发送一个消息然后等待回复。 -
请求-响应流
— 发送一条消息并接收返回的消息流。 -
Channel
— 双向发送消息流。 -
Fire-and-Forget
— 发送单向消息。
一旦建立了初始连接,“客户端”与“服务器”的区别就会消失,因为双方变得对称并且每一方都可以发起上述交互之一。 这就是为什么在协议中将参与方称为“请求者”和“响应者”,而将上述交互称为“请求流”或简称为“请求”。
这些是 RSocket 协议的主要特性和优点:
-
Reactive Streams 跨网络边界的语义 — 对于诸如
Request-Stream
和Channel
之类的流请求,回压信号在请求者和响应者之间传播,允许请求者在源头减慢响应者的速度,从而减少对网络层拥塞控制的依赖,以及在网络级别或任何级别进行缓冲的需要。 -
请求限制 — 此功能叫做“租赁”,任何一端都可以发送
LEASE
帧,以限制另一端在给定时间内允许的请求总数。 租约定期更新。 -
会话恢复 — 这是为失去连接而设计的,需要维护一些状态。 状态管理对应用程序是透明的,并且与回压结合使用效果很好,可以在可能的情况下停止生产者并减少所需的状态量。
-
大消息的碎片化和重组。
-
保活(心跳)。
RSocket 有多种语言的实现。 Java 库 建立在 Project Reactor之上, 使用 Reactor Netty 进行传输。 这意味着来自应用程序中的 Reactive Streams Publisher 的信号通过 RSocket 透明地在网络上传播。
RSocket 的好处之一是它在网络上具有明确定义的行为,并且易于阅读 规范 以及一些协议 扩展。因此,阅读规范是一个好主意,独立于语言实现和更高级别的框架 API。 本节提供了一个简洁的概述来建立一些上下文。
连接
最初,客户端通过一些低级流传输(例如 TCP 或 WebSocket)连接到服务器,并向服务器发送SETUP
帧以设置连接参数。
服务器可能会拒绝SETUP
帧,但一般在发送(对于客户端)和接收(对于服务器)之后,双方就可以开始发出请求,除非SETUP
指示使用租约语义来限制请求数量,在这种情况下,双方必须等待来自另一端的LEASE
帧以允许发出请求。
发出请求
一旦建立连接,双方可以通过REQUEST_RESPONSE
、REQUEST_STREAM
、REQUEST_CHANNEL
或REQUEST_FNF
帧之一发起请求。 这些帧中的每一个都将携带一条消息从请求者传送到响应者。
响应者可以返回带有响应消息的PAYLOAD
帧,并且在REQUEST_CHANNEL
帧的情况下,请求者也可以发送带有更多请求消息的PAYLOAD
帧。
当请求涉及诸如Request-Stream
和Channel
之类的消息流时,响应者必须尊重来自请求者的需求信号。 需求表示为多个消息。 初始需求在REQUEST_STREAM
和REQUEST_CHANNEL
帧中指定。 随后的需求通过REQUEST_N
帧发出信号。
每一端也可以通过METADATA_PUSH
帧来发送元数据通知,这些通知与任何单独的请求无关,而是与整个连接有关。
消息格式
RSocket 消息包含数据和元数据。 元数据可用于发送路由、安全令牌等。数据和元数据的格式可以不同。 每个 Mime 类型都在 SETUP
帧中声明,并应用于给定连接上的所有请求。
虽然所有消息都可以具有元数据,但通常元数据(例如路由)是针对每个请求的,因此仅包含在请求的第一条消息中,即REQUEST_RESPONSE
、REQUEST_STREAM
、REQUEST_CHANNEL
或REQUEST_FNF
帧之一
协议扩展定义了用于应用程序的通用元数据格式:
-
Composite Metadata-- 多个独立格式化的元数据条目。
-
Routing — 请求路由。
RSocket 的 Java 实现 建立在 Project Reactor 之上。 TCP 和 WebSocket 的传输建立在 Reactor Netty 上。作为一个 Reactive Streams 库,Reactor 简化了实现协议的工作。对于应用程序,使用带有声明性运算符和透明回压支持的 Flux
和 Mono
是很自然的选择。
RSocket Java 中的 API 是有意进行最小化和基本化的。它专注于协议特性,并将应用程序编程模型(例如 RPC 代码生成与其他)作为更高级别的独立关注点。
主合约 io.rsocket.RSocket 模拟了四个请求交互类型,其中 Mono
表示单个消息的promise,Flux
是消息流,io.rsocket.Payload
是实际消息,可以访问数据和元数据作。 RSocket
合约是对称使用的。对于请求,应用程序会获得一个RSocket
来执行请求。对于响应,应用程序实现了 RSocket
来处理请求。
这部分仅仅是一个简单的介绍。大多数情况下,Spring 应用程序不必直接使用其 API。然而,查看或试验独立于 Spring 的 RSocket 可能很重要。 RSocket Java 存储库包含许多演示其 API 和协议功能的 示例应用程序。
spring-messaging
模块包括如下内容:
-
RSocketRequester - 流式API,通过带有数据和元数据编码/解码的
io.rsocket.RSocket
发出请求。 -
Annotated Responders -
@MessageMapping
注解处理方法用于响应
spring-web
模块包含 Encoder
和 Decoder
实现,例如 Jackson CBOR/JSON 和 RSocket 应用程序可能需要的 Protobuf。 它还包含可以插入以进行高效路由匹配的PathPatternParser
。
Spring Boot 2.2 支持通过 TCP 或 WebSocket 建立 RSocket 服务器,包括在 WebFlux 服务器中通过 WebSocket 公开 RSocket 的选项。 RSocketRequester.Builder
和RSocketStrategies
也有客户端支持和自动配置。 有关更多详细信息,请参阅 Spring Boot 参考中的 RSocket 部分。
Spring Security 5.2 提供了RSocket支持。
Spring Integration 5.2 提供了入站和出站网关与 RSocket 客户端和服务器交互的相关内容。 有关更多详细信息,请参阅 Spring 集成参考手册。
Spring Cloud Gateway 支持RSocket连接。
RSocketRequester
提供了一个流式 API 来执行 RSocket 请求,接受和返回数据和元数据的对象,而不是低级别的数据缓冲区。 它可以对称地使用,从客户端发出请求和从服务器发出请求。
在客户端获得一个 RSocketRequester
就是连接到服务器,这涉及发送带有连接设置的 RSocket SETUP
帧。 RSocketRequester
提供了一个构建器来帮助准备一个 io.rsocket.core.RSocketConnector
,包括 SETUP
帧的连接设置。
这是连接默认设置的最基本方法:
Java.
RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);
URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);
Kotlin.
val requester = RSocketRequester.builder().tcp("localhost", 7000)
URI url = URI.create("https://example.org:8080/rsocket");
val requester = RSocketRequester.builder().webSocket(url)
以上不会立即连接。 发出请求时,会透明地建立并使用共享连接。
RSocketRequester.Builder
提供以下内容来自定义初始的 SETUP
框架:
-
dataMimeType(MimeType)
— 设置连接数据的 MIME 类型。 -
metadataMimeType(MimeType)
— 为连接上的元数据设置 MIME 类型。 -
setupData(Object)
— 包含在SETUP
中的数据。 -
setupRoute(String, Object... )
— 元数据中的路由以包含在SETUP
中。 -
setupMetadata(Object, MimeType)
— 要包含在SETUP
中的其他元数据。
对于数据,默认 mime 类型源自第一个配置的解码器
。 对于元数据,默认的 mime 类型是 composite metadata,它允许每个请求有多个元数据值和 mime 类型对。 通常两者都不需要更改。
SETUP
帧中的数据和元数据是可选的。 在服务器端,可以使用 @ConnectMapping 方法来处理连接的开始和 SETUP
帧的内容。 元数据可用于连接级别的安全。
RSocketRequester.Builder
接受 RSocketStrategies
来配置请求者。 你需要使用它来为数据和元数据值的(反)序列化提供编码器和解码器。 默认情况下,仅注册了来自 spring-core
的用于 String
、byte[]
和 ByteBuffer
的基本编解码器。 添加 spring-web
可以访问更多可以注册的内容,如下所示:
Java.
RSocketStrategies strategies = RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.build();
RSocketRequester requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000);
Kotlin.
val strategies = RSocketStrategies.builder()
.encoders { it.add(Jackson2CborEncoder()) }
.decoders { it.add(Jackson2CborDecoder()) }
.build()
val requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000)
RSocketStrategies
是为重复使用而设计的。 在某些情况下,例如 客户端和服务器在同一个应用程序中,最好在 Spring 配置中声明它。
RSocketRequester.Builder
可用于配置响应来自服务器的请求。
你可以基于在服务器上使用相同的结构使用带注解的handler进行客户端响应,但以编程方式注册如下:
Java.
RSocketStrategies strategies = RSocketStrategies.builder()
.routeMatcher(new PathPatternRouteMatcher()) ①
.build();
SocketAcceptor responder =
RSocketMessageHandler.responder(strategies, new ClientHandler()); ②
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(responder)) ③
.tcp("localhost", 7000);
① 如果存在spring-web
,则使用PathPatternRouteMatcher
以实现高效的路由匹配。
② 从具有@MessageMaping
和/或@ConnectMapping
方法的类创建响应者。
③ 注册响应者。
Kotlin.
val strategies = RSocketStrategies.builder()
.routeMatcher(PathPatternRouteMatcher()) ①
.build()
val responder =
RSocketMessageHandler.responder(strategies, new ClientHandler()); ②
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(responder) } ③
.tcp("localhost", 7000)
① 如果存在spring-web
,则使用PathPatternRouteMatcher
以实现高效的路由匹配。
② 从具有@MessageMaping
和/或@ConnectMapping
方法的类创建响应者。
③ 注册响应者。
请注意,以上只是为客户端响应程序的编程注册而设计的快捷方式。 对于客户端响应程序在 Spring 配置中的替代方案,你仍然可以将 RSocketMessageHandler
声明为 Spring bean,然后按如下方式使用:
Java.
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(handler.responder()))
.tcp("localhost", 7000);
Kotlin.
import org.springframework.beans.factory.getBean
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(handler.responder()) }
.tcp("localhost", 7000)
对于上述情况,你可能还需要在 RSocketMessageHandler
中使用 setHandlerPredicate
来切换到检测客户端响应者的不同策略,例如基于自定义注解,像@RSocketClientResponder
与默认的 @Controller
。 这在客户端和服务器,或同一应用程序中的多个客户端的场景中是必要的。
有关编程模型的更多信息,另请参见 Annotated Responders。
RSocketRequesterBuilder
提供了一个回调来公开底层的 io.rsocket.core.RSocketConnector
,以提供保活间隔、会话恢复、拦截器等的进一步配置选项。 你可以在该级别配置选项,如下所示:
Java.
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> {
// ...
})
.tcp("localhost", 7000);
Kotlin.
val requester = RSocketRequester.builder()
.rsocketConnector {
//...
}
.tcp("localhost", 7000)
从服务器向连接的客户端发出请求需要获取连接客户端的请求者。
在 Annotated Responders 中,@ConnectMapping
和 @MessageMapping
方法支持 RSocketRequester
参数。 使用它来访问连接的请求者。 请记住,@ConnectMapping
方法本质上是 SETUP
帧的处理程序,必须在请求开始之前处理。 因此,一开始的请求必须与处理分离。 例如:
Java.
@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
requester.route("status").data("5")
.retrieveFlux(StatusReport.class)
.subscribe(bar -> { ①
// ...
});
return ... ②
}
① 异步开始请求,与处理相独立
② 进行处理并返回Mono<Void>
Kotlin.
@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
GlobalScope.launch {
requester.route("status").data("5").retrieveFlow<StatusReport>().collect { ①
// ...
}
}
/// ... ②
}
① 异步开始请求,与处理相独立
② 进行处理并返回Mono<Void>
一旦你有一个 client 或 server 请求者,你可以发出如下请求:
Java.
ViewBox viewBox = ... ;
Flux<AirportLocation> locations = requester.route("locate.radars.within") ①
.data(viewBox) ②
.retrieveFlux(AirportLocation.class); ③
① 指定要包含在请求消息的元数据中的路由。
② 为请求消息提供数据。
③ 声明预期的响应。
Kotlin.
val viewBox: ViewBox = ...
val locations = requester.route("locate.radars.within") ①
.data(viewBox) ②
.retrieveFlow<AirportLocation>() ③
① 指定要包含在请求消息的元数据中的路由。
② 为请求消息提供数据。
③ 声明预期的响应。
交互类型由输入和输出的基数隐式确定。 上面的例子是一个Request-Stream
,因为一个值被发送,一个值的流被接收。 大多数情况下,只要输入和输出的选择与 RSocket 交互类型以及响应者期望的输入和输出类型相匹配,你就不需要考虑这一点。 无效组合的唯一示例是多对一。
data(Object)
方法还接受任何 Reactive Streams Publisher
,包括 Flux
和 Mono
,以及在 ReactiveAdapterRegistry
中注册的任何其他值的生产者。 对于产生相同类型值的多值 Publisher
,例如 Flux
,考虑使用重载的 data
方法之一,以避免对每个元素进行类型检查和 Encoder
查找:
data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);
data(Object)
步骤是可选的。 对于不发送数据的请求,跳过它:
Java.
Mono<AirportLocation> location = requester.route("find.radar.EWR"))
.retrieveMono(AirportLocation.class);
Kotlin.
import org.springframework.messaging.rsocket.retrieveAndAwait
val location = requester.route("find.radar.EWR")
.retrieveAndAwait<AirportLocation>()
如果使用 composite metadata(默认值)并且这些值受注册的Encoder
支持,则可以添加额外的元数据值. 例如:
Java.
String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");
Flux<AirportLocation> locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlux(AirportLocation.class);
Kotlin.
import org.springframework.messaging.rsocket.retrieveFlow
val requester: RSocketRequester = ...
val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")
val locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlow<AirportLocation>()
对于Fire-and-Forget
,使用返回Mono<Void>
的send()
方法。 请注意,Mono
仅表示消息已成功发送,而不表示已被处理。
对于 Metadata-Push
,使用带有 Mono<Void>
返回值的 sendMetadata()
方法。
RSocket 响应者可以实现为 @MessageMapping
和 @ConnectMapping
方法。 @MessageMapping
方法处理单个请求,而 @ConnectMapping
方法处理连接级别的事件(设置和元数据推送)。 当然也支持带注解的响应者,用于从服务器端响应和从客户端响应。
要在服务器端使用带注解的响应者,需要将 RSocketMessageHandler
添加到你的 Spring 配置中,以使用 @MessageMapping
和 @ConnectMapping
方法检测 @Controller
bean:
Java.
@Configuration
static class ServerConfig {
@Bean
public RSocketMessageHandler rsocketMessageHandler() {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.routeMatcher(new PathPatternRouteMatcher());
return handler;
}
}
Kotlin.
@Configuration
class ServerConfig {
@Bean
fun rsocketMessageHandler() = RSocketMessageHandler().apply {
routeMatcher = PathPatternRouteMatcher()
}
}
然后通过Java RSocket API启动一个RSocket服务器,并为响应者配置RSocketMessageHandler
,如下所示:
Java.
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
CloseableChannel server =
RSocketServer.create(handler.responder())
.bind(TcpServerTransport.create("localhost", 7000))
.block();
Kotlin.
import org.springframework.beans.factory.getBean
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val server = RSocketServer.create(handler.responder())
.bind(TcpServerTransport.create("localhost", 7000))
.awaitSingle()
RSocketMessageHandler
默认支持 composite 和 routing 元数据。如果你需要使用不同的 mime 类型或注册其他元数据 mime 类型,你可以设置其 MetadataExtractor。
你需要设置支持元数据和数据格式所需的 Encoder
和 Decoder
实例。你可能需要用于编解码器实现的 spring-web
模块。
默认情况下,SimpleRouteMatcher
用于通过 AntPathMatcher
匹配路由。我们建议配置来自 spring-web
的 PathPatternRouteMatcher
以实现高效的路由匹配。 RSocket 路由可以是分层的,但不是 URL 路径。两个路由匹配器在默认情况下都配置为使用“.”作为分隔符,并且没有像 HTTP URL 那样的 URL 解码。
RSocketMessageHandler
可以通过 RSocketStrategies
进行配置,如果你需要在同一进程中的客户端和服务器之间共享配置:
Java.
@Configuration
static class ServerConfig {
@Bean
public RSocketMessageHandler rsocketMessageHandler() {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.setRSocketStrategies(rsocketStrategies());
return handler;
}
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.routeMatcher(new PathPatternRouteMatcher())
.build();
}
}
Kotlin.
@Configuration
class ServerConfig {
@Bean
fun rsocketMessageHandler() = RSocketMessageHandler().apply {
rSocketStrategies = rsocketStrategies()
}
@Bean
fun rsocketStrategies() = RSocketStrategies.builder()
.encoders { it.add(Jackson2CborEncoder()) }
.decoders { it.add(Jackson2CborDecoder()) }
.routeMatcher(PathPatternRouteMatcher())
.build()
}
客户端的带注解的响应者需要在 RSocketRequester.Builder
中进行配置。 详情请参见Client Responders。
服务端server 或者客户端client 配置好之后, @MessageMapping
方法可按照如下方法使用:
Java.
@Controller
public class RadarsController {
@MessageMapping("locate.radars.within")
public Flux<AirportLocation> radars(MapRequest request) {
// ...
}
}
Kotlin.
@Controller
class RadarsController {
@MessageMapping("locate.radars.within")
fun radars(request: MapRequest): Flow<AirportLocation> {
// ...
}
}
上述@MessageMapping
方法处理具有路由“locate.radars.within”的Request-Stream交互请求。 它支持灵活的方法签名,可以选择使用以下方法参数:
Method Argument | Description |
---|---|
@Payload |
请求负载。 可以是异步类型,如Mono 或者Flux *备注:*使用注解是可选的。 如果一个方法参数不是简单类型并且不是其他任何支持的参数方法, 都被假定为预期的负载。 |
RSocketRequester |
向远端发出请求的请求者。 |
@DestinationVariable |
基于映射模式中的变量从路由中提取的值,例如:@MessageMapping("find.radar.{id}") |
@Header |
为提取而注册的元数据值,详细描述:MetadataExtractor |
@Headers Map<String,Object> |
为提取而注册的元数据值,详细描述:MetadataExtractor |
预期返回值是一个或多个要序列化为响应负载的对象。 这可以是异步类型,如Mono
或Flux
,一个具体值,或者void
或无值异步类型,如Mono<Void>
。
@MessageMapping
方法支持的 RSocket 交互类型由输入(即 @Payload
参数)和输出的基数确定,其中基数的含义如下:
基数 | 描述 |
---|---|
1 | 或者是一个明确的值,或者是一个单值的异步类型,如Mono |
Many | 多值异步类型,如Flux<T> |
0 | 对于输入,这表示这个方法没有@Payload 参数。对于输出 void 或者一个不带有值的异步类型,如Mono<Void> |
下表显示了所有输入和输出基数组合以及相应的交互类型:
Input Cardinality | Output Cardinality | Interaction Types |
---|---|---|
0, 1 | 0 | Fire-and-Forget, Request-Response |
0, 1 | 1 | Request-Response |
0, 1 | Many | Request-Stream |
Many | 0, 1, Many | Request-Channel |
@ConnectMapping
处理 RSocket 连接开始时的 SETUP
帧,以及通过 METADATA_PUSH
帧的任何后续元数据推送通知,即 io.rsocket.RSocket
中的 metadataPush(Payload)
。
@ConnectMapping
方法支持与 @MessageMapping 相同的参数,但基于来自 SETUP
和 METADATA_PUSH
帧的元数据和数据。 @ConnectMapping
可以有一个模式,将处理范围缩小到在元数据中有路由的特定连接,或者如果没有声明任何模式,则所有连接都匹配。
@ConnectMapping
方法不能返回数据,必须使用 void
或 Mono<Void>
作为返回值声明。 如果新连接处理错误,则拒绝连接。 处理不能阻止向 RSocketRequester
发出连接请求。 有关详细信息,请参阅 Server Requester。
响应者必须理解元数据。 Composite metadata 允许独立格式化的元数据值(例如用于路由、安全、跟踪),每个值都有自己的 mime 类型。 应用程序需要一种方法来配置要支持的元数据 mime 类型,以及一种访问提取值的方法。
MetadataExtractor
是一个契约,用于获取序列化的元数据并返回解码的名称-值对,然后可以像首部一样按名称访问,例如通过带注释的处理程序方法中的 @Header
。
DefaultMetadataExtractor
可以被注入到 Decoder
实例来解码元数据。 它内置了对 "message/x.rsocket.routing.v0" 解码的支持,它解码为String
类型并保存在“route”键下。 对于任何其他 mime 类型,你需要提供一个 Decoder
并按如下方式注册 mime 类型:
Java.
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(fooMimeType, Foo.class, "foo");
Kotlin.
import org.springframework.messaging.rsocket.metadataToExtract
val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Foo>(fooMimeType, "foo")
复合元数据可以很好地组合独立的元数据值。 然而,请求者可能不支持复合元数据,或者可能选择不使用它。 为此,DefaultMetadataExtractor
可能需要自定义逻辑来将解码值映射到输出映射。 以下是将 JSON 用于元数据的示例:
Java.
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(
MimeType.valueOf("application/vnd.myapp.metadata+json"),
new ParameterizedTypeReference<Map<String,String>>() {},
(jsonMap, outputMap) -> {
outputMap.putAll(jsonMap);
});
Kotlin.
import org.springframework.messaging.rsocket.metadataToExtract
val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Map<String, String>>(MimeType.valueOf("application/vnd.myapp.metadata+json")) { jsonMap, outputMap ->
outputMap.putAll(jsonMap)
}
当通过 RSocketStrategies
配置 MetadataExtractor
时,你可以让 RSocketStrategies.Builder
使用配置的解码器创建提取器,只需使用回调来自定义注册,如下所示:
Java.
RSocketStrategies strategies = RSocketStrategies.builder()
.metadataExtractorRegistry(registry -> {
registry.metadataToExtract(fooMimeType, Foo.class, "foo");
// ...
})
.build();
Kotlin.
import org.springframework.messaging.rsocket.metadataToExtract
val strategies = RSocketStrategies.builder()
.metadataExtractorRegistry { registry: MetadataExtractorRegistry ->
registry.metadataToExtract<Foo>(fooMimeType, "foo")
// ...
}
.build()