Skip to content

Commit

Permalink
change the class JavaExecutionResult and move the startTime field fro…
Browse files Browse the repository at this point in the history
…m ComponentStatsManager to JavaExecutionResult.
  • Loading branch information
walkinggo committed Jan 7, 2025
1 parent 0344726 commit 5775db1
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
public class JavaExecutionResult {
private Throwable userException;
private Object result;
private final long startTime = System.nanoTime();
private double startTimeNanos;

public void reset() {
setUserException(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ public JavaExecutionResult handleMessage(Record<?> record, Object input,
}

JavaExecutionResult executionResult = new JavaExecutionResult();
executionResult.setStartTimeNanos(System.nanoTime());

final Object output;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.functions.instance;

import static org.apache.pulsar.functions.utils.FunctionCommon.convertFromFunctionDetailsSubscriptionPosition;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationConfig;
Expand All @@ -28,6 +29,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.scurrilous.circe.checksum.Crc32cIntChecksum;
import io.netty.buffer.ByteBuf;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand All @@ -40,6 +42,7 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;

import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -334,6 +337,7 @@ public void run() {
// set last invocation time
stats.setLastInvocation(System.currentTimeMillis());

// start time for process latency stat

// process the message
Thread.currentThread().setContextClassLoader(functionClassLoader);
Expand All @@ -344,7 +348,10 @@ public void run() {
asyncErrorHandler);
Thread.currentThread().setContextClassLoader(instanceClassLoader);


if (result != null) {
// register end time
stats.processTimeEnd(result.getStartTimeNanos());
// process the synchronous results
handleResult(currentRecord, result);
}
Expand Down Expand Up @@ -443,8 +450,6 @@ void handleResult(Record srcRecord, JavaExecutionResult result) throws Exception
// increment total successfully processed
stats.incrTotalProcessedSuccessfully();
}
// handle endTime here
stats.processTimeEnd(result.getStartTime());
}

private void sendOutputMessage(Record srcRecord, Object output) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
package org.apache.pulsar.functions.instance.stats;

import com.google.common.collect.EvictingQueue;

import java.io.IOException;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
Expand Down Expand Up @@ -100,8 +102,7 @@ public ComponentStatsManager(FunctionCollectorRegistry collectorRegistry,

public abstract void setLastInvocation(long ts);


public abstract void processTimeEnd(long startTime);
public abstract void processTimeEnd(double processTimeStart);

public abstract double getTotalProcessedSuccessfully();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.prometheus.client.Summary;

import java.util.Arrays;
import java.util.concurrent.ScheduledExecutorService;

import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -40,7 +42,9 @@ public class FunctionStatsManager extends ComponentStatsManager {

public static final String PULSAR_FUNCTION_METRICS_PREFIX = "pulsar_function_";

/** Declare metric names. **/
/**
* Declare metric names.
**/
public static final String PROCESSED_SUCCESSFULLY_TOTAL = "processed_successfully_total";
public static final String SYSTEM_EXCEPTIONS_TOTAL = "system_exceptions_total";
public static final String USER_EXCEPTIONS_TOTAL = "user_exceptions_total";
Expand All @@ -58,7 +62,9 @@ public class FunctionStatsManager extends ComponentStatsManager {
public static final String PROCESS_LATENCY_MS_1min = "process_latency_ms_1min";
public static final String RECEIVED_TOTAL_1min = "received_1min";

/** Declare Prometheus stats. **/
/**
* Declare Prometheus stats.
**/

final Counter statTotalProcessedSuccessfully;

Expand Down Expand Up @@ -126,140 +132,140 @@ public FunctionStatsManager(FunctionCollectorRegistry collectorRegistry,
statTotalProcessedSuccessfully = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL,
Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL)
.help("Total number of messages processed successfully.")
.labelNames(METRICS_LABEL_NAMES)
.create());
.name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL)
.help("Total number of messages processed successfully.")
.labelNames(METRICS_LABEL_NAMES)
.create());
statTotalProcessedSuccessfullyChild = statTotalProcessedSuccessfully.labels(metricsLabels);

statTotalSysExceptions = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL,
Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL)
.help("Total number of system exceptions.")
.labelNames(METRICS_LABEL_NAMES)
.create());
.name(PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL)
.help("Total number of system exceptions.")
.labelNames(METRICS_LABEL_NAMES)
.create());
statTotalSysExceptionsChild = statTotalSysExceptions.labels(metricsLabels);

