Skip to content

Commit

Permalink
Add a new metric turms.client.request.pending to record the number …
Browse files Browse the repository at this point in the history
…of pending requests
  • Loading branch information
JamesChenX committed Aug 18, 2024
1 parent 655577d commit 3f1fc77
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@

package im.turms.gateway.access.client.common;

import java.util.concurrent.atomic.AtomicInteger;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;

import im.turms.gateway.domain.observation.service.MetricsService;
import im.turms.gateway.domain.servicerequest.service.ServiceRequestService;
import im.turms.gateway.domain.session.access.client.controller.SessionClientController;
import im.turms.gateway.domain.session.service.SessionService;
Expand All @@ -43,6 +46,7 @@
import im.turms.server.common.infra.healthcheck.ServiceAvailability;
import im.turms.server.common.infra.logging.core.logger.Logger;
import im.turms.server.common.infra.logging.core.logger.LoggerFactory;
import im.turms.server.common.infra.metrics.CommonMetricNameConst;
import im.turms.server.common.infra.property.TurmsPropertiesManager;
import im.turms.server.common.infra.proto.ProtoDecoder;
import im.turms.server.common.infra.proto.ProtoEncoder;
Expand Down Expand Up @@ -84,6 +88,8 @@ public class ClientRequestDispatcher {
private final ServiceRequestService serviceRequestService;
private final ServerStatusManager serverStatusManager;

private final AtomicInteger pendingRequestCount;

public ClientRequestDispatcher(
ApiLoggingContext apiLoggingContext,
BlocklistService blocklistService,
Expand All @@ -92,6 +98,7 @@ public ClientRequestDispatcher(
SessionService sessionService,
ServiceRequestService serviceRequestService,
ServerStatusManager serverStatusManager,
MetricsService metricsService,
TurmsPropertiesManager propertiesManager) {
this.apiLoggingContext = apiLoggingContext;
this.blocklistService = blocklistService;
Expand All @@ -100,6 +107,8 @@ public ClientRequestDispatcher(
this.sessionService = sessionService;
this.serviceRequestService = serviceRequestService;
this.serverStatusManager = serverStatusManager;
pendingRequestCount = metricsService.getRegistry()
.gauge(CommonMetricNameConst.TURMS_CLIENT_REQUEST_PENDING, new AtomicInteger());
NotificationFactory.init(propertiesManager);
}

Expand All @@ -113,6 +122,19 @@ public ClientRequestDispatcher(
public Mono<ByteBuf> handleRequest(
UserSessionWrapper sessionWrapper,
ByteBuf serviceRequestBuffer) {
pendingRequestCount.incrementAndGet();
try {
return handleRequest0(sessionWrapper, serviceRequestBuffer)
.doFinally(signalType -> pendingRequestCount.decrementAndGet());
} catch (Exception e) {
pendingRequestCount.decrementAndGet();
return Mono.error(e);
}
}

public Mono<ByteBuf> handleRequest0(
UserSessionWrapper sessionWrapper,
ByteBuf serviceRequestBuffer) {
// Check if it is a heartbeat request
if (!serviceRequestBuffer.isReadable()) {
serviceRequestBuffer.release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;

Expand Down Expand Up @@ -65,6 +66,7 @@
import im.turms.server.common.infra.tracing.TracingContext;
import im.turms.service.access.servicerequest.dto.ClientRequest;
import im.turms.service.access.servicerequest.dto.RequestHandlerResult;
import im.turms.service.domain.observation.service.MetricsService;
import im.turms.service.infra.logging.ApiLoggingContext;
import im.turms.service.infra.logging.ClientApiLogging;
import im.turms.service.infra.plugin.extension.ClientRequestTransformer;
Expand Down Expand Up @@ -98,6 +100,8 @@ public class ServiceRequestDispatcher implements IServiceRequestDispatcher {

private final FastEnumMap<TurmsRequest.KindCase, ClientRequestHandler> requestTypeToHandler;

private final AtomicInteger pendingRequestCount;

static {
try {
REQUEST_TRANSFORM_METHOD = ClientRequestTransformer.class.getDeclaredMethod("transform",
Expand All @@ -124,6 +128,7 @@ public ServiceRequestDispatcher(
ApiLoggingContext apiLoggingContext,
ApplicationContext context,
BlocklistService blocklistService,
MetricsService metricsService,
OutboundMessageManager outboundMessageManager,
ServerStatusManager serverStatusManager,
PluginManager pluginManager,
Expand All @@ -147,6 +152,8 @@ public ServiceRequestDispatcher(
+ requestType);
}
}
pendingRequestCount = metricsService.getRegistry()
.gauge(TURMS_CLIENT_REQUEST_PENDING, new AtomicInteger());
}

private FastEnumMap<TurmsRequest.KindCase, ClientRequestHandler> getMappings(
Expand Down Expand Up @@ -194,10 +201,13 @@ private boolean isRequestForGateway(TurmsRequest.KindCase type) {
@Override
public Mono<ServiceResponse> dispatch(TracingContext context, ServiceRequest serviceRequest) {
ByteBuf requestBuffer = serviceRequest.getTurmsRequestBuffer();
pendingRequestCount.incrementAndGet();
try {
requestBuffer.touch(serviceRequest);
return dispatch0(context, serviceRequest);
return dispatch0(context, serviceRequest)
.doFinally(unused -> pendingRequestCount.decrementAndGet());
} catch (Exception e) {
pendingRequestCount.decrementAndGet();
LOGGER.error("Failed to handle the request: {}", serviceRequest, e);
return Mono.just(
ServiceResponse.of(ResponseStatusCode.SERVER_INTERNAL_ERROR, e.toString()));
Expand Down

0 comments on commit 3f1fc77

Please sign in to comment.