Skip to content

Commit

Permalink
using keda
Browse files Browse the repository at this point in the history
  • Loading branch information
dashaun committed Feb 6, 2024
1 parent 4c76eb7 commit 3e17366
Show file tree
Hide file tree
Showing 7 changed files with 297 additions and 25 deletions.
56 changes: 56 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,48 @@
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-brave</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-reporter-brave</artifactId>
</dependency>
<dependency>
<groupId>net.devh</groupId>
<artifactId>grpc-server-spring-boot-starter</artifactId>
<version>2.15.0.RELEASE</version>
<exclusions>
<exclusion>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>net.devh</groupId>
<artifactId>grpc-client-spring-boot-starter</artifactId>
<version>2.15.0.RELEASE</version>
<exclusions>
<exclusion>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>1.58.1</version>
</dependency>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
Expand Down Expand Up @@ -103,6 +141,24 @@
</image>
</configuration>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.24.4:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.58.1:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
</plugin>
</plugins>
</build>
</project>
76 changes: 76 additions & 0 deletions src/main/java/com/javagrunt/service/gateway/Application.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,88 @@

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.factory.RetryGatewayFilterFactory;
import org.springframework.cloud.gateway.route.RouteLocator;
import org.springframework.cloud.gateway.route.builder.RouteLocatorBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;

@SpringBootApplication
public class Application {

private final String hello = "http://hello-service";

private final String root = "https://javagrunt.github.io";

private final RequestCountingFilter scaler = new RequestCountingFilter();

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

@Bean
public ScalerService scaler() {
return scaler; // new SimpleScalerService();
}

@Bean
RouteLocator gateway(RouteLocatorBuilder rlb) {
return rlb.routes()
.route(r -> r.path("/hello-service/**").filters(f -> f.stripPrefix(1).filter(scaler)).uri(hello))
.route(r -> r.path("/**").uri(root))
.build();
}
}

class RequestCountingFilter implements GatewayFilter, ScalerService {

private final AtomicInteger count = new AtomicInteger();

private volatile boolean active = true;

private final GatewayFilter filter;

RequestCountingFilter() {
RetryGatewayFilterFactory.RetryConfig config = new RetryGatewayFilterFactory.RetryConfig();
config.setBackoff(Duration.ofMillis(300), Duration.ofSeconds(1), 2, true);
config.setRetries(10);
filter = new RetryGatewayFilterFactory().apply(config);
}

@Override
public long getMetric(String name) {
return count.get();
}

@Override
public boolean isActive(String name) {
return active && count.get() > 0;
}

@Override
public void setActive(String name, boolean active) {
count.set(0);
this.active = active;
}

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
count.incrementAndGet();
return this.filter.filter(exchange, chain).doOnError(e -> {
if (count.get() > 0) {
count.decrementAndGet();
}
}).then(Mono.fromRunnable(() -> {
if (count.get() > 0) {
count.decrementAndGet();
}
}));
}

}
67 changes: 67 additions & 0 deletions src/main/java/com/javagrunt/service/gateway/GrpcServerService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.javagrunt.service.gateway;

import externalscaler.ExternalScalerGrpc.ExternalScalerImplBase;
import externalscaler.Externalscaler.*;
import io.grpc.stub.StreamObserver;
import net.devh.boot.grpc.server.service.GrpcService;
import org.springframework.scheduling.annotation.Async;

@GrpcService
public class GrpcServerService extends ExternalScalerImplBase {

private final ScalerService scaler;

GrpcServerService(ScalerService scaler) {
this.scaler = scaler;
}

@Override
public void isActive(ScaledObjectRef request, StreamObserver<IsActiveResponse> responseObserver) {
responseObserver.onNext(IsActiveResponse.newBuilder().setResult(scaler.isActive(request.getName())).build());
responseObserver.onCompleted();
}

@Override
@Async
public void streamIsActive(ScaledObjectRef request, StreamObserver<IsActiveResponse> responseObserver) {
boolean active = scaler.isActive(request.getName());
responseObserver.onNext(IsActiveResponse.newBuilder().setResult(active).build());
while (true) {
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
responseObserver.onError(e);
Thread.currentThread().interrupt();
}
boolean update = scaler.isActive(request.getName());
if (update != active) {
active = update;
responseObserver.onNext(IsActiveResponse.newBuilder().setResult(active).build());
}
}
}