statTotalUserExceptions = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + USER_EXCEPTIONS_TOTAL,
Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + USER_EXCEPTIONS_TOTAL)
.help("Total number of user exceptions.")
.labelNames(METRICS_LABEL_NAMES)
.create());
.name(PULSAR_FUNCTION_METRICS_PREFIX + USER_EXCEPTIONS_TOTAL)
.help("Total number of user exceptions.")
.labelNames(METRICS_LABEL_NAMES)
.create());
statTotalUserExceptionsChild = statTotalUserExceptions.labels(metricsLabels);

statProcessLatency = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS,
Summary.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS)
.help("Process latency in milliseconds.")
.quantile(0.5, 0.01)
.quantile(0.9, 0.01)
.quantile(0.99, 0.01)
.quantile(0.999, 0.01)
.labelNames(METRICS_LABEL_NAMES)
.create());
.name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS)
.help("Process latency in milliseconds.")
.quantile(0.5, 0.01)
.quantile(0.9, 0.01)
.quantile(0.99, 0.01)
.quantile(0.999, 0.01)
.labelNames(METRICS_LABEL_NAMES)
.create());
statProcessLatencyChild = statProcessLatency.labels(metricsLabels);

statlastInvocation = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + LAST_INVOCATION,
Gauge.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + LAST_INVOCATION)
.help("The timestamp of the last invocation of the function.")
.labelNames(METRICS_LABEL_NAMES)
.create());
.name(PULSAR_FUNCTION_METRICS_PREFIX + LAST_INVOCATION)
.help("The timestamp of the last invocation of the function.")
.labelNames(METRICS_LABEL_NAMES)
.create());
statlastInvocationChild = statlastInvocation.labels(metricsLabels);

statTotalRecordsReceived = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL,
Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL)
.help("Total number of messages received from source.")
.labelNames(METRICS_LABEL_NAMES)
.create());
.name(PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL)
.help("Total number of messages received from source.")
.labelNames(METRICS_LABEL_NAMES)
.create());
statTotalRecordsReceivedChild = statTotalRecordsReceived.labels(metricsLabels);

statTotalProcessedSuccessfully1min = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL_1min,
Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL_1min)
.help("Total number of messages processed successfully in the last 1 minute.")
.labelNames(METRICS_LABEL_NAMES)
.create());
.name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL_1min)
.help("Total number of messages processed successfully in the last 1 minute.")
.labelNames(METRICS_LABEL_NAMES)
.create());
statTotalProcessedSuccessfully1minChild = statTotalProcessedSuccessfully1min.labels(metricsLabels);

statTotalSysExceptions1min = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL_1min,
Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL_1min)
.help("Total number of system exceptions in the last 1 minute.")
.labelNames(METRICS_LABEL_NAMES)
.create());
.name(PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL_1min)
.help("Total number of system exceptions in the last 1 minute.")
.labelNames(METRICS_LABEL_NAMES)
.create());
statTotalSysExceptions1minChild = statTotalSysExceptions1min.labels(metricsLabels);

statTotalUserExceptions1min = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + USER_EXCEPTIONS_TOTAL_1min,
Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + USER_EXCEPTIONS_TOTAL_1min)
.help("Total number of user exceptions in the last 1 minute.")
.labelNames(METRICS_LABEL_NAMES)
.create());
.name(PULSAR_FUNCTION_METRICS_PREFIX + USER_EXCEPTIONS_TOTAL_1min)
.help("Total number of user exceptions in the last 1 minute.")
.labelNames(METRICS_LABEL_NAMES)
.create());
statTotalUserExceptions1minChild = statTotalUserExceptions1min.labels(metricsLabels);

