Skip to content

Commit

Permalink
resolve allegro#1738 | Add jetty http client request processing time …
Browse files Browse the repository at this point in the history
…metrics
  • Loading branch information
SkySurferOne committed Oct 18, 2023
1 parent 4e9f8c7 commit 3f777ff
Show file tree
Hide file tree
Showing 10 changed files with 124 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pl.allegro.tech.hermes.common.metric;

import io.micrometer.core.instrument.MeterRegistry;
import pl.allegro.tech.hermes.metrics.HermesTimer;

import java.util.function.ToDoubleFunction;

Expand Down Expand Up @@ -66,4 +67,32 @@ public <T> void registerHttp2SerialClientConnectionsGauge(T obj, ToDoubleFunctio
public <T> void registerHttp2SerialClientPendingConnectionsGauge(T obj, ToDoubleFunction<T> f) {
gaugeRegistrar.registerGauge(CONSUMER_SENDER_HTTP_2_SERIAL_CLIENT_PENDING_CONNECTIONS, obj, f);
}

public HermesTimer http1SerialClientRequestQueueWaitingTimer() {
return HermesTimer.from(
meterRegistry.timer(Timers.CONSUMER_SENDER_HTTP_1_SERIAL_CLIENT_REQUEST_QUEUE_WAITING_TIME),
hermesMetrics.timer(Timers.CONSUMER_SENDER_HTTP_1_SERIAL_CLIENT_REQUEST_QUEUE_WAITING_TIME)
);
}

public HermesTimer http2SerialClientRequestQueueWaitingTimer() {
return HermesTimer.from(
meterRegistry.timer(Timers.CONSUMER_SENDER_HTTP_2_SERIAL_CLIENT_REQUEST_QUEUE_WAITING_TIME),
hermesMetrics.timer(Timers.CONSUMER_SENDER_HTTP_2_SERIAL_CLIENT_REQUEST_QUEUE_WAITING_TIME)
);
}

public HermesTimer http1SerialClientRequestProcessingTimer() {
return HermesTimer.from(
meterRegistry.timer(Timers.CONSUMER_SENDER_HTTP_1_SERIAL_CLIENT_REQUEST_PROCESSING_TIME),
hermesMetrics.timer(Timers.CONSUMER_SENDER_HTTP_1_SERIAL_CLIENT_REQUEST_PROCESSING_TIME)
);
}

public HermesTimer http2SerialClientRequestProcessingTimer() {
return HermesTimer.from(
meterRegistry.timer(Timers.CONSUMER_SENDER_HTTP_2_SERIAL_CLIENT_REQUEST_PROCESSING_TIME),
hermesMetrics.timer(Timers.CONSUMER_SENDER_HTTP_2_SERIAL_CLIENT_REQUEST_PROCESSING_TIME)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,13 @@ public class Timers {
public static final String CONSUMER_IDLE_TIME = "idle-time." + GROUP + "." + TOPIC + "." + SUBSCRIPTION;

public static final String OAUTH_PROVIDER_TOKEN_REQUEST_LATENCY = "oauth.provider." + OAUTH_PROVIDER_NAME + ".token-request-latency";

public static final String CONSUMER_SENDER_HTTP_1_SERIAL_CLIENT_REQUEST_QUEUE_WAITING_TIME =
"http-clients.serial.http1.request-queue-waiting-time";
public static final String CONSUMER_SENDER_HTTP_1_SERIAL_CLIENT_REQUEST_PROCESSING_TIME =
"http-clients.serial.http1.request-processing-time";
public static final String CONSUMER_SENDER_HTTP_2_SERIAL_CLIENT_REQUEST_QUEUE_WAITING_TIME =
"http-clients.serial.http2.request-queue-waiting-time";
public static final String CONSUMER_SENDER_HTTP_2_SERIAL_CLIENT_REQUEST_PROCESSING_TIME =
"http-clients.serial.http2.request-processing-time";
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,9 @@ public HttpHeadersProvidersFactory emptyHttpHeadersProvidersFactory() {

@Bean
public HttpClientsFactory httpClientsFactory(InstrumentedExecutorServiceFactory executorFactory,
SslContextFactoryProvider sslContextFactoryProvider) {
return new HttpClientsFactory(executorFactory, sslContextFactoryProvider);
SslContextFactoryProvider sslContextFactoryProvider,
MetricsFacade metricsFacade) {
return new HttpClientsFactory(executorFactory, sslContextFactoryProvider, metricsFacade.consumerSender());
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class Http1ClientProperties implements Http1ClientParameters {

private Duration connectionTimeout = Duration.ofSeconds(15);

private boolean requestProcessingMonitoringEnabled = false;

@Override
public int getThreadPoolSize() {
Expand Down Expand Up @@ -83,4 +84,13 @@ public Duration getConnectionTimeout() {
public void setConnectionTimeout(Duration connectionTimeout) {
this.connectionTimeout = connectionTimeout;
}

@Override
public boolean isRequestProcessingMonitoringEnabled() {
return requestProcessingMonitoringEnabled;
}

public void setRequestProcessingMonitoringEnabled(boolean requestProcessingMonitoringEnabled) {
this.requestProcessingMonitoringEnabled = requestProcessingMonitoringEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public class Http2ClientProperties implements Http2ClientParameters {

private Duration connectionTimeout = Duration.ofSeconds(15);

private boolean requestProcessingMonitoringEnabled = false;

public boolean isEnabled() {
return enabled;
}
Expand Down Expand Up @@ -81,4 +83,13 @@ public Duration getConnectionTimeout() {
public void setConnectionTimeout(Duration connectionTimeout) {
this.connectionTimeout = connectionTimeout;
}

@Override
public boolean isRequestProcessingMonitoringEnabled() {
return this.requestProcessingMonitoringEnabled;
}

public void setRequestProcessingMonitoringEnabled(boolean requestProcessingMonitoringEnabled) {
this.requestProcessingMonitoringEnabled = requestProcessingMonitoringEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@ public interface HttpClientParameters {
int getMaxRequestsQueuedPerDestination();

Duration getConnectionTimeout();

boolean isRequestProcessingMonitoringEnabled();
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.util.HttpCookieStore;
import pl.allegro.tech.hermes.common.metric.ConsumerSenderMetrics;
import pl.allegro.tech.hermes.common.metric.executor.InstrumentedExecutorServiceFactory;

import java.util.concurrent.ExecutorService;
Expand All @@ -15,11 +16,17 @@ public class HttpClientsFactory {
private final InstrumentedExecutorServiceFactory executorFactory;
private final SslContextFactoryProvider sslContextFactoryProvider;

private final ConsumerSenderMetrics consumerSenderMetrics;


public HttpClientsFactory(
InstrumentedExecutorServiceFactory executorFactory,
SslContextFactoryProvider sslContextFactoryProvider) {
SslContextFactoryProvider sslContextFactoryProvider,
ConsumerSenderMetrics consumerSenderMetrics
) {
this.executorFactory = executorFactory;
this.sslContextFactoryProvider = sslContextFactoryProvider;
this.consumerSenderMetrics = consumerSenderMetrics;
}

public HttpClient createClientForHttp1(String name, Http1ClientParameters http1ClientParameters) {
Expand All @@ -40,6 +47,14 @@ public HttpClient createClientForHttp1(String name, Http1ClientParameters http1C
client.setIdleTimeout(http1ClientParameters.getIdleTimeout().toMillis());
client.setFollowRedirects(http1ClientParameters.isFollowRedirectsEnabled());
client.setConnectTimeout(http1ClientParameters.getConnectionTimeout().toMillis());
if (http1ClientParameters.isRequestProcessingMonitoringEnabled()) {
client.getRequestListeners().add(
new JettyHttpClientMetrics(
consumerSenderMetrics.http1SerialClientRequestQueueWaitingTimer(),
consumerSenderMetrics.http1SerialClientRequestProcessingTimer()
)
);
}
return client;
}

Expand All @@ -48,7 +63,7 @@ public HttpClient createClientForHttp2(String name, Http2ClientParameters http2C
sslContextFactoryProvider.provideSslContextFactory()
.ifPresentOrElse(clientConnector::setSslContextFactory,
() -> {
throw new IllegalStateException("Cannot create http/2 client due to lack of ssl context factory");
throw new IllegalStateException("Cannot create http/2 client due to lack of ssl context factory");
});
HTTP2Client http2Client = new HTTP2Client(clientConnector);

Expand All @@ -66,6 +81,14 @@ public HttpClient createClientForHttp2(String name, Http2ClientParameters http2C
client.setIdleTimeout(http2ClientParameters.getIdleTimeout().toMillis());
client.setFollowRedirects(http2ClientParameters.isFollowRedirectsEnabled());
client.setConnectTimeout(http2ClientParameters.getConnectionTimeout().toMillis());
if (http2ClientParameters.isRequestProcessingMonitoringEnabled()) {
client.getRequestListeners().add(
new JettyHttpClientMetrics(
consumerSenderMetrics.http2SerialClientRequestQueueWaitingTimer(),
consumerSenderMetrics.http2SerialClientRequestProcessingTimer()
)
);
}
return client;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package pl.allegro.tech.hermes.consumers.consumer.sender.http;

import org.eclipse.jetty.client.api.Request;
import pl.allegro.tech.hermes.metrics.HermesTimer;

public class JettyHttpClientMetrics implements Request.Listener {

private final HermesTimer requestQueueWaitingTimer;
private final HermesTimer requestProcessingTimer;

public JettyHttpClientMetrics(HermesTimer requestQueueWaitingTimer, HermesTimer requestProcessingTimer) {
this.requestQueueWaitingTimer = requestQueueWaitingTimer;
this.requestProcessingTimer = requestProcessingTimer;
}

@Override
public void onQueued(Request request) {
var timer = requestQueueWaitingTimer.time();

request.onRequestBegin(onBeginRequest -> timer.close());
}

@Override
public void onBegin(Request request) {
var timer = requestProcessingTimer.time();

request.onComplete(result -> timer.close());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class HttpClientConnectionMonitoringTest extends Specification {
ConsumerSenderConfiguration consumerConfiguration = new ConsumerSenderConfiguration();
client = consumerConfiguration.http1SerialClient(new HttpClientsFactory(
new InstrumentedExecutorServiceFactory(threadPoolMetrics),
sslContextFactoryProvider), new Http1ClientProperties()
sslContextFactoryProvider, metrics.consumerSender()), new Http1ClientProperties()
)
batchClient = Mock(HttpClient)
client.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.junit.Test;
import pl.allegro.tech.hermes.api.EndpointAddress;
import pl.allegro.tech.hermes.api.EndpointAddressResolverMetadata;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.common.metric.executor.InstrumentedExecutorServiceFactory;
import pl.allegro.tech.hermes.common.metric.executor.ThreadPoolMetrics;
import pl.allegro.tech.hermes.consumers.config.ConsumerSenderConfiguration;
Expand Down Expand Up @@ -55,6 +56,8 @@ public class JettyMessageSenderTest {

private final HttpHeadersProvider headersProvider = new HermesHeadersProvider(Collections.singleton(new Http1HeadersProvider()));

private static final MetricsFacade metricsFacade = TestMetricsFacadeFactory.create();

@BeforeClass
public static void setupEnvironment() throws Exception {
wireMockServer = new WireMockServer(ENDPOINT_PORT);
Expand All @@ -67,7 +70,7 @@ public static void setupEnvironment() throws Exception {
new InstrumentedExecutorServiceFactory(
new ThreadPoolMetrics(TestMetricsFacadeFactory.create())
),
sslContextFactoryProvider),
sslContextFactoryProvider, metricsFacade.consumerSender()),
new Http1ClientProperties()
);
client.start();
Expand Down Expand Up @@ -256,4 +259,4 @@ public void shouldUseSuppliedSocketTimeout() throws ExecutionException, Interrup
// then
assertThat(messageSendingResult.isTimeout()).isTrue();
}
}
}

0 comments on commit 3f777ff

Please sign in to comment.