diff --git a/src/main/java/com/javagrunt/service/gateway/Application.java b/src/main/java/com/javagrunt/service/gateway/Application.java index 834a045..c74806d 100644 --- a/src/main/java/com/javagrunt/service/gateway/Application.java +++ b/src/main/java/com/javagrunt/service/gateway/Application.java @@ -1,19 +1,26 @@ package com.javagrunt.service.gateway; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.gateway.filter.GatewayFilter; import org.springframework.cloud.gateway.filter.GatewayFilterChain; import org.springframework.cloud.gateway.filter.factory.RetryGatewayFilterFactory; +import org.springframework.cloud.gateway.route.Route; import org.springframework.cloud.gateway.route.RouteLocator; import org.springframework.cloud.gateway.route.builder.RouteLocatorBuilder; +import org.springframework.cloud.gateway.support.ServerWebExchangeUtils; import org.springframework.context.annotation.Bean; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Mono; import java.time.Duration; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR; + @SpringBootApplication public class Application { @@ -29,13 +36,18 @@ public static void main(String[] args) { @Bean public ScalerService scaler() { - return scaler; // new SimpleScalerService(); + return scaler; } @Bean RouteLocator gateway(RouteLocatorBuilder rlb) { return rlb.routes() - .route(r -> r.path("/hello-service/**").filters(f -> f.stripPrefix(1).filter(scaler)).uri(hello)) + .route(r -> r.path("/hello-service/**") + .filters(f -> f.stripPrefix(1) + .filter(scaler)) + .metadata("name", "hello-service") + .metadata("namespace", "dev-javagrunt-com") + .uri(hello)) .route(r -> r.path("/**").uri(root)) .build(); } @@ -43,7 +55,9 @@ RouteLocator gateway(RouteLocatorBuilder rlb) { class RequestCountingFilter implements GatewayFilter, ScalerService { - private final AtomicInteger count = new AtomicInteger(); + Logger logger = LoggerFactory.getLogger(RequestCountingFilter.class); + + private ConcurrentHashMap counts = new ConcurrentHashMap<>(); private volatile boolean active = true; @@ -57,30 +71,46 @@ class RequestCountingFilter implements GatewayFilter, ScalerService { } @Override - public long getMetric(String name) { - return count.get(); + public long getMetric(String name, String namespace) { + AtomicInteger val = counts.get(String.format("%s:%s", name, namespace)); + logger.info("Returning count: " + val.get()); + return val.get(); } @Override - public boolean isActive(String name) { - return active && count.get() > 0; + public boolean isActive(String name, String namespace) { + AtomicInteger val = counts.get(String.format("%s:%s", name, namespace)); + logger.info("Returning active: " + active + " and count: " + val.get()); + return active && val.get() > 0; } @Override - public void setActive(String name, boolean active) { - count.set(0); + public void setActive(String name, String namespace, boolean active) { + logger.info("Setting active to: " + active); + counts.get(String.format("%s:%s", name, namespace)).set(0); this.active = active; } @Override public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) { - count.incrementAndGet(); + Route r = exchange.getAttribute(GATEWAY_ROUTE_ATTR); + assert r != null; + String name = r.getMetadata().get("name").toString(); + String namespace = r.getMetadata().get("namespace").toString(); + String key = String.format("%s:%s", name, namespace); + if(!counts.containsKey(key)) { + counts.put(key, new AtomicInteger(0)); + } + counts.get(key).incrementAndGet(); + AtomicInteger count = counts.get(key); return this.filter.filter(exchange, chain).doOnError(e -> { if (count.get() > 0) { + logger.info("Decrementing count"); count.decrementAndGet(); } }).then(Mono.fromRunnable(() -> { if (count.get() > 0) { + logger.info("Decrementing count"); count.decrementAndGet(); } })); diff --git a/src/main/java/com/javagrunt/service/gateway/GrpcServerService.java b/src/main/java/com/javagrunt/service/gateway/GrpcServerService.java index 64015b4..c6ab9f5 100644 --- a/src/main/java/com/javagrunt/service/gateway/GrpcServerService.java +++ b/src/main/java/com/javagrunt/service/gateway/GrpcServerService.java @@ -17,14 +17,14 @@ public class GrpcServerService extends ExternalScalerImplBase { @Override public void isActive(ScaledObjectRef request, StreamObserver responseObserver) { - responseObserver.onNext(IsActiveResponse.newBuilder().setResult(scaler.isActive(request.getName())).build()); + responseObserver.onNext(IsActiveResponse.newBuilder().setResult(scaler.isActive(request.getName(), request.getNamespace())).build()); responseObserver.onCompleted(); } @Override @Async public void streamIsActive(ScaledObjectRef request, StreamObserver responseObserver) { - boolean active = scaler.isActive(request.getName()); + boolean active = scaler.isActive(request.getName(), request.getNamespace()); responseObserver.onNext(IsActiveResponse.newBuilder().setResult(active).build()); while (true) { try { @@ -34,7 +34,7 @@ public void streamIsActive(ScaledObjectRef request, StreamObserver