diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index c4c44a8ed3ffe..08c782b8be5a1 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -43,6 +43,16 @@ metadataStoreOperationTimeoutSeconds: 30
# Metadata store cache expiry time in seconds
metadataStoreCacheExpirySeconds: 300
+# Specifies if the function worker should use classloading for validating submissions for built-in
+# connectors and functions. This is required for validateConnectorConfig to take effect.
+# Default is false.
+enableClassloadingOfBuiltinFiles: false
+
+# Specifies if the function worker should use classloading for validating submissions for external
+# connectors and functions. This is required for validateConnectorConfig to take effect.
+# Default is false.
+enableClassloadingOfExternalFiles: false
+
################################
# Function package management
################################
@@ -393,7 +403,10 @@ saslJaasServerRoleTokenSignerSecretPath:
connectorsDirectory: ./connectors
functionsDirectory: ./functions
-# Should connector config be validated during submission
+# Enables extended validation for connector config with fine-grain annotation based validation
+# during submission. Classloading with either enableClassloadingOfExternalFiles or
+# enableClassloadingOfBuiltinFiles must be enabled on the worker for this to take effect.
+# Default is false.
validateConnectorConfig: false
# Whether to initialize distributed log metadata by runtime.
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index 85d08f2fd38de..088d36e39fdbb 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -495,6 +495,10 @@ The Apache Software License, Version 2.0
* Jodah
- net.jodah-typetools-0.5.0.jar
- net.jodah-failsafe-2.4.4.jar
+ * Byte Buddy
+ - net.bytebuddy-byte-buddy-1.14.12.jar
+ * zt-zip
+ - org.zeroturnaround-zt-zip-1.17.jar
* Apache Avro
- org.apache.avro-avro-1.11.3.jar
- org.apache.avro-avro-protobuf-1.11.3.jar
diff --git a/pom.xml b/pom.xml
index c58d4ea90b28b..f037c9ef0e520 100644
--- a/pom.xml
+++ b/pom.xml
@@ -143,6 +143,8 @@ flexible messaging model and an intuitive client API.
8.37
1.4.13
0.5.0
+ 1.14.12
+ 1.17
3.19.6
${protobuf3.version}
1.55.3
@@ -978,6 +980,18 @@ flexible messaging model and an intuitive client API.
${typetools.version}
+
+ net.bytebuddy
+ byte-buddy
+ ${byte-buddy.version}
+
+
+
+ org.zeroturnaround
+ zt-zip
+ ${zt-zip.version}
+
+
io.grpc
grpc-bom
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
index c4864883788d9..0705e76bf583c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
@@ -155,6 +155,11 @@ public NarClassLoader run() {
});
}
+ public static List getClasspathFromArchive(File narPath, String narExtractionDirectory) throws IOException {
+ File unpacked = NarUnpacker.unpackNar(narPath, getNarExtractionDirectory(narExtractionDirectory));
+ return getClassPathEntries(unpacked);
+ }
+
private static File getNarExtractionDirectory(String configuredDirectory) {
return new File(configuredDirectory + "/" + TMP_DIR_PREFIX);
}
@@ -165,16 +170,11 @@ private static File getNarExtractionDirectory(String configuredDirectory) {
* @param narWorkingDirectory
* directory to explode nar contents to
* @param parent
- * @throws IllegalArgumentException
- * if the NAR is missing the Java Services API file for FlowFileProcessor implementations.
- * @throws ClassNotFoundException
- * if any of the FlowFileProcessor implementations defined by the Java Services API cannot be
- * loaded.
* @throws IOException
* if an error occurs while loading the NAR.
*/
private NarClassLoader(final File narWorkingDirectory, Set additionalJars, ClassLoader parent)
- throws ClassNotFoundException, IOException {
+ throws IOException {
super(new URL[0], parent);
this.narWorkingDirectory = narWorkingDirectory;
@@ -239,22 +239,31 @@ public List getServiceImplementation(String serviceName) throws IOExcept
* if the URL list could not be updated.
*/
private void updateClasspath(File root) throws IOException {
- addURL(root.toURI().toURL()); // for compiled classes, META-INF/, etc.
+ getClassPathEntries(root).forEach(f -> {
+ try {
+ addURL(f.toURI().toURL());
+ } catch (IOException e) {
+ log.error("Failed to add entry to classpath: {}", f, e);
+ }
+ });
+ }
+ static List getClassPathEntries(File root) {
+ List classPathEntries = new ArrayList<>();
+ classPathEntries.add(root);
File dependencies = new File(root, "META-INF/bundled-dependencies");
if (!dependencies.isDirectory()) {
- log.warn("{} does not contain META-INF/bundled-dependencies!", narWorkingDirectory);
+ log.warn("{} does not contain META-INF/bundled-dependencies!", root);
}
- addURL(dependencies.toURI().toURL());
+ classPathEntries.add(dependencies);
if (dependencies.isDirectory()) {
final File[] jarFiles = dependencies.listFiles(JAR_FILTER);
if (jarFiles != null) {
Arrays.sort(jarFiles, Comparator.comparing(File::getName));
- for (File libJar : jarFiles) {
- addURL(libJar.toURI().toURL());
- }
+ classPathEntries.addAll(Arrays.asList(jarFiles));
}
}
+ return classPathEntries;
}
@Override
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java
index b2c0a37a65036..1554db5693fd8 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java
@@ -33,13 +33,14 @@
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
+import java.nio.file.Path;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.Enumeration;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.jar.JarEntry;
-import java.util.jar.JarFile;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
import lombok.extern.slf4j.Slf4j;
/**
@@ -114,16 +115,22 @@ static File doUnpackNar(final File nar, final File baseWorkingDirectory, Runnabl
* if the NAR could not be unpacked.
*/
private static void unpack(final File nar, final File workingDirectory) throws IOException {
- try (JarFile jarFile = new JarFile(nar)) {
- Enumeration jarEntries = jarFile.entries();
- while (jarEntries.hasMoreElements()) {
- JarEntry jarEntry = jarEntries.nextElement();
- String name = jarEntry.getName();
- File f = new File(workingDirectory, name);
- if (jarEntry.isDirectory()) {
+ Path workingDirectoryPath = workingDirectory.toPath().normalize();
+ try (ZipFile zipFile = new ZipFile(nar)) {
+ Enumeration extends ZipEntry> zipEntries = zipFile.entries();
+ while (zipEntries.hasMoreElements()) {
+ ZipEntry zipEntry = zipEntries.nextElement();
+ String name = zipEntry.getName();
+ Path targetFilePath = workingDirectoryPath.resolve(name).normalize();
+ if (!targetFilePath.startsWith(workingDirectoryPath)) {
+ log.error("Invalid zip file with entry '{}'", name);
+ throw new IOException("Invalid zip file. Aborting unpacking.");
+ }
+ File f = targetFilePath.toFile();
+ if (zipEntry.isDirectory()) {
FileUtils.ensureDirectoryExistAndCanReadAndWrite(f);
} else {
- makeFile(jarFile.getInputStream(jarEntry), f);
+ makeFile(zipFile.getInputStream(zipEntry), f);
}
}
}
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index 2b9b25ed74349..2b97627875948 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -31,8 +31,9 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
import java.nio.file.Files;
-import java.nio.file.Paths;
+import java.nio.file.Path;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -47,9 +48,12 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Builder;
+import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.common.functions.Utils;
+import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.nar.FileUtils;
@@ -59,6 +63,7 @@
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.runtime.RuntimeUtils;
@@ -71,8 +76,11 @@
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
+import org.apache.pulsar.functions.utils.FunctionRuntimeCommon;
+import org.apache.pulsar.functions.utils.LoadedFunctionPackage;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
+import org.apache.pulsar.functions.utils.ValidatableFunctionPackage;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
import org.apache.pulsar.functions.utils.functions.FunctionArchive;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
@@ -90,8 +98,7 @@ public class LocalRunner implements AutoCloseable {
private final String functionsDir;
private final Thread shutdownHook;
private final int instanceLivenessCheck;
- private ClassLoader userCodeClassLoader;
- private boolean userCodeClassLoaderCreated;
+ private UserCodeClassLoader userCodeClassLoader;
private RuntimeFactory runtimeFactory;
private HTTPServer metricsServer;
@@ -100,6 +107,12 @@ public enum RuntimeEnv {
PROCESS
}
+ @Value
+ private static class UserCodeClassLoader {
+ ClassLoader classLoader;
+ boolean classLoaderCreated;
+ }
+
public static class FunctionConfigConverter implements IStringConverter {
@Override
public FunctionConfig convert(String value) {
@@ -257,11 +270,13 @@ public LocalRunner(FunctionConfig functionConfig, SourceConfig sourceConfig, Sin
}
private static String getPulsarDirectory(String directory) {
- String pulsarHome = System.getenv("PULSAR_HOME");
- if (pulsarHome == null) {
- pulsarHome = Paths.get("").toAbsolutePath().toString();
+ final Path directoryPath;
+ if (System.getenv("PULSAR_HOME") != null) {
+ directoryPath = Path.of(System.getenv("PULSAR_HOME"), directory);
+ } else {
+ directoryPath = Path.of(directory);
}
- return Paths.get(pulsarHome, directory).toString();
+ return directoryPath.toAbsolutePath().toString();
}
private static File createNarExtractionTempDirectory() {
@@ -308,16 +323,19 @@ public synchronized void stop() {
runtimeFactory = null;
}
- if (userCodeClassLoaderCreated) {
- if (userCodeClassLoader instanceof Closeable) {
- try {
- ((Closeable) userCodeClassLoader).close();
- } catch (IOException e) {
- log.warn("Error closing classloader", e);
- }
+ closeClassLoaderIfneeded(userCodeClassLoader);
+ userCodeClassLoader = null;
+ }
+ }
+
+ private static void closeClassLoaderIfneeded(UserCodeClassLoader userCodeClassLoader) {
+ if (userCodeClassLoader != null && userCodeClassLoader.isClassLoaderCreated()) {
+ if (userCodeClassLoader.getClassLoader() instanceof Closeable) {
+ try {
+ ((Closeable) userCodeClassLoader.getClassLoader()).close();
+ } catch (IOException e) {
+ log.warn("Error closing classloader", e);
}
- userCodeClassLoaderCreated = false;
- userCodeClassLoader = null;
}
}
}
@@ -337,47 +355,14 @@ public void start(boolean blocking) throws Exception {
parallelism = functionConfig.getParallelism();
if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
userCodeFile = functionConfig.getJar();
- ClassLoader builtInFunctionClassLoader = userCodeFile != null
- ? isBuiltInFunction(userCodeFile)
- : null;
- if (builtInFunctionClassLoader != null) {
- userCodeClassLoader = builtInFunctionClassLoader;
- functionDetails = FunctionConfigUtils.convert(
- functionConfig,
- FunctionConfigUtils.validateJavaFunction(functionConfig, builtInFunctionClassLoader));
- } else if (userCodeFile != null && Utils.isFunctionPackageUrlSupported(userCodeFile)) {
- File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
- ClassLoader functionClassLoader = FunctionCommon.getClassLoaderFromPackage(
- Function.FunctionDetails.ComponentType.FUNCTION,
- functionConfig.getClassName(), file, narExtractionDirectory);
- functionDetails = FunctionConfigUtils.convert(
- functionConfig,
- FunctionConfigUtils.validateJavaFunction(functionConfig, functionClassLoader));
- userCodeClassLoader = functionClassLoader;
- userCodeClassLoaderCreated = true;
- } else if (userCodeFile != null) {
- File file = new File(userCodeFile);
- if (!file.exists()) {
- throw new RuntimeException("User jar does not exist");
- }
- ClassLoader functionClassLoader = FunctionCommon.getClassLoaderFromPackage(
- Function.FunctionDetails.ComponentType.FUNCTION,
- functionConfig.getClassName(), file, narExtractionDirectory);
- functionDetails = FunctionConfigUtils.convert(
- functionConfig,
- FunctionConfigUtils.validateJavaFunction(functionConfig, functionClassLoader));
- userCodeClassLoader = functionClassLoader;
- userCodeClassLoaderCreated = true;
- } else {
- if (!(runtimeEnv == null || runtimeEnv == RuntimeEnv.THREAD)) {
- throw new IllegalStateException("The jar property must be specified in FunctionConfig.");
- }
- functionDetails = FunctionConfigUtils.convert(
- functionConfig,
- FunctionConfigUtils.validateJavaFunction(
- functionConfig,
- Thread.currentThread().getContextClassLoader()));
- }
+ userCodeClassLoader = extractClassLoader(
+ userCodeFile, ComponentType.FUNCTION, functionConfig.getClassName());
+ ValidatableFunctionPackage validatableFunctionPackage =
+ new LoadedFunctionPackage(getCurrentOrUserCodeClassLoader(),
+ FunctionDefinition.class);
+ functionDetails = FunctionConfigUtils.convert(
+ functionConfig,
+ FunctionConfigUtils.validateJavaFunction(functionConfig, validatableFunctionPackage));
} else if (functionConfig.getRuntime() == FunctionConfig.Runtime.GO) {
userCodeFile = functionConfig.getGo();
} else if (functionConfig.getRuntime() == FunctionConfig.Runtime.PYTHON) {
@@ -387,91 +372,34 @@ public void start(boolean blocking) throws Exception {
}
if (functionDetails == null) {
- functionDetails = FunctionConfigUtils.convert(functionConfig,
- userCodeClassLoader != null ? userCodeClassLoader :
- Thread.currentThread().getContextClassLoader());
+ ValidatableFunctionPackage validatableFunctionPackage =
+ new LoadedFunctionPackage(getCurrentOrUserCodeClassLoader(),
+ FunctionDefinition.class);
+ functionDetails = FunctionConfigUtils.convert(functionConfig, validatableFunctionPackage);
}
} else if (sourceConfig != null) {
inferMissingArguments(sourceConfig);
userCodeFile = sourceConfig.getArchive();
parallelism = sourceConfig.getParallelism();
-
- ClassLoader builtInSourceClassLoader = userCodeFile != null ? isBuiltInSource(userCodeFile) : null;
- if (builtInSourceClassLoader != null) {
- functionDetails = SourceConfigUtils.convert(
- sourceConfig, SourceConfigUtils.validateAndExtractDetails(
- sourceConfig, builtInSourceClassLoader, true));
- userCodeClassLoader = builtInSourceClassLoader;
- } else if (userCodeFile != null && Utils.isFunctionPackageUrlSupported(userCodeFile)) {
- File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
- ClassLoader sourceClassLoader = FunctionCommon.getClassLoaderFromPackage(
- Function.FunctionDetails.ComponentType.SOURCE,
- sourceConfig.getClassName(), file, narExtractionDirectory);
- functionDetails = SourceConfigUtils.convert(
- sourceConfig,
- SourceConfigUtils.validateAndExtractDetails(sourceConfig, sourceClassLoader, true));
- userCodeClassLoader = sourceClassLoader;
- userCodeClassLoaderCreated = true;
- } else if (userCodeFile != null) {
- File file = new File(userCodeFile);
- if (!file.exists()) {
- throw new RuntimeException("Source archive (" + userCodeFile + ") does not exist");
- }
- ClassLoader sourceClassLoader = FunctionCommon.getClassLoaderFromPackage(
- Function.FunctionDetails.ComponentType.SOURCE,
- sourceConfig.getClassName(), file, narExtractionDirectory);
- functionDetails = SourceConfigUtils.convert(sourceConfig,
- SourceConfigUtils.validateAndExtractDetails(sourceConfig, sourceClassLoader, true));
- userCodeClassLoader = sourceClassLoader;
- userCodeClassLoaderCreated = true;
- } else {
- if (!(runtimeEnv == null || runtimeEnv == RuntimeEnv.THREAD)) {
- throw new IllegalStateException("The archive property must be specified in SourceConfig.");
- }
- functionDetails = SourceConfigUtils.convert(
- sourceConfig, SourceConfigUtils.validateAndExtractDetails(
- sourceConfig, Thread.currentThread().getContextClassLoader(), true));
- }
+ userCodeClassLoader = extractClassLoader(
+ userCodeFile, ComponentType.SOURCE, sourceConfig.getClassName());
+ ValidatableFunctionPackage validatableFunctionPackage =
+ new LoadedFunctionPackage(getCurrentOrUserCodeClassLoader(), ConnectorDefinition.class);
+ functionDetails = SourceConfigUtils.convert(sourceConfig,
+ SourceConfigUtils.validateAndExtractDetails(sourceConfig, validatableFunctionPackage, true));
} else if (sinkConfig != null) {
inferMissingArguments(sinkConfig);
userCodeFile = sinkConfig.getArchive();
parallelism = sinkConfig.getParallelism();
-
- ClassLoader builtInSinkClassLoader = userCodeFile != null ? isBuiltInSink(userCodeFile) : null;
- if (builtInSinkClassLoader != null) {
- functionDetails = SinkConfigUtils.convert(
- sinkConfig, SinkConfigUtils.validateAndExtractDetails(
- sinkConfig, builtInSinkClassLoader, true));
- userCodeClassLoader = builtInSinkClassLoader;
- } else if (Utils.isFunctionPackageUrlSupported(userCodeFile)) {
- File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
- ClassLoader sinkClassLoader = FunctionCommon.getClassLoaderFromPackage(
- Function.FunctionDetails.ComponentType.SINK,
- sinkConfig.getClassName(), file, narExtractionDirectory);
- functionDetails = SinkConfigUtils.convert(
- sinkConfig, SinkConfigUtils.validateAndExtractDetails(sinkConfig, sinkClassLoader, true));
- userCodeClassLoader = sinkClassLoader;
- userCodeClassLoaderCreated = true;
- } else if (userCodeFile != null) {
- File file = new File(userCodeFile);
- if (!file.exists()) {
- throw new RuntimeException("Sink archive does not exist");
- }
- ClassLoader sinkClassLoader = FunctionCommon.getClassLoaderFromPackage(
- Function.FunctionDetails.ComponentType.SINK,
- sinkConfig.getClassName(), file, narExtractionDirectory);
- functionDetails = SinkConfigUtils.convert(
- sinkConfig, SinkConfigUtils.validateAndExtractDetails(sinkConfig, sinkClassLoader, true));
- userCodeClassLoader = sinkClassLoader;
- userCodeClassLoaderCreated = true;
- } else {
- if (!(runtimeEnv == null || runtimeEnv == RuntimeEnv.THREAD)) {
- throw new IllegalStateException("The archive property must be specified in SourceConfig.");
- }
- functionDetails = SinkConfigUtils.convert(
- sinkConfig, SinkConfigUtils.validateAndExtractDetails(
- sinkConfig, Thread.currentThread().getContextClassLoader(), true));
- }
+ userCodeClassLoader = extractClassLoader(
+ userCodeFile, ComponentType.SINK, sinkConfig.getClassName());
+ ValidatableFunctionPackage validatableFunctionPackage =
+ new LoadedFunctionPackage(getCurrentOrUserCodeClassLoader(), ConnectorDefinition.class);
+
+ functionDetails = SinkConfigUtils.convert(
+ sinkConfig,
+ SinkConfigUtils.validateAndExtractDetails(sinkConfig, validatableFunctionPackage,
+ true));
} else {
throw new IllegalArgumentException("Must specify Function, Source or Sink config");
}
@@ -526,6 +454,67 @@ public void start(boolean blocking) throws Exception {
}
}
+ private ClassLoader getCurrentOrUserCodeClassLoader() {
+ return userCodeClassLoader == null || userCodeClassLoader.getClassLoader() == null
+ ? Thread.currentThread().getContextClassLoader()
+ : userCodeClassLoader.getClassLoader();
+ }
+
+ private UserCodeClassLoader extractClassLoader(String userCodeFile, ComponentType componentType, String className)
+ throws IOException, URISyntaxException {
+ ClassLoader classLoader = userCodeFile != null ? isBuiltIn(userCodeFile, componentType) : null;
+ boolean classLoaderCreated = false;
+ if (classLoader == null) {
+ if (userCodeFile != null && Utils.isFunctionPackageUrlSupported(userCodeFile)) {
+ File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
+ classLoader = FunctionRuntimeCommon.getClassLoaderFromPackage(
+ componentType, className, file, narExtractionDirectory);
+ classLoaderCreated = true;
+ } else if (userCodeFile != null) {
+ File file = new File(userCodeFile);
+ if (!file.exists()) {
+ String errorMsg;
+ switch (componentType) {
+ case FUNCTION:
+ errorMsg = "User jar";
+ break;
+ case SOURCE:
+ errorMsg = "Source archive";
+ break;
+ case SINK:
+ errorMsg = "Sink archive";
+ break;
+ default:
+ throw new IllegalStateException("Unexpected value: " + componentType);
+ }
+ throw new RuntimeException(errorMsg + " (" + userCodeFile + ") does not exist");
+ }
+ classLoader = FunctionRuntimeCommon.getClassLoaderFromPackage(
+ componentType, className, file, narExtractionDirectory);
+ classLoaderCreated = true;
+ } else {
+ if (!(runtimeEnv == null || runtimeEnv == RuntimeEnv.THREAD)) {
+ String errorMsg;
+ switch (componentType) {
+ case FUNCTION:
+ errorMsg = "The jar property must be specified in FunctionConfig.";
+ break;
+ case SOURCE:
+ errorMsg = "The archive property must be specified in SourceConfig.";
+ break;
+ case SINK:
+ errorMsg = "The archive property must be specified in SinkConfig.";
+ break;
+ default:
+ throw new IllegalStateException("Unexpected ComponentType: " + componentType);
+ }
+ throw new IllegalStateException(errorMsg);
+ }
+ }
+ }
+ return new UserCodeClassLoader(classLoader, classLoaderCreated);
+ }
+
private void startProcessMode(org.apache.pulsar.functions.proto.Function.FunctionDetails functionDetails,
int parallelism, int instanceIdOffset, String serviceUrl,
String stateStorageServiceUrl, AuthenticationConfig authConfig,
@@ -641,8 +630,8 @@ private void startThreadedMode(org.apache.pulsar.functions.proto.Function.Functi
ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
try {
- if (userCodeClassLoader != null) {
- Thread.currentThread().setContextClassLoader(userCodeClassLoader);
+ if (userCodeClassLoader != null && userCodeClassLoader.getClassLoader() != null) {
+ Thread.currentThread().setContextClassLoader(userCodeClassLoader.getClassLoader());
}
runtimeFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup",
serviceUrl,
@@ -692,6 +681,20 @@ private void startThreadedMode(org.apache.pulsar.functions.proto.Function.Functi
}
}
+ private ClassLoader isBuiltIn(String component, ComponentType componentType)
+ throws IOException {
+ switch (componentType) {
+ case FUNCTION:
+ return isBuiltInFunction(component);
+ case SOURCE:
+ return isBuiltInSource(component);
+ case SINK:
+ return isBuiltInSink(component);
+ default:
+ throw new IllegalStateException("Unexpected ComponentType: " + componentType);
+ }
+ }
+
private ClassLoader isBuiltInFunction(String functionType) throws IOException {
// Validate the connector type from the locally available connectors
TreeMap functions = getFunctions();
@@ -700,7 +703,7 @@ private ClassLoader isBuiltInFunction(String functionType) throws IOException {
FunctionArchive function = functions.get(functionName);
if (function != null && function.getFunctionDefinition().getFunctionClass() != null) {
// Function type is a valid built-in type.
- return function.getClassLoader();
+ return function.getFunctionPackage().getClassLoader();
} else {
return null;
}
@@ -714,7 +717,7 @@ private ClassLoader isBuiltInSource(String sourceType) throws IOException {
Connector connector = connectors.get(source);
if (connector != null && connector.getConnectorDefinition().getSourceClass() != null) {
// Source type is a valid built-in connector type.
- return connector.getClassLoader();
+ return connector.getConnectorFunctionPackage().getClassLoader();
} else {
return null;
}
@@ -728,18 +731,18 @@ private ClassLoader isBuiltInSink(String sinkType) throws IOException {
Connector connector = connectors.get(sink);
if (connector != null && connector.getConnectorDefinition().getSinkClass() != null) {
// Sink type is a valid built-in connector type
- return connector.getClassLoader();
+ return connector.getConnectorFunctionPackage().getClassLoader();
} else {
return null;
}
}
private TreeMap getFunctions() throws IOException {
- return FunctionUtils.searchForFunctions(functionsDir);
+ return FunctionUtils.searchForFunctions(functionsDir, narExtractionDirectory, true);
}
private TreeMap getConnectors() throws IOException {
- return ConnectorUtils.searchForConnectors(connectorsDir, narExtractionDirectory);
+ return ConnectorUtils.searchForConnectors(connectorsDir, narExtractionDirectory, true);
}
private SecretsProviderConfigurator getSecretsProviderConfigurator() {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
index 5743341483f14..f3ec46dcf2809 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
@@ -38,6 +38,9 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
+import net.bytebuddy.description.type.TypeDefinition;
+import net.bytebuddy.dynamic.ClassFileLocator;
+import net.bytebuddy.pool.TypePool;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.functions.WindowConfig;
import org.apache.pulsar.common.nar.NarClassLoader;
@@ -306,7 +309,8 @@ public void close() {
}
private void inferringMissingTypeClassName(Function.FunctionDetails.Builder functionDetailsBuilder,
- ClassLoader classLoader) throws ClassNotFoundException {
+ ClassLoader classLoader) {
+ TypePool typePool = TypePool.Default.of(ClassFileLocator.ForClassLoader.of(classLoader));
switch (functionDetailsBuilder.getComponentType()) {
case FUNCTION:
if ((functionDetailsBuilder.hasSource()
@@ -325,14 +329,13 @@ private void inferringMissingTypeClassName(Function.FunctionDetails.Builder func
WindowConfig.class);
className = windowConfig.getActualWindowFunctionClassName();
}
-
- Class>[] typeArgs = FunctionCommon.getFunctionTypes(classLoader.loadClass(className),
+ TypeDefinition[] typeArgs = FunctionCommon.getFunctionTypes(typePool.describe(className).resolve(),
isWindowConfigPresent);
if (functionDetailsBuilder.hasSource()
&& functionDetailsBuilder.getSource().getTypeClassName().isEmpty()
&& typeArgs[0] != null) {
Function.SourceSpec.Builder sourceBuilder = functionDetailsBuilder.getSource().toBuilder();
- sourceBuilder.setTypeClassName(typeArgs[0].getName());
+ sourceBuilder.setTypeClassName(typeArgs[0].asErasure().getTypeName());
functionDetailsBuilder.setSource(sourceBuilder.build());
}
@@ -340,7 +343,7 @@ private void inferringMissingTypeClassName(Function.FunctionDetails.Builder func
&& functionDetailsBuilder.getSink().getTypeClassName().isEmpty()
&& typeArgs[1] != null) {
Function.SinkSpec.Builder sinkBuilder = functionDetailsBuilder.getSink().toBuilder();
- sinkBuilder.setTypeClassName(typeArgs[1].getName());
+ sinkBuilder.setTypeClassName(typeArgs[1].asErasure().getTypeName());
functionDetailsBuilder.setSink(sinkBuilder.build());
}
}
@@ -349,7 +352,8 @@ private void inferringMissingTypeClassName(Function.FunctionDetails.Builder func
if ((functionDetailsBuilder.hasSink()
&& functionDetailsBuilder.getSink().getTypeClassName().isEmpty())) {
String typeArg =
- getSinkType(functionDetailsBuilder.getSink().getClassName(), classLoader).getName();
+ getSinkType(functionDetailsBuilder.getSink().getClassName(), typePool).asErasure()
+ .getTypeName();
Function.SinkSpec.Builder sinkBuilder =
Function.SinkSpec.newBuilder(functionDetailsBuilder.getSink());
@@ -368,7 +372,8 @@ private void inferringMissingTypeClassName(Function.FunctionDetails.Builder func
if ((functionDetailsBuilder.hasSource()
&& functionDetailsBuilder.getSource().getTypeClassName().isEmpty())) {
String typeArg =
- getSourceType(functionDetailsBuilder.getSource().getClassName(), classLoader).getName();
+ getSourceType(functionDetailsBuilder.getSource().getClassName(), typePool).asErasure()
+ .getTypeName();
Function.SourceSpec.Builder sourceBuilder =
Function.SourceSpec.newBuilder(functionDetailsBuilder.getSource());
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
index 26651010fefc4..47bd3ba0472b8 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
@@ -121,17 +121,17 @@ private static ClassLoader getFunctionClassLoader(InstanceConfig instanceConfig,
if (componentType == Function.FunctionDetails.ComponentType.FUNCTION && functionsManager.isPresent()) {
return functionsManager.get()
.getFunction(instanceConfig.getFunctionDetails().getBuiltin())
- .getClassLoader();
+ .getFunctionPackage().getClassLoader();
}
if (componentType == Function.FunctionDetails.ComponentType.SOURCE && connectorsManager.isPresent()) {
return connectorsManager.get()
.getConnector(instanceConfig.getFunctionDetails().getSource().getBuiltin())
- .getClassLoader();
+ .getConnectorFunctionPackage().getClassLoader();
}
if (componentType == Function.FunctionDetails.ComponentType.SINK && connectorsManager.isPresent()) {
return connectorsManager.get()
.getConnector(instanceConfig.getFunctionDetails().getSink().getBuiltin())
- .getClassLoader();
+ .getConnectorFunctionPackage().getClassLoader();
}
}
return loadJars(jarFile, instanceConfig, instanceConfig.getFunctionDetails().getName(),
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
index 643afa4f26c3c..c78bb73428548 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.functions.worker;
+import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
@@ -27,18 +28,35 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.io.ConfigFieldDefinition;
import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.utils.io.Connector;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
@Slf4j
-public class ConnectorsManager {
+public class ConnectorsManager implements AutoCloseable {
@Getter
private volatile TreeMap connectors;
+ @VisibleForTesting
+ public ConnectorsManager() {
+ this.connectors = new TreeMap<>();
+ }
+
public ConnectorsManager(WorkerConfig workerConfig) throws IOException {
- this.connectors = ConnectorUtils
- .searchForConnectors(workerConfig.getConnectorsDirectory(), workerConfig.getNarExtractionDirectory());
+ this.connectors = createConnectors(workerConfig);
+ }
+
+ private static TreeMap createConnectors(WorkerConfig workerConfig) throws IOException {
+ boolean enableClassloading = workerConfig.getEnableClassloadingOfBuiltinFiles()
+ || ThreadRuntimeFactory.class.getName().equals(workerConfig.getFunctionRuntimeFactoryClassName());
+ return ConnectorUtils.searchForConnectors(workerConfig.getConnectorsDirectory(),
+ workerConfig.getNarExtractionDirectory(), enableClassloading);
+ }
+
+ @VisibleForTesting
+ public void addConnector(String connectorType, Connector connector) {
+ connectors.put(connectorType, connector);
}
public Connector getConnector(String connectorType) {
@@ -71,7 +89,25 @@ public Path getSinkArchive(String sinkType) {
}
public void reloadConnectors(WorkerConfig workerConfig) throws IOException {
- connectors = ConnectorUtils
- .searchForConnectors(workerConfig.getConnectorsDirectory(), workerConfig.getNarExtractionDirectory());
+ TreeMap oldConnectors = connectors;
+ this.connectors = createConnectors(workerConfig);
+ closeConnectors(oldConnectors);
}
+
+ @Override
+ public void close() {
+ closeConnectors(connectors);
+ }
+
+ private void closeConnectors(TreeMap connectorMap) {
+ connectorMap.values().forEach(connector -> {
+ try {
+ connector.close();
+ } catch (Exception e) {
+ log.warn("Failed to close connector", e);
+ }
+ });
+ connectorMap.clear();
+ }
+
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java
index 0f1c0fcc8356e..753d41f0fa772 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java
@@ -18,20 +18,30 @@
*/
package org.apache.pulsar.functions.worker;
+import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.nio.file.Path;
import java.util.TreeMap;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.utils.functions.FunctionArchive;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
@Slf4j
-public class FunctionsManager {
-
+public class FunctionsManager implements AutoCloseable {
private TreeMap functions;
+ @VisibleForTesting
+ public FunctionsManager() {
+ this.functions = new TreeMap<>();
+ }
+
public FunctionsManager(WorkerConfig workerConfig) throws IOException {
- this.functions = FunctionUtils.searchForFunctions(workerConfig.getFunctionsDirectory());
+ this.functions = createFunctions(workerConfig);
+ }
+
+ public void addFunction(String functionType, FunctionArchive functionArchive) {
+ functions.put(functionType, functionArchive);
}
public FunctionArchive getFunction(String functionType) {
@@ -43,6 +53,32 @@ public Path getFunctionArchive(String functionType) {
}
public void reloadFunctions(WorkerConfig workerConfig) throws IOException {
- this.functions = FunctionUtils.searchForFunctions(workerConfig.getFunctionsDirectory());
+ TreeMap oldFunctions = functions;
+ this.functions = createFunctions(workerConfig);
+ closeFunctions(oldFunctions);
+ }
+
+ private static TreeMap createFunctions(WorkerConfig workerConfig) throws IOException {
+ boolean enableClassloading = workerConfig.getEnableClassloadingOfBuiltinFiles()
+ || ThreadRuntimeFactory.class.getName().equals(workerConfig.getFunctionRuntimeFactoryClassName());
+ return FunctionUtils.searchForFunctions(workerConfig.getFunctionsDirectory(),
+ workerConfig.getNarExtractionDirectory(),
+ enableClassloading);
+ }
+
+ @Override
+ public void close() {
+ closeFunctions(functions);
+ }
+
+ private void closeFunctions(TreeMap functionMap) {
+ functionMap.values().forEach(functionArchive -> {
+ try {
+ functionArchive.close();
+ } catch (Exception e) {
+ log.warn("Failed to close function archive", e);
+ }
+ });
+ functionMap.clear();
}
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 76078a46e7979..5432d5c46106a 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -225,6 +225,22 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
)
private int zooKeeperCacheExpirySeconds = -1;
+ @FieldContext(
+ category = CATEGORY_WORKER,
+ doc = "Specifies if the function worker should use classloading for validating submissions for built-in "
+ + "connectors and functions. This is required for validateConnectorConfig to take effect. "
+ + "Default is false."
+ )
+ private Boolean enableClassloadingOfBuiltinFiles = false;
+
+ @FieldContext(
+ category = CATEGORY_WORKER,
+ doc = "Specifies if the function worker should use classloading for validating submissions for external "
+ + "connectors and functions. This is required for validateConnectorConfig to take effect. "
+ + "Default is false."
+ )
+ private Boolean enableClassloadingOfExternalFiles = false;
+
@FieldContext(
category = CATEGORY_CONNECTORS,
doc = "The path to the location to locate builtin connectors"
@@ -237,7 +253,10 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR;
@FieldContext(
category = CATEGORY_CONNECTORS,
- doc = "Should we validate connector config during submission"
+ doc = "Enables extended validation for connector config with fine-grain annotation based validation "
+ + "during submission. Classloading with either enableClassloadingOfExternalFiles or "
+ + "enableClassloadingOfBuiltinFiles must be enabled on the worker for this to take effect. "
+ + "Default is false."
)
private Boolean validateConnectorConfig = false;
@FieldContext(
diff --git a/pulsar-functions/utils/pom.xml b/pulsar-functions/utils/pom.xml
index 27274258a183c..befc887f67c60 100644
--- a/pulsar-functions/utils/pom.xml
+++ b/pulsar-functions/utils/pom.xml
@@ -87,6 +87,17 @@
typetools
+
+ net.bytebuddy
+ byte-buddy
+
+
+
+ org.zeroturnaround
+ zt-zip
+ 1.17
+
+
${project.groupId}
pulsar-client-original
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
index 49b71afcad795..0e17907fb3b32 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
@@ -22,27 +22,25 @@
import com.google.protobuf.AbstractMessage.Builder;
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.util.JsonFormat;
-import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
-import java.io.ObjectOutputStream;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.net.MalformedURLException;
import java.net.ServerSocket;
import java.net.URISyntaxException;
import java.net.URL;
+import java.net.URLConnection;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.Collection;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import net.jodah.typetools.TypeResolver;
+import net.bytebuddy.description.type.TypeDefinition;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.description.type.TypeList;
+import net.bytebuddy.pool.TypePool;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
@@ -50,15 +48,11 @@
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Utils;
-import org.apache.pulsar.common.nar.NarClassLoader;
-import org.apache.pulsar.common.nar.NarClassLoaderBuilder;
-import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.WindowFunction;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType;
import org.apache.pulsar.functions.proto.Function.FunctionDetails.Runtime;
-import org.apache.pulsar.functions.utils.functions.FunctionUtils;
-import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.io.core.BatchSource;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.Source;
@@ -92,82 +86,79 @@ public static int findAvailablePort() {
}
}
- public static Class>[] getFunctionTypes(FunctionConfig functionConfig, ClassLoader classLoader)
+ public static TypeDefinition[] getFunctionTypes(FunctionConfig functionConfig, TypePool typePool)
throws ClassNotFoundException {
- return getFunctionTypes(functionConfig, classLoader.loadClass(functionConfig.getClassName()));
+ return getFunctionTypes(functionConfig, typePool.describe(functionConfig.getClassName()).resolve());
}
- public static Class>[] getFunctionTypes(FunctionConfig functionConfig, Class functionClass)
- throws ClassNotFoundException {
+ public static TypeDefinition[] getFunctionTypes(FunctionConfig functionConfig, TypeDefinition functionClass) {
boolean isWindowConfigPresent = functionConfig.getWindowConfig() != null;
return getFunctionTypes(functionClass, isWindowConfigPresent);
}
- public static Class>[] getFunctionTypes(Class userClass, boolean isWindowConfigPresent) {
- Class>[] typeArgs;
+ public static TypeDefinition[] getFunctionTypes(TypeDefinition userClass, boolean isWindowConfigPresent) {
+ Class> classParent = getFunctionClassParent(userClass, isWindowConfigPresent);
+ TypeList.Generic typeArgsList = resolveInterfaceTypeArguments(userClass, classParent);
+ TypeDescription.Generic[] typeArgs = new TypeDescription.Generic[2];
+ typeArgs[0] = typeArgsList.get(0);
+ typeArgs[1] = typeArgsList.get(1);
// if window function
if (isWindowConfigPresent) {
- if (WindowFunction.class.isAssignableFrom(userClass)) {
- typeArgs = getFunctionTypesUnwrappingRecordIfNeeded(WindowFunction.class, userClass);
- } else {
- typeArgs = getFunctionTypesUnwrappingRecordIfNeeded(java.util.function.Function.class, userClass);
- if (!typeArgs[0].equals(Collection.class)) {
+ if (classParent.equals(java.util.function.Function.class)) {
+ if (!typeArgs[0].asErasure().isAssignableTo(Collection.class)) {
throw new IllegalArgumentException("Window function must take a collection as input");
}
- Type type = TypeResolver.resolveGenericType(java.util.function.Function.class, userClass);
- Type collectionType = ((ParameterizedType) type).getActualTypeArguments()[0];
- Type actualInputType = ((ParameterizedType) collectionType).getActualTypeArguments()[0];
- typeArgs[0] = (Class>) actualInputType;
- }
- } else {
- if (Function.class.isAssignableFrom(userClass)) {
- typeArgs = getFunctionTypesUnwrappingRecordIfNeeded(Function.class, userClass);
- } else {
- typeArgs = getFunctionTypesUnwrappingRecordIfNeeded(java.util.function.Function.class, userClass);
+ typeArgs[0] = typeArgs[0].getTypeArguments().get(0);
}
}
-
+ if (typeArgs[1].asErasure().isAssignableTo(Record.class)) {
+ typeArgs[1] = typeArgs[1].getTypeArguments().get(0);
+ }
+ if (typeArgs[1].asErasure().isAssignableTo(CompletableFuture.class)) {
+ typeArgs[1] = typeArgs[1].getTypeArguments().get(0);
+ }
return typeArgs;
}
- private static Class>[] getFunctionTypesUnwrappingRecordIfNeeded(Class> type, Class> subType) {
- Class>[] typeArgs = TypeResolver.resolveRawArguments(type, subType);
- if (typeArgs[1].equals(Record.class)) {
- Type genericType = TypeResolver.resolveGenericType(type, subType);
- Type recordType = ((ParameterizedType) genericType).getActualTypeArguments()[1];
- Type actualInputType = ((ParameterizedType) recordType).getActualTypeArguments()[0];
- typeArgs[1] = (Class>) actualInputType;
+ private static TypeList.Generic resolveInterfaceTypeArguments(TypeDefinition userClass, Class> interfaceClass) {
+ if (!interfaceClass.isInterface()) {
+ throw new IllegalArgumentException("interfaceClass must be an interface");
+ }
+ for (TypeDescription.Generic interfaze : userClass.getInterfaces()) {
+ if (interfaze.asErasure().isAssignableTo(interfaceClass)) {
+ return interfaze.getTypeArguments();
+ }
+ }
+ if (userClass.getSuperClass() != null) {
+ return resolveInterfaceTypeArguments(userClass.getSuperClass(), interfaceClass);
}
+ return null;
+ }
+
+ public static TypeDescription.Generic[] getRawFunctionTypes(TypeDefinition userClass,
+ boolean isWindowConfigPresent) {
+ Class> classParent = getFunctionClassParent(userClass, isWindowConfigPresent);
+ TypeList.Generic typeArgsList = resolveInterfaceTypeArguments(userClass, classParent);
+ TypeDescription.Generic[] typeArgs = new TypeDescription.Generic[2];
+ typeArgs[0] = typeArgsList.get(0);
+ typeArgs[1] = typeArgsList.get(1);
return typeArgs;
}
- public static Object createInstance(String userClassName, ClassLoader classLoader) {
- Class> theCls;
- try {
- theCls = Class.forName(userClassName);
- } catch (ClassNotFoundException | NoClassDefFoundError cnfe) {
- try {
- theCls = Class.forName(userClassName, true, classLoader);
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
- throw new RuntimeException("User class must be in class path", cnfe);
+ public static Class> getFunctionClassParent(TypeDefinition userClass, boolean isWindowConfigPresent) {
+ if (isWindowConfigPresent) {
+ if (userClass.asErasure().isAssignableTo(WindowFunction.class)) {
+ return WindowFunction.class;
+ } else {
+ return java.util.function.Function.class;
+ }
+ } else {
+ if (userClass.asErasure().isAssignableTo(Function.class)) {
+ return Function.class;
+ } else {
+ return java.util.function.Function.class;
}
}
- Object result;
- try {
- Constructor> meth = theCls.getDeclaredConstructor();
- meth.setAccessible(true);
- result = meth.newInstance();
- } catch (InstantiationException ie) {
- throw new RuntimeException("User class must be concrete", ie);
- } catch (NoSuchMethodException e) {
- throw new RuntimeException("User class doesn't have such method", e);
- } catch (IllegalAccessException e) {
- throw new RuntimeException("User class must have a no-arg constructor", e);
- } catch (InvocationTargetException e) {
- throw new RuntimeException("User class constructor throws exception", e);
- }
- return result;
-
}
public static Runtime convertRuntime(FunctionConfig.Runtime runtime) {
@@ -210,50 +201,46 @@ public static FunctionConfig.ProcessingGuarantees convertProcessingGuarantee(
throw new RuntimeException("Unrecognized processing guarantee: " + processingGuarantees.name());
}
- public static Class> getSourceType(String className, ClassLoader classLoader) throws ClassNotFoundException {
- return getSourceType(classLoader.loadClass(className));
+ public static TypeDefinition getSourceType(String className, TypePool typePool) {
+ return getSourceType(typePool.describe(className).resolve());
}
- public static Class> getSourceType(Class sourceClass) {
-
- if (Source.class.isAssignableFrom(sourceClass)) {
- return TypeResolver.resolveRawArgument(Source.class, sourceClass);
- } else if (BatchSource.class.isAssignableFrom(sourceClass)) {
- return TypeResolver.resolveRawArgument(BatchSource.class, sourceClass);
+ public static TypeDefinition getSourceType(TypeDefinition sourceClass) {
+ if (sourceClass.asErasure().isAssignableTo(Source.class)) {
+ return resolveInterfaceTypeArguments(sourceClass, Source.class).get(0);
+ } else if (sourceClass.asErasure().isAssignableTo(BatchSource.class)) {
+ return resolveInterfaceTypeArguments(sourceClass, BatchSource.class).get(0);
} else {
throw new IllegalArgumentException(
String.format("Source class %s does not implement the correct interface",
- sourceClass.getName()));
+ sourceClass.getActualName()));
}
}
- public static Class> getSinkType(String className, ClassLoader classLoader) throws ClassNotFoundException {
- return getSinkType(classLoader.loadClass(className));
+ public static TypeDefinition getSinkType(String className, TypePool typePool) {
+ return getSinkType(typePool.describe(className).resolve());
}
- public static Class> getSinkType(Class sinkClass) {
- return TypeResolver.resolveRawArgument(Sink.class, sinkClass);
+ public static TypeDefinition getSinkType(TypeDefinition sinkClass) {
+ if (sinkClass.asErasure().isAssignableTo(Sink.class)) {
+ return resolveInterfaceTypeArguments(sinkClass, Sink.class).get(0);
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Sink class %s does not implement the correct interface",
+ sinkClass.getActualName()));
+ }
}
public static void downloadFromHttpUrl(String destPkgUrl, File targetFile) throws IOException {
- URL website = new URL(destPkgUrl);
- try (InputStream in = website.openStream()) {
+ final URL url = new URL(destPkgUrl);
+ final URLConnection connection = url.openConnection();
+ try (InputStream in = connection.getInputStream()) {
log.info("Downloading function package from {} to {} ...", destPkgUrl, targetFile.getAbsoluteFile());
Files.copy(in, targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
}
log.info("Downloading function package from {} to {} completed!", destPkgUrl, targetFile.getAbsoluteFile());
}
- public static ClassLoader extractClassLoader(String destPkgUrl) throws IOException, URISyntaxException {
- File file = extractFileFromPkgURL(destPkgUrl);
- try {
- return ClassLoaderUtils.loadJar(file);
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException(
- "Corrupt User PackageFile " + file + " with error " + e.getMessage());
- }
- }
-
public static File createPkgTempFile() throws IOException {
return File.createTempFile("functions", ".tmp");
}
@@ -277,21 +264,6 @@ public static File extractFileFromPkgURL(String destPkgUrl) throws IOException,
}
}
- public static NarClassLoader extractNarClassLoader(File packageFile,
- String narExtractionDirectory) {
- if (packageFile != null) {
- try {
- return NarClassLoaderBuilder.builder()
- .narFile(packageFile)
- .extractionDirectory(narExtractionDirectory)
- .build();
- } catch (IOException e) {
- throw new IllegalArgumentException(e.getMessage());
- }
- }
- return null;
- }
-
public static String getFullyQualifiedInstanceId(org.apache.pulsar.functions.proto.Function.Instance instance) {
return getFullyQualifiedInstanceId(
instance.getFunctionMetaData().getFunctionDetails().getTenant(),
@@ -327,17 +299,6 @@ public static final MessageId getMessageId(long sequenceId) {
return new MessageIdImpl(ledgerId, entryId, -1);
}
- public static byte[] toByteArray(Object obj) throws IOException {
- byte[] bytes = null;
- try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(bos)) {
- oos.writeObject(obj);
- oos.flush();
- bytes = bos.toByteArray();
- }
- return bytes;
- }
-
public static String getUniquePackageName(String packageName) {
return String.format("%s-%s", UUID.randomUUID().toString(), packageName);
}
@@ -385,173 +346,38 @@ private static String extractFromFullyQualifiedName(String fqfn, int index) {
throw new RuntimeException("Invalid Fully Qualified Function Name " + fqfn);
}
- public static Class> getTypeArg(String className, Class> funClass, ClassLoader classLoader)
- throws ClassNotFoundException {
- Class> loadedClass = classLoader.loadClass(className);
- if (!funClass.isAssignableFrom(loadedClass)) {
- throw new IllegalArgumentException(
- String.format("class %s is not type of %s", className, funClass.getName()));
- }
- return TypeResolver.resolveRawArgument(funClass, loadedClass);
- }
-
public static double roundDecimal(double value, int places) {
double scale = Math.pow(10, places);
return Math.round(value * scale) / scale;
}
- public static ClassLoader getClassLoaderFromPackage(
- org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType componentType,
- String className,
- File packageFile,
- String narExtractionDirectory) {
- String connectorClassName = className;
- ClassLoader jarClassLoader = null;
- boolean keepJarClassLoader = false;
- ClassLoader narClassLoader = null;
- boolean keepNarClassLoader = false;
-
- Exception jarClassLoaderException = null;
- Exception narClassLoaderException = null;
-
- try {
- try {
- jarClassLoader = ClassLoaderUtils.extractClassLoader(packageFile);
- } catch (Exception e) {
- jarClassLoaderException = e;
- }
- try {
- narClassLoader = FunctionCommon.extractNarClassLoader(packageFile, narExtractionDirectory);
- } catch (Exception e) {
- narClassLoaderException = e;
- }
-
- // if connector class name is not provided, we can only try to load archive as a NAR
- if (isEmpty(connectorClassName)) {
- if (narClassLoader == null) {
- throw new IllegalArgumentException(String.format("%s package does not have the correct format. "
- + "Pulsar cannot determine if the package is a NAR package or JAR package. "
- + "%s classname is not provided and attempts to load it as a NAR package produced "
- + "the following error.",
- capFirstLetter(componentType), capFirstLetter(componentType)),
- narClassLoaderException);
- }
- try {
- if (componentType
- == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.FUNCTION) {
- connectorClassName = FunctionUtils.getFunctionClass((NarClassLoader) narClassLoader);
- } else if (componentType
- == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) {
- connectorClassName = ConnectorUtils.getIOSourceClass((NarClassLoader) narClassLoader);
- } else {
- connectorClassName = ConnectorUtils.getIOSinkClass((NarClassLoader) narClassLoader);
- }
- } catch (IOException e) {
- throw new IllegalArgumentException(String.format("Failed to extract %s class from archive",
- componentType.toString().toLowerCase()), e);
- }
-
- try {
- narClassLoader.loadClass(connectorClassName);
- keepNarClassLoader = true;
- return narClassLoader;
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
- throw new IllegalArgumentException(
- String.format("%s class %s must be in class path", capFirstLetter(componentType),
- connectorClassName), e);
- }
-
- } else {
- // if connector class name is provided, we need to try to load it as a JAR and as a NAR.
- if (jarClassLoader != null) {
- try {
- jarClassLoader.loadClass(connectorClassName);
- keepJarClassLoader = true;
- return jarClassLoader;
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
- // class not found in JAR try loading as a NAR and searching for the class
- if (narClassLoader != null) {
-
- try {
- narClassLoader.loadClass(connectorClassName);
- keepNarClassLoader = true;
- return narClassLoader;
- } catch (ClassNotFoundException | NoClassDefFoundError e1) {
- throw new IllegalArgumentException(
- String.format("%s class %s must be in class path",
- capFirstLetter(componentType), connectorClassName), e1);
- }
- } else {
- throw new IllegalArgumentException(
- String.format("%s class %s must be in class path", capFirstLetter(componentType),
- connectorClassName), e);
- }
- }
- } else if (narClassLoader != null) {
- try {
- narClassLoader.loadClass(connectorClassName);
- keepNarClassLoader = true;
- return narClassLoader;
- } catch (ClassNotFoundException | NoClassDefFoundError e1) {
- throw new IllegalArgumentException(
- String.format("%s class %s must be in class path",
- capFirstLetter(componentType), connectorClassName), e1);
- }
- } else {
- StringBuilder errorMsg = new StringBuilder(capFirstLetter(componentType)
- + " package does not have the correct format."
- + " Pulsar cannot determine if the package is a NAR package or JAR package.");
-
- if (jarClassLoaderException != null) {
- errorMsg.append(
- " Attempts to load it as a JAR package produced error: " + jarClassLoaderException
- .getMessage());
- }
-
- if (narClassLoaderException != null) {
- errorMsg.append(
- " Attempts to load it as a NAR package produced error: " + narClassLoaderException
- .getMessage());
- }
-
- throw new IllegalArgumentException(errorMsg.toString());
- }
- }
- } finally {
- if (!keepJarClassLoader) {
- ClassLoaderUtils.closeClassLoader(jarClassLoader);
- }
- if (!keepNarClassLoader) {
- ClassLoaderUtils.closeClassLoader(narClassLoader);
- }
- }
- }
-
public static String capFirstLetter(Enum en) {
return StringUtils.capitalize(en.toString().toLowerCase());
}
public static boolean isFunctionCodeBuiltin(
- org.apache.pulsar.functions.proto.Function.FunctionDetailsOrBuilder functionDetails) {
- if (functionDetails.hasSource()) {
+ org.apache.pulsar.functions.proto.Function.FunctionDetailsOrBuilder functionDetail) {
+ return isFunctionCodeBuiltin(functionDetail, functionDetail.getComponentType());
+ }
+
+ public static boolean isFunctionCodeBuiltin(
+ org.apache.pulsar.functions.proto.Function.FunctionDetailsOrBuilder functionDetails,
+ ComponentType componentType) {
+ if (componentType == ComponentType.SOURCE && functionDetails.hasSource()) {
org.apache.pulsar.functions.proto.Function.SourceSpec sourceSpec = functionDetails.getSource();
if (!isEmpty(sourceSpec.getBuiltin())) {
return true;
}
}
- if (functionDetails.hasSink()) {
+ if (componentType == ComponentType.SINK && functionDetails.hasSink()) {
org.apache.pulsar.functions.proto.Function.SinkSpec sinkSpec = functionDetails.getSink();
if (!isEmpty(sinkSpec.getBuiltin())) {
return true;
}
}
- if (!isEmpty(functionDetails.getBuiltin())) {
- return true;
- }
-
- return false;
+ return componentType == ComponentType.FUNCTION && !isEmpty(functionDetails.getBuiltin());
}
public static SubscriptionInitialPosition convertFromFunctionDetailsSubscriptionPosition(
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index a8630dd3b34d8..dcd6982315e9a 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -18,21 +18,18 @@
*/
package org.apache.pulsar.functions.utils;
-import static org.apache.commons.lang.StringUtils.isBlank;
-import static org.apache.commons.lang.StringUtils.isNotBlank;
-import static org.apache.commons.lang.StringUtils.isNotEmpty;
+import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.pulsar.common.functions.Utils.BUILTIN;
-import static org.apache.pulsar.common.util.ClassLoaderUtils.loadJar;
import static org.apache.pulsar.functions.utils.FunctionCommon.convertFromFunctionDetailsSubscriptionPosition;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.io.File;
-import java.io.IOException;
import java.lang.reflect.Type;
-import java.net.MalformedURLException;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
@@ -42,10 +39,13 @@
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang.StringUtils;
+import net.bytebuddy.description.type.TypeDefinition;
+import net.bytebuddy.pool.TypePool;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.functions.WindowConfig;
@@ -53,7 +53,6 @@
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
-import org.apache.pulsar.functions.utils.functions.FunctionUtils;
@Slf4j
public class FunctionConfigUtils {
@@ -72,26 +71,21 @@ public static class ExtractedFunctionDetails {
private static final ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.create();
- public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader classLoader)
- throws IllegalArgumentException {
+ public static FunctionDetails convert(FunctionConfig functionConfig) {
+ return convert(functionConfig, (ValidatableFunctionPackage) null);
+ }
- if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
- if (classLoader != null) {
- try {
- Class>[] typeArgs = FunctionCommon.getFunctionTypes(functionConfig, classLoader);
- return convert(
- functionConfig,
- new ExtractedFunctionDetails(
- functionConfig.getClassName(),
- typeArgs[0].getName(),
- typeArgs[1].getName()));
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
- throw new IllegalArgumentException(
- String.format("Function class %s must be in class path", functionConfig.getClassName()), e);
- }
- }
+ public static FunctionDetails convert(FunctionConfig functionConfig,
+ ValidatableFunctionPackage validatableFunctionPackage)
+ throws IllegalArgumentException {
+ if (functionConfig == null) {
+ throw new IllegalArgumentException("Function config is not provided");
+ }
+ if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA && validatableFunctionPackage != null) {
+ return convert(functionConfig, doJavaChecks(functionConfig, validatableFunctionPackage));
+ } else {
+ return convert(functionConfig, new ExtractedFunctionDetails(functionConfig.getClassName(), null, null));
}
- return convert(functionConfig, new ExtractedFunctionDetails(functionConfig.getClassName(), null, null));
}
public static FunctionDetails convert(FunctionConfig functionConfig, ExtractedFunctionDetails extractedDetails)
@@ -568,7 +562,6 @@ public static void inferMissingArguments(FunctionConfig functionConfig,
} else if (functionConfig.getGo() != null) {
functionConfig.setRuntime(FunctionConfig.Runtime.GO);
}
-
WindowConfig windowConfig = functionConfig.getWindowConfig();
if (windowConfig != null) {
WindowConfigUtils.inferMissingArguments(windowConfig);
@@ -576,48 +569,49 @@ public static void inferMissingArguments(FunctionConfig functionConfig,
}
}
- private static ExtractedFunctionDetails doJavaChecks(FunctionConfig functionConfig, ClassLoader clsLoader) {
+ public static ExtractedFunctionDetails doJavaChecks(FunctionConfig functionConfig,
+ ValidatableFunctionPackage validatableFunctionPackage) {
- String functionClassName = functionConfig.getClassName();
- Class functionClass;
+ String functionClassName = StringUtils.trimToNull(functionConfig.getClassName());
+ TypeDefinition functionClass;
try {
// if class name in function config is not set, this should be a built-in function
// thus we should try to find its class name in the NAR service definition
if (functionClassName == null) {
- try {
- functionClassName = FunctionUtils.getFunctionClass(clsLoader);
- } catch (IOException e) {
- throw new IllegalArgumentException("Failed to extract source class from archive", e);
+ FunctionDefinition functionDefinition =
+ validatableFunctionPackage.getFunctionMetaData(FunctionDefinition.class);
+ if (functionDefinition == null) {
+ throw new IllegalArgumentException("Function class name is not provided.");
+ }
+ functionClassName = functionDefinition.getFunctionClass();
+ if (functionClassName == null) {
+ throw new IllegalArgumentException("Function class name is not provided.");
}
}
- functionClass = clsLoader.loadClass(functionClassName);
+ functionClass = validatableFunctionPackage.resolveType(functionClassName);
- if (!org.apache.pulsar.functions.api.Function.class.isAssignableFrom(functionClass)
- && !java.util.function.Function.class.isAssignableFrom(functionClass)
- && !org.apache.pulsar.functions.api.WindowFunction.class.isAssignableFrom(functionClass)) {
+ if (!functionClass.asErasure().isAssignableTo(org.apache.pulsar.functions.api.Function.class)
+ && !functionClass.asErasure().isAssignableTo(java.util.function.Function.class)
+ && !functionClass.asErasure()
+ .isAssignableTo(org.apache.pulsar.functions.api.WindowFunction.class)) {
throw new IllegalArgumentException(
String.format("Function class %s does not implement the correct interface",
- functionClass.getName()));
+ functionClassName));
}
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
+ } catch (TypePool.Resolution.NoSuchTypeException e) {
throw new IllegalArgumentException(
- String.format("Function class %s must be in class path", functionConfig.getClassName()), e);
+ String.format("Function class %s must be in class path", functionClassName), e);
}
- Class>[] typeArgs;
- try {
- typeArgs = FunctionCommon.getFunctionTypes(functionConfig, functionClass);
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
- throw new IllegalArgumentException(
- String.format("Function class %s must be in class path", functionConfig.getClassName()), e);
- }
+ TypeDefinition[] typeArgs = FunctionCommon.getFunctionTypes(functionConfig, functionClass);
// inputs use default schema, so there is no check needed there
// Check if the Input serialization/deserialization class exists in jar or already loaded and that it
// implements SerDe class
if (functionConfig.getCustomSerdeInputs() != null) {
functionConfig.getCustomSerdeInputs().forEach((topicName, inputSerializer) -> {
- ValidatorUtils.validateSerde(inputSerializer, typeArgs[0], clsLoader, true);
+ ValidatorUtils.validateSerde(inputSerializer, typeArgs[0], validatableFunctionPackage.getTypePool(),
+ true);
});
}
@@ -632,8 +626,8 @@ private static ExtractedFunctionDetails doJavaChecks(FunctionConfig functionConf
throw new IllegalArgumentException(
String.format("Topic %s has an incorrect schema Info", topicName));
}
- ValidatorUtils.validateSchema(consumerConfig.getSchemaType(), typeArgs[0], clsLoader, true);
-
+ ValidatorUtils.validateSchema(consumerConfig.getSchemaType(), typeArgs[0],
+ validatableFunctionPackage.getTypePool(), true);
});
}
@@ -648,13 +642,16 @@ private static ExtractedFunctionDetails doJavaChecks(FunctionConfig functionConf
"Only one of schemaType or serdeClassName should be set in inputSpec");
}
if (!isEmpty(conf.getSerdeClassName())) {
- ValidatorUtils.validateSerde(conf.getSerdeClassName(), typeArgs[0], clsLoader, true);
+ ValidatorUtils.validateSerde(conf.getSerdeClassName(), typeArgs[0],
+ validatableFunctionPackage.getTypePool(), true);
}
if (!isEmpty(conf.getSchemaType())) {
- ValidatorUtils.validateSchema(conf.getSchemaType(), typeArgs[0], clsLoader, true);
+ ValidatorUtils.validateSchema(conf.getSchemaType(), typeArgs[0],
+ validatableFunctionPackage.getTypePool(), true);
}
if (conf.getCryptoConfig() != null) {
- ValidatorUtils.validateCryptoKeyReader(conf.getCryptoConfig(), clsLoader, false);
+ ValidatorUtils.validateCryptoKeyReader(conf.getCryptoConfig(),
+ validatableFunctionPackage.getTypePool(), false);
}
});
}
@@ -662,8 +659,8 @@ private static ExtractedFunctionDetails doJavaChecks(FunctionConfig functionConf
if (Void.class.equals(typeArgs[1])) {
return new FunctionConfigUtils.ExtractedFunctionDetails(
functionClassName,
- typeArgs[0].getName(),
- typeArgs[1].getName());
+ typeArgs[0].asErasure().getTypeName(),
+ typeArgs[1].asErasure().getTypeName());
}
// One and only one of outputSchemaType and outputSerdeClassName should be set
@@ -673,22 +670,25 @@ private static ExtractedFunctionDetails doJavaChecks(FunctionConfig functionConf
}
if (!isEmpty(functionConfig.getOutputSchemaType())) {
- ValidatorUtils.validateSchema(functionConfig.getOutputSchemaType(), typeArgs[1], clsLoader, false);
+ ValidatorUtils.validateSchema(functionConfig.getOutputSchemaType(), typeArgs[1],
+ validatableFunctionPackage.getTypePool(), false);
}
if (!isEmpty(functionConfig.getOutputSerdeClassName())) {
- ValidatorUtils.validateSerde(functionConfig.getOutputSerdeClassName(), typeArgs[1], clsLoader, false);
+ ValidatorUtils.validateSerde(functionConfig.getOutputSerdeClassName(), typeArgs[1],
+ validatableFunctionPackage.getTypePool(), false);
}
if (functionConfig.getProducerConfig() != null
&& functionConfig.getProducerConfig().getCryptoConfig() != null) {
ValidatorUtils
- .validateCryptoKeyReader(functionConfig.getProducerConfig().getCryptoConfig(), clsLoader, true);
+ .validateCryptoKeyReader(functionConfig.getProducerConfig().getCryptoConfig(),
+ validatableFunctionPackage.getTypePool(), true);
}
return new FunctionConfigUtils.ExtractedFunctionDetails(
functionClassName,
- typeArgs[0].getName(),
- typeArgs[1].getName());
+ typeArgs[0].asErasure().getTypeName(),
+ typeArgs[1].asErasure().getTypeName());
}
private static void doPythonChecks(FunctionConfig functionConfig) {
@@ -703,10 +703,6 @@ private static void doPythonChecks(FunctionConfig functionConfig) {
if (functionConfig.getMaxMessageRetries() != null && functionConfig.getMaxMessageRetries() >= 0) {
throw new IllegalArgumentException("Message retries not yet supported in python");
}
-
- if (functionConfig.getRetainKeyOrdering() != null && functionConfig.getRetainKeyOrdering()) {
- throw new IllegalArgumentException("Retain Key Orderering not yet supported in python");
- }
}
private static void doGolangChecks(FunctionConfig functionConfig) {
@@ -737,7 +733,7 @@ private static void verifyNoTopicClash(Collection inputTopics, String ou
}
}
- private static void doCommonChecks(FunctionConfig functionConfig) {
+ public static void doCommonChecks(FunctionConfig functionConfig) {
if (isEmpty(functionConfig.getTenant())) {
throw new IllegalArgumentException("Function tenant cannot be null");
}
@@ -879,7 +875,7 @@ private static void doCommonChecks(FunctionConfig functionConfig) {
}
}
- private static Collection collectAllInputTopics(FunctionConfig functionConfig) {
+ public static Collection collectAllInputTopics(FunctionConfig functionConfig) {
List retval = new LinkedList<>();
if (functionConfig.getInputs() != null) {
retval.addAll(functionConfig.getInputs());
@@ -899,47 +895,21 @@ private static Collection collectAllInputTopics(FunctionConfig functionC
return retval;
}
- public static ClassLoader validate(FunctionConfig functionConfig, File functionPackageFile) {
+ public static void validateNonJavaFunction(FunctionConfig functionConfig) {
doCommonChecks(functionConfig);
- if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
- ClassLoader classLoader;
- if (functionPackageFile != null) {
- try {
- classLoader = loadJar(functionPackageFile);
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException("Corrupted Jar File", e);
- }
- } else if (!isEmpty(functionConfig.getJar())) {
- File jarFile = new File(functionConfig.getJar());
- if (!jarFile.exists()) {
- throw new IllegalArgumentException("Jar file does not exist");
- }
- try {
- classLoader = loadJar(jarFile);
- } catch (Exception e) {
- throw new IllegalArgumentException("Corrupted Jar File", e);
- }
- } else {
- throw new IllegalArgumentException("Function Package is not provided");
- }
-
- doJavaChecks(functionConfig, classLoader);
- return classLoader;
- } else if (functionConfig.getRuntime() == FunctionConfig.Runtime.GO) {
+ if (functionConfig.getRuntime() == FunctionConfig.Runtime.GO) {
doGolangChecks(functionConfig);
- return null;
} else if (functionConfig.getRuntime() == FunctionConfig.Runtime.PYTHON) {
doPythonChecks(functionConfig);
- return null;
} else {
throw new IllegalArgumentException("Function language runtime is either not set or cannot be determined");
}
}
public static ExtractedFunctionDetails validateJavaFunction(FunctionConfig functionConfig,
- ClassLoader classLoader) {
+ ValidatableFunctionPackage validatableFunctionPackage) {
doCommonChecks(functionConfig);
- return doJavaChecks(functionConfig, classLoader);
+ return doJavaChecks(functionConfig, validatableFunctionPackage);
}
public static FunctionConfig validateUpdate(FunctionConfig existingConfig, FunctionConfig newConfig) {
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionFilePackage.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionFilePackage.java
new file mode 100644
index 0000000000000..80b0bc05b3a1f
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionFilePackage.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.MalformedURLException;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.List;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.dynamic.ClassFileLocator;
+import net.bytebuddy.pool.TypePool;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.nar.NarClassLoaderBuilder;
+import org.apache.pulsar.functions.utils.functions.FunctionUtils;
+import org.zeroturnaround.zip.ZipUtil;
+
+/**
+ * FunctionFilePackage is a class that represents a function package and
+ * implements the ValidatableFunctionPackage interface which decouples the
+ * function package from classloading.
+ */
+public class FunctionFilePackage implements AutoCloseable, ValidatableFunctionPackage {
+ private final File file;
+ private final ClassFileLocator.Compound classFileLocator;
+ private final TypePool typePool;
+ private final boolean isNar;
+ private final String narExtractionDirectory;
+ private final boolean enableClassloading;
+
+ private ClassLoader classLoader;
+
+ private final Object configMetadata;
+
+ public FunctionFilePackage(File file, String narExtractionDirectory, boolean enableClassloading,
+ Class> configClass) {
+ this.file = file;
+ boolean nonZeroFile = file.isFile() && file.length() > 0;
+ this.isNar = nonZeroFile ? ZipUtil.containsAnyEntry(file,
+ new String[] {"META-INF/services/pulsar-io.yaml", "META-INF/bundled-dependencies"}) : false;
+ this.narExtractionDirectory = narExtractionDirectory;
+ this.enableClassloading = enableClassloading;
+ if (isNar) {
+ List classpathFromArchive = null;
+ try {
+ classpathFromArchive = NarClassLoader.getClasspathFromArchive(file, narExtractionDirectory);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ List classFileLocators = new ArrayList<>();
+ classFileLocators.add(ClassFileLocator.ForClassLoader.ofSystemLoader());
+ for (File classpath : classpathFromArchive) {
+ if (classpath.exists()) {
+ try {
+ ClassFileLocator locator;
+ if (classpath.isDirectory()) {
+ locator = new ClassFileLocator.ForFolder(classpath);
+ } else {
+ locator = ClassFileLocator.ForJarFile.of(classpath);
+ }
+ classFileLocators.add(locator);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ }
+ this.classFileLocator = new ClassFileLocator.Compound(classFileLocators);
+ this.typePool = TypePool.Default.of(classFileLocator);
+ try {
+ this.configMetadata = FunctionUtils.getPulsarIOServiceConfig(file, configClass);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ } else {
+ try {
+ this.classFileLocator = nonZeroFile
+ ? new ClassFileLocator.Compound(ClassFileLocator.ForClassLoader.ofSystemLoader(),
+ ClassFileLocator.ForJarFile.of(file)) :
+ new ClassFileLocator.Compound(ClassFileLocator.ForClassLoader.ofSystemLoader());
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ this.typePool =
+ TypePool.Default.of(classFileLocator);
+ this.configMetadata = null;
+ }
+ }
+
+ public TypeDescription resolveType(String className) {
+ return typePool.describe(className).resolve();
+ }
+
+ public boolean isNar() {
+ return isNar;
+ }
+
+ public File getFile() {
+ return file;
+ }
+
+ public TypePool getTypePool() {
+ return typePool;
+ }
+
+ @Override
+ public T getFunctionMetaData(Class clazz) {
+ return configMetadata != null ? clazz.cast(configMetadata) : null;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ classFileLocator.close();
+ if (classLoader instanceof Closeable) {
+ ((Closeable) classLoader).close();
+ }
+ }
+
+ public boolean isEnableClassloading() {
+ return enableClassloading;
+ }
+
+ public synchronized ClassLoader getClassLoader() {
+ if (classLoader == null) {
+ classLoader = createClassLoader();
+ }
+ return classLoader;
+ }
+
+ private ClassLoader createClassLoader() {
+ if (enableClassloading) {
+ if (isNar) {
+ try {
+ return NarClassLoaderBuilder.builder()
+ .narFile(file)
+ .extractionDirectory(narExtractionDirectory)
+ .build();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ } else {
+ try {
+ return new URLClassLoader(new java.net.URL[] {file.toURI().toURL()},
+ NarClassLoader.class.getClassLoader());
+ } catch (MalformedURLException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ } else {
+ throw new IllegalStateException("Classloading is not enabled");
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "FunctionFilePackage{"
+ + "file=" + file
+ + ", isNar=" + isNar
+ + '}';
+ }
+}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionRuntimeCommon.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionRuntimeCommon.java
new file mode 100644
index 0000000000000..948b1f1905a00
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionRuntimeCommon.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils;
+
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import java.io.File;
+import java.io.IOException;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.nar.NarClassLoaderBuilder;
+import org.apache.pulsar.common.util.ClassLoaderUtils;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.utils.functions.FunctionUtils;
+import org.apache.pulsar.functions.utils.io.ConnectorUtils;
+
+public class FunctionRuntimeCommon {
+ public static NarClassLoader extractNarClassLoader(File packageFile,
+ String narExtractionDirectory) {
+ if (packageFile != null) {
+ try {
+ return NarClassLoaderBuilder.builder()
+ .narFile(packageFile)
+ .extractionDirectory(narExtractionDirectory)
+ .build();
+ } catch (IOException e) {
+ throw new IllegalArgumentException(e.getMessage());
+ }
+ }
+ return null;
+ }
+
+ public static ClassLoader getClassLoaderFromPackage(
+ Function.FunctionDetails.ComponentType componentType,
+ String className,
+ File packageFile,
+ String narExtractionDirectory) {
+ String connectorClassName = className;
+ ClassLoader jarClassLoader = null;
+ boolean keepJarClassLoader = false;
+ NarClassLoader narClassLoader = null;
+ boolean keepNarClassLoader = false;
+
+ Exception jarClassLoaderException = null;
+ Exception narClassLoaderException = null;
+
+ try {
+ try {
+ jarClassLoader = ClassLoaderUtils.extractClassLoader(packageFile);
+ } catch (Exception e) {
+ jarClassLoaderException = e;
+ }
+ try {
+ narClassLoader = extractNarClassLoader(packageFile, narExtractionDirectory);
+ } catch (Exception e) {
+ narClassLoaderException = e;
+ }
+
+ // if connector class name is not provided, we can only try to load archive as a NAR
+ if (isEmpty(connectorClassName)) {
+ if (narClassLoader == null) {
+ throw new IllegalArgumentException(String.format("%s package does not have the correct format. "
+ + "Pulsar cannot determine if the package is a NAR package or JAR package. "
+ + "%s classname is not provided and attempts to load it as a NAR package produced "
+ + "the following error.",
+ FunctionCommon.capFirstLetter(componentType), FunctionCommon.capFirstLetter(componentType)),
+ narClassLoaderException);
+ }
+ try {
+ if (componentType == Function.FunctionDetails.ComponentType.FUNCTION) {
+ connectorClassName = FunctionUtils.getFunctionClass(narClassLoader);
+ } else if (componentType == Function.FunctionDetails.ComponentType.SOURCE) {
+ connectorClassName = ConnectorUtils.getIOSourceClass(narClassLoader);
+ } else {
+ connectorClassName = ConnectorUtils.getIOSinkClass(narClassLoader);
+ }
+ } catch (IOException e) {
+ throw new IllegalArgumentException(String.format("Failed to extract %s class from archive",
+ componentType.toString().toLowerCase()), e);
+ }
+
+ try {
+ narClassLoader.loadClass(connectorClassName);
+ keepNarClassLoader = true;
+ return narClassLoader;
+ } catch (ClassNotFoundException | NoClassDefFoundError e) {
+ throw new IllegalArgumentException(String.format("%s class %s must be in class path",
+ FunctionCommon.capFirstLetter(componentType), connectorClassName), e);
+ }
+
+ } else {
+ // if connector class name is provided, we need to try to load it as a JAR and as a NAR.
+ if (jarClassLoader != null) {
+ try {
+ jarClassLoader.loadClass(connectorClassName);
+ keepJarClassLoader = true;
+ return jarClassLoader;
+ } catch (ClassNotFoundException | NoClassDefFoundError e) {
+ // class not found in JAR try loading as a NAR and searching for the class
+ if (narClassLoader != null) {
+
+ try {
+ narClassLoader.loadClass(connectorClassName);
+ keepNarClassLoader = true;
+ return narClassLoader;
+ } catch (ClassNotFoundException | NoClassDefFoundError e1) {
+ throw new IllegalArgumentException(
+ String.format("%s class %s must be in class path",
+ FunctionCommon.capFirstLetter(componentType), connectorClassName), e1);
+ }
+ } else {
+ throw new IllegalArgumentException(String.format("%s class %s must be in class path",
+ FunctionCommon.capFirstLetter(componentType), connectorClassName), e);
+ }
+ }
+ } else if (narClassLoader != null) {
+ try {
+ narClassLoader.loadClass(connectorClassName);
+ keepNarClassLoader = true;
+ return narClassLoader;
+ } catch (ClassNotFoundException | NoClassDefFoundError e1) {
+ throw new IllegalArgumentException(
+ String.format("%s class %s must be in class path",
+ FunctionCommon.capFirstLetter(componentType), connectorClassName), e1);
+ }
+ } else {
+ StringBuilder errorMsg = new StringBuilder(FunctionCommon.capFirstLetter(componentType)
+ + " package does not have the correct format."
+ + " Pulsar cannot determine if the package is a NAR package or JAR package.");
+
+ if (jarClassLoaderException != null) {
+ errorMsg.append(
+ " Attempts to load it as a JAR package produced error: " + jarClassLoaderException
+ .getMessage());
+ }
+
+ if (narClassLoaderException != null) {
+ errorMsg.append(
+ " Attempts to load it as a NAR package produced error: " + narClassLoaderException
+ .getMessage());
+ }
+
+ throw new IllegalArgumentException(errorMsg.toString());
+ }
+ }
+ } finally {
+ if (!keepJarClassLoader) {
+ ClassLoaderUtils.closeClassLoader(jarClassLoader);
+ }
+ if (!keepNarClassLoader) {
+ ClassLoaderUtils.closeClassLoader(narClassLoader);
+ }
+ }
+ }
+
+}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/LoadedFunctionPackage.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/LoadedFunctionPackage.java
new file mode 100644
index 0000000000000..3f885a7cd2a3d
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/LoadedFunctionPackage.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.dynamic.ClassFileLocator;
+import net.bytebuddy.pool.TypePool;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.functions.utils.functions.FunctionUtils;
+
+/**
+ * LoadedFunctionPackage is a class that represents a function package and
+ * implements the ValidatableFunctionPackage interface which decouples the
+ * function package from classloading. This implementation is backed by
+ * a ClassLoader, and it is used when the function package is already loaded
+ * by a ClassLoader. This is the case in the LocalRunner and in some of
+ * the unit tests.
+ */
+public class LoadedFunctionPackage implements ValidatableFunctionPackage {
+ private final ClassLoader classLoader;
+ private final Object configMetadata;
+ private final TypePool typePool;
+
+ public LoadedFunctionPackage(ClassLoader classLoader, Class configMetadataClass, T configMetadata) {
+ this.classLoader = classLoader;
+ this.configMetadata = configMetadata;
+ typePool = TypePool.Default.of(
+ ClassFileLocator.ForClassLoader.of(classLoader));
+ }
+
+ public LoadedFunctionPackage(ClassLoader classLoader, Class> configMetadataClass) {
+ this.classLoader = classLoader;
+ if (classLoader instanceof NarClassLoader) {
+ try {
+ configMetadata = FunctionUtils.getPulsarIOServiceConfig((NarClassLoader) classLoader,
+ configMetadataClass);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ } else {
+ configMetadata = null;
+ }
+ typePool = TypePool.Default.of(
+ ClassFileLocator.ForClassLoader.of(classLoader));
+ }
+
+ @Override
+ public TypeDescription resolveType(String className) {
+ return typePool.describe(className).resolve();
+ }
+
+ @Override
+ public TypePool getTypePool() {
+ return typePool;
+ }
+
+ @Override
+ public T getFunctionMetaData(Class clazz) {
+ return configMetadata != null ? clazz.cast(configMetadata) : null;
+ }
+
+ @Override
+ public boolean isEnableClassloading() {
+ return true;
+ }
+
+ @Override
+ public ClassLoader getClassLoader() {
+ return classLoader;
+ }
+}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 34574cc4d44d7..f4b7bf2513401 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -39,6 +39,8 @@
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
+import net.bytebuddy.description.type.TypeDefinition;
+import net.bytebuddy.pool.TypePool;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.functions.ConsumerConfig;
@@ -47,13 +49,11 @@
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.config.validation.ConfigValidation;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
-import org.apache.pulsar.functions.utils.io.ConnectorUtils;
@Slf4j
public class SinkConfigUtils {
@@ -190,12 +190,6 @@ public static FunctionDetails convert(SinkConfig sinkConfig, ExtractedSinkDetail
sourceSpecBuilder.setNegativeAckRedeliveryDelayMs(sinkConfig.getNegativeAckRedeliveryDelayMs());
}
- if (sinkConfig.getCleanupSubscription() != null) {
- sourceSpecBuilder.setCleanupSubscription(sinkConfig.getCleanupSubscription());
- } else {
- sourceSpecBuilder.setCleanupSubscription(true);
- }
-
if (sinkConfig.getSourceSubscriptionPosition() == SubscriptionInitialPosition.Earliest) {
sourceSpecBuilder.setSubscriptionPosition(Function.SubscriptionPosition.EARLIEST);
} else {
@@ -319,7 +313,6 @@ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) {
// Set subscription position
sinkConfig.setSourceSubscriptionPosition(
convertFromFunctionDetailsSubscriptionPosition(functionDetails.getSource().getSubscriptionPosition()));
- sinkConfig.setCleanupSubscription(functionDetails.getSource().getCleanupSubscription());
if (functionDetails.getSource().getTimeoutMs() != 0) {
sinkConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs());
@@ -380,7 +373,7 @@ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) {
}
public static ExtractedSinkDetails validateAndExtractDetails(SinkConfig sinkConfig,
- ClassLoader sinkClassLoader,
+ ValidatableFunctionPackage sinkFunction,
boolean validateConnectorConfig) {
if (isEmpty(sinkConfig.getTenant())) {
throw new IllegalArgumentException("Sink tenant cannot be null");
@@ -420,34 +413,38 @@ public static ExtractedSinkDetails validateAndExtractDetails(SinkConfig sinkConf
// if class name in sink config is not set, this should be a built-in sink
// thus we should try to find it class name in the NAR service definition
if (sinkClassName == null) {
- try {
- sinkClassName = ConnectorUtils.getIOSinkClass((NarClassLoader) sinkClassLoader);
- } catch (IOException e) {
- throw new IllegalArgumentException("Failed to extract sink class from archive", e);
+ ConnectorDefinition connectorDefinition = sinkFunction.getFunctionMetaData(ConnectorDefinition.class);
+ if (connectorDefinition == null) {
+ throw new IllegalArgumentException(
+ "Sink package doesn't contain the META-INF/services/pulsar-io.yaml file.");
+ }
+ sinkClassName = connectorDefinition.getSinkClass();
+ if (sinkClassName == null) {
+ throw new IllegalArgumentException("Failed to extract sink class from archive");
}
}
// check if sink implements the correct interfaces
- Class sinkClass;
+ TypeDefinition sinkClass;
try {
- sinkClass = sinkClassLoader.loadClass(sinkClassName);
- } catch (ClassNotFoundException e) {
+ sinkClass = sinkFunction.resolveType(sinkClassName);
+ } catch (TypePool.Resolution.NoSuchTypeException e) {
throw new IllegalArgumentException(
- String.format("Sink class %s not found in class loader", sinkClassName), e);
+ String.format("Sink class %s not found", sinkClassName), e);
}
- // extract type from sink class
- Class> typeArg = getSinkType(sinkClass);
+ TypeDefinition typeArg = getSinkType(sinkClass);
+ ValidatableFunctionPackage inputFunction = sinkFunction;
if (sinkConfig.getTopicToSerdeClassName() != null) {
for (String serdeClassName : sinkConfig.getTopicToSerdeClassName().values()) {
- ValidatorUtils.validateSerde(serdeClassName, typeArg, sinkClassLoader, true);
+ ValidatorUtils.validateSerde(serdeClassName, typeArg, inputFunction.getTypePool(), true);
}
}
if (sinkConfig.getTopicToSchemaType() != null) {
for (String schemaType : sinkConfig.getTopicToSchemaType().values()) {
- ValidatorUtils.validateSchema(schemaType, typeArg, sinkClassLoader, true);
+ ValidatorUtils.validateSchema(schemaType, typeArg, inputFunction.getTypePool(), true);
}
}
@@ -460,26 +457,46 @@ public static ExtractedSinkDetails validateAndExtractDetails(SinkConfig sinkConf
throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set");
}
if (!isEmpty(consumerSpec.getSerdeClassName())) {
- ValidatorUtils.validateSerde(consumerSpec.getSerdeClassName(), typeArg, sinkClassLoader, true);
+ ValidatorUtils.validateSerde(consumerSpec.getSerdeClassName(), typeArg,
+ inputFunction.getTypePool(), true);
}
if (!isEmpty(consumerSpec.getSchemaType())) {
- ValidatorUtils.validateSchema(consumerSpec.getSchemaType(), typeArg, sinkClassLoader, true);
+ ValidatorUtils.validateSchema(consumerSpec.getSchemaType(), typeArg,
+ inputFunction.getTypePool(), true);
}
if (consumerSpec.getCryptoConfig() != null) {
- ValidatorUtils.validateCryptoKeyReader(consumerSpec.getCryptoConfig(), sinkClassLoader, false);
+ ValidatorUtils.validateCryptoKeyReader(consumerSpec.getCryptoConfig(),
+ inputFunction.getTypePool(), false);
}
}
}
- // validate user defined config if enabled and sink is loaded from NAR
- if (validateConnectorConfig && sinkClassLoader instanceof NarClassLoader) {
- validateSinkConfig(sinkConfig, (NarClassLoader) sinkClassLoader);
+ if (sinkConfig.getRetainKeyOrdering() != null
+ && sinkConfig.getRetainKeyOrdering()
+ && sinkConfig.getProcessingGuarantees() != null
+ && sinkConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+ throw new IllegalArgumentException(
+ "When effectively once processing guarantee is specified, retain Key ordering cannot be set");
+ }
+
+ if (sinkConfig.getRetainKeyOrdering() != null && sinkConfig.getRetainKeyOrdering()
+ && sinkConfig.getRetainOrdering() != null && sinkConfig.getRetainOrdering()) {
+ throw new IllegalArgumentException("Only one of retain ordering or retain key ordering can be set");
}
- return new ExtractedSinkDetails(sinkClassName, typeArg.getName());
+ // validate user defined config if enabled and classloading is enabled
+ if (validateConnectorConfig) {
+ if (sinkFunction.isEnableClassloading()) {
+ validateSinkConfig(sinkConfig, sinkFunction);
+ } else {
+ log.warn("Skipping annotation based validation of sink config as classloading is disabled");
+ }
+ }
+
+ return new ExtractedSinkDetails(sinkClassName, typeArg.asErasure().getTypeName());
}
- private static Collection collectAllInputTopics(SinkConfig sinkConfig) {
+ public static Collection collectAllInputTopics(SinkConfig sinkConfig) {
List retval = new LinkedList<>();
if (sinkConfig.getInputs() != null) {
retval.addAll(sinkConfig.getInputs());
@@ -501,8 +518,8 @@ private static Collection collectAllInputTopics(SinkConfig sinkConfig) {
@SneakyThrows
public static SinkConfig clone(SinkConfig sinkConfig) {
- return ObjectMapperFactory.getThreadLocal().readValue(
- ObjectMapperFactory.getThreadLocal().writeValueAsBytes(sinkConfig), SinkConfig.class);
+ return ObjectMapperFactory.getThreadLocal().reader().readValue(
+ ObjectMapperFactory.getThreadLocal().writer().writeValueAsBytes(sinkConfig), SinkConfig.class);
}
public static SinkConfig validateUpdate(SinkConfig existingConfig, SinkConfig newConfig) {
@@ -535,7 +552,7 @@ public static SinkConfig validateUpdate(SinkConfig existingConfig, SinkConfig ne
if (newConfig.getInputs() != null) {
newConfig.getInputs().forEach((topicName -> {
- newConfig.getInputSpecs().put(topicName,
+ newConfig.getInputSpecs().putIfAbsent(topicName,
ConsumerConfig.builder().isRegexPattern(false).build());
}));
}
@@ -619,36 +636,17 @@ public static SinkConfig validateUpdate(SinkConfig existingConfig, SinkConfig ne
if (!StringUtils.isEmpty(newConfig.getCustomRuntimeOptions())) {
mergedConfig.setCustomRuntimeOptions(newConfig.getCustomRuntimeOptions());
}
- if (newConfig.getCleanupSubscription() != null) {
- mergedConfig.setCleanupSubscription(newConfig.getCleanupSubscription());
- }
return mergedConfig;
}
- public static void validateSinkConfig(SinkConfig sinkConfig, NarClassLoader narClassLoader) {
-
- if (sinkConfig.getRetainKeyOrdering() != null
- && sinkConfig.getRetainKeyOrdering()
- && sinkConfig.getProcessingGuarantees() != null
- && sinkConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
- throw new IllegalArgumentException(
- "When effectively once processing guarantee is specified, retain Key ordering cannot be set");
- }
-
- if (sinkConfig.getRetainKeyOrdering() != null && sinkConfig.getRetainKeyOrdering()
- && sinkConfig.getRetainOrdering() != null && sinkConfig.getRetainOrdering()) {
- throw new IllegalArgumentException("Only one of retain ordering or retain key ordering can be set");
- }
-
+ public static void validateSinkConfig(SinkConfig sinkConfig, ValidatableFunctionPackage sinkFunction) {
try {
- ConnectorDefinition defn = ConnectorUtils.getConnectorDefinition(narClassLoader);
- if (defn.getSinkConfigClass() != null) {
- Class configClass = Class.forName(defn.getSinkConfigClass(), true, narClassLoader);
+ ConnectorDefinition defn = sinkFunction.getFunctionMetaData(ConnectorDefinition.class);
+ if (defn != null && defn.getSinkConfigClass() != null) {
+ Class configClass = Class.forName(defn.getSinkConfigClass(), true, sinkFunction.getClassLoader());
validateSinkConfig(sinkConfig, configClass);
}
- } catch (IOException e) {
- throw new IllegalArgumentException("Error validating sink config", e);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("Could not find sink config class", e);
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index dd32903f4e44c..207d426ab540e 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -34,7 +34,9 @@
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
-import net.jodah.typetools.TypeResolver;
+import net.bytebuddy.description.type.TypeDefinition;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.pool.TypePool;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.functions.Resources;
@@ -43,13 +45,11 @@
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.config.validation.ConfigValidation;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
-import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.io.core.BatchSource;
import org.apache.pulsar.io.core.Source;
@@ -285,7 +285,7 @@ public static SourceConfig convertFromDetails(FunctionDetails functionDetails) {
}
public static ExtractedSourceDetails validateAndExtractDetails(SourceConfig sourceConfig,
- ClassLoader sourceClassLoader,
+ ValidatableFunctionPackage sourceFunction,
boolean validateConnectorConfig) {
if (isEmpty(sourceConfig.getTenant())) {
throw new IllegalArgumentException("Source tenant cannot be null");
@@ -313,29 +313,34 @@ public static ExtractedSourceDetails validateAndExtractDetails(SourceConfig sour
// if class name in source config is not set, this should be a built-in source
// thus we should try to find it class name in the NAR service definition
if (sourceClassName == null) {
- try {
- sourceClassName = ConnectorUtils.getIOSourceClass((NarClassLoader) sourceClassLoader);
- } catch (IOException e) {
- throw new IllegalArgumentException("Failed to extract source class from archive", e);
+ ConnectorDefinition connectorDefinition = sourceFunction.getFunctionMetaData(ConnectorDefinition.class);
+ if (connectorDefinition == null) {
+ throw new IllegalArgumentException(
+ "Source package doesn't contain the META-INF/services/pulsar-io.yaml file.");
+ }
+ sourceClassName = connectorDefinition.getSourceClass();
+ if (sourceClassName == null) {
+ throw new IllegalArgumentException("Failed to extract source class from archive");
}
}
// check if source implements the correct interfaces
- Class sourceClass;
+ TypeDescription sourceClass;
try {
- sourceClass = sourceClassLoader.loadClass(sourceClassName);
- } catch (ClassNotFoundException e) {
+ sourceClass = sourceFunction.resolveType(sourceClassName);
+ } catch (TypePool.Resolution.NoSuchTypeException e) {
throw new IllegalArgumentException(
String.format("Source class %s not found in class loader", sourceClassName), e);
}
- if (!Source.class.isAssignableFrom(sourceClass) && !BatchSource.class.isAssignableFrom(sourceClass)) {
+ if (!(sourceClass.asErasure().isAssignableTo(Source.class) || sourceClass.asErasure()
+ .isAssignableTo(BatchSource.class))) {
throw new IllegalArgumentException(
- String.format("Source class %s does not implement the correct interface",
- sourceClass.getName()));
+ String.format("Source class %s does not implement the correct interface",
+ sourceClass.getName()));
}
- if (BatchSource.class.isAssignableFrom(sourceClass)) {
+ if (sourceClass.asErasure().isAssignableTo(BatchSource.class)) {
if (sourceConfig.getBatchSourceConfig() != null) {
validateBatchSourceConfig(sourceConfig.getBatchSourceConfig());
} else {
@@ -346,7 +351,14 @@ public static ExtractedSourceDetails validateAndExtractDetails(SourceConfig sour
}
// extract type from source class
- Class> typeArg = getSourceType(sourceClass);
+ TypeDefinition typeArg;
+
+ try {
+ typeArg = getSourceType(sourceClass);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ String.format("Failed to resolve type for Source class %s", sourceClassName), e);
+ }
// Only one of serdeClassName or schemaType should be set
if (!StringUtils.isEmpty(sourceConfig.getSerdeClassName()) && !StringUtils
@@ -355,29 +367,30 @@ public static ExtractedSourceDetails validateAndExtractDetails(SourceConfig sour
}
if (!StringUtils.isEmpty(sourceConfig.getSerdeClassName())) {
- ValidatorUtils.validateSerde(sourceConfig.getSerdeClassName(), typeArg, sourceClassLoader, false);
+ ValidatorUtils.validateSerde(sourceConfig.getSerdeClassName(), typeArg, sourceFunction.getTypePool(),
+ false);
}
if (!StringUtils.isEmpty(sourceConfig.getSchemaType())) {
- ValidatorUtils.validateSchema(sourceConfig.getSchemaType(), typeArg, sourceClassLoader, false);
+ ValidatorUtils.validateSchema(sourceConfig.getSchemaType(), typeArg, sourceFunction.getTypePool(),
+ false);
}
if (sourceConfig.getProducerConfig() != null && sourceConfig.getProducerConfig().getCryptoConfig() != null) {
ValidatorUtils
- .validateCryptoKeyReader(sourceConfig.getProducerConfig().getCryptoConfig(), sourceClassLoader,
- true);
+ .validateCryptoKeyReader(sourceConfig.getProducerConfig().getCryptoConfig(),
+ sourceFunction.getTypePool(), true);
}
- if (typeArg.equals(TypeResolver.Unknown.class)) {
- throw new IllegalArgumentException(
- String.format("Failed to resolve type for Source class %s", sourceClassName));
- }
-
- // validate user defined config if enabled and source is loaded from NAR
- if (validateConnectorConfig && sourceClassLoader instanceof NarClassLoader) {
- validateSourceConfig(sourceConfig, (NarClassLoader) sourceClassLoader);
+ // validate user defined config if enabled and classloading is enabled
+ if (validateConnectorConfig) {
+ if (sourceFunction.isEnableClassloading()) {
+ validateSourceConfig(sourceConfig, sourceFunction);
+ } else {
+ log.warn("Skipping annotation based validation of sink config as classloading is disabled");
+ }
}
- return new ExtractedSourceDetails(sourceClassName, typeArg.getName());
+ return new ExtractedSourceDetails(sourceClassName, typeArg.asErasure().getTypeName());
}
@SneakyThrows
@@ -518,15 +531,14 @@ public static void validateBatchSourceConfigUpdate(BatchSourceConfig existingCon
}
}
- public static void validateSourceConfig(SourceConfig sourceConfig, NarClassLoader narClassLoader) {
+ public static void validateSourceConfig(SourceConfig sourceConfig, ValidatableFunctionPackage sourceFunction) {
try {
- ConnectorDefinition defn = ConnectorUtils.getConnectorDefinition(narClassLoader);
- if (defn.getSourceConfigClass() != null) {
- Class configClass = Class.forName(defn.getSourceConfigClass(), true, narClassLoader);
+ ConnectorDefinition defn = sourceFunction.getFunctionMetaData(ConnectorDefinition.class);
+ if (defn != null && defn.getSourceConfigClass() != null) {
+ Class configClass =
+ Class.forName(defn.getSourceConfigClass(), true, sourceFunction.getClassLoader());
validateSourceConfig(sourceConfig, configClass);
}
- } catch (IOException e) {
- throw new IllegalArgumentException("Error validating source config", e);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("Could not find source config class");
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatableFunctionPackage.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatableFunctionPackage.java
new file mode 100644
index 0000000000000..bd71bbfb5a584
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatableFunctionPackage.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils;
+
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.pool.TypePool;
+
+/**
+ * This abstraction separates the function and connector definition from classloading,
+ * enabling validation without the need for classloading. It utilizes Byte Buddy for
+ * type and annotation resolution.
+ *
+ * The function or connector definition is directly extracted from the archive file,
+ * eliminating the need for classloader initialization.
+ *
+ * The getClassLoader method should only be invoked when classloading is enabled.
+ * Classloading is required in the LocalRunner and in the Functions worker when the
+ * worker is configured with the 'validateConnectorConfig' set to true.
+ */
+public interface ValidatableFunctionPackage {
+ /**
+ * Resolves the type description for the given class name within the function package.
+ */
+ TypeDescription resolveType(String className);
+ /**
+ * Returns the Byte Buddy TypePool instance for the function package.
+ */
+ TypePool getTypePool();
+ /**
+ * Returns the function or connector definition metadata.
+ * Supports FunctionDefinition and ConnectorDefinition as the metadata type.
+ */
+ T getFunctionMetaData(Class clazz);
+ /**
+ * Returns if classloading is enabled for the function package.
+ */
+ boolean isEnableClassloading();
+ /**
+ * Returns the classloader for the function package. The classloader is
+ * lazily initialized when classloading is enabled.
+ */
+ ClassLoader getClassLoader();
+}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatorUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatorUtils.java
index 9e9ee27dfebad..1d12deaaea59d 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatorUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatorUtils.java
@@ -18,35 +18,40 @@
*/
package org.apache.pulsar.functions.utils;
-import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isEmpty;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
-import net.jodah.typetools.TypeResolver;
+import net.bytebuddy.description.type.TypeDefinition;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.pool.TypePool;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.functions.CryptoConfig;
import org.apache.pulsar.common.schema.SchemaType;
-import org.apache.pulsar.common.util.ClassLoaderUtils;
-import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.api.SerDe;
-import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.io.core.Sink;
-import org.apache.pulsar.io.core.Source;
@Slf4j
public class ValidatorUtils {
private static final String DEFAULT_SERDE = "org.apache.pulsar.functions.api.utils.DefaultSerDe";
- public static void validateSchema(String schemaType, Class> typeArg, ClassLoader clsLoader,
+ public static void validateSchema(String schemaType, TypeDefinition typeArg, TypePool typePool,
boolean input) {
if (isEmpty(schemaType) || getBuiltinSchemaType(schemaType) != null) {
// If it's empty, we use the default schema and no need to validate
// If it's built-in, no need to validate
} else {
- ClassLoaderUtils.implementsClass(schemaType, Schema.class, clsLoader);
- validateSchemaType(schemaType, typeArg, clsLoader, input);
+ TypeDescription schemaClass = null;
+ try {
+ schemaClass = typePool.describe(schemaType).resolve();
+ } catch (TypePool.Resolution.NoSuchTypeException e) {
+ throw new IllegalArgumentException(
+ String.format("The schema class %s does not exist", schemaType));
+ }
+ if (!schemaClass.asErasure().isAssignableTo(Schema.class)) {
+ throw new IllegalArgumentException(
+ String.format("%s does not implement %s", schemaType, Schema.class.getName()));
+ }
+ validateSchemaType(schemaClass, typeArg, typePool, input);
}
}
@@ -60,29 +65,32 @@ private static SchemaType getBuiltinSchemaType(String schemaTypeOrClassName) {
}
- public static void validateCryptoKeyReader(CryptoConfig conf, ClassLoader classLoader, boolean isProducer) {
+ public static void validateCryptoKeyReader(CryptoConfig conf, TypePool typePool, boolean isProducer) {
if (isEmpty(conf.getCryptoKeyReaderClassName())) {
return;
}
- Class> cryptoClass;
+ String cryptoClassName = conf.getCryptoKeyReaderClassName();
+ TypeDescription cryptoClass = null;
try {
- cryptoClass = ClassLoaderUtils.loadClass(conf.getCryptoKeyReaderClassName(), classLoader);
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
+ cryptoClass = typePool.describe(cryptoClassName).resolve();
+ } catch (TypePool.Resolution.NoSuchTypeException e) {
throw new IllegalArgumentException(
- String.format("The crypto key reader class %s does not exist", conf.getCryptoKeyReaderClassName()));
+ String.format("The crypto key reader class %s does not exist", cryptoClassName));
+ }
+ if (!cryptoClass.asErasure().isAssignableTo(CryptoKeyReader.class)) {
+ throw new IllegalArgumentException(
+ String.format("%s does not implement %s", cryptoClassName, CryptoKeyReader.class.getName()));
}
- ClassLoaderUtils.implementsClass(conf.getCryptoKeyReaderClassName(), CryptoKeyReader.class, classLoader);
- try {
- cryptoClass.getConstructor(Map.class);
- } catch (NoSuchMethodException ex) {
+ boolean hasConstructor = cryptoClass.getDeclaredMethods().stream()
+ .anyMatch(method -> method.isConstructor() && method.getParameters().size() == 1
+ && method.getParameters().get(0).getType().asErasure().represents(Map.class));
+
+ if (!hasConstructor) {
throw new IllegalArgumentException(
String.format("The crypto key reader class %s does not implement the desired constructor.",
conf.getCryptoKeyReaderClassName()));
-
- } catch (SecurityException e) {
- throw new IllegalArgumentException("Failed to access crypto key reader class", e);
}
if (isProducer && (conf.getEncryptionKeys() == null || conf.getEncryptionKeys().length == 0)) {
@@ -90,7 +98,7 @@ public static void validateCryptoKeyReader(CryptoConfig conf, ClassLoader classL
}
}
- public static void validateSerde(String inputSerializer, Class> typeArg, ClassLoader clsLoader,
+ public static void validateSerde(String inputSerializer, TypeDefinition typeArg, TypePool typePool,
boolean deser) {
if (isEmpty(inputSerializer)) {
return;
@@ -98,154 +106,53 @@ public static void validateSerde(String inputSerializer, Class> typeArg, Class
if (inputSerializer.equals(DEFAULT_SERDE)) {
return;
}
+ TypeDescription serdeClass;
try {
- Class> serdeClass = ClassLoaderUtils.loadClass(inputSerializer, clsLoader);
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
+ serdeClass = typePool.describe(inputSerializer).resolve();
+ } catch (TypePool.Resolution.NoSuchTypeException e) {
throw new IllegalArgumentException(
String.format("The input serialization/deserialization class %s does not exist",
inputSerializer));
}
- ClassLoaderUtils.implementsClass(inputSerializer, SerDe.class, clsLoader);
-
- SerDe serDe = (SerDe) Reflections.createInstance(inputSerializer, clsLoader);
- if (serDe == null) {
- throw new IllegalArgumentException(String.format("The SerDe class %s does not exist",
- inputSerializer));
- }
- Class>[] serDeTypes = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
-
- // type inheritance information seems to be lost in generic type
- // load the actual type class for verification
- Class> fnInputClass;
- Class> serdeInputClass;
- try {
- fnInputClass = Class.forName(typeArg.getName(), true, clsLoader);
- serdeInputClass = Class.forName(serDeTypes[0].getName(), true, clsLoader);
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
- throw new IllegalArgumentException("Failed to load type class", e);
- }
+ TypeDescription.Generic serDeTypeArg = serdeClass.getInterfaces().stream()
+ .filter(i -> i.asErasure().isAssignableTo(SerDe.class))
+ .findFirst()
+ .map(i -> i.getTypeArguments().get(0))
+ .orElseThrow(() -> new IllegalArgumentException(
+ String.format("%s does not implement %s", inputSerializer, SerDe.class.getName())));
if (deser) {
- if (!fnInputClass.isAssignableFrom(serdeInputClass)) {
- throw new IllegalArgumentException("Serializer type mismatch " + typeArg + " vs " + serDeTypes[0]);
+ if (!serDeTypeArg.asErasure().isAssignableTo(typeArg.asErasure())) {
+ throw new IllegalArgumentException("Serializer type mismatch " + typeArg.getActualName() + " vs "
+ + serDeTypeArg.getActualName());
}
} else {
- if (!serdeInputClass.isAssignableFrom(fnInputClass)) {
- throw new IllegalArgumentException("Serializer type mismatch " + typeArg + " vs " + serDeTypes[0]);
+ if (!serDeTypeArg.asErasure().isAssignableFrom(typeArg.asErasure())) {
+ throw new IllegalArgumentException("Serializer type mismatch " + typeArg.getActualName() + " vs "
+ + serDeTypeArg.getActualName());
}
}
}
- private static void validateSchemaType(String schemaClassName, Class> typeArg, ClassLoader clsLoader,
+ private static void validateSchemaType(TypeDefinition schema, TypeDefinition typeArg, TypePool typePool,
boolean input) {
- Schema> schema = (Schema>) Reflections.createInstance(schemaClassName, clsLoader);
- if (schema == null) {
- throw new IllegalArgumentException(String.format("The Schema class %s does not exist",
- schemaClassName));
- }
- Class>[] schemaTypes = TypeResolver.resolveRawArguments(Schema.class, schema.getClass());
- // type inheritance information seems to be lost in generic type
- // load the actual type class for verification
- Class> fnInputClass;
- Class> schemaInputClass;
- try {
- fnInputClass = Class.forName(typeArg.getName(), true, clsLoader);
- schemaInputClass = Class.forName(schemaTypes[0].getName(), true, clsLoader);
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
- throw new IllegalArgumentException("Failed to load type class", e);
- }
+ TypeDescription.Generic schemaTypeArg = schema.getInterfaces().stream()
+ .filter(i -> i.asErasure().isAssignableTo(Schema.class))
+ .findFirst()
+ .map(i -> i.getTypeArguments().get(0))
+ .orElse(null);
if (input) {
- if (!fnInputClass.isAssignableFrom(schemaInputClass)) {
+ if (!schemaTypeArg.asErasure().isAssignableTo(typeArg.asErasure())) {
throw new IllegalArgumentException(
- "Schema type mismatch " + typeArg + " vs " + schemaTypes[0]);
+ "Schema type mismatch " + typeArg.getActualName() + " vs " + schemaTypeArg.getActualName());
}
} else {
- if (!schemaInputClass.isAssignableFrom(fnInputClass)) {
+ if (!schemaTypeArg.asErasure().isAssignableFrom(typeArg.asErasure())) {
throw new IllegalArgumentException(
- "Schema type mismatch " + typeArg + " vs " + schemaTypes[0]);
- }
- }
- }
-
-
- public static void validateFunctionClassTypes(ClassLoader classLoader,
- Function.FunctionDetails.Builder functionDetailsBuilder) {
-
- // validate only if classLoader is provided
- if (classLoader == null) {
- return;
- }
-
- if (isBlank(functionDetailsBuilder.getClassName())) {
- throw new IllegalArgumentException("Function class-name can't be empty");
- }
-
- // validate function class-type
- Class functionClass;
- try {
- functionClass = classLoader.loadClass(functionDetailsBuilder.getClassName());
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
- throw new IllegalArgumentException(
- String.format("Function class %s must be in class path", functionDetailsBuilder.getClassName()), e);
- }
- Class>[] typeArgs = FunctionCommon.getFunctionTypes(functionClass, false);
-
- if (!(org.apache.pulsar.functions.api.Function.class.isAssignableFrom(functionClass))
- && !(java.util.function.Function.class.isAssignableFrom(functionClass))) {
- throw new RuntimeException("User class must either be Function or java.util.Function");
- }
-
- if (functionDetailsBuilder.hasSource() && functionDetailsBuilder.getSource() != null
- && isNotBlank(functionDetailsBuilder.getSource().getClassName())) {
- try {
- String sourceClassName = functionDetailsBuilder.getSource().getClassName();
- String argClassName = FunctionCommon.getTypeArg(sourceClassName, Source.class, classLoader).getName();
- functionDetailsBuilder
- .setSource(functionDetailsBuilder.getSourceBuilder().setTypeClassName(argClassName));
-
- // if sink-class not present then set same arg as source
- if (!functionDetailsBuilder.hasSink() || isBlank(functionDetailsBuilder.getSink().getClassName())) {
- functionDetailsBuilder
- .setSink(functionDetailsBuilder.getSinkBuilder().setTypeClassName(argClassName));
- }
-
- } catch (IllegalArgumentException ie) {
- throw ie;
- } catch (Exception e) {
- log.error("Failed to validate source class", e);
- throw new IllegalArgumentException("Failed to validate source class-name", e);
- }
- } else if (isBlank(functionDetailsBuilder.getSourceBuilder().getTypeClassName())) {
- // if function-src-class is not present then set function-src type-class according to function class
- functionDetailsBuilder
- .setSource(functionDetailsBuilder.getSourceBuilder().setTypeClassName(typeArgs[0].getName()));
- }
-
- if (functionDetailsBuilder.hasSink() && functionDetailsBuilder.getSink() != null
- && isNotBlank(functionDetailsBuilder.getSink().getClassName())) {
- try {
- String sinkClassName = functionDetailsBuilder.getSink().getClassName();
- String argClassName = FunctionCommon.getTypeArg(sinkClassName, Sink.class, classLoader).getName();
- functionDetailsBuilder.setSink(functionDetailsBuilder.getSinkBuilder().setTypeClassName(argClassName));
-
- // if source-class not present then set same arg as sink
- if (!functionDetailsBuilder.hasSource() || isBlank(functionDetailsBuilder.getSource().getClassName())) {
- functionDetailsBuilder
- .setSource(functionDetailsBuilder.getSourceBuilder().setTypeClassName(argClassName));
- }
-
- } catch (IllegalArgumentException ie) {
- throw ie;
- } catch (Exception e) {
- log.error("Failed to validate sink class", e);
- throw new IllegalArgumentException("Failed to validate sink class-name", e);
+ "Schema type mismatch " + typeArg.getActualName() + " vs " + schemaTypeArg.getActualName());
}
- } else if (isBlank(functionDetailsBuilder.getSinkBuilder().getTypeClassName())) {
- // if function-sink-class is not present then set function-sink type-class according to function class
- functionDetailsBuilder
- .setSink(functionDetailsBuilder.getSinkBuilder().setTypeClassName(typeArgs[1].getName()));
}
}
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionArchive.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionArchive.java
index 8d621cc965f80..54f3da7a448c5 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionArchive.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionArchive.java
@@ -19,14 +19,50 @@
package org.apache.pulsar.functions.utils.functions;
import java.nio.file.Path;
-import lombok.Builder;
-import lombok.Data;
import org.apache.pulsar.common.functions.FunctionDefinition;
+import org.apache.pulsar.functions.utils.FunctionFilePackage;
+import org.apache.pulsar.functions.utils.ValidatableFunctionPackage;
-@Builder
-@Data
-public class FunctionArchive {
- private Path archivePath;
- private ClassLoader classLoader;
- private FunctionDefinition functionDefinition;
+public class FunctionArchive implements AutoCloseable {
+ private final Path archivePath;
+ private final FunctionDefinition functionDefinition;
+ private final String narExtractionDirectory;
+ private final boolean enableClassloading;
+ private ValidatableFunctionPackage functionPackage;
+ private boolean closed;
+
+ public FunctionArchive(Path archivePath, FunctionDefinition functionDefinition, String narExtractionDirectory,
+ boolean enableClassloading) {
+ this.archivePath = archivePath;
+ this.functionDefinition = functionDefinition;
+ this.narExtractionDirectory = narExtractionDirectory;
+ this.enableClassloading = enableClassloading;
+ }
+
+ public Path getArchivePath() {
+ return archivePath;
+ }
+
+ public synchronized ValidatableFunctionPackage getFunctionPackage() {
+ if (closed) {
+ throw new IllegalStateException("FunctionArchive is already closed");
+ }
+ if (functionPackage == null) {
+ functionPackage = new FunctionFilePackage(archivePath.toFile(), narExtractionDirectory, enableClassloading,
+ FunctionDefinition.class);
+ }
+ return functionPackage;
+ }
+
+ public FunctionDefinition getFunctionDefinition() {
+ return functionDefinition;
+ }
+
+ @Override
+ public synchronized void close() throws Exception {
+ closed = true;
+ if (functionPackage instanceof AutoCloseable) {
+ ((AutoCloseable) functionPackage).close();
+ }
+ }
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java
index 8b70a4b650814..e09f3f1f12f7d 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.pulsar.functions.utils.functions;
import java.io.File;
@@ -30,10 +31,8 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.common.nar.NarClassLoader;
-import org.apache.pulsar.common.nar.NarClassLoaderBuilder;
import org.apache.pulsar.common.util.ObjectMapperFactory;
-import org.apache.pulsar.functions.api.Function;
-import org.apache.pulsar.functions.utils.Exceptions;
+import org.zeroturnaround.zip.ZipUtil;
@UtilityClass
@@ -45,43 +44,40 @@ public class FunctionUtils {
/**
* Extract the Pulsar Function class from a function or archive.
*/
- public static String getFunctionClass(ClassLoader classLoader) throws IOException {
- NarClassLoader ncl = (NarClassLoader) classLoader;
- String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
-
- FunctionDefinition conf = ObjectMapperFactory.getThreadLocalYaml().readValue(configStr,
- FunctionDefinition.class);
- if (StringUtils.isEmpty(conf.getFunctionClass())) {
- throw new IOException(
- String.format("The '%s' functionctor does not provide a function implementation", conf.getName()));
- }
+ public static String getFunctionClass(File narFile) throws IOException {
+ return getFunctionDefinition(narFile).getFunctionClass();
+ }
- try {
- // Try to load source class and check it implements Function interface
- Class functionClass = ncl.loadClass(conf.getFunctionClass());
- if (!(Function.class.isAssignableFrom(functionClass))) {
- throw new IOException(
- "Class " + conf.getFunctionClass() + " does not implement interface " + Function.class
- .getName());
- }
- } catch (Throwable t) {
- Exceptions.rethrowIOException(t);
+ public static FunctionDefinition getFunctionDefinition(File narFile) throws IOException {
+ return getPulsarIOServiceConfig(narFile, FunctionDefinition.class);
+ }
+
+ public static T getPulsarIOServiceConfig(File narFile, Class valueType) throws IOException {
+ String filename = "META-INF/services/" + PULSAR_IO_SERVICE_NAME;
+ byte[] configEntry = ZipUtil.unpackEntry(narFile, filename);
+ if (configEntry != null) {
+ return ObjectMapperFactory.getThreadLocalYaml().reader().readValue(configEntry, valueType);
+ } else {
+ return null;
}
+ }
- return conf.getFunctionClass();
+ public static String getFunctionClass(NarClassLoader narClassLoader) throws IOException {
+ return getFunctionDefinition(narClassLoader).getFunctionClass();
}
public static FunctionDefinition getFunctionDefinition(NarClassLoader narClassLoader) throws IOException {
- String configStr = narClassLoader.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
- return ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, FunctionDefinition.class);
+ return getPulsarIOServiceConfig(narClassLoader, FunctionDefinition.class);
}
- public static TreeMap searchForFunctions(String functionsDirectory) throws IOException {
- return searchForFunctions(functionsDirectory, false);
+ public static T getPulsarIOServiceConfig(NarClassLoader narClassLoader, Class valueType) throws IOException {
+ return ObjectMapperFactory.getThreadLocalYaml().reader()
+ .readValue(narClassLoader.getServiceDefinition(PULSAR_IO_SERVICE_NAME), valueType);
}
public static TreeMap searchForFunctions(String functionsDirectory,
- boolean alwaysPopulatePath) throws IOException {
+ String narExtractionDirectory,
+ boolean enableClassloading) throws IOException {
Path path = Paths.get(functionsDirectory).toAbsolutePath();
log.info("Searching for functions in {}", path);
@@ -95,22 +91,12 @@ public static TreeMap searchForFunctions(String functio
try (DirectoryStream stream = Files.newDirectoryStream(path, "*.nar")) {
for (Path archive : stream) {
try {
-
- NarClassLoader ncl = NarClassLoaderBuilder.builder()
- .narFile(new File(archive.toString()))
- .build();
-
- FunctionArchive.FunctionArchiveBuilder functionArchiveBuilder = FunctionArchive.builder();
- FunctionDefinition cntDef = FunctionUtils.getFunctionDefinition(ncl);
+ FunctionDefinition cntDef = FunctionUtils.getFunctionDefinition(archive.toFile());
log.info("Found function {} from {}", cntDef, archive);
-
- functionArchiveBuilder.archivePath(archive);
-
- functionArchiveBuilder.classLoader(ncl);
- functionArchiveBuilder.functionDefinition(cntDef);
-
- if (alwaysPopulatePath || !StringUtils.isEmpty(cntDef.getFunctionClass())) {
- functions.put(cntDef.getName(), functionArchiveBuilder.build());
+ if (!StringUtils.isEmpty(cntDef.getFunctionClass())) {
+ FunctionArchive functionArchive =
+ new FunctionArchive(archive, cntDef, narExtractionDirectory, enableClassloading);
+ functions.put(cntDef.getName(), functionArchive);
}
} catch (Throwable t) {
log.warn("Failed to load function from {}", archive, t);
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connector.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connector.java
index e2e6e39e273ff..b78c5c45fbd8b 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connector.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connector.java
@@ -20,17 +20,79 @@
import java.nio.file.Path;
import java.util.List;
-import lombok.Builder;
-import lombok.Data;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.io.ConfigFieldDefinition;
import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.functions.utils.FunctionFilePackage;
+import org.apache.pulsar.functions.utils.ValidatableFunctionPackage;
-@Builder
-@Data
-public class Connector {
- private Path archivePath;
+public class Connector implements AutoCloseable {
+ private final Path archivePath;
+ private final String narExtractionDirectory;
+ private final boolean enableClassloading;
+ private ValidatableFunctionPackage connectorFunctionPackage;
private List sourceConfigFieldDefinitions;
private List sinkConfigFieldDefinitions;
- private ClassLoader classLoader;
private ConnectorDefinition connectorDefinition;
+ private boolean closed;
+
+ public Connector(Path archivePath, ConnectorDefinition connectorDefinition, String narExtractionDirectory,
+ boolean enableClassloading) {
+ this.archivePath = archivePath;
+ this.connectorDefinition = connectorDefinition;
+ this.narExtractionDirectory = narExtractionDirectory;
+ this.enableClassloading = enableClassloading;
+ }
+
+ public Path getArchivePath() {
+ return archivePath;
+ }
+
+ public synchronized ValidatableFunctionPackage getConnectorFunctionPackage() {
+ checkState();
+ if (connectorFunctionPackage == null) {
+ connectorFunctionPackage =
+ new FunctionFilePackage(archivePath.toFile(), narExtractionDirectory, enableClassloading,
+ ConnectorDefinition.class);
+ }
+ return connectorFunctionPackage;
+ }
+
+ private void checkState() {
+ if (closed) {
+ throw new IllegalStateException("Connector is already closed");
+ }
+ }
+
+ public synchronized List getSourceConfigFieldDefinitions() {
+ checkState();
+ if (sourceConfigFieldDefinitions == null && !StringUtils.isEmpty(connectorDefinition.getSourceClass())
+ && !StringUtils.isEmpty(connectorDefinition.getSourceConfigClass())) {
+ sourceConfigFieldDefinitions = ConnectorUtils.getConnectorConfigDefinition(getConnectorFunctionPackage(),
+ connectorDefinition.getSourceConfigClass());
+ }
+ return sourceConfigFieldDefinitions;
+ }
+
+ public synchronized List getSinkConfigFieldDefinitions() {
+ checkState();
+ if (sinkConfigFieldDefinitions == null && !StringUtils.isEmpty(connectorDefinition.getSinkClass())
+ && !StringUtils.isEmpty(connectorDefinition.getSinkConfigClass())) {
+ sinkConfigFieldDefinitions = ConnectorUtils.getConnectorConfigDefinition(getConnectorFunctionPackage(),
+ connectorDefinition.getSinkConfigClass());
+ }
+ return sinkConfigFieldDefinitions;
+ }
+
+ public ConnectorDefinition getConnectorDefinition() {
+ return connectorDefinition;
+ }
+
+ @Override
+ public synchronized void close() throws Exception {
+ closed = true;
+ if (connectorFunctionPackage instanceof AutoCloseable) {
+ ((AutoCloseable) connectorFunctionPackage).close();
+ }
+ }
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
index a4c74d0cdfdd0..24d4f8fbf68de 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
@@ -20,9 +20,6 @@
import java.io.File;
import java.io.IOException;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -34,14 +31,18 @@
import java.util.TreeMap;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
+import net.bytebuddy.description.annotation.AnnotationDescription;
+import net.bytebuddy.description.annotation.AnnotationValue;
+import net.bytebuddy.description.field.FieldDescription;
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.description.type.TypeDefinition;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.io.ConfigFieldDefinition;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.nar.NarClassLoader;
-import org.apache.pulsar.common.nar.NarClassLoaderBuilder;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
-import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.utils.Exceptions;
+import org.apache.pulsar.functions.utils.ValidatableFunctionPackage;
+import org.apache.pulsar.functions.utils.functions.FunctionUtils;
import org.apache.pulsar.io.core.BatchSource;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.Source;
@@ -68,7 +69,7 @@ public static String getIOSourceClass(NarClassLoader narClassLoader) throws IOEx
Class sourceClass = narClassLoader.loadClass(conf.getSourceClass());
if (!(Source.class.isAssignableFrom(sourceClass) || BatchSource.class.isAssignableFrom(sourceClass))) {
throw new IOException(String.format("Class %s does not implement interface %s or %s",
- conf.getSourceClass(), Source.class.getName(), BatchSource.class.getName()));
+ conf.getSourceClass(), Source.class.getName(), BatchSource.class.getName()));
}
} catch (Throwable t) {
Exceptions.rethrowIOException(t);
@@ -101,32 +102,36 @@ public static String getIOSinkClass(NarClassLoader narClassLoader) throws IOExce
return conf.getSinkClass();
}
- public static ConnectorDefinition getConnectorDefinition(NarClassLoader narClassLoader) throws IOException {
- String configStr = narClassLoader.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
+ public static ConnectorDefinition getConnectorDefinition(File narFile) throws IOException {
+ return FunctionUtils.getPulsarIOServiceConfig(narFile, ConnectorDefinition.class);
+ }
- return ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, ConnectorDefinition.class);
+ public static ConnectorDefinition getConnectorDefinition(NarClassLoader narClassLoader) throws IOException {
+ return FunctionUtils.getPulsarIOServiceConfig(narClassLoader, ConnectorDefinition.class);
}
- public static List getConnectorConfigDefinition(ClassLoader classLoader,
- String configClassName) throws Exception {
+ public static List getConnectorConfigDefinition(
+ ValidatableFunctionPackage connectorFunctionPackage,
+ String configClassName) {
List retval = new LinkedList<>();
- Class configClass = classLoader.loadClass(configClassName);
- for (Field field : Reflections.getAllFields(configClass)) {
- if (java.lang.reflect.Modifier.isStatic(field.getModifiers())) {
- // We dont want static fields
+ TypeDefinition configClass = connectorFunctionPackage.resolveType(configClassName);
+
+ for (FieldDescription field : getAllFields(configClass)) {
+ if (field.isStatic()) {
+ // We don't want static fields
continue;
}
- field.setAccessible(true);
ConfigFieldDefinition configFieldDefinition = new ConfigFieldDefinition();
configFieldDefinition.setFieldName(field.getName());
- configFieldDefinition.setTypeName(field.getType().getName());
+ configFieldDefinition.setTypeName(field.getType().getActualName());
Map attributes = new HashMap<>();
- for (Annotation annotation : field.getAnnotations()) {
- if (annotation.annotationType().equals(FieldDoc.class)) {
- FieldDoc fieldDoc = (FieldDoc) annotation;
- for (Method method : FieldDoc.class.getDeclaredMethods()) {
- Object value = method.invoke(fieldDoc);
- attributes.put(method.getName(), value == null ? "" : value.toString());
+ for (AnnotationDescription annotation : field.getDeclaredAnnotations()) {
+ if (annotation.getAnnotationType().represents(FieldDoc.class)) {
+ for (MethodDescription.InDefinedShape method : annotation.getAnnotationType()
+ .getDeclaredMethods()) {
+ AnnotationValue, ?> value = annotation.getValue(method.getName());
+ attributes.put(method.getName(),
+ value == null || value.resolve() == null ? "" : value.resolve().toString());
}
}
}
@@ -137,12 +142,25 @@ public static List getConnectorConfigDefinition(ClassLoad
return retval;
}
+ private static List getAllFields(TypeDefinition type) {
+ List fields = new LinkedList<>();
+ fields.addAll(type.getDeclaredFields());
+
+ if (type.getSuperClass() != null) {
+ fields.addAll(getAllFields(type.getSuperClass()));
+ }
+
+ return fields;
+ }
+
public static TreeMap searchForConnectors(String connectorsDirectory,
- String narExtractionDirectory) throws IOException {
+ String narExtractionDirectory,
+ boolean enableClassloading) throws IOException {
Path path = Paths.get(connectorsDirectory).toAbsolutePath();
log.info("Searching for connectors in {}", path);
TreeMap connectors = new TreeMap<>();
+
if (!path.toFile().exists()) {
log.warn("Connectors archive directory not found");
return connectors;
@@ -151,40 +169,15 @@ public static TreeMap searchForConnectors(String connectorsDi
try (DirectoryStream stream = Files.newDirectoryStream(path, "*.nar")) {
for (Path archive : stream) {
try {
-
- NarClassLoader ncl = NarClassLoaderBuilder.builder()
- .narFile(new File(archive.toString()))
- .extractionDirectory(narExtractionDirectory)
- .build();
-
- Connector.ConnectorBuilder connectorBuilder = Connector.builder();
- ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(ncl);
+ ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(archive.toFile());
log.info("Found connector {} from {}", cntDef, archive);
-
- connectorBuilder.archivePath(archive);
- if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
- if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
- connectorBuilder.sourceConfigFieldDefinitions(ConnectorUtils
- .getConnectorConfigDefinition(ncl, cntDef.getSourceConfigClass()));
- }
- }
-
- if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
- if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
- connectorBuilder.sinkConfigFieldDefinitions(
- ConnectorUtils.getConnectorConfigDefinition(ncl, cntDef.getSinkConfigClass()));
- }
- }
-
- connectorBuilder.classLoader(ncl);
- connectorBuilder.connectorDefinition(cntDef);
- connectors.put(cntDef.getName(), connectorBuilder.build());
+ Connector connector = new Connector(archive, cntDef, narExtractionDirectory, enableClassloading);
+ connectors.put(cntDef.getName(), connector);
} catch (Throwable t) {
log.warn("Failed to load connector from {}", archive, t);
}
}
-
- return connectors;
}
+ return connectors;
}
}
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionCommonTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionCommonTest.java
index b991f8b4f473c..98c8dacb25c12 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionCommonTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionCommonTest.java
@@ -19,9 +19,16 @@
package org.apache.pulsar.functions.utils;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import java.io.File;
import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import net.bytebuddy.description.type.TypeDefinition;
+import net.bytebuddy.pool.TypePool;
import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
@@ -31,52 +38,10 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-import java.io.File;
-import java.util.UUID;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-
/**
* Unit test of {@link Exceptions}.
*/
public class FunctionCommonTest {
-
- @Test
- public void testValidateLocalFileUrl() throws Exception {
- String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
- try {
- // eg: fileLocation : /dir/fileName.jar (invalid)
- FunctionCommon.extractClassLoader(fileLocation);
- Assert.fail("should fail with invalid url: without protocol");
- } catch (IllegalArgumentException ie) {
- // Ok.. expected exception
- }
- String fileLocationWithProtocol = "file://" + fileLocation;
- // eg: fileLocation : file:///dir/fileName.jar (valid)
- FunctionCommon.extractClassLoader(fileLocationWithProtocol);
- // eg: fileLocation : file:/dir/fileName.jar (valid)
- fileLocationWithProtocol = "file:" + fileLocation;
- FunctionCommon.extractClassLoader(fileLocationWithProtocol);
- }
-
- @Test
- public void testValidateHttpFileUrl() throws Exception {
-
- String jarHttpUrl = "https://repo1.maven.org/maven2/org/apache/pulsar/pulsar-common/2.4.2/pulsar-common-2.4.2.jar";
- FunctionCommon.extractClassLoader(jarHttpUrl);
-
- jarHttpUrl = "http://_invalidurl_.com";
- try {
- // eg: fileLocation : /dir/fileName.jar (invalid)
- FunctionCommon.extractClassLoader(jarHttpUrl);
- Assert.fail("should fail with invalid url: without protocol");
- } catch (Exception ie) {
- // Ok.. expected exception
- }
- }
-
@Test
public void testDownloadFile() throws Exception {
String jarHttpUrl = "https://repo1.maven.org/maven2/org/apache/pulsar/pulsar-common/2.4.2/pulsar-common-2.4.2.jar";
@@ -128,6 +93,14 @@ public Record process(String input, Context context) throws Exception {
}
}, false
},
+ {
+ new Function>() {
+ @Override
+ public CompletableFuture process(String input, Context context) throws Exception {
+ return null;
+ }
+ }, false
+ },
{
new java.util.function.Function() {
@Override
@@ -144,6 +117,14 @@ public Record apply(String s) {
}
}, false
},
+ {
+ new java.util.function.Function>() {
+ @Override
+ public CompletableFuture apply(String s) {
+ return null;
+ }
+ }, false
+ },
{
new WindowFunction() {
@Override
@@ -160,6 +141,14 @@ public Record process(Collection> input, WindowContext c
}
}, true
},
+ {
+ new WindowFunction>() {
+ @Override
+ public CompletableFuture process(Collection> input, WindowContext context) throws Exception {
+ return null;
+ }
+ }, true
+ },
{
new java.util.function.Function, Integer>() {
@Override
@@ -175,15 +164,26 @@ public Record apply(Collection strings) {
return null;
}
}, true
+ },
+ {
+ new java.util.function.Function, CompletableFuture>() {
+ @Override
+ public CompletableFuture apply(Collection strings) {
+ return null;
+ }
+ }, true
}
};
}
@Test(dataProvider = "function")
public void testGetFunctionTypes(Object function, boolean isWindowConfigPresent) {
- Class>[] types = FunctionCommon.getFunctionTypes(function.getClass(), isWindowConfigPresent);
+ TypePool typePool = TypePool.Default.of(function.getClass().getClassLoader());
+ TypeDefinition[] types =
+ FunctionCommon.getFunctionTypes(typePool.describe(function.getClass().getName()).resolve(),
+ isWindowConfigPresent);
assertEquals(types.length, 2);
- assertEquals(types[0], String.class);
- assertEquals(types[1], Integer.class);
+ assertEquals(types[0].asErasure().getTypeName(), String.class.getName());
+ assertEquals(types[1].asErasure().getTypeName(), Integer.class.getName());
}
}
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
index d2b387cdf9938..85d481c252422 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
@@ -18,12 +18,23 @@
*/
package org.apache.pulsar.functions.utils;
+import static org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE;
+import static org.apache.pulsar.common.functions.FunctionConfig.Runtime.PYTHON;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
import com.google.gson.Gson;
-
-import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
@@ -31,28 +42,29 @@
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.functions.WindowConfig;
import org.apache.pulsar.common.util.Reflections;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.api.WindowContext;
+import org.apache.pulsar.functions.api.WindowFunction;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.testng.annotations.Test;
-import java.lang.reflect.Field;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE;
-import static org.apache.pulsar.common.functions.FunctionConfig.Runtime.PYTHON;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertThrows;
-import static org.testng.Assert.assertTrue;
-
/**
* Unit test of {@link Reflections}.
*/
@Slf4j
public class FunctionConfigUtilsTest {
+ public static class WordCountWindowFunction implements WindowFunction {
+ @Override
+ public Void process(Collection> inputs, WindowContext context) throws Exception {
+ for (Record input : inputs) {
+ Arrays.asList(input.getValue().split("\\.")).forEach(word -> context.incrCounter(word, 1));
+ }
+ return null;
+ }
+ }
+
@Test
public void testAutoAckConvertFailed() {
@@ -62,7 +74,7 @@ public void testAutoAckConvertFailed() {
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATMOST_ONCE);
assertThrows(IllegalArgumentException.class, () -> {
- FunctionConfigUtils.convert(functionConfig, (ClassLoader) null);
+ FunctionConfigUtils.convert(functionConfig);
});
}
@@ -97,7 +109,7 @@ public void testConvertBackFidelity() {
producerConfig.setUseThreadLocalProducers(true);
producerConfig.setBatchBuilder("DEFAULT");
functionConfig.setProducerConfig(producerConfig);
- Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, (ClassLoader) null);
+ Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig);
FunctionConfig convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails);
// add default resources
@@ -117,7 +129,7 @@ public void testConvertWindow() {
functionConfig.setNamespace("test-namespace");
functionConfig.setName("test-function");
functionConfig.setParallelism(1);
- functionConfig.setClassName(IdentityFunction.class.getName());
+ functionConfig.setClassName(WordCountWindowFunction.class.getName());
Map inputSpecs = new HashMap<>();
inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").build());
functionConfig.setInputSpecs(inputSpecs);
@@ -138,7 +150,7 @@ public void testConvertWindow() {
producerConfig.setUseThreadLocalProducers(true);
producerConfig.setBatchBuilder("KEY_BASED");
functionConfig.setProducerConfig(producerConfig);
- Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, (ClassLoader) null);
+ Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig);
FunctionConfig convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails);
// WindowsFunction guarantees convert to FunctionGuarantees.
@@ -160,7 +172,7 @@ public void testConvertBatchBuilder() {
FunctionConfig functionConfig = createFunctionConfig();
functionConfig.setBatchBuilder("KEY_BASED");
- Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, (ClassLoader) null);
+ Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig);
assertEquals(functionDetails.getSink().getProducerSpec().getBatchBuilder(), "KEY_BASED");
FunctionConfig convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails);
@@ -515,7 +527,6 @@ private FunctionConfig createFunctionConfig() {
functionConfig.setUserConfig(new HashMap<>());
functionConfig.setAutoAck(true);
functionConfig.setTimeoutMs(2000L);
- functionConfig.setWindowConfig(new WindowConfig().setWindowLengthCount(10));
functionConfig.setCleanupSubscription(true);
functionConfig.setRuntimeFlags("-Dfoo=bar");
return functionConfig;
@@ -549,7 +560,7 @@ public void testDisableForwardSourceMessageProperty() throws InvalidProtocolBuff
config.setForwardSourceMessageProperty(true);
FunctionConfigUtils.inferMissingArguments(config, false);
assertNull(config.getForwardSourceMessageProperty());
- FunctionDetails details = FunctionConfigUtils.convert(config, FunctionConfigUtilsTest.class.getClassLoader());
+ FunctionDetails details = FunctionConfigUtils.convert(config);
assertFalse(details.getSink().getForwardSourceMessageProperty());
String detailsJson = "'" + JsonFormat.printer().omittingInsignificantWhitespace().print(details) + "'";
log.info("Function details : {}", detailsJson);
@@ -636,7 +647,7 @@ public void testMergeDifferentOutputSchemaTypes() {
@Test
public void testPoolMessages() {
FunctionConfig functionConfig = createFunctionConfig();
- Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, (ClassLoader) null);
+ Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig);
assertFalse(functionDetails.getSource().getInputSpecsMap().get("test-input").getPoolMessages());
FunctionConfig convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails);
assertFalse(convertedConfig.getInputSpecs().get("test-input").isPoolMessages());
@@ -646,7 +657,7 @@ public void testPoolMessages() {
.poolMessages(true).build());
functionConfig.setInputSpecs(inputSpecs);
- functionDetails = FunctionConfigUtils.convert(functionConfig, (ClassLoader) null);
+ functionDetails = FunctionConfigUtils.convert(functionConfig);
assertTrue(functionDetails.getSource().getInputSpecsMap().get("test-input").getPoolMessages());
convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails);
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
index 701e852fa7d8c..cb55eebbe50b3 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
@@ -18,36 +18,38 @@
*/
package org.apache.pulsar.functions.utils;
+import static org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE;
+import static org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.ATMOST_ONCE;
+import static org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
+import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.config.validation.ConfigValidationAnnotations;
-import org.apache.pulsar.functions.api.utils.IdentityFunction;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
import org.testng.annotations.Test;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE;
-import static org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.ATMOST_ONCE;
-import static org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertThrows;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.expectThrows;
-
/**
* Unit test of {@link SinkConfigUtilsTest}.
*/
@@ -61,6 +63,27 @@ public static class TestSinkConfig {
private String configParameter;
}
+
+ public static class NopSink implements Sink