diff --git a/docs/zh-cn/detail-design.md b/docs/zh-cn/detail-design.md deleted file mode 100644 index d6374f3..0000000 --- a/docs/zh-cn/detail-design.md +++ /dev/null @@ -1,624 +0,0 @@ -# x-api-gateway - -## Purpose - -Build a high-performance API gateway - -构建一个高性能API网关 - -## Detailed design - -### HttpServer - -Multiple http servers are supported. default http Server is reactor-netty-http. - -支持多种http服务器,默认http服务器是 reactor-netty-http - -### HttpHandler - -为了支持多种http服务器,抽象一个`HttpHandler`用于处理http请求。不同的服务器,通过适配器进行适配即可。 - -**request & response wraper** - -为了支持多种服务器,需要对http的Request和Response进行抽象 - -- interface HttpRequest. - -```java -interface HttpRequest { - - URL getURI(); - - HttpMethod getMethod(); - - HttpHeaders getHeaders(); - - MultiValueMap getCookies(); - - Flux getBody(); - -} - -``` - -- interface HttpResponse. - -```java -interface HttpResponse { - - HttpStatus getStatus(); - - HttpHeaders getHeaders(); - - MultiValueMap getCookies(); - - Mono writeWith(Publisher body); - -} - -``` - -HttpHandler完整实现: - -``` -interface HttpHandler { - void handle(HttpRequest request, HttpResponse response); -} - -``` - -### ServerAdapter - -HttpHandler默认实现:ReactorHttpHandlerAdaptor 用于适配 reactor-netty-http服务器 - -HttpRequest默认实现: ReactorHttpRequest - -HttpResponse默认实现: ReactorHttpResponse - -### WebAPI - -在http之上进行一个抽象web层 - -- WebHandler -- ServerCodecConfigurer -- ForwardedHeaderTransformer [TODO] - -提供一个http request-response 交互契约, 包含一些额外的属性和特性。 - -ServerWebExchange - -```java -interface ServerWebExchange { - - HttpRequest getRequest(); - - HttpResponse getResponse(); - - Map getAttributes(); - -} - -``` - -WebHandler - -```java -interface WebHandler { - - Mono handle(ServerWebExchange exchange); - -} - -``` - -### WebHandlerDecorator - -WebHandlerDecorator 提供一个WebHandler的包装&代理 - -```java -public class WebHandlerDecorator implements WebHandler { - - private final WebHandler delegate; - - public WebHandlerDecorator(WebHandler delegate) { - Assert.notNull(delegate, "'delegate' must not be null"); - this.delegate = delegate; - } - - @Override - public Mono handle(ServerWebExchange exchange) { - return this.delegate.handle(exchange); - } - -} - -``` - -可以通过继承此类并重写handle方法,对某一个WebHandler进行包装,提供额外处理功能。可以查看后续的 `ExceptionHandlingWebHandler` `HttpWebHandlerAdapter` - -**HttpWebHandlerAdapter** - -通过一个Adapter,将HttpHandler和WebHandler的行为联系起来。即HttpHandler接收请求然后交由被委托的WebHandler进行处理。 - -```java -class HttpWebHandlerAdapter extends WebHandlerDecorator implements HttpHandler { - // todo ... -} - -``` - -**ExceptionHandlingWebHandler** - -对委托的WebHandler提供Exception处理能力 - -```java -class ExceptionHandlingWebHandler extends WebHandlerDecorator { - - public ExceptionHandlingWebHandler(WebHandler delegate, List handlers) { - // todo - } - -} - -``` - -```java -public interface WebExceptionHandler { - - Mono handle(ServerWebExchange exchange, Throwable ex); - -} - -``` - -### Route - -Route由一个id,一个RoutePredicate,一组GatewayFilter以及一个Endpoint组成 - -| Route Component | | -| --- | --- | -| id | | -| RoutePredicate | 1 | -| GatewayFilter | 1..N | -| Endpoint | 1 | - -### RoutePredicate - -```java -interface RoutePredicate extends Ordered { - - Mono test(ServerWebExchange exchange); - - default RoutePredicate or(RoutePredicate other){..} - default RoutePredicate and(RoutePredicate ohter){..} - default RoutePredicate negative(){..} -} - -``` - -断言参数配置 - -```java -interface Configable { - C getConfig(); -} - -interface Config { - -} -// 基础实现 -class NameValueConfig implements Config{ - private String name; - private String value; -} - -class NameValuesConfig implements Config{ - private String name; - private Set values; -} - -``` - -```java -abstract class AbstractRoutePredicate implements RoutePredicate, Configable { - private final C config; - public AbstraceRoutePredicate (C config){ - this.config = config; - } - public C getConfig() { - return this.config; - } -} - -``` - -举例说明:HttpMethodPredicate 判断请求方法 - -```java -class HttpMethodPredicate extends AbstraceRoutePredicate { - public HttpMethodPredicate(NameValuesPredicateConfig config) { - super(config); - } - public Mono test(ServerWebExchange exchange) { - return Mono.just(exchange->{ - return getConfig().getValues().contains(exchange.getRequest().getMethod().toString()); - }); - } -} - -``` - -### RoutePredicateWebHandler - -用于断言路由 - -```java -public class RoutePredicateWebHandler extends WebHandlerDecorator { - - private RouteLocator routeLocator; - - @Override - public Mono handle(ServerWebExchange exchange) { - return lookupRoute(exchange) - .switchIfEmpty(...) - .flatMap(route -> { - exchange.setAttribte(ROUTE_KEY, route); - return Mono.just(super.handle(exchange)); - }); - } -} - -``` - -### GatewayFilter - -```java -interface GatewayFilter { - Mono filter(ServerWebExchange exchange, GatewayFilterChain chain); -} - -``` - -```java -interface GatewayFilterChain { - Mono filter(ServerWebExchange exchange); -} - -``` - -```java -class DefaultGatewayFilterChain implements GatewayFilterChain { - private List filters; - private int index = 0; - public DefaultGatewayFilterChain(List filters) { - this.filters = filters; - } - - Mono filter(ServerWebExchange exchange) { - if(index < filters.length()){ - return filters.get(index++).filter(exchange, this); - } else { - return Mono.empty(); - } - } -} - -``` - -### FilteringWebHandler - -用于触发GatewayFilter责任链 - -```java -class FilteringWebHandler implements WebHandler { - - private List globalFilters; - - Mono handle(ServerWebExchange exchange) { - Route route = exchange.getRequiredAttribute(ROUTE_KEY); - List filters = Route.getFilters(); - filters = List.combine(globalFilters, filters).sort(); - GatewayFilterChain chain = new DefaltGatewayFilterChain(filters); - return chain.filter(exchange); - } - -} - -``` - -### Endpoint - -Endpoint代表了这个路由最终调用的端点 - -```java -interface Endpoint { - Mono invoke(ServerWebExchange exchange); -} - -``` - -### Protocol & Refrence - -Endpoint默认实现由Protocol和Refrence组合而成 - -| Endpoint Component | | -| --- | --- | -| Protocol | GRPC/HTTP/DUBBO/THRIFT | -| Refrence | | - -```java -enum Protocol { - GRPC, HTTP, THRIFT, DUBBO; -} - -interface Refrence { - -} - -abstract class AbstractEndpoint { - - private final Portocol protocol; - - private final R refrence; - - public AbstractEndpoint(Protocol protocol, R refrence) { - this.protocol = protocol; - this.refrence = refrence; - } - - public R getRefrence(){ - return this.refrence; - } -} - -``` - -以http endpoint为例: - -```java -class HttpRefrence implements Refrence { - private URL url; - ... -} -class HttpEndpoint extends AbstractEndpoint{ - public HttpEndpoint(HttpRefrence refrence){ - super(Protocol.HTTP, refrence); - } - - public Mono invoke(ServerWebExchange exchange){ - .... - } -} - -``` - -### InvokeEndpointGatewayFilter - -将endpoint调用嵌入到GatewayFilter调用链中 - -```java -class InvokeEndpointGatewayFilter implements GatewayFilter { - private final Endpoint endpoint; - - public InvokeEndpointGatewayFilter(Endpoint endpoint) { - this.endpoint = endpoint; - } - - Mono filter(ServerWebExchange exchange, WebFilterChain chain) { - return endpoint.invoke(exchange).then(chain.filter(exchange)); - } -} - -``` - -## Observability - -可观测性作为一个应用系统很重要的一部分,网关在以下三个方面进行设计: - -整个可观测性的API层使用Open-Telemetry(这里有个风险:与Open-Telemetry强耦合) - -### Tracing - -支持从http请求头中提取上下文trace信息, 如果没有提取到,则开启一个新的Trace链;最终会将Trace信息传递给下游的Endpoint系统(通过http请求头/ RPC context) - -网关获取Span - -```java -class TracingWebHandler extends WebHandlerDecorator { - public TracingWebHandler(WebHandler webhandler){ - super(webhandler); - } - - @Override - public Mono handle(ServerWebExchange exchange) { - return Mono.just(()->{ - Span span = SpanHeaderExtractor.extract(exchange.getRequest().getHeaders()); - if (span == null) { - // new Trace - } else { - // new Span - } - exchange.setAttribte(SPAN_KEY, span); - }).then(super.handle(exchange)) - .finialy(()->{ - Span span = exchange.getRequiredAttribute(SPAN_KEY) - // finish span - }); - } -} - -``` - -向下游传递Span,以Http为例: - -```java -class HttpEndpoint extends AbstractEndpoint{ - public HttpEndpoint(HttpRefrence refrence){ - super(Protocol.HTTP, refrence); - } - - public Mono invoke(ServerWebExchange exchange){ - // Span信息注入,传递给下游 - SpanHeaderInjecter.inject(exchange.getRequest().getHeaders()); - // .... - } -} - -``` - -### Metric - -我们的目标是打造一个高性能网关,如何知道当前API网关的性能呢?监控就必不可少了。在整个API处理过程中,我们预设了一些指标。 - -| Metric | Type | Labels | Desc | -| --- | --- | --- | --- | -| requestTime | Histogram | route_id, http_status | 统计请求的耗时+请求数 | -| requestBodyBytes【TODO】 | Histogram | route_id | 统计请求体的大小 | -| routePredicateTime | Histogram | route_id | 统计路由断言耗时 | -| inprogressRequests | Gauge | route_id | 统计正在处理的请求数量 | -| invokeEndpointTime | Histogram | route_id | 统计调用后端endpoint的时间 | - -```java -class MetricWebHandler extends WebHandlerDecorator { - private Histogram requestTime; - private Gauge inprogressRequests; - - public MetricWebHandler(WebHandler webhandler){ - super(webhandler); - } - - @Override - public Mono handle(ServerWebExchange exchange) { - return Mono.just(()->{ - Timer timer = new Timer(); - exchange.setAttribte(REQ_TIMER, timer); - // 记录指标 - inprogressRequests.inc(); - }).then(super.handle(exchange)) - .finialy(()->{ - Timer timer = exchange.getRequiredAttribute(REQ_TIMER) - timer.end(); - // 记录指标 - requestTime.record(timer.toMills(), ...); - // 记录指标 - inprogressRequests.dec(); - }); - } -} - -``` - -```java -public class RoutePredicateWebHandler extends WebHandlerDecorator { - private Histogram routePredicateTime; - private RouteLocator routeLocator; - - @Override - public Mono handle(ServerWebExchange exchange) { - return lookupRoute(exchange) - .switchIfEmpty(...) - .flatMap(route -> { - exchange.setAttribte(ROUTE_KEY, route); - return Mono.just(super.handle(exchange)); - }); - } - - private Flux lookupRoute(ServerWebExchange exchange) { - return Flux.just(()->{ - Timer timer = new Timer(); - exchange.setAttribute(PREDICATE_TIMER, timer); - }).then(...) - .finialy(()->{ - Timer timer = exchange.getRequiredAttribute(PREDICATE_TIMER); - timer.end(); - // 记录指标 - routePredicateTime.record(timer.toMills, ...); - }); - } -} - -``` - -```java -class InvokeEndpointGatewayFilter implements GatewayFilter { - private Histogram invokeEndpointTime; - private final Endpoint endpoint; - - public InvokeEndpointGatewayFilter(Endpoint endpoint) { - this.endpoint = endpoint; - } - - Mono filter(ServerWebExchange exchange, WebFilterChain chain) { - return Mono.just(()->{ - Timer timer = new Timer(); - exchange.setAttribute(INVOKE_TIMER, timer); - }).then(() -> { - endpoint.invoke(exchange).then(chain.filter(exchange)); - }).finialy(() -> { - Timer timer = exchange.getRequiredAttribute(INVOKE_TIMER); - timer.end(); - // 记录指标 - invokeEndpointTime.record(timer.toMills(),...); - }); - } - -} - -``` - -### Logging - -定义一个AccessLog对象包含我们关注的所有日志内容 - -```java -class AccessLog { - private String traceId; - private long timestamp; - private String clientIp; - private String httpMethod; - private String requestUrl; - private String requestHeaders; - private String requestBody; - private int httpStatus; - private String responseHeaders; - private String responseBody; -} - -``` - -```java -interface AccessLogCollector { - void collect(AccessLog accessLog); -} - -``` - -```java -class LoggingWebHandler extends WebHandlerDecorator { - - public LoggingWebHandler(WebHandler webhandler){ - super(webhandler); - } - - @Override - public Mono handle(ServerWebExchange exchange) { - return Mono.just(()->{ - AccessLog accessLog = new AccessLog(); - // set log properties - ... - exchange.setAttribte(ACCESS_LOG, accesslog); - }).then(()->{ - super.handle(exchange) - }).finialy(()->{ - AccessLog accessLog = exchange.getRequiredAttribute(ACCESS_LOG) - // set log properties - ... - AccessLogCollectorManager.collect(accessLog); - }); - } -} -``` - -## Summary - -本文介绍了x-api-gateway的架构和设计,包括WebHandler、WebHandlerDecorator、HttpWebHandlerAdapter、Route、RoutePredicate、GatewayFilter、Endpoint、Protocol、Refrence等组件,并重点介绍了x-api-gateway的可观测性设计,包括Tracing、Metric和Logging。 diff --git a/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/bootstrap/GatewayServerBootstrap.java b/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/bootstrap/GatewayServerBootstrap.java index 2b8c4fb..9cb45cf 100644 --- a/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/bootstrap/GatewayServerBootstrap.java +++ b/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/bootstrap/GatewayServerBootstrap.java @@ -3,8 +3,9 @@ import com.google.common.collect.Lists; import io.github.xinfra.lab.gateway.commons.Assert; import io.github.xinfra.lab.gateway.config.GatewayConfigManager; -import io.github.xinfra.lab.gateway.filter.GatewayFilter; +import io.github.xinfra.lab.gateway.filter.global.GlobalGatewayFilter; import io.github.xinfra.lab.gateway.filter.global.RoutingFilter; +import io.github.xinfra.lab.gateway.handler.DefaultWebExceptionHandler; import io.github.xinfra.lab.gateway.handler.ExceptionHandlingWebHandler; import io.github.xinfra.lab.gateway.handler.FilteringWebHandler; import io.github.xinfra.lab.gateway.handler.ReactorHttpHandler; @@ -18,7 +19,6 @@ import reactor.netty.http.server.HttpServer; import java.net.InetSocketAddress; -import java.util.ArrayList; import java.util.List; /** @@ -27,18 +27,22 @@ public class GatewayServerBootstrap { private int port; - private List globalFilters = new ArrayList<>(); - private List webExceptionHandlers = new ArrayList<>(); + private List globalFilters; + private List webExceptionHandlers; private RouteLocator routeLocator; private String configPath; + private DisposableServer server; + + public RouteLocator getRouteLocator() { + return routeLocator; + } public GatewayServerBootstrap port(int port) { this.port = port; return this; } - - public GatewayServerBootstrap globalFilters(List globalFilters) { + public GatewayServerBootstrap globalFilters(List globalFilters) { Assert.notNull(globalFilters, "globalFilters must not be null."); this.globalFilters = globalFilters; return this; @@ -63,13 +67,16 @@ public GatewayServerBootstrap configPath(String configPath) { } - public DisposableServer start() { - + public GatewayServerBootstrap start() { if (routeLocator == null) { routeLocator = getDefaultRouteLocator(); } - - globalFilters.addAll(getDefaultGlobalFilters()); + if (globalFilters == null) { + globalFilters = getDefaultGlobalFilters(); + } + if (webExceptionHandlers == null) { + webExceptionHandlers = getDefaultWebExceptionHandlers(); + } // trigger GatewayFilterChain FilteringWebHandler filteringWebHandler = new FilteringWebHandler(globalFilters); @@ -81,17 +88,26 @@ public DisposableServer start() { ReactorHttpHandler httpHandler = new ReactorHttpHandler(exceptionHandlingWebHandler); // start server - return HttpServer.create() + this.server = HttpServer.create() .handle(httpHandler::handle) .bindAddress(() -> new InetSocketAddress(port)) .bind().block(); + + return this; } - private List getDefaultGlobalFilters() { + public GatewayServerBootstrap stop() { + this.server.disposeNow(); + return this; + } - return Lists.newArrayList(new RoutingFilter()); + protected List getDefaultWebExceptionHandlers() { + return Lists.newArrayList(new DefaultWebExceptionHandler()); } + protected List getDefaultGlobalFilters() { + return Lists.newArrayList(new RoutingFilter()); + } protected RouteLocator getDefaultRouteLocator() { diff --git a/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/bootstrap/AbstractConfigurable.java b/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/commons/AbstractConfigurable.java similarity index 86% rename from x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/bootstrap/AbstractConfigurable.java rename to x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/commons/AbstractConfigurable.java index 1c6751d..0548209 100644 --- a/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/bootstrap/AbstractConfigurable.java +++ b/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/commons/AbstractConfigurable.java @@ -1,4 +1,4 @@ -package io.github.xinfra.lab.gateway.bootstrap; +package io.github.xinfra.lab.gateway.commons; public class AbstractConfigurable implements Configurable { diff --git a/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/bootstrap/Configurable.java b/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/commons/Configurable.java similarity index 59% rename from x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/bootstrap/Configurable.java rename to x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/commons/Configurable.java index 5ff228e..888a876 100644 --- a/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/bootstrap/Configurable.java +++ b/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/commons/Configurable.java @@ -1,4 +1,4 @@ -package io.github.xinfra.lab.gateway.bootstrap; +package io.github.xinfra.lab.gateway.commons; public interface Configurable { diff --git a/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/endpoint/Endpoint.java b/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/endpoint/Endpoint.java index 8616ad6..64af6a1 100644 --- a/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/endpoint/Endpoint.java +++ b/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/endpoint/Endpoint.java @@ -3,6 +3,8 @@ import io.github.xinfra.lab.gateway.server.ServerWebExchange; import reactor.core.publisher.Mono; -public interface Endpoint { +public interface Endpoint { + + T getConfig(); Mono invoke(ServerWebExchange exchange); } diff --git a/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/endpoint/EndpointFactory.java b/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/endpoint/EndpointFactory.java index a885f62..aff3b0c 100644 --- a/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/endpoint/EndpointFactory.java +++ b/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/endpoint/EndpointFactory.java @@ -1,6 +1,6 @@ package io.github.xinfra.lab.gateway.endpoint; -import io.github.xinfra.lab.gateway.bootstrap.Configurable; +import io.github.xinfra.lab.gateway.commons.Configurable; public interface EndpointFactory extends Configurable { diff --git a/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/endpoint/HttpEndpoint.java b/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/endpoint/HttpEndpoint.java new file mode 100644 index 0000000..8acbdb3 --- /dev/null +++ b/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/endpoint/HttpEndpoint.java @@ -0,0 +1,65 @@ +package io.github.xinfra.lab.gateway.endpoint; + +import io.github.xinfra.lab.gateway.server.ServerWebExchange; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; +import reactor.netty.http.client.HttpClient; +import reactor.netty.http.server.HttpServerRequest; +import reactor.netty.http.server.HttpServerResponse; + +import java.net.URI; +import java.util.List; + +import static io.github.xinfra.lab.gateway.commons.ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR; +import static io.github.xinfra.lab.gateway.commons.ServerWebExchangeUtils.markResponseCommitted; + +@Slf4j +public class HttpEndpoint implements Endpoint { + + @Data + public static class Config { + List urls; + List weights; + } + + private Config config; + + public HttpEndpoint(Config config) { + this.config = config; + } + + @Override + public Config getConfig() { + return this.config; + } + + @Override + public Mono invoke(ServerWebExchange exchange) { + + + HttpServerRequest request = exchange.getRequest(); + HttpServerResponse response = exchange.getResponse(); + + return HttpClient.create() + .headers(headers -> { + headers.add(request.requestHeaders()); + }) + .request(request.method()) + .uri(URI.create(config.getUrls().get(0))) // TODO Chooser + .send(request.receive()) + .response((httpClientResponse, byteBufFlux) -> + response.status(httpClientResponse.status()) + .headers(httpClientResponse.responseHeaders()) + .sendByteArray(byteBufFlux.asByteArray()) + ) + .subscribeOn(Schedulers.boundedElastic()) + .doOnError(t -> { + log.error("fail invoke http endpoint:{}", + exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR), t); + }).doOnNext(v -> { + markResponseCommitted(exchange); + }).then(); + } +} diff --git a/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/endpoint/HttpEndpointFactory.java b/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/endpoint/HttpEndpointFactory.java index c5ab06c..95830ce 100644 --- a/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/endpoint/HttpEndpointFactory.java +++ b/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/endpoint/HttpEndpointFactory.java @@ -1,66 +1,27 @@ package io.github.xinfra.lab.gateway.endpoint; -import io.github.xinfra.lab.gateway.bootstrap.AbstractConfigurable; -import lombok.Data; +import io.github.xinfra.lab.gateway.commons.AbstractConfigurable; import lombok.extern.slf4j.Slf4j; -import reactor.core.scheduler.Schedulers; -import reactor.netty.http.client.HttpClient; -import reactor.netty.http.server.HttpServerRequest; -import reactor.netty.http.server.HttpServerResponse; - -import java.net.URI; -import java.util.List; - -import static io.github.xinfra.lab.gateway.commons.ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR; -import static io.github.xinfra.lab.gateway.commons.ServerWebExchangeUtils.markResponseCommitted; @Slf4j public class HttpEndpointFactory extends - AbstractConfigurable - implements EndpointFactory { + AbstractConfigurable + implements EndpointFactory { + public static final String NAME = "Http"; + public HttpEndpointFactory() { - super(HttpEndpointFactory.Config.class); + super(HttpEndpoint.Config.class); } @Override public String getName() { - return "Http"; + return NAME; } @Override - public Endpoint apply(Config config) { - return exchange -> { - - HttpServerRequest request = exchange.getRequest(); - HttpServerResponse response = exchange.getResponse(); - - return HttpClient.create() - .headers(headers -> { - headers.add(request.requestHeaders()); - }) - .request(request.method()) - .uri(URI.create(config.getHosts().get(0))) // TODO Chooser - .send(request.receive()) - .response((httpClientResponse, byteBufFlux) -> - response.status(httpClientResponse.status()) - .headers(httpClientResponse.responseHeaders()) - .sendByteArray(byteBufFlux.asByteArray()) - ) - .subscribeOn(Schedulers.boundedElastic()) - .doOnError(t -> { - log.error("fail invoke http endpoint:{}", - exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR), t); - }).doOnNext(v -> { - markResponseCommitted(exchange); - }).then(); - }; + public Endpoint apply(HttpEndpoint.Config config) { + return new HttpEndpoint(config); } - - @Data - public static class Config { - List hosts; - List weights; - } } diff --git a/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/filter/AddHeaderGatewayFilterFactory.java b/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/filter/AddHeaderGatewayFilterFactory.java index 378e200..f05caa8 100644 --- a/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/filter/AddHeaderGatewayFilterFactory.java +++ b/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/filter/AddHeaderGatewayFilterFactory.java @@ -1,6 +1,6 @@ package io.github.xinfra.lab.gateway.filter; -import io.github.xinfra.lab.gateway.bootstrap.AbstractConfigurable; +import io.github.xinfra.lab.gateway.commons.AbstractConfigurable; import reactor.netty.http.server.HttpServerRequest; import java.util.Map; @@ -8,13 +8,14 @@ public class AddHeaderGatewayFilterFactory extends AbstractConfigurable> implements GatewayFilterFactory> { + public static final String NAME = "AddHeader"; public AddHeaderGatewayFilterFactory() { super((Class) Map.class); } @Override public String getName() { - return "AddHeader"; + return NAME; } @Override diff --git a/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/filter/GatewayFilterFactory.java b/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/filter/GatewayFilterFactory.java index 35f360c..512bf9d 100644 --- a/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/filter/GatewayFilterFactory.java +++ b/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/filter/GatewayFilterFactory.java @@ -1,6 +1,6 @@ package io.github.xinfra.lab.gateway.filter; -import io.github.xinfra.lab.gateway.bootstrap.Configurable; +import io.github.xinfra.lab.gateway.commons.Configurable; public interface GatewayFilterFactory extends Configurable { diff --git a/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/handler/FilteringWebHandler.java b/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/handler/FilteringWebHandler.java index 528ed14..1a4e180 100644 --- a/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/handler/FilteringWebHandler.java +++ b/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/handler/FilteringWebHandler.java @@ -3,6 +3,7 @@ import io.github.xinfra.lab.gateway.commons.OrderedAwareComparator; import io.github.xinfra.lab.gateway.filter.DefaultGatewayFilterChain; import io.github.xinfra.lab.gateway.filter.GatewayFilter; +import io.github.xinfra.lab.gateway.filter.global.GlobalGatewayFilter; import io.github.xinfra.lab.gateway.route.Route; import io.github.xinfra.lab.gateway.server.ServerWebExchange; import reactor.core.publisher.Mono; @@ -14,9 +15,9 @@ public class FilteringWebHandler implements WebHandler { - private List globalFilters; + private List globalFilters; - public FilteringWebHandler(List globalFilters) { + public FilteringWebHandler(List globalFilters) { this.globalFilters = globalFilters; } diff --git a/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/predicate/PathRoutePredicateFactory.java b/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/predicate/PathRoutePredicateFactory.java index 40f4503..de243fa 100644 --- a/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/predicate/PathRoutePredicateFactory.java +++ b/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/predicate/PathRoutePredicateFactory.java @@ -1,6 +1,6 @@ package io.github.xinfra.lab.gateway.predicate; -import io.github.xinfra.lab.gateway.bootstrap.AbstractConfigurable; +import io.github.xinfra.lab.gateway.commons.AbstractConfigurable; import io.github.xinfra.lab.gateway.server.ServerWebExchange; import lombok.Data; import reactor.core.publisher.Mono; @@ -11,6 +11,7 @@ public class PathRoutePredicateFactory extends AbstractConfigurable implements RoutePredicateFactory { + public static final String NAME = "Path"; public PathRoutePredicateFactory() { super(PatternConfig.class); @@ -18,7 +19,7 @@ public PathRoutePredicateFactory() { @Override public String getName() { - return "Path"; + return NAME; } @Override diff --git a/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/predicate/RoutePredicateFactory.java b/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/predicate/RoutePredicateFactory.java index 5d22731..613d44c 100644 --- a/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/predicate/RoutePredicateFactory.java +++ b/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/predicate/RoutePredicateFactory.java @@ -1,7 +1,7 @@ package io.github.xinfra.lab.gateway.predicate; -import io.github.xinfra.lab.gateway.bootstrap.Configurable; +import io.github.xinfra.lab.gateway.commons.Configurable; import io.github.xinfra.lab.gateway.server.ServerWebExchange; public interface RoutePredicateFactory extends Configurable { diff --git a/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/route/RouteLocatorBuilder.java b/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/route/RouteLocatorBuilder.java new file mode 100644 index 0000000..2b6fd36 --- /dev/null +++ b/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/route/RouteLocatorBuilder.java @@ -0,0 +1,165 @@ +package io.github.xinfra.lab.gateway.route; + + +import io.github.xinfra.lab.gateway.endpoint.Endpoint; +import io.github.xinfra.lab.gateway.endpoint.EndpointFactoryManager; +import io.github.xinfra.lab.gateway.endpoint.HttpEndpoint; +import io.github.xinfra.lab.gateway.endpoint.HttpEndpointFactory; +import io.github.xinfra.lab.gateway.filter.AddHeaderGatewayFilterFactory; +import io.github.xinfra.lab.gateway.filter.GatewayFilter; +import io.github.xinfra.lab.gateway.filter.GatewayFilterFactory; +import io.github.xinfra.lab.gateway.filter.GatewayFilterFactoryManager; +import io.github.xinfra.lab.gateway.predicate.PathRoutePredicateFactory; +import io.github.xinfra.lab.gateway.predicate.RoutePredicate; +import io.github.xinfra.lab.gateway.predicate.RoutePredicateFactoryManager; +import io.github.xinfra.lab.gateway.server.ServerWebExchange; +import reactor.core.publisher.Flux; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +import static io.github.xinfra.lab.gateway.route.RouteLocatorBuilder.Operator.AND; +import static io.github.xinfra.lab.gateway.route.RouteLocatorBuilder.Operator.OR; + +public class RouteLocatorBuilder { + + List routeList = new ArrayList<>(); + + public static RouteLocatorBuilder builder() { + return new RouteLocatorBuilder(); + } + + public RouteLocatorBuilder route(String id, Function func) { + routeList.add(func.apply(new RouteSpec().id(id))); + return this; + } + + public RouteLocator build() { + return () -> Flux.fromIterable(routeList); + } + + + public static class RouteSpec { + private String id; + + private RoutePredicate predicate; + private List gatewayFilterList = new ArrayList<>(); + + private Endpoint endpoint; + + RoutePredicateSpec id(String id) { + this.id = id; + return new RoutePredicateSpec(this); + } + } + + public static class RoutePredicateSpec { + private RouteSpec routeSpec; + private Operator operator; + + public RoutePredicateSpec(RouteSpec routeSpec) { + this.routeSpec = routeSpec; + } + + public RoutePredicateSpec(RouteSpec routeSpec, Operator operator) { + this.routeSpec = routeSpec; + this.operator = operator; + } + + public BooleanSpec path(String pattern) { + PathRoutePredicateFactory.PatternConfig config = new PathRoutePredicateFactory.PatternConfig(); + config.setPattern(pattern); + RoutePredicate predicate = RoutePredicateFactoryManager.INSTANCE.lookup(PathRoutePredicateFactory.NAME) + .apply(config); + if (operator != null) { + switch (operator) { + case AND: + routeSpec.predicate = routeSpec.predicate.and(predicate); + case OR: + routeSpec.predicate = routeSpec.predicate.or(predicate); + } + } else { + routeSpec.predicate = predicate; + } + return new BooleanSpec(routeSpec); + } + + } + + public static class BooleanSpec { + private RouteSpec routeSpec; + + public BooleanSpec(RouteSpec routeSpec) { + this.routeSpec = routeSpec; + } + + public RoutePredicateSpec and() { + return new RoutePredicateSpec(routeSpec, AND); + } + + public RoutePredicateSpec or() { + return new RoutePredicateSpec(routeSpec, OR); + } + + public BooleanSpec negate() { + routeSpec.predicate = routeSpec.predicate.negate(); + return this; + } + + public GatewayFilterSpec filters() { + return new GatewayFilterSpec(routeSpec); + } + } + + public static class GatewayFilterSpec { + private RouteSpec routeSpec; + + + public GatewayFilterSpec(RouteSpec routeSpec) { + this.routeSpec = routeSpec; + } + + public GatewayFilterSpec addHeader(Map headers) { + GatewayFilterFactory gatewayFilterFactory = GatewayFilterFactoryManager.INSTANCE + .lookup(AddHeaderGatewayFilterFactory.NAME); + GatewayFilter filter = gatewayFilterFactory.apply(headers); + routeSpec.gatewayFilterList.add(filter); + return this; + } + + public EndpointSpec endpoint() { + return new EndpointSpec(routeSpec); + } + } + + + public static class EndpointSpec { + private RouteSpec routeSpec; + + public EndpointSpec(RouteSpec routeSpec) { + this.routeSpec = routeSpec; + } + + public Route http(List urls, List weights) { + HttpEndpoint.Config config = new HttpEndpoint.Config(); + config.setUrls(urls); + config.setWeights(weights); + routeSpec.endpoint = EndpointFactoryManager.INSTANCE + .lookup(HttpEndpointFactory.NAME) + .apply(config); + + DefaultRoute route = new DefaultRoute(); + route.setEndpoint(routeSpec.endpoint); + route.setFilters(routeSpec.gatewayFilterList); + route.setPredicate(routeSpec.predicate); + route.setId(routeSpec.id); + return route; + } + } + + enum Operator { + AND, OR + } +} diff --git a/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/serializer/Serializers.java b/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/serializer/Serializers.java index 9a340cb..7959b5b 100644 --- a/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/serializer/Serializers.java +++ b/x-api-gateway-server/src/main/java/io/github/xinfra/lab/gateway/serializer/Serializers.java @@ -1,12 +1,16 @@ package io.github.xinfra.lab.gateway.serializer; + public class Serializers { - private static GsonSerializer gsonSerializer = new GsonSerializer(); + /** + * default gsonSerializer + */ + private static Serializer jsonSerializer = new GsonSerializer(); - public static GsonSerializer jsonSerializer() { - // default gsonSerializer - return gsonSerializer; + public static Serializer jsonSerializer() { + return jsonSerializer; } + } diff --git a/x-api-gateway-server/src/test/java/io/github/xinfra/lab/gateway/BaseTest.java b/x-api-gateway-server/src/test/java/io/github/xinfra/lab/gateway/BaseTest.java index 2c31be0..3d8304d 100644 --- a/x-api-gateway-server/src/test/java/io/github/xinfra/lab/gateway/BaseTest.java +++ b/x-api-gateway-server/src/test/java/io/github/xinfra/lab/gateway/BaseTest.java @@ -1,7 +1,7 @@ package io.github.xinfra.lab.gateway; import io.github.xinfra.lab.gateway.bootstrap.GatewayServerBootstrap; -import reactor.netty.DisposableServer; +import io.github.xinfra.lab.gateway.route.RouteLocator; public class BaseTest { @@ -12,11 +12,10 @@ public class BaseTest { * @param port * @return */ - public DisposableServer startGatewayServer(int port) { - DisposableServer server = new GatewayServerBootstrap() + public GatewayServerBootstrap startGatewayServer(int port) { + return new GatewayServerBootstrap() .port(port) .start(); - return server; } @@ -27,14 +26,28 @@ public DisposableServer startGatewayServer(int port) { * @param configPath * @return */ - public DisposableServer startGatewayServer(int port, - String configPath) { + public GatewayServerBootstrap startGatewayServer(int port, + String configPath) { - DisposableServer server = new GatewayServerBootstrap() + return new GatewayServerBootstrap() .port(port) .configPath(configPath) .start(); - return server; } + /** + * use custom routeLocator + * + * @param port + * @param routeLocator + * @return + */ + public GatewayServerBootstrap startGatewayServer(int port, + RouteLocator routeLocator) { + + return new GatewayServerBootstrap() + .port(port) + .routeLocator(routeLocator) + .start(); + } } diff --git a/x-api-gateway-server/src/test/java/io/github/xinfra/lab/gateway/bootstrap/GatewayServerBootstrapTest.java b/x-api-gateway-server/src/test/java/io/github/xinfra/lab/gateway/bootstrap/GatewayServerBootstrapTest.java index 5a86899..6c7f40c 100644 --- a/x-api-gateway-server/src/test/java/io/github/xinfra/lab/gateway/bootstrap/GatewayServerBootstrapTest.java +++ b/x-api-gateway-server/src/test/java/io/github/xinfra/lab/gateway/bootstrap/GatewayServerBootstrapTest.java @@ -4,15 +4,14 @@ import io.github.xinfra.lab.gateway.common.TestSocketUtils; import org.junit.Assert; import org.junit.Test; -import reactor.netty.DisposableServer; public class GatewayServerBootstrapTest extends BaseTest { @Test public void testStartGateWayServer1() { - DisposableServer server = startGatewayServer(TestSocketUtils.findAvailableTcpPort()); - Assert.assertNotNull(server); - server.disposeNow(); + GatewayServerBootstrap bootstrap = startGatewayServer(TestSocketUtils.findAvailableTcpPort()); + Assert.assertNotNull(bootstrap); + bootstrap.stop(); } } diff --git a/x-api-gateway-server/src/test/java/io/github/xinfra/lab/gateway/endpoint/HttpEndpointTest.java b/x-api-gateway-server/src/test/java/io/github/xinfra/lab/gateway/endpoint/HttpEndpointTest.java index 45afb7e..e3d58ed 100644 --- a/x-api-gateway-server/src/test/java/io/github/xinfra/lab/gateway/endpoint/HttpEndpointTest.java +++ b/x-api-gateway-server/src/test/java/io/github/xinfra/lab/gateway/endpoint/HttpEndpointTest.java @@ -1,7 +1,13 @@ package io.github.xinfra.lab.gateway.endpoint; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import io.github.xinfra.lab.gateway.BaseTest; +import io.github.xinfra.lab.gateway.bootstrap.GatewayServerBootstrap; import io.github.xinfra.lab.gateway.common.TestSocketUtils; +import io.github.xinfra.lab.gateway.route.Route; +import io.github.xinfra.lab.gateway.route.RouteLocator; +import io.github.xinfra.lab.gateway.route.RouteLocatorBuilder; import io.netty.handler.codec.http.HttpResponseStatus; import lombok.extern.slf4j.Slf4j; import org.junit.Assert; @@ -12,19 +18,105 @@ import reactor.netty.http.client.HttpClientResponse; import reactor.netty.http.server.HttpServer; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.List; +import java.util.Map; + @Slf4j public class HttpEndpointTest extends BaseTest { @Test - public void httpEndpointTest1() { + public void httpEndpointFileConfigTest() throws MalformedURLException { + + int gatewayServerPort = TestSocketUtils.findAvailableTcpPort(); + + String path = Thread.currentThread() + .getContextClassLoader() + .getResource("httptest.yml").getPath().toString(); + + GatewayServerBootstrap bootstrap = startGatewayServer(gatewayServerPort, path); + log.info("gatewayServer started."); + + List routeList = bootstrap.getRouteLocator().getRoutes().collectList().block(); + Route route = routeList.stream() + .filter(r -> r.getId().equals("http-test")) + .findFirst() + .get(); + + HttpEndpoint endpoint = (HttpEndpoint) route.getEndpoint(); + String urlStr = endpoint.getConfig().getUrls().get(0); + URL url = new URL(urlStr); + + // start httpEndpoint + int httpServerPort = url.getPort(); + DisposableServer httpServer = HttpServer.create() + .port(httpServerPort) + .route(routes -> + routes.get("/helloworld", + (httpServerRequest, httpServerResponse) -> { + System.out.println(httpServerRequest); + return httpServerResponse.sendString(Mono.just("hello world!")); + }) + ).bind().block(); + log.info("httpServer started."); + + + HttpClientResponse response = HttpClient.create() + .get() + .uri(String.format("http://localhost:%s/helloworld", httpServerPort)) + .response() + .block(); + Assert.assertEquals(response.status(), HttpResponseStatus.OK); + log.info("httpclient request httpServer success."); + + + response = HttpClient.create() + .get() + .uri(String.format("http://localhost:%s/hello", gatewayServerPort)) + .response() + .block(); + Assert.assertEquals(response.status(), HttpResponseStatus.OK); + log.info("httpclient request gatewayServer success."); + + httpServer.disposeNow(); + bootstrap.stop(); + } + + @Test + public void httpEndpointApiConfigTest() throws MalformedURLException { + Map addHeadersMap = Maps.newHashMap(); + addHeadersMap.put("proxy", "x-api-gateway"); + + RouteLocator routeLocator = RouteLocatorBuilder.builder() + .route("http-test", + routePredicateSpec -> + routePredicateSpec.path("hello") + .filters() + .addHeader(addHeadersMap) + .endpoint() + .http(Lists.newArrayList("http://127.0.0.1:9999/helloworld", "http://localhost:9999/helloworld"), + Lists.newArrayList()) + ).build(); + int gatewayServerPort = TestSocketUtils.findAvailableTcpPort(); - DisposableServer gatewayServer = startGatewayServer(gatewayServerPort); + GatewayServerBootstrap bootstrap = startGatewayServer(gatewayServerPort, routeLocator); log.info("gatewayServer started."); - // see config.yml - int httpServerPort = 9999; + List routeList = bootstrap.getRouteLocator().getRoutes().collectList().block(); + Route route = routeList.stream() + .filter(r -> r.getId().equals("http-test")) + .findFirst() + .get(); + + HttpEndpoint endpoint = (HttpEndpoint) route.getEndpoint(); + String urlStr = endpoint.getConfig().getUrls().get(0); + URL url = new URL(urlStr); + + // start httpEndpoint + int httpServerPort = url.getPort(); DisposableServer httpServer = HttpServer.create() .port(httpServerPort) .route(routes -> @@ -55,6 +147,6 @@ public void httpEndpointTest1() { log.info("httpclient request gatewayServer success."); httpServer.disposeNow(); - gatewayServer.disposeNow(); + bootstrap.stop(); } } diff --git a/x-api-gateway-server/src/test/java/io/github/xinfra/lab/gateway/serializer/SerializerTest.java b/x-api-gateway-server/src/test/java/io/github/xinfra/lab/gateway/serializer/SerializerTest.java index e3a0225..9f13978 100644 --- a/x-api-gateway-server/src/test/java/io/github/xinfra/lab/gateway/serializer/SerializerTest.java +++ b/x-api-gateway-server/src/test/java/io/github/xinfra/lab/gateway/serializer/SerializerTest.java @@ -1,18 +1,24 @@ package io.github.xinfra.lab.gateway.serializer; -import io.github.xinfra.lab.gateway.exception.ErrorCode; +import org.junit.Assert; import org.junit.Test; -import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; public class SerializerTest { @Test - public void gsonSerializerTest(){ - GsonSerializer serializer = Serializers.jsonSerializer(); + public void jsonSerializerTest1() { + Serializer serializer = Serializers.jsonSerializer(); - serializer.serialize(ErrorCode.SYSTEM_EXCEPTION) - .map(bytes -> new String(bytes, StandardCharsets.UTF_8)) - .doOnNext(s -> System.out.println(s)); + Map map = new HashMap<>(); + map.put("key", "value"); + + byte[] bytes = serializer.serialize(map) + .block(); + + Assert.assertEquals("{\"key\":\"value\"}", new String(bytes)); } + } diff --git a/x-api-gateway-server/src/test/resources/config.yml b/x-api-gateway-server/src/test/resources/httptest.yml similarity index 90% rename from x-api-gateway-server/src/test/resources/config.yml rename to x-api-gateway-server/src/test/resources/httptest.yml index 0e9f502..30dd5c8 100644 --- a/x-api-gateway-server/src/test/resources/config.yml +++ b/x-api-gateway-server/src/test/resources/httptest.yml @@ -1,6 +1,6 @@ routes: - - id: test + - id: http-test predicates: - name: Path config: 'pattern: hello' @@ -10,6 +10,6 @@ routes: endpoint: name: Http config: | - hosts: + urls: - http://127.0.0.1:9999/helloworld - http://localhost:9999/helloworld \ No newline at end of file