From ef36d8de854711064275ba081e93e118e6ed9538 Mon Sep 17 00:00:00 2001 From: walkinggo <40360529+walkinggo@users.noreply.github.com> Date: Mon, 6 Jan 2025 10:52:15 +0800 Subject: [PATCH 1/8] delete processTimeStart method in ComponentStatsManager --- .../instance/JavaInstanceRunnable.java | 56 +++++----- .../instance/stats/ComponentStatsManager.java | 7 +- .../instance/stats/FunctionStatsManager.java | 15 +-- .../instance/stats/SinkStatsManager.java | 100 +++++++++--------- .../instance/stats/SourceStatsManager.java | 100 +++++++++--------- 5 files changed, 136 insertions(+), 142 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 4f811c14704a0..0838f9d98f402 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -189,8 +189,8 @@ public JavaInstanceRunnable(InstanceConfig instanceConfig, this.secretsProvider = secretsProvider; this.componentClassLoader = componentClassLoader; this.functionClassLoader = transformFunctionClassLoader != null - ? transformFunctionClassLoader - : componentClassLoader; + ? transformFunctionClassLoader + : componentClassLoader; this.metricsLabels = new String[]{ instanceConfig.getFunctionDetails().getTenant(), String.format("%s/%s", instanceConfig.getFunctionDetails().getTenant(), @@ -235,7 +235,7 @@ private synchronized void setup() throws Exception { ThreadContext.put("instance", instanceConfig.getInstanceName()); log.info("Starting Java Instance {} : \n Details = {}", - instanceConfig.getFunctionDetails().getName(), instanceConfig.getFunctionDetails()); + instanceConfig.getFunctionDetails().getName(), instanceConfig.getFunctionDetails()); Object object; if (instanceConfig.getFunctionDetails().getClassName() @@ -293,8 +293,8 @@ ContextImpl setupContext() throws PulsarClientException { try { Thread.currentThread().setContextClassLoader(functionClassLoader); return new ContextImpl(instanceConfig, instanceLog, client, secretsProvider, - collectorRegistry, metricsLabels, this.componentType, this.stats, stateManager, - pulsarAdmin, clientBuilder, fatalHandler, producerCache); + collectorRegistry, metricsLabels, this.componentType, this.stats, stateManager, + pulsarAdmin, clientBuilder, fatalHandler, producerCache); } finally { Thread.currentThread().setContextClassLoader(clsLoader); } @@ -334,8 +334,6 @@ public void run() { // set last invocation time stats.setLastInvocation(System.currentTimeMillis()); - // start time for process latency stat - stats.processTimeStart(); // process the message Thread.currentThread().setContextClassLoader(functionClassLoader); @@ -401,9 +399,9 @@ private void setupStateStore() throws Exception { stateStoreProvider.init(stateStoreProviderConfig); StateStore store = stateStoreProvider.getStateStore( - instanceConfig.getFunctionDetails().getTenant(), - instanceConfig.getFunctionDetails().getNamespace(), - instanceConfig.getFunctionDetails().getName() + instanceConfig.getFunctionDetails().getTenant(), + instanceConfig.getFunctionDetails().getNamespace(), + instanceConfig.getFunctionDetails().getName() ); StateStoreContext context = new StateStoreContextImpl(); store.init(context); @@ -513,7 +511,7 @@ private OutputRecordSinkRecord encodeWithRecordSchemaAndDecodeWithSinkSchema(Rec if (isKeyValueSeparated && schema instanceof KeyValueSchema) { KeyValueSchema kvSchema = (KeyValueSchema) schema; finalSchema = KeyValueSchemaImpl.of(kvSchema.getKeySchema(), kvSchema.getValueSchema(), - KeyValueEncodingType.SEPARATED); + KeyValueEncodingType.SEPARATED); } else { finalSchema = schema; } @@ -548,7 +546,7 @@ record = this.source.read(); /** * NOTE: this method is be synchronized because it is potentially called by two different places - * one inside the run/finally clause and one inside the ThreadRuntime::stop. + * one inside the run/finally clause and one inside the ThreadRuntime::stop. */ @Override public synchronized void close() { @@ -678,8 +676,8 @@ private InstanceCommunication.MetricsData internalGetMetrics() { } private void internalResetMetrics() { - stats.reset(); - javaInstance.resetMetrics(); + stats.reset(); + javaInstance.resetMetrics(); } private Builder createMetricsDataBuilder() { @@ -834,7 +832,7 @@ private void setupInput(ContextImpl contextImpl) throws Exception { ); pulsarSourceConfig.setSkipToLatest( - sourceSpec.getSkipToLatest() + sourceSpec.getSkipToLatest() ); Objects.requireNonNull(contextImpl.getSubscriptionType()); @@ -874,12 +872,12 @@ private void setupInput(ContextImpl contextImpl) throws Exception { // check if source is a batch source if (sourceSpec.getClassName().equals(BatchSourceExecutor.class.getName())) { object = Reflections.createInstance( - sourceSpec.getClassName(), - this.instanceClassLoader); + sourceSpec.getClassName(), + this.instanceClassLoader); } else { object = Reflections.createInstance( - sourceSpec.getClassName(), - this.componentClassLoader); + sourceSpec.getClassName(), + this.componentClassLoader); } } @@ -911,8 +909,9 @@ private void setupInput(ContextImpl contextImpl) throws Exception { /** * Recursively interpolate configured secrets into the config map by calling * {@link SecretsProvider#interpolateSecretForValue(String)}. + * * @param secretsProvider - the secrets provider that will convert secret's values into config values. - * @param configs - the connector configuration map, which will be mutated. + * @param configs - the connector configuration map, which will be mutated. */ private static void interpolateSecretsIntoConfigs(SecretsProvider secretsProvider, Map configs) { @@ -939,12 +938,13 @@ static Map augmentAndFilterConnectorConfig(String connectorConfi SecretsProvider secretsProvider, ClassLoader componentClassLoader, org.apache.pulsar.functions.proto.Function - .FunctionDetails.ComponentType componentType) + .FunctionDetails.ComponentType componentType) throws IOException { final Map config = connectorConfigs.isEmpty() ? new HashMap<>() : ObjectMapperFactory .getMapper() .reader() - .forType(new TypeReference>() {}) + .forType(new TypeReference>() { + }) .readValue(connectorConfigs); if (componentType != org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK && componentType != org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) { @@ -959,7 +959,7 @@ static Map augmentAndFilterConnectorConfig(String connectorConfi configClassName = ConnectorUtils .getConnectorDefinition((NarClassLoader) componentClassLoader).getSourceConfigClass(); } else { - configClassName = ConnectorUtils + configClassName = ConnectorUtils .getConnectorDefinition((NarClassLoader) componentClassLoader).getSinkConfigClass(); } if (configClassName != null) { @@ -1105,7 +1105,7 @@ private static Schema getSinkSchema(Record record, Class clazz) { case AVRO: return AvroSchema.of(SchemaDefinition.builder() - .withPojo(clazz).build()); + .withPojo(clazz).build()); case JSON: return JSONSchema.of(SchemaDefinition.builder().withPojo(clazz).build()); @@ -1131,8 +1131,8 @@ private static SchemaType getSchemaTypeOrDefault(Record record, Class claz if (GenericObject.class.isAssignableFrom(clazz)) { return SchemaType.AUTO_CONSUME; } else if (byte[].class.equals(clazz) - || ByteBuf.class.equals(clazz) - || ByteBuffer.class.equals(clazz)) { + || ByteBuf.class.equals(clazz) + || ByteBuffer.class.equals(clazz)) { // if sink uses bytes, we should ignore return SchemaType.NONE; } else { @@ -1151,8 +1151,8 @@ private static SchemaType getSchemaTypeOrDefault(Record record, Class claz private static SchemaType getDefaultSchemaType(Class clazz) { if (byte[].class.equals(clazz) - || ByteBuf.class.equals(clazz) - || ByteBuffer.class.equals(clazz)) { + || ByteBuf.class.equals(clazz) + || ByteBuffer.class.equals(clazz)) { return SchemaType.NONE; } else if (GenericObject.class.isAssignableFrom(clazz)) { // the sink is taking generic record/object, so we do auto schema detection diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java index 6da3c082f78f4..3e17ffbcf2a14 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java @@ -53,9 +53,9 @@ public abstract class ComponentStatsManager implements AutoCloseable { } public static ComponentStatsManager getStatsManager(FunctionCollectorRegistry collectorRegistry, - String[] metricsLabels, - ScheduledExecutorService scheduledExecutorService, - Function.FunctionDetails.ComponentType componentType) { + String[] metricsLabels, + ScheduledExecutorService scheduledExecutorService, + Function.FunctionDetails.ComponentType componentType) { switch (componentType) { case FUNCTION: return new FunctionStatsManager(collectorRegistry, metricsLabels, scheduledExecutorService); @@ -100,7 +100,6 @@ public ComponentStatsManager(FunctionCollectorRegistry collectorRegistry, public abstract void setLastInvocation(long ts); - public abstract void processTimeStart(); public abstract void processTimeEnd(); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java index 8737c8a4fa913..700eb539864a7 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java @@ -336,20 +336,15 @@ public void setLastInvocation(long ts) { statlastInvocationChild.set(ts); } - private Long processTimeStart; - @Override - public void processTimeStart() { - processTimeStart = System.nanoTime(); - } @Override public void processTimeEnd() { - if (processTimeStart != null) { - double endTimeMs = ((double) System.nanoTime() - processTimeStart) / 1.0E6D; - statProcessLatencyChild.observe(endTimeMs); - statProcessLatency1minChild.observe(endTimeMs); - } +// if (processTimeStart != null) { +// double endTimeMs = ((double) System.nanoTime() - processTimeStart) / 1.0E6D; +// statProcessLatencyChild.observe(endTimeMs); +// statProcessLatency1minChild.observe(endTimeMs); +// } } @Override diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java index c515ce6bc872c..dde3207ef04ae 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java @@ -31,7 +31,9 @@ public class SinkStatsManager extends ComponentStatsManager { public static final String PULSAR_SINK_METRICS_PREFIX = "pulsar_sink_"; - /** Declare metric names. **/ + /** + * Declare metric names. + **/ public static final String SYSTEM_EXCEPTIONS_TOTAL = "system_exceptions_total"; public static final String SINK_EXCEPTIONS_TOTAL = "sink_exceptions_total"; public static final String LAST_INVOCATION = "last_invocation"; @@ -43,7 +45,9 @@ public class SinkStatsManager extends ComponentStatsManager { public static final String RECEIVED_TOTAL_1min = "received_1min"; public static final String WRITTEN_TOTAL_1min = "written_1min"; - /** Declare Prometheus stats. **/ + /** + * Declare Prometheus stats. + **/ private final Counter statTotalRecordsReceived; @@ -101,99 +105,99 @@ public SinkStatsManager(FunctionCollectorRegistry collectorRegistry, String[] me statTotalRecordsReceived = collectorRegistry.registerIfNotExist( PULSAR_SINK_METRICS_PREFIX + RECEIVED_TOTAL, Counter.build() - .name(PULSAR_SINK_METRICS_PREFIX + RECEIVED_TOTAL) - .help("Total number of records sink has received from Pulsar topic(s).") - .labelNames(METRICS_LABEL_NAMES) - .create()); + .name(PULSAR_SINK_METRICS_PREFIX + RECEIVED_TOTAL) + .help("Total number of records sink has received from Pulsar topic(s).") + .labelNames(METRICS_LABEL_NAMES) + .create()); statTotalRecordsReceivedChild = statTotalRecordsReceived.labels(metricsLabels); statTotalSysExceptions = collectorRegistry.registerIfNotExist( PULSAR_SINK_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL, Counter.build() - .name(PULSAR_SINK_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL) - .help("Total number of system exceptions.") - .labelNames(METRICS_LABEL_NAMES) - .create()); + .name(PULSAR_SINK_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL) + .help("Total number of system exceptions.") + .labelNames(METRICS_LABEL_NAMES) + .create()); statTotalSysExceptionsChild = statTotalSysExceptions.labels(metricsLabels); statTotalSinkExceptions = collectorRegistry.registerIfNotExist( PULSAR_SINK_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL, Counter.build() - .name(PULSAR_SINK_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL) - .help("Total number of sink exceptions.") - .labelNames(METRICS_LABEL_NAMES) - .create()); + .name(PULSAR_SINK_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL) + .help("Total number of sink exceptions.") + .labelNames(METRICS_LABEL_NAMES) + .create()); statTotalSinkExceptionsChild = statTotalSinkExceptions.labels(metricsLabels); statTotalWritten = collectorRegistry.registerIfNotExist( PULSAR_SINK_METRICS_PREFIX + WRITTEN_TOTAL, Counter.build() - .name(PULSAR_SINK_METRICS_PREFIX + WRITTEN_TOTAL) - .help("Total number of records processed by sink.") - .labelNames(METRICS_LABEL_NAMES) - .create()); + .name(PULSAR_SINK_METRICS_PREFIX + WRITTEN_TOTAL) + .help("Total number of records processed by sink.") + .labelNames(METRICS_LABEL_NAMES) + .create()); statTotalWrittenChild = statTotalWritten.labels(metricsLabels); statlastInvocation = collectorRegistry.registerIfNotExist( PULSAR_SINK_METRICS_PREFIX + LAST_INVOCATION, Gauge.build() - .name(PULSAR_SINK_METRICS_PREFIX + LAST_INVOCATION) - .help("The timestamp of the last invocation of the sink.") - .labelNames(METRICS_LABEL_NAMES) - .create()); + .name(PULSAR_SINK_METRICS_PREFIX + LAST_INVOCATION) + .help("The timestamp of the last invocation of the sink.") + .labelNames(METRICS_LABEL_NAMES) + .create()); statlastInvocationChild = statlastInvocation.labels(metricsLabels); statTotalRecordsReceived1min = collectorRegistry.registerIfNotExist( PULSAR_SINK_METRICS_PREFIX + RECEIVED_TOTAL_1min, Counter.build() - .name(PULSAR_SINK_METRICS_PREFIX + RECEIVED_TOTAL_1min) - .help("Total number of messages sink has received from Pulsar topic(s) in the last 1 minute.") - .labelNames(METRICS_LABEL_NAMES) - .create()); + .name(PULSAR_SINK_METRICS_PREFIX + RECEIVED_TOTAL_1min) + .help("Total number of messages sink has received from Pulsar topic(s) in the last 1 minute.") + .labelNames(METRICS_LABEL_NAMES) + .create()); statTotalRecordsReceivedChild1min = statTotalRecordsReceived1min.labels(metricsLabels); statTotalSysExceptions1min = collectorRegistry.registerIfNotExist( PULSAR_SINK_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL_1min, Counter.build() - .name(PULSAR_SINK_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL_1min) - .help("Total number of system exceptions in the last 1 minute.") - .labelNames(METRICS_LABEL_NAMES) - .create()); + .name(PULSAR_SINK_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL_1min) + .help("Total number of system exceptions in the last 1 minute.") + .labelNames(METRICS_LABEL_NAMES) + .create()); statTotalSysExceptions1minChild = statTotalSysExceptions1min.labels(metricsLabels); statTotalSinkExceptions1min = collectorRegistry.registerIfNotExist( PULSAR_SINK_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL_1min, Counter.build() - .name(PULSAR_SINK_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL_1min) - .help("Total number of sink exceptions in the last 1 minute.") - .labelNames(METRICS_LABEL_NAMES) - .create()); + .name(PULSAR_SINK_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL_1min) + .help("Total number of sink exceptions in the last 1 minute.") + .labelNames(METRICS_LABEL_NAMES) + .create()); statTotalSinkExceptionsChild1min = statTotalSinkExceptions1min.labels(metricsLabels); statTotalWritten1min = collectorRegistry.registerIfNotExist( PULSAR_SINK_METRICS_PREFIX + WRITTEN_TOTAL_1min, Counter.build() - .name(PULSAR_SINK_METRICS_PREFIX + WRITTEN_TOTAL_1min) - .help("Total number of records processed by sink the last 1 minute.") - .labelNames(METRICS_LABEL_NAMES) - .create()); + .name(PULSAR_SINK_METRICS_PREFIX + WRITTEN_TOTAL_1min) + .help("Total number of records processed by sink the last 1 minute.") + .labelNames(METRICS_LABEL_NAMES) + .create()); statTotalWrittenChild1min = statTotalWritten1min.labels(metricsLabels); sysExceptions = collectorRegistry.registerIfNotExist( PULSAR_SINK_METRICS_PREFIX + "system_exception", Gauge.build() - .name(PULSAR_SINK_METRICS_PREFIX + "system_exception") - .labelNames(EXCEPTION_METRICS_LABEL_NAMES) - .help("Exception from system code.") - .create()); + .name(PULSAR_SINK_METRICS_PREFIX + "system_exception") + .labelNames(EXCEPTION_METRICS_LABEL_NAMES) + .help("Exception from system code.") + .create()); sinkExceptions = collectorRegistry.registerIfNotExist( PULSAR_SINK_METRICS_PREFIX + "sink_exception", Gauge.build() - .name(PULSAR_SINK_METRICS_PREFIX + "sink_exception") - .labelNames(EXCEPTION_METRICS_LABEL_NAMES) - .help("Exception from sink.") - .create()); + .name(PULSAR_SINK_METRICS_PREFIX + "sink_exception") + .labelNames(EXCEPTION_METRICS_LABEL_NAMES) + .help("Exception from sink.") + .create()); sysExceptionRateLimiter = RateLimiter.create(5.0d / 60.0d); sinkExceptionRateLimiter = RateLimiter.create(5.0d / 60.0d); @@ -279,10 +283,6 @@ public void setLastInvocation(long ts) { statlastInvocationChild.set(ts); } - @Override - public void processTimeStart() { - //no-op - } @Override public void processTimeEnd() { diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java index 1f7e159c4dcb5..115532b82e669 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java @@ -31,7 +31,9 @@ public class SourceStatsManager extends ComponentStatsManager { public static final String PULSAR_SOURCE_METRICS_PREFIX = "pulsar_source_"; - /** Declare metric names. **/ + /** + * Declare metric names. + **/ public static final String SYSTEM_EXCEPTIONS_TOTAL = "system_exceptions_total"; public static final String SOURCE_EXCEPTIONS_TOTAL = "source_exceptions_total"; public static final String LAST_INVOCATION = "last_invocation"; @@ -43,7 +45,9 @@ public class SourceStatsManager extends ComponentStatsManager { public static final String RECEIVED_TOTAL_1min = "received_1min"; public static final String WRITTEN_TOTAL_1min = "written_1min"; - /** Declare Prometheus stats. **/ + /** + * Declare Prometheus stats. + **/ private final Counter statTotalRecordsReceived; @@ -101,99 +105,99 @@ public SourceStatsManager(FunctionCollectorRegistry collectorRegistry, String[] statTotalRecordsReceived = collectorRegistry.registerIfNotExist( PULSAR_SOURCE_METRICS_PREFIX + RECEIVED_TOTAL, Counter.build() - .name(PULSAR_SOURCE_METRICS_PREFIX + RECEIVED_TOTAL) - .help("Total number of records received from source.") - .labelNames(METRICS_LABEL_NAMES) - .create()); + .name(PULSAR_SOURCE_METRICS_PREFIX + RECEIVED_TOTAL) + .help("Total number of records received from source.") + .labelNames(METRICS_LABEL_NAMES) + .create()); statTotalRecordsReceivedChild = statTotalRecordsReceived.labels(metricsLabels); statTotalSysExceptions = collectorRegistry.registerIfNotExist( PULSAR_SOURCE_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL, Counter.build() - .name(PULSAR_SOURCE_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL) - .help("Total number of system exceptions.") - .labelNames(METRICS_LABEL_NAMES) - .create()); + .name(PULSAR_SOURCE_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL) + .help("Total number of system exceptions.") + .labelNames(METRICS_LABEL_NAMES) + .create()); statTotalSysExceptionsChild = statTotalSysExceptions.labels(metricsLabels); statTotalSourceExceptions = collectorRegistry.registerIfNotExist( PULSAR_SOURCE_METRICS_PREFIX + SOURCE_EXCEPTIONS_TOTAL, Counter.build() - .name(PULSAR_SOURCE_METRICS_PREFIX + SOURCE_EXCEPTIONS_TOTAL) - .help("Total number of source exceptions.") - .labelNames(METRICS_LABEL_NAMES) - .create()); + .name(PULSAR_SOURCE_METRICS_PREFIX + SOURCE_EXCEPTIONS_TOTAL) + .help("Total number of source exceptions.") + .labelNames(METRICS_LABEL_NAMES) + .create()); statTotalSourceExceptionsChild = statTotalSourceExceptions.labels(metricsLabels); statTotalWritten = collectorRegistry.registerIfNotExist( PULSAR_SOURCE_METRICS_PREFIX + WRITTEN_TOTAL, Counter.build() - .name(PULSAR_SOURCE_METRICS_PREFIX + WRITTEN_TOTAL) - .help("Total number of records written to a Pulsar topic.") - .labelNames(METRICS_LABEL_NAMES) - .create()); + .name(PULSAR_SOURCE_METRICS_PREFIX + WRITTEN_TOTAL) + .help("Total number of records written to a Pulsar topic.") + .labelNames(METRICS_LABEL_NAMES) + .create()); statTotalWrittenChild = statTotalWritten.labels(metricsLabels); statlastInvocation = collectorRegistry.registerIfNotExist( PULSAR_SOURCE_METRICS_PREFIX + LAST_INVOCATION, Gauge.build() - .name(PULSAR_SOURCE_METRICS_PREFIX + LAST_INVOCATION) - .help("The timestamp of the last invocation of the source.") - .labelNames(METRICS_LABEL_NAMES) - .create()); + .name(PULSAR_SOURCE_METRICS_PREFIX + LAST_INVOCATION) + .help("The timestamp of the last invocation of the source.") + .labelNames(METRICS_LABEL_NAMES) + .create()); statlastInvocationChild = statlastInvocation.labels(metricsLabels); statTotalRecordsReceived1min = collectorRegistry.registerIfNotExist( PULSAR_SOURCE_METRICS_PREFIX + RECEIVED_TOTAL_1min, Counter.build() - .name(PULSAR_SOURCE_METRICS_PREFIX + RECEIVED_TOTAL_1min) - .help("Total number of records received from source in the last 1 minute.") - .labelNames(METRICS_LABEL_NAMES) - .create()); + .name(PULSAR_SOURCE_METRICS_PREFIX + RECEIVED_TOTAL_1min) + .help("Total number of records received from source in the last 1 minute.") + .labelNames(METRICS_LABEL_NAMES) + .create()); statTotalRecordsReceivedChild1min = statTotalRecordsReceived1min.labels(metricsLabels); statTotalSysExceptions1min = collectorRegistry.registerIfNotExist( PULSAR_SOURCE_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL_1min, Counter.build() - .name(PULSAR_SOURCE_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL_1min) - .help("Total number of system exceptions in the last 1 minute.") - .labelNames(METRICS_LABEL_NAMES) - .create()); + .name(PULSAR_SOURCE_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL_1min) + .help("Total number of system exceptions in the last 1 minute.") + .labelNames(METRICS_LABEL_NAMES) + .create()); statTotalSysExceptions1minChild = statTotalSysExceptions1min.labels(metricsLabels); statTotalSourceExceptions1min = collectorRegistry.registerIfNotExist( PULSAR_SOURCE_METRICS_PREFIX + SOURCE_EXCEPTIONS_TOTAL_1min, Counter.build() - .name(PULSAR_SOURCE_METRICS_PREFIX + SOURCE_EXCEPTIONS_TOTAL_1min) - .help("Total number of source exceptions in the last 1 minute.") - .labelNames(METRICS_LABEL_NAMES) - .create()); + .name(PULSAR_SOURCE_METRICS_PREFIX + SOURCE_EXCEPTIONS_TOTAL_1min) + .help("Total number of source exceptions in the last 1 minute.") + .labelNames(METRICS_LABEL_NAMES) + .create()); statTotalSourceExceptionsChild1min = statTotalSourceExceptions1min.labels(metricsLabels); statTotalWritten1min = collectorRegistry.registerIfNotExist( PULSAR_SOURCE_METRICS_PREFIX + WRITTEN_TOTAL_1min, Counter.build() - .name(PULSAR_SOURCE_METRICS_PREFIX + WRITTEN_TOTAL_1min) - .help("Total number of records written to a Pulsar topic in the last 1 minute.") - .labelNames(METRICS_LABEL_NAMES) - .create()); + .name(PULSAR_SOURCE_METRICS_PREFIX + WRITTEN_TOTAL_1min) + .help("Total number of records written to a Pulsar topic in the last 1 minute.") + .labelNames(METRICS_LABEL_NAMES) + .create()); statTotalWrittenChild1min = statTotalWritten1min.labels(metricsLabels); sysExceptions = collectorRegistry.registerIfNotExist( PULSAR_SOURCE_METRICS_PREFIX + "system_exception", Gauge.build() - .name(PULSAR_SOURCE_METRICS_PREFIX + "system_exception") - .labelNames(EXCEPTION_METRICS_LABEL_NAMES) - .help("Exception from system code.") - .create()); + .name(PULSAR_SOURCE_METRICS_PREFIX + "system_exception") + .labelNames(EXCEPTION_METRICS_LABEL_NAMES) + .help("Exception from system code.") + .create()); sourceExceptions = collectorRegistry.registerIfNotExist( PULSAR_SOURCE_METRICS_PREFIX + "source_exception", Gauge.build() - .name(PULSAR_SOURCE_METRICS_PREFIX + "source_exception") - .labelNames(EXCEPTION_METRICS_LABEL_NAMES) - .help("Exception from source.") - .create()); + .name(PULSAR_SOURCE_METRICS_PREFIX + "source_exception") + .labelNames(EXCEPTION_METRICS_LABEL_NAMES) + .help("Exception from source.") + .create()); sysExceptionRateLimiter = RateLimiter.create(5.0d / 60.0d); sourceExceptionRateLimiter = RateLimiter.create(5.0d / 60.0d); @@ -279,10 +283,6 @@ public void setLastInvocation(long ts) { statlastInvocationChild.set(ts); } - @Override - public void processTimeStart() { - //no-op - } @Override public void processTimeEnd() { From 6f9bffae2bc49e7898b103776b4d23853b0eef7b Mon Sep 17 00:00:00 2001 From: walkinggo <40360529+walkinggo@users.noreply.github.com> Date: Mon, 6 Jan 2025 11:02:53 +0800 Subject: [PATCH 2/8] change processTimeEnd method in ComponentStatsManager --- .../functions/instance/JavaInstanceRunnable.java | 3 --- .../instance/stats/ComponentStatsManager.java | 2 +- .../functions/instance/stats/FunctionStatsManager.java | 10 ++++------ .../functions/instance/stats/SinkStatsManager.java | 2 +- .../functions/instance/stats/SourceStatsManager.java | 2 +- 5 files changed, 7 insertions(+), 12 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 0838f9d98f402..cf8567c3e5126 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -344,9 +344,6 @@ public void run() { asyncErrorHandler); Thread.currentThread().setContextClassLoader(instanceClassLoader); - // register end time - stats.processTimeEnd(); - if (result != null) { // process the synchronous results handleResult(currentRecord, result); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java index 3e17ffbcf2a14..b133324face57 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java @@ -101,7 +101,7 @@ public ComponentStatsManager(FunctionCollectorRegistry collectorRegistry, public abstract void setLastInvocation(long ts); - public abstract void processTimeEnd(); + public abstract void processTimeEnd(long startTime); public abstract double getTotalProcessedSuccessfully(); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java index 700eb539864a7..0009fcea6671a 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java @@ -339,12 +339,10 @@ public void setLastInvocation(long ts) { @Override - public void processTimeEnd() { -// if (processTimeStart != null) { -// double endTimeMs = ((double) System.nanoTime() - processTimeStart) / 1.0E6D; -// statProcessLatencyChild.observe(endTimeMs); -// statProcessLatency1minChild.observe(endTimeMs); -// } + public void processTimeEnd(long startTime) { + double endTimeMs = ((double) System.nanoTime() - startTime) / 1.0E6D; + statProcessLatencyChild.observe(endTimeMs); + statProcessLatency1minChild.observe(endTimeMs); } @Override diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java index dde3207ef04ae..091f0929f712b 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java @@ -285,7 +285,7 @@ public void setLastInvocation(long ts) { @Override - public void processTimeEnd() { + public void processTimeEnd(long startTime) { //no-op } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java index 115532b82e669..4651c7e06318b 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java @@ -285,7 +285,7 @@ public void setLastInvocation(long ts) { @Override - public void processTimeEnd() { + public void processTimeEnd(long startTime) { //no-op } From b4435ddb8bc6f58836d8bab59f34e1ccae97789d Mon Sep 17 00:00:00 2001 From: walkinggo <40360529+walkinggo@users.noreply.github.com> Date: Mon, 6 Jan 2025 11:12:06 +0800 Subject: [PATCH 3/8] add startTime in JavaExecutionResult and process endTime in handleResult --- .../apache/pulsar/functions/instance/JavaExecutionResult.java | 1 + .../apache/pulsar/functions/instance/JavaInstanceRunnable.java | 2 ++ 2 files changed, 3 insertions(+) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaExecutionResult.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaExecutionResult.java index 5856600196b49..40c66a93e1d0f 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaExecutionResult.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaExecutionResult.java @@ -29,6 +29,7 @@ public class JavaExecutionResult { private Throwable userException; private Object result; + private final long startTime = System.nanoTime(); public void reset() { setUserException(null); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index cf8567c3e5126..f8010bd68fe59 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -443,6 +443,8 @@ void handleResult(Record srcRecord, JavaExecutionResult result) throws Exception // increment total successfully processed stats.incrTotalProcessedSuccessfully(); } + // handle endTime here + stats.processTimeEnd(result.getStartTime()); } private void sendOutputMessage(Record srcRecord, Object output) throws Exception { From 6c4c5a1a7c6ecf7d09c1e92c4c87d7db825e8f12 Mon Sep 17 00:00:00 2001 From: walkinggo <40360529+walkinggo@users.noreply.github.com> Date: Mon, 6 Jan 2025 20:45:03 +0800 Subject: [PATCH 4/8] add field JavaExecutionResult in AsyncFuncRequest and use JavaExecutionResult from AsyncFuncRequest in processAsyncResultsInInputOrder --- .../pulsar/functions/instance/JavaInstance.java | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java index 5946be9fe5be9..8d669f0052916 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java @@ -47,6 +47,7 @@ public class JavaInstance implements AutoCloseable { public static class AsyncFuncRequest { private final Record record; private final CompletableFuture processResult; + private final JavaExecutionResult result; } @Getter(AccessLevel.PACKAGE) @@ -136,7 +137,7 @@ public JavaExecutionResult handleMessage(Record record, Object input, if (asyncPreserveInputOrderForOutputMessages) { // Function is in format: Function> AsyncFuncRequest request = new AsyncFuncRequest( - record, (CompletableFuture) output + record, (CompletableFuture) output, executionResult ); pendingAsyncRequests.put(request); } else { @@ -187,13 +188,7 @@ private void processAsyncResultsInInputOrder(JavaInstanceRunnable.AsyncResultCon while (asyncResult != null && asyncResult.getProcessResult().isDone()) { pendingAsyncRequests.remove(asyncResult); - JavaExecutionResult execResult = new JavaExecutionResult(); - try { - Object result = asyncResult.getProcessResult().get(); - execResult.setResult(result); - } catch (ExecutionException e) { - execResult.setUserException(FutureUtil.unwrapCompletionException(e)); - } + JavaExecutionResult execResult = asyncResult.getResult(); resultConsumer.accept(asyncResult.getRecord(), execResult); From a7769322b5cfd95e1f376cc765b2268ac504afb7 Mon Sep 17 00:00:00 2001 From: walkinggo <40360529+walkinggo@users.noreply.github.com> Date: Mon, 6 Jan 2025 20:58:43 +0800 Subject: [PATCH 5/8] use the same executionResult in non asyncPreserveInputOrderForOutputMessages --- .../org/apache/pulsar/functions/instance/JavaInstance.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java index 8d669f0052916..80a966e343a5a 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java @@ -149,13 +149,12 @@ public JavaExecutionResult handleMessage(Record record, Object input, processAsyncResultsInInputOrder(asyncResultConsumer); } else { try { - JavaExecutionResult execResult = new JavaExecutionResult(); if (cause != null) { - execResult.setUserException(FutureUtil.unwrapCompletionException(cause)); + executionResult.setUserException(FutureUtil.unwrapCompletionException(cause)); } else { - execResult.setResult(res); + executionResult.setResult(res); } - asyncResultConsumer.accept(record, execResult); + asyncResultConsumer.accept(record, executionResult); } finally { asyncRequestsConcurrencyLimiter.release(); } From 1749db79d414d7b9ea8c53bc098817181d7f08f3 Mon Sep 17 00:00:00 2001 From: walkinggo <40360529+walkinggo@users.noreply.github.com> Date: Tue, 7 Jan 2025 14:37:12 +0800 Subject: [PATCH 6/8] fix the bug that do not set result --- .../java/org/apache/pulsar/functions/instance/JavaInstance.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java index 80a966e343a5a..3c05de7b8025c 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java @@ -188,7 +188,7 @@ private void processAsyncResultsInInputOrder(JavaInstanceRunnable.AsyncResultCon pendingAsyncRequests.remove(asyncResult); JavaExecutionResult execResult = asyncResult.getResult(); - + execResult.setResult(asyncResult.getProcessResult().get()); resultConsumer.accept(asyncResult.getRecord(), execResult); // peek the next result From 493c364af0710d75d3f88bac93cffd2159c12535 Mon Sep 17 00:00:00 2001 From: walkinggo <40360529+walkinggo@users.noreply.github.com> Date: Tue, 7 Jan 2025 15:04:14 +0800 Subject: [PATCH 7/8] fix the bug that do not set exception --- .../apache/pulsar/functions/instance/JavaInstance.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java index 3c05de7b8025c..2ef2425caea88 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java @@ -188,9 +188,13 @@ private void processAsyncResultsInInputOrder(JavaInstanceRunnable.AsyncResultCon pendingAsyncRequests.remove(asyncResult); JavaExecutionResult execResult = asyncResult.getResult(); - execResult.setResult(asyncResult.getProcessResult().get()); + try { + Object result = asyncResult.getProcessResult().get(); + execResult.setResult(result); + } catch (ExecutionException e) { + execResult.setUserException(FutureUtil.unwrapCompletionException(e)); + } resultConsumer.accept(asyncResult.getRecord(), execResult); - // peek the next result asyncResult = pendingAsyncRequests.peek(); } From 03447265ba7c9ced4f6b0186d31a46982a5f53a5 Mon Sep 17 00:00:00 2001 From: walkinggo <40360529+walkinggo@users.noreply.github.com> Date: Tue, 7 Jan 2025 15:55:37 +0800 Subject: [PATCH 8/8] add test case --- .../functions/instance/JavaInstanceTest.java | 85 ++++++++++++------- 1 file changed, 54 insertions(+), 31 deletions(-) diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java index b3fcef292e52b..7e681f23c554e 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.function.LongSupplier; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.functions.api.Function; @@ -43,6 +44,7 @@ public class JavaInstanceTest { /** * Verify that be able to run lambda functions. + * * @throws Exception */ @Test @@ -59,32 +61,32 @@ public void testLambda() throws Exception { } @Test - public void testNullReturningFunction() throws Exception { - JavaInstance instance = new JavaInstance( + public void testNullReturningFunction() throws Exception { + JavaInstance instance = new JavaInstance( mock(ContextImpl.class), (Function) (input, context) -> null, new InstanceConfig()); - String testString = "ABC123"; - JavaExecutionResult result = instance.handleMessage(mock(Record.class), testString); - assertNull(result.getResult()); - instance.close(); + String testString = "ABC123"; + JavaExecutionResult result = instance.handleMessage(mock(Record.class), testString); + assertNull(result.getResult()); + instance.close(); } @Test - public void testUserExceptionThrowingFunction() throws Exception { - final UserException userException = new UserException("Boom"); - Function func = (input, context) -> { - throw userException; - }; + public void testUserExceptionThrowingFunction() throws Exception { + final UserException userException = new UserException("Boom"); + Function func = (input, context) -> { + throw userException; + }; - JavaInstance instance = new JavaInstance( + JavaInstance instance = new JavaInstance( mock(ContextImpl.class), func, new InstanceConfig()); - String testString = "ABC123"; - JavaExecutionResult result = instance.handleMessage(mock(Record.class), testString); - assertSame(userException, result.getUserException()); - instance.close(); + String testString = "ABC123"; + JavaExecutionResult result = instance.handleMessage(mock(Record.class), testString); + assertSame(userException, result.getUserException()); + instance.close(); } @Test @@ -95,7 +97,7 @@ public void testAsyncFunction() throws Exception { Function> function = (input, context) -> { log.info("input string: {}", input); - CompletableFuture result = new CompletableFuture<>(); + CompletableFuture result = new CompletableFuture<>(); executor.submit(() -> { try { Thread.sleep(500); @@ -115,8 +117,9 @@ public void testAsyncFunction() throws Exception { String testString = "ABC123"; CompletableFuture resultHolder = new CompletableFuture<>(); JavaExecutionResult result = instance.handleMessage( - mock(Record.class), testString, - (record, javaResult) -> resultHolder.complete(javaResult), cause -> {}); + mock(Record.class), testString, + (record, javaResult) -> resultHolder.complete(javaResult), cause -> { + }); assertNull(result); assertNotNull(resultHolder.get()); assertEquals(testString + "-lambda", resultHolder.get().getResult()); @@ -131,7 +134,7 @@ public void testNullReturningAsyncFunction() throws Exception { Function> function = (input, context) -> { log.info("input string: {}", input); - CompletableFuture result = new CompletableFuture<>(); + CompletableFuture result = new CompletableFuture<>(); executor.submit(() -> { try { Thread.sleep(500); @@ -151,7 +154,8 @@ public void testNullReturningAsyncFunction() throws Exception { String testString = "ABC123"; CompletableFuture resultHolder = new CompletableFuture<>(); JavaExecutionResult result = instance.handleMessage(mock(Record.class), testString, - (record, javaResult) -> resultHolder.complete(javaResult), cause -> {}); + (record, javaResult) -> resultHolder.complete(javaResult), cause -> { + }); assertNull(result); assertNotNull(resultHolder.get()); instance.close(); @@ -159,16 +163,16 @@ public void testNullReturningAsyncFunction() throws Exception { @Test public void testUserExceptionThrowingAsyncFunction() throws Exception { - final UserException userException = new UserException("Boom"); + final UserException userException = new UserException("Boom"); InstanceConfig instanceConfig = new InstanceConfig(); @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); Function> function = (input, context) -> { log.info("input string: {}", input); - CompletableFuture result = new CompletableFuture<>(); + CompletableFuture result = new CompletableFuture<>(); executor.submit(() -> { - result.completeExceptionally(userException); + result.completeExceptionally(userException); }); return result; @@ -181,7 +185,8 @@ public void testUserExceptionThrowingAsyncFunction() throws Exception { String testString = "ABC123"; CompletableFuture resultHolder = new CompletableFuture<>(); JavaExecutionResult result = instance.handleMessage(mock(Record.class), testString, - (record, javaResult) -> resultHolder.complete(javaResult), cause -> {}); + (record, javaResult) -> resultHolder.complete(javaResult), cause -> { + }); assertNull(result); assertSame(userException, resultHolder.get().getUserException()); instance.close(); @@ -198,7 +203,7 @@ public void testAsyncFunctionMaxPending() throws Exception { Function> function = (input, context) -> { log.info("input string: {}", input); - CompletableFuture result = new CompletableFuture<>(); + CompletableFuture result = new CompletableFuture<>(); executor.submit(() -> { try { count.await(); @@ -243,10 +248,10 @@ public void testAsyncFunctionMaxPending() throws Exception { instance.close(); } - private static class UserException extends Exception { - public UserException(String msg) { - super(msg); - } + private static class UserException extends Exception { + public UserException(String msg) { + super(msg); + } } @Test @@ -264,7 +269,7 @@ public void testAsyncFunctionMaxPendingVoidResult() throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); Function> function = (input, context) -> { - CompletableFuture result = new CompletableFuture<>(); + CompletableFuture result = new CompletableFuture<>(); executor.submit(() -> { try { count.await(); @@ -309,4 +314,22 @@ public void testAsyncFunctionMaxPendingVoidResult() throws Exception { log.info("start:{} end:{} during:{}", startTime, endTime, endTime - startTime); instance.close(); } + + @Test + public void testAsyncFunctionTime() { + JavaInstance instance = new JavaInstance( + mock(ContextImpl.class), + (Function) (input, context) -> { + Thread.sleep(500); + return input; + }, + new InstanceConfig()); + String testString = "ABC123"; + JavaExecutionResult result = instance.handleMessage(mock(Record.class), testString); + LongSupplier timeSupplier = () -> System.nanoTime() - 500_000_000L; + assertNotNull(result.getResult()); + long beforeTime = timeSupplier.getAsLong(); + assertTrue(Math.abs(beforeTime - result.getStartTime()) <= 20_000_000); + instance.close(); + } }