Skip to content

Commit

Permalink
name and namespace keys
Browse files Browse the repository at this point in the history
  • Loading branch information
dashaun committed Feb 7, 2024
1 parent 919f404 commit 3bf9428
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 21 deletions.
50 changes: 40 additions & 10 deletions src/main/java/com/javagrunt/service/gateway/Application.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
package com.javagrunt.service.gateway;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.factory.RetryGatewayFilterFactory;
import org.springframework.cloud.gateway.route.Route;
import org.springframework.cloud.gateway.route.RouteLocator;
import org.springframework.cloud.gateway.route.builder.RouteLocatorBuilder;
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR;

@SpringBootApplication
public class Application {

Expand All @@ -29,21 +36,28 @@ public static void main(String[] args) {

@Bean
public ScalerService scaler() {
return scaler; // new SimpleScalerService();
return scaler;
}

@Bean
RouteLocator gateway(RouteLocatorBuilder rlb) {
return rlb.routes()
.route(r -> r.path("/hello-service/**").filters(f -> f.stripPrefix(1).filter(scaler)).uri(hello))
.route(r -> r.path("/hello-service/**")
.filters(f -> f.stripPrefix(1)
.filter(scaler))
.metadata("name", "hello-service")
.metadata("namespace", "dev-javagrunt-com")
.uri(hello))
.route(r -> r.path("/**").uri(root))
.build();
}
}

class RequestCountingFilter implements GatewayFilter, ScalerService {

private final AtomicInteger count = new AtomicInteger();
Logger logger = LoggerFactory.getLogger(RequestCountingFilter.class);

private ConcurrentHashMap<String, AtomicInteger> counts = new ConcurrentHashMap<>();

private volatile boolean active = true;

Expand All @@ -57,30 +71,46 @@ class RequestCountingFilter implements GatewayFilter, ScalerService {
}

@Override
public long getMetric(String name) {
return count.get();
public long getMetric(String name, String namespace) {
AtomicInteger val = counts.get(String.format("%s:%s", name, namespace));
logger.info("Returning count: " + val.get());
return val.get();
}

@Override
public boolean isActive(String name) {
return active && count.get() > 0;
public boolean isActive(String name, String namespace) {
AtomicInteger val = counts.get(String.format("%s:%s", name, namespace));
logger.info("Returning active: " + active + " and count: " + val.get());
return active && val.get() > 0;
}

@Override
public void setActive(String name, boolean active) {
count.set(0);
public void setActive(String name, String namespace, boolean active) {
logger.info("Setting active to: " + active);
counts.get(String.format("%s:%s", name, namespace)).set(0);
this.active = active;
}

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
count.incrementAndGet();
Route r = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
assert r != null;
String name = r.getMetadata().get("name").toString();
String namespace = r.getMetadata().get("namespace").toString();
String key = String.format("%s:%s", name, namespace);
if(!counts.containsKey(key)) {
counts.put(key, new AtomicInteger(0));
}
counts.get(key).incrementAndGet();
AtomicInteger count = counts.get(key);
return this.filter.filter(exchange, chain).doOnError(e -> {
if (count.get() > 0) {
logger.info("Decrementing count");
count.decrementAndGet();
}
}).then(Mono.fromRunnable(() -> {
if (count.get() > 0) {
logger.info("Decrementing count");
count.decrementAndGet();
}
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ public class GrpcServerService extends ExternalScalerImplBase {

@Override
public void isActive(ScaledObjectRef request, StreamObserver<IsActiveResponse> responseObserver) {
responseObserver.onNext(IsActiveResponse.newBuilder().setResult(scaler.isActive(request.getName())).build());
responseObserver.onNext(IsActiveResponse.newBuilder().setResult(scaler.isActive(request.getName(), request.getNamespace())).build());
responseObserver.onCompleted();
}

@Override
@Async
public void streamIsActive(ScaledObjectRef request, StreamObserver<IsActiveResponse> responseObserver) {
boolean active = scaler.isActive(request.getName());
boolean active = scaler.isActive(request.getName(), request.getNamespace());
responseObserver.onNext(IsActiveResponse.newBuilder().setResult(active).build());
while (true) {
try {
Expand All @@ -34,7 +34,7 @@ public void streamIsActive(ScaledObjectRef request, StreamObserver<IsActiveRespo
responseObserver.onError(e);
Thread.currentThread().interrupt();
}
boolean update = scaler.isActive(request.getName());
boolean update = scaler.isActive(request.getName(), request.getNamespace());
if (update != active) {
active = update;
responseObserver.onNext(IsActiveResponse.newBuilder().setResult(active).build());
Expand All @@ -58,7 +58,7 @@ public void getMetrics(GetMetricsRequest request, StreamObserver<GetMetricsRespo
responseObserver.onNext(GetMetricsResponse.newBuilder()
.addMetricValues(MetricValue.newBuilder()
.setMetricName("requests")
.setMetricValue(scaler.getMetric(request.getScaledObjectRef().getName()))
.setMetricValue(scaler.getMetric(request.getScaledObjectRef().getName(), request.getScaledObjectRef().getNamespace()))
.build())
.build());
responseObserver.onCompleted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ public class ScalerEndpoint {
}

@ReadOperation
public boolean status(String name) {
return scaler.isActive(name);
public boolean status(String name, String namespace) {
return scaler.isActive(name, namespace);
}

@WriteOperation
public void toggle(String name) {
scaler.setActive(name, !scaler.isActive(name));
public void toggle(String name, String namespace) {
scaler.setActive(name, namespace, !scaler.isActive(name, namespace));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

public interface ScalerService {

boolean isActive(String name);
boolean isActive(String name, String namespace);

void setActive(String name, boolean active);
void setActive(String name, String namespace, boolean active);

long getMetric(String name);
long getMetric(String name, String namespace);

}

0 comments on commit 3bf9428

Please sign in to comment.