@Override
public void getMetricSpec(ScaledObjectRef request, StreamObserver<GetMetricSpecResponse> responseObserver) {
responseObserver.onNext(GetMetricSpecResponse.newBuilder()
.addMetricSpecs(MetricSpec.newBuilder()
.setMetricName("requests")
.setTargetSize(Integer.parseInt(request.getScalerMetadataOrDefault("threshold", "3")))
.build())
.build());
responseObserver.onCompleted();
}

@Override
public void getMetrics(GetMetricsRequest request, StreamObserver<GetMetricsResponse> responseObserver) {
responseObserver.onNext(GetMetricsResponse.newBuilder()
.addMetricValues(MetricValue.newBuilder()
.setMetricName("requests")
.setMetricValue(scaler.getMetric(request.getScaledObjectRef().getName()))
.build())
.build());
responseObserver.onCompleted();
}

}
28 changes: 28 additions & 0 deletions src/main/java/com/javagrunt/service/gateway/ScalerEndpoint.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.javagrunt.service.gateway;

import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
import org.springframework.boot.actuate.endpoint.annotation.ReadOperation;
import org.springframework.boot.actuate.endpoint.annotation.WriteOperation;
import org.springframework.stereotype.Component;

@Component
@Endpoint(id = "scaler")
public class ScalerEndpoint {

private final ScalerService scaler;

ScalerEndpoint(ScalerService scaler) {
this.scaler = scaler;
}

@ReadOperation
public boolean status(String name) {
return scaler.isActive(name);
}

@WriteOperation
public void toggle(String name) {
scaler.setActive(name, !scaler.isActive(name));
}

}
11 changes: 11 additions & 0 deletions src/main/java/com/javagrunt/service/gateway/ScalerService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.javagrunt.service.gateway;

public interface ScalerService {

boolean isActive(String name);

void setActive(String name, boolean active);

long getMetric(String name);

}
44 changes: 44 additions & 0 deletions src/main/proto/externalscaler.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
syntax = "proto3";

package externalscaler;
option go_package = ".;externalscaler";

service ExternalScaler {
rpc IsActive(ScaledObjectRef) returns (IsActiveResponse) {}
rpc StreamIsActive(ScaledObjectRef) returns (stream IsActiveResponse) {}
rpc GetMetricSpec(ScaledObjectRef) returns (GetMetricSpecResponse) {}
rpc GetMetrics(GetMetricsRequest) returns (GetMetricsResponse) {}
}

message ScaledObjectRef {
string name = 1;
string namespace = 2;
map<string, string> scalerMetadata = 3;
}

message IsActiveResponse {
bool result = 1;
}

message GetMetricSpecResponse {
repeated MetricSpec metricSpecs = 1;
}

message MetricSpec {
string metricName = 1;
int64 targetSize = 2;
}

message GetMetricsRequest {
ScaledObjectRef scaledObjectRef = 1;
string metricName = 2;
}

message GetMetricsResponse {
repeated MetricValue metricValues = 1;
}

message MetricValue {
string metricName = 1;
int64 metricValue = 2;
}
40 changes: 15 additions & 25 deletions src/main/resources/application.yaml
Original file line number Diff line number Diff line change
@@ -1,39 +1,29 @@
management:
endpoints:
enabled-by-default: true
web:
exposure:
include: "health, metrics, prometheus, info"
include: "health, metrics, prometheus, info, env, scaler"
endpoint:
health:
show-details: always
probes:
add-additional-paths: true
env:
show-values: always
info:
env:
enabled: true
java:
enabled: true
os:
enabled: true
zipkin:
tracing:
endpoint: "${ZIPKIN_ENDPOINT:http://localhost:9411/api/v2/spans}"

spring:
application:
name: gateway-service
cloud:
gateway:
routes:
- id: youtube-listener
uri: http://youtube-listener.javagrunt-com.svc.cluster.local
predicates:
- Path=/youtube-listener/**
filters:
- RewritePath=/youtube-listener/(?<segment>.*), /$\{segment}
- id: youtube-service
uri: http://youtube-service.javagrunt-com.svc.cluster.local
predicates:
- Path=/youtube-service/**
filters:
- RewritePath=/youtube-service/(?<segment>.*), /$\{segment}
- id: zipkin
uri: http://openzipkin.javagrunt-com.svc.cluster.local:9411
predicates:
- Path=/zipkin/**
- id: root
uri: https://javagrunt.github.io
predicates:
- Path=/**
data:
redis:
host: ${REDIS_HOST:localhost}
Expand Down

0 comments on commit 3e17366

Please sign in to comment.