Skip to content

Commit

Permalink
feat(api): throw an exception when the api service fails to start.
Browse files Browse the repository at this point in the history
  • Loading branch information
halibobo1205 committed May 22, 2024
1 parent 5bbff44 commit 50b8227
Show file tree
Hide file tree
Showing 21 changed files with 839 additions and 914 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.tron.common.application;

import com.google.common.base.Objects;
import java.util.concurrent.CompletableFuture;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.tron.core.config.args.Args;
Expand All @@ -12,30 +13,36 @@ public abstract class AbstractService implements Service {
protected int port;
@Getter
protected boolean enable;
@Getter
protected final String name = this.getClass().getSimpleName();


@Override
public void start() {
public CompletableFuture<?> start() {
logger.info("{} starting on {}", name, port);
final CompletableFuture<?> resultFuture = new CompletableFuture<>();
try {
innerStart();
resultFuture.complete(null);
logger.info("{} started, listening on {}", name, port);
} catch (Exception e) {
logger.error("{}", name, e);
resultFuture.completeExceptionally(e);
}
return resultFuture;
}

@Override
public void stop() {
public CompletableFuture<?> stop() {
logger.info("{} shutdown...", name);
final CompletableFuture<?> resultFuture = new CompletableFuture<>();
try {
innerStop();
resultFuture.complete(null);
logger.info("{} shutdown complete", name);
} catch (Exception e) {
logger.warn("{}", name, e);
resultFuture.completeExceptionally(e);
}
logger.info("{} shutdown complete", name);


return resultFuture;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,61 @@

package org.tron.common.application;

import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.jetty.server.ConnectionLimit;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.tron.core.config.args.Args;

@Slf4j(topic = "rpc")
public abstract class HttpService extends AbstractService {

protected Server apiServer;

protected String contextPath;

@Override
public void innerStart() throws Exception {
if (apiServer != null) {
apiServer.start();
if (this.apiServer != null) {
this.apiServer.start();
}
}

@Override
public void innerStop() throws Exception {
if (apiServer != null) {
apiServer.stop();
if (this.apiServer != null) {
this.apiServer.stop();
}
}

@Override
public CompletableFuture<?> start() {
initServer();
ServletContextHandler context = initContextHandler();
addServlet(context);
addFilter(context);
return super.start();
}

protected void initServer() {
this.apiServer = new Server(this.port);
int maxHttpConnectNumber = Args.getInstance().getMaxHttpConnectNumber();
if (maxHttpConnectNumber > 0) {
this.apiServer.addBean(new ConnectionLimit(maxHttpConnectNumber, this.apiServer));
}
}

protected ServletContextHandler initContextHandler() {
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath(this.contextPath);
this.apiServer.setHandler(context);
return context;
}

protected abstract void addServlet(ServletContextHandler context);

protected void addFilter(ServletContextHandler context) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,96 @@
package org.tron.common.application;

import io.grpc.Server;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.protobuf.services.ProtoReflectionService;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.tron.common.es.ExecutorServiceManager;
import org.tron.common.parameter.CommonParameter;
import org.tron.core.config.args.Args;
import org.tron.core.services.filter.LiteFnQueryGrpcInterceptor;
import org.tron.core.services.ratelimiter.RateLimiterInterceptor;
import org.tron.core.services.ratelimiter.RpcApiAccessInterceptor;

@Slf4j(topic = "rpc")
public abstract class RpcService extends AbstractService {

protected Server apiServer;
private Server apiServer;
protected String executorName;

@Autowired
private RateLimiterInterceptor rateLimiterInterceptor;

@Autowired
private LiteFnQueryGrpcInterceptor liteFnQueryGrpcInterceptor;

@Autowired
private RpcApiAccessInterceptor apiAccessInterceptor;

@Override
public void innerStart() throws Exception {
if (apiServer != null) {
apiServer.start();
if (this.apiServer != null) {
this.apiServer.start();
}
}

@Override
public void innerStop() throws Exception {
if (apiServer != null) {
apiServer.shutdown().awaitTermination(5, TimeUnit.SECONDS);
if (this.apiServer != null) {
this.apiServer.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
}

@Override
public CompletableFuture<?> start() {
NettyServerBuilder serverBuilder = initServerBuilder();
addService(serverBuilder);
addInterceptor(serverBuilder);
initServer(serverBuilder);
this.rateLimiterInterceptor.init(this.apiServer);
return super.start();
}

protected NettyServerBuilder initServerBuilder() {
NettyServerBuilder serverBuilder = NettyServerBuilder.forPort(this.port);
CommonParameter parameter = Args.getInstance();
if (parameter.getRpcThreadNum() > 0) {
serverBuilder = serverBuilder
.executor(ExecutorServiceManager.newFixedThreadPool(
this.executorName, parameter.getRpcThreadNum()));
}
// Set configs from config.conf or default value
serverBuilder
.maxConcurrentCallsPerConnection(parameter.getMaxConcurrentCallsPerConnection())
.flowControlWindow(parameter.getFlowControlWindow())
.maxConnectionIdle(parameter.getMaxConnectionIdleInMillis(), TimeUnit.MILLISECONDS)
.maxConnectionAge(parameter.getMaxConnectionAgeInMillis(), TimeUnit.MILLISECONDS)
.maxInboundMessageSize(parameter.getMaxMessageSize())
.maxHeaderListSize(parameter.getMaxHeaderListSize());

if (parameter.isRpcReflectionServiceEnable()) {
serverBuilder.addService(ProtoReflectionService.newInstance());
}
return serverBuilder;
}

protected abstract void addService(NettyServerBuilder serverBuilder);

protected void addInterceptor(NettyServerBuilder serverBuilder) {
// add a ratelimiter interceptor
serverBuilder.intercept(this.rateLimiterInterceptor);

// add api access interceptor
serverBuilder.intercept(this.apiAccessInterceptor);

// add lite fullnode query interceptor
serverBuilder.intercept(this.liteFnQueryGrpcInterceptor);
}

protected void initServer(NettyServerBuilder serverBuilder) {
this.apiServer = serverBuilder.build();
}

}
18 changes: 16 additions & 2 deletions framework/src/main/java/org/tron/common/application/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,25 @@

package org.tron.common.application;

import java.util.concurrent.CompletableFuture;

public interface Service {

void start();
/**
* Starts the service and all needed backend systems.
*
* @return completion state
*/
CompletableFuture<?> start();

void stop();
/**
* Stops the service and performs needed cleanup.
*
* @return completion state
*/
CompletableFuture<?> stop();

boolean isEnable();

String getName();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
package org.tron.common.application;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -43,13 +47,45 @@ private void initEnabledServices() {

void start() {
logger.info("Starting api services.");
this.enabledServices.forEach(Service::start);
this.enabledServices.forEach(this::waitForServiceToStart);
logger.info("All api services started.");
}

void stop() {
logger.info("Stopping api services.");
this.enabledServices.forEach(Service::stop);
this.enabledServices.forEach(this::waitForServiceToStop);
logger.info("All api services stopped.");
}

private void waitForServiceToStart(Service service) {
final String serviceName = service.getName();
final CompletableFuture<?> startFuture = service.start();
do {
try {
startFuture.get(60, TimeUnit.SECONDS);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Interrupted while waiting for service to start", e);
} catch (final ExecutionException e) {
throw new IllegalStateException("Service " + serviceName + " failed to start", e);
} catch (final TimeoutException e) {
logger.warn("Service {} is taking an unusually long time to start", serviceName);
}
} while (!startFuture.isDone());
}

private void waitForServiceToStop(Service service) {
final String serviceName = service.getName();
final CompletableFuture<?> stopFuture = service.stop();
try {
stopFuture.get(30, TimeUnit.SECONDS);
} catch (final InterruptedException e) {
logger.debug("Interrupted while waiting for service {} to complete", serviceName, e);
Thread.currentThread().interrupt();
} catch (final ExecutionException e) {
logger.error("Service {} failed to shutdown", serviceName, e);
} catch (final TimeoutException e) {
logger.error("Service {} did not shut down cleanly", serviceName);
}
}
}
Loading

0 comments on commit 50b8227

Please sign in to comment.