statProcessLatency1min = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS_1min,
Summary.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS_1min)
.help("Process latency in milliseconds in the last 1 minute.")
.quantile(0.5, 0.01)
.quantile(0.9, 0.01)
.quantile(0.99, 0.01)
.quantile(0.999, 0.01)
.labelNames(METRICS_LABEL_NAMES)
.create());
.name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS_1min)
.help("Process latency in milliseconds in the last 1 minute.")
.quantile(0.5, 0.01)
.quantile(0.9, 0.01)
.quantile(0.99, 0.01)
.quantile(0.999, 0.01)
.labelNames(METRICS_LABEL_NAMES)
.create());
statProcessLatency1minChild = statProcessLatency1min.labels(metricsLabels);

statTotalRecordsReceived1min = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL_1min,
Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL_1min)
.help("Total number of messages received from source in the last 1 minute.")
.labelNames(METRICS_LABEL_NAMES)
.create());
.name(PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL_1min)
.help("Total number of messages received from source in the last 1 minute.")
.labelNames(METRICS_LABEL_NAMES)
.create());
statTotalRecordsReceivedChild1min = statTotalRecordsReceived1min.labels(metricsLabels);

userExceptions = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + "user_exception",
Gauge.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + "user_exception")
.labelNames(EXCEPTION_METRICS_LABEL_NAMES)
.help("Exception from user code.")
.create());
.name(PULSAR_FUNCTION_METRICS_PREFIX + "user_exception")
.labelNames(EXCEPTION_METRICS_LABEL_NAMES)
.help("Exception from user code.")
.create());
sysExceptions = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + "system_exception",
Gauge.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + "system_exception")
.labelNames(EXCEPTION_METRICS_LABEL_NAMES)
.help("Exception from system code.")
.create());
.name(PULSAR_FUNCTION_METRICS_PREFIX + "system_exception")
.labelNames(EXCEPTION_METRICS_LABEL_NAMES)
.help("Exception from system code.")
.create());

sourceExceptions = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + "source_exception",
Gauge.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + "source_exception")
.labelNames(EXCEPTION_METRICS_LABEL_NAMES)
.help("Exception from source.")
.create());
.name(PULSAR_FUNCTION_METRICS_PREFIX + "source_exception")
.labelNames(EXCEPTION_METRICS_LABEL_NAMES)
.help("Exception from source.")
.create());

sinkExceptions = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + "sink_exception",
Gauge.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + "sink_exception")
.labelNames(EXCEPTION_METRICS_LABEL_NAMES)
.help("Exception from sink.")
.create());
.name(PULSAR_FUNCTION_METRICS_PREFIX + "sink_exception")
.labelNames(EXCEPTION_METRICS_LABEL_NAMES)
.help("Exception from sink.")
.create());

userExceptionRateLimiter = RateLimiter.create(5.0d / 60.0d);
sysExceptionRateLimiter = RateLimiter.create(5.0d / 60.0d);
Expand Down Expand Up @@ -337,10 +343,9 @@ public void setLastInvocation(long ts) {
}



@Override
public void processTimeEnd(long startTime) {
double endTimeMs = ((double) System.nanoTime() - startTime) / 1.0E6D;
public void processTimeEnd(double processTimeStart) {
double endTimeMs = ((double) System.nanoTime() - processTimeStart) / 1.0E6D;
statProcessLatencyChild.observe(endTimeMs);
statProcessLatency1minChild.observe(endTimeMs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import com.google.common.util.concurrent.RateLimiter;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;

import java.util.Arrays;
import java.util.concurrent.ScheduledExecutorService;

import lombok.Getter;
import org.apache.pulsar.functions.proto.InstanceCommunication;

Expand Down Expand Up @@ -283,9 +285,8 @@ public void setLastInvocation(long ts) {
statlastInvocationChild.set(ts);
}


@Override
public void processTimeEnd(long startTime) {
public void processTimeEnd(double processTimeStart) {
//no-op
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import com.google.common.util.concurrent.RateLimiter;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;

import java.util.Arrays;
import java.util.concurrent.ScheduledExecutorService;

import lombok.Getter;
import org.apache.pulsar.functions.proto.InstanceCommunication;

Expand Down Expand Up @@ -283,9 +285,8 @@ public void setLastInvocation(long ts) {
statlastInvocationChild.set(ts);
}


@Override
public void processTimeEnd(long startTime) {
public void processTimeEnd(double processTimeStart) {
//no-op
}

Expand Down

0 comments on commit 5775db1

Please sign in to comment.