Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][fn] change async function time get #23820

Closed
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
public class JavaExecutionResult {
private Throwable userException;
private Object result;
private final long startTime = System.nanoTime();

public void reset() {
setUserException(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class JavaInstance implements AutoCloseable {
public static class AsyncFuncRequest {
private final Record record;
private final CompletableFuture processResult;
private final JavaExecutionResult result;
}

@Getter(AccessLevel.PACKAGE)
Expand Down Expand Up @@ -136,7 +137,7 @@ public JavaExecutionResult handleMessage(Record<?> record, Object input,
if (asyncPreserveInputOrderForOutputMessages) {
// Function is in format: Function<I, CompletableFuture<O>>
AsyncFuncRequest request = new AsyncFuncRequest(
record, (CompletableFuture) output
record, (CompletableFuture) output, executionResult
);
pendingAsyncRequests.put(request);
} else {
Expand All @@ -148,13 +149,12 @@ public JavaExecutionResult handleMessage(Record<?> record, Object input,
processAsyncResultsInInputOrder(asyncResultConsumer);
} else {
try {
JavaExecutionResult execResult = new JavaExecutionResult();
if (cause != null) {
execResult.setUserException(FutureUtil.unwrapCompletionException(cause));
executionResult.setUserException(FutureUtil.unwrapCompletionException(cause));
} else {
execResult.setResult(res);
executionResult.setResult(res);
}
asyncResultConsumer.accept(record, execResult);
asyncResultConsumer.accept(record, executionResult);
} finally {
asyncRequestsConcurrencyLimiter.release();
}
Expand Down Expand Up @@ -187,16 +187,14 @@ private void processAsyncResultsInInputOrder(JavaInstanceRunnable.AsyncResultCon
while (asyncResult != null && asyncResult.getProcessResult().isDone()) {
pendingAsyncRequests.remove(asyncResult);

JavaExecutionResult execResult = new JavaExecutionResult();
JavaExecutionResult execResult = asyncResult.getResult();
try {
Object result = asyncResult.getProcessResult().get();
execResult.setResult(result);
} catch (ExecutionException e) {
execResult.setUserException(FutureUtil.unwrapCompletionException(e));
}

resultConsumer.accept(asyncResult.getRecord(), execResult);

// peek the next result
asyncResult = pendingAsyncRequests.peek();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ public JavaInstanceRunnable(InstanceConfig instanceConfig,
this.secretsProvider = secretsProvider;
this.componentClassLoader = componentClassLoader;
this.functionClassLoader = transformFunctionClassLoader != null
? transformFunctionClassLoader
: componentClassLoader;
? transformFunctionClassLoader
: componentClassLoader;
this.metricsLabels = new String[]{
instanceConfig.getFunctionDetails().getTenant(),
String.format("%s/%s", instanceConfig.getFunctionDetails().getTenant(),
Expand Down Expand Up @@ -235,7 +235,7 @@ private synchronized void setup() throws Exception {
ThreadContext.put("instance", instanceConfig.getInstanceName());

log.info("Starting Java Instance {} : \n Details = {}",
instanceConfig.getFunctionDetails().getName(), instanceConfig.getFunctionDetails());
instanceConfig.getFunctionDetails().getName(), instanceConfig.getFunctionDetails());

Object object;
if (instanceConfig.getFunctionDetails().getClassName()
Expand Down Expand Up @@ -293,8 +293,8 @@ ContextImpl setupContext() throws PulsarClientException {
try {
Thread.currentThread().setContextClassLoader(functionClassLoader);
return new ContextImpl(instanceConfig, instanceLog, client, secretsProvider,
collectorRegistry, metricsLabels, this.componentType, this.stats, stateManager,
pulsarAdmin, clientBuilder, fatalHandler, producerCache);
collectorRegistry, metricsLabels, this.componentType, this.stats, stateManager,
pulsarAdmin, clientBuilder, fatalHandler, producerCache);
} finally {
Thread.currentThread().setContextClassLoader(clsLoader);
}
Expand Down Expand Up @@ -334,8 +334,6 @@ public void run() {
// set last invocation time
stats.setLastInvocation(System.currentTimeMillis());

// start time for process latency stat
stats.processTimeStart();

// process the message
Thread.currentThread().setContextClassLoader(functionClassLoader);
Expand All @@ -346,9 +344,6 @@ public void run() {
asyncErrorHandler);
Thread.currentThread().setContextClassLoader(instanceClassLoader);

// register end time
stats.processTimeEnd();

if (result != null) {
// process the synchronous results
handleResult(currentRecord, result);
Expand Down Expand Up @@ -401,9 +396,9 @@ private void setupStateStore() throws Exception {
stateStoreProvider.init(stateStoreProviderConfig);

StateStore store = stateStoreProvider.getStateStore(
instanceConfig.getFunctionDetails().getTenant(),
instanceConfig.getFunctionDetails().getNamespace(),
instanceConfig.getFunctionDetails().getName()
instanceConfig.getFunctionDetails().getTenant(),
instanceConfig.getFunctionDetails().getNamespace(),
instanceConfig.getFunctionDetails().getName()
);
StateStoreContext context = new StateStoreContextImpl();
store.init(context);
Expand Down Expand Up @@ -448,6 +443,8 @@ void handleResult(Record srcRecord, JavaExecutionResult result) throws Exception
// increment total successfully processed
stats.incrTotalProcessedSuccessfully();
}
// handle endTime here
stats.processTimeEnd(result.getStartTime());
}

private void sendOutputMessage(Record srcRecord, Object output) throws Exception {
Expand Down Expand Up @@ -513,7 +510,7 @@ private OutputRecordSinkRecord encodeWithRecordSchemaAndDecodeWithSinkSchema(Rec
if (isKeyValueSeparated && schema instanceof KeyValueSchema) {
KeyValueSchema<?, ?> kvSchema = (KeyValueSchema<?, ?>) schema;
finalSchema = KeyValueSchemaImpl.of(kvSchema.getKeySchema(), kvSchema.getValueSchema(),
KeyValueEncodingType.SEPARATED);
KeyValueEncodingType.SEPARATED);
} else {
finalSchema = schema;
}
Expand Down Expand Up @@ -548,7 +545,7 @@ record = this.source.read();

/**
* NOTE: this method is be synchronized because it is potentially called by two different places
* one inside the run/finally clause and one inside the ThreadRuntime::stop.
* one inside the run/finally clause and one inside the ThreadRuntime::stop.
*/
@Override
public synchronized void close() {
Expand Down Expand Up @@ -678,8 +675,8 @@ private InstanceCommunication.MetricsData internalGetMetrics() {
}

private void internalResetMetrics() {
stats.reset();
javaInstance.resetMetrics();
stats.reset();
javaInstance.resetMetrics();
}

private Builder createMetricsDataBuilder() {
Expand Down Expand Up @@ -834,7 +831,7 @@ private void setupInput(ContextImpl contextImpl) throws Exception {
);

pulsarSourceConfig.setSkipToLatest(
sourceSpec.getSkipToLatest()
sourceSpec.getSkipToLatest()
);

Objects.requireNonNull(contextImpl.getSubscriptionType());
Expand Down Expand Up @@ -874,12 +871,12 @@ private void setupInput(ContextImpl contextImpl) throws Exception {
// check if source is a batch source
if (sourceSpec.getClassName().equals(BatchSourceExecutor.class.getName())) {
object = Reflections.createInstance(
sourceSpec.getClassName(),
this.instanceClassLoader);
sourceSpec.getClassName(),
this.instanceClassLoader);
} else {
object = Reflections.createInstance(
sourceSpec.getClassName(),
this.componentClassLoader);
sourceSpec.getClassName(),
this.componentClassLoader);
}
}

Expand Down Expand Up @@ -911,8 +908,9 @@ private void setupInput(ContextImpl contextImpl) throws Exception {
/**
* Recursively interpolate configured secrets into the config map by calling
* {@link SecretsProvider#interpolateSecretForValue(String)}.
*
* @param secretsProvider - the secrets provider that will convert secret's values into config values.
* @param configs - the connector configuration map, which will be mutated.
* @param configs - the connector configuration map, which will be mutated.
*/
private static void interpolateSecretsIntoConfigs(SecretsProvider secretsProvider,
Map<String, Object> configs) {
Expand All @@ -939,12 +937,13 @@ static Map<String, Object> augmentAndFilterConnectorConfig(String connectorConfi
SecretsProvider secretsProvider,
ClassLoader componentClassLoader,
org.apache.pulsar.functions.proto.Function
.FunctionDetails.ComponentType componentType)
.FunctionDetails.ComponentType componentType)
throws IOException {
final Map<String, Object> config = connectorConfigs.isEmpty() ? new HashMap<>() : ObjectMapperFactory
.getMapper()
.reader()
.forType(new TypeReference<Map<String, Object>>() {})
.forType(new TypeReference<Map<String, Object>>() {
})
.readValue(connectorConfigs);
if (componentType != org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK
&& componentType != org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) {
Expand All @@ -959,7 +958,7 @@ static Map<String, Object> augmentAndFilterConnectorConfig(String connectorConfi
configClassName = ConnectorUtils
.getConnectorDefinition((NarClassLoader) componentClassLoader).getSourceConfigClass();
} else {
configClassName = ConnectorUtils
configClassName = ConnectorUtils
.getConnectorDefinition((NarClassLoader) componentClassLoader).getSinkConfigClass();
}
if (configClassName != null) {
Expand Down Expand Up @@ -1105,7 +1104,7 @@ private static <T> Schema<T> getSinkSchema(Record<?> record, Class<T> clazz) {

case AVRO:
return AvroSchema.of(SchemaDefinition.<T>builder()
.withPojo(clazz).build());
.withPojo(clazz).build());

case JSON:
return JSONSchema.of(SchemaDefinition.<T>builder().withPojo(clazz).build());
Expand All @@ -1131,8 +1130,8 @@ private static SchemaType getSchemaTypeOrDefault(Record<?> record, Class<?> claz
if (GenericObject.class.isAssignableFrom(clazz)) {
return SchemaType.AUTO_CONSUME;
} else if (byte[].class.equals(clazz)
|| ByteBuf.class.equals(clazz)
|| ByteBuffer.class.equals(clazz)) {
|| ByteBuf.class.equals(clazz)
|| ByteBuffer.class.equals(clazz)) {
// if sink uses bytes, we should ignore
return SchemaType.NONE;
} else {
Expand All @@ -1151,8 +1150,8 @@ private static SchemaType getSchemaTypeOrDefault(Record<?> record, Class<?> claz

private static SchemaType getDefaultSchemaType(Class<?> clazz) {
if (byte[].class.equals(clazz)
|| ByteBuf.class.equals(clazz)
|| ByteBuffer.class.equals(clazz)) {
|| ByteBuf.class.equals(clazz)
|| ByteBuffer.class.equals(clazz)) {
return SchemaType.NONE;
} else if (GenericObject.class.isAssignableFrom(clazz)) {
// the sink is taking generic record/object, so we do auto schema detection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ public abstract class ComponentStatsManager implements AutoCloseable {
}

public static ComponentStatsManager getStatsManager(FunctionCollectorRegistry collectorRegistry,
String[] metricsLabels,
ScheduledExecutorService scheduledExecutorService,
Function.FunctionDetails.ComponentType componentType) {
String[] metricsLabels,
ScheduledExecutorService scheduledExecutorService,
Function.FunctionDetails.ComponentType componentType) {
switch (componentType) {
case FUNCTION:
return new FunctionStatsManager(collectorRegistry, metricsLabels, scheduledExecutorService);
Expand Down Expand Up @@ -100,9 +100,8 @@ public ComponentStatsManager(FunctionCollectorRegistry collectorRegistry,

public abstract void setLastInvocation(long ts);

public abstract void processTimeStart();

public abstract void processTimeEnd();
public abstract void processTimeEnd(long startTime);

public abstract double getTotalProcessedSuccessfully();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,20 +336,13 @@ public void setLastInvocation(long ts) {
statlastInvocationChild.set(ts);
}

private Long processTimeStart;

@Override
public void processTimeStart() {
processTimeStart = System.nanoTime();
}

@Override
public void processTimeEnd() {
if (processTimeStart != null) {
double endTimeMs = ((double) System.nanoTime() - processTimeStart) / 1.0E6D;
public void processTimeEnd(long startTime) {
double endTimeMs = ((double) System.nanoTime() - startTime) / 1.0E6D;
statProcessLatencyChild.observe(endTimeMs);
statProcessLatency1minChild.observe(endTimeMs);
}
}

@Override
Expand Down
Loading
Loading