diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml index 20a2f25e3b2ad..5310044a97466 100644 --- a/conf/functions_worker.yml +++ b/conf/functions_worker.yml @@ -398,7 +398,15 @@ saslJaasServerRoleTokenSignerSecretPath: ######################## connectorsDirectory: ./connectors +# Whether to enable referencing connectors directory files by file url in connector (sink/source) creation +enableReferencingConnectorDirectoryFiles: true +# Regex patterns for enabling creation of connectors by referencing packages in matching http/https urls +additionalEnabledConnectorUrlPatterns: [] functionsDirectory: ./functions +# Whether to enable referencing functions directory files by file url in functions creation +enableReferencingFunctionsDirectoryFiles: true +# Regex patterns for enabling creation of functions by referencing packages in matching http/https urls +additionalEnabledFunctionsUrlPatterns: [] # Enables extended validation for connector config with fine-grain annotation based validation # during submission. Classloading with either enableClassloadingOfExternalFiles or diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java index 173794a2cfb58..921ee8f5d6443 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java @@ -27,14 +27,15 @@ import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; - import com.google.common.collect.Lists; import com.google.common.collect.Sets; import io.jsonwebtoken.SignatureAlgorithm; import java.lang.reflect.Method; import java.net.URL; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Properties; @@ -265,6 +266,11 @@ private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration conf workerConfig.setAuthorizationEnabled(config.isAuthorizationEnabled()); workerConfig.setAuthorizationProvider(config.getAuthorizationProvider()); + List urlPatterns = + Arrays.asList(getPulsarApiExamplesJar().getParentFile().toURI() + ".*", "http://127\\.0\\.0\\.1:.*"); + workerConfig.setAdditionalEnabledConnectorUrlPatterns(urlPatterns); + workerConfig.setAdditionalEnabledFunctionsUrlPatterns(urlPatterns); + PulsarWorkerService workerService = new PulsarWorkerService(); workerService.init(workerConfig, null, false); return workerService; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java index d985241e2903d..7df84459322ed 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java @@ -25,13 +25,13 @@ import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; - import com.google.common.collect.Lists; import com.google.common.collect.Sets; import java.io.File; import java.lang.reflect.Method; import java.net.URI; import java.net.URL; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -260,6 +260,10 @@ private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration conf workerConfig.setAuthenticationEnabled(true); workerConfig.setAuthorizationEnabled(true); + List urlPatterns = Arrays.asList(getPulsarApiExamplesJar().getParentFile().toURI() + ".*"); + workerConfig.setAdditionalEnabledConnectorUrlPatterns(urlPatterns); + workerConfig.setAdditionalEnabledFunctionsUrlPatterns(urlPatterns); + PulsarWorkerService workerService = new PulsarWorkerService(); workerService.init(workerConfig, null, false); return workerService; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java index f4a27506c2e10..28b0b2856786d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java @@ -26,9 +26,11 @@ import java.io.File; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -71,6 +73,7 @@ public class PulsarFunctionTlsTest { protected PulsarService[] pulsarServices = new PulsarService[BROKER_COUNT]; protected PulsarService leaderPulsar; protected PulsarAdmin leaderAdmin; + protected WorkerService[] fnWorkerServices = new WorkerService[BROKER_COUNT]; protected String testCluster = "my-cluster"; protected String testTenant = "my-tenant"; protected String testNamespace = testTenant + "/my-ns"; @@ -136,12 +139,18 @@ void setup() throws Exception { workerConfig.setBrokerClientAuthenticationEnabled(true); workerConfig.setTlsEnabled(true); workerConfig.setUseTls(true); - WorkerService fnWorkerService = WorkerServiceLoader.load(workerConfig); + File packagePath = new File( + PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath()).getParentFile(); + List urlPatterns = + Arrays.asList(packagePath.toURI() + ".*"); + workerConfig.setAdditionalEnabledConnectorUrlPatterns(urlPatterns); + workerConfig.setAdditionalEnabledFunctionsUrlPatterns(urlPatterns); + fnWorkerServices[i] = WorkerServiceLoader.load(workerConfig); configurations[i] = config; pulsarServices[i] = new PulsarService( - config, workerConfig, Optional.of(fnWorkerService), code -> {}); + config, workerConfig, Optional.of(fnWorkerServices[i]), code -> {}); pulsarServices[i].start(); // Sleep until pulsarServices[0] becomes leader, this way we can spy namespace bundle assignment easily. @@ -180,6 +189,9 @@ void tearDown() throws Exception { if (pulsarAdmins[i] != null) { pulsarAdmins[i].close(); } + if (fnWorkerServices[i] != null) { + fnWorkerServices[i].stop(); + } if (pulsarServices[i] != null) { pulsarServices[i].close(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java index 9e1edea2f8029..96521094b9f71 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java @@ -25,7 +25,7 @@ import static org.apache.pulsar.functions.worker.PulsarFunctionLocalRunTest.getPulsarIODataGeneratorNar; import static org.mockito.Mockito.spy; import static org.testng.Assert.assertTrue; - +import com.google.common.collect.Sets; import java.io.File; import java.io.IOException; import java.lang.reflect.Method; @@ -34,10 +34,10 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; - import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.ServiceConfigurationUtils; @@ -70,8 +70,6 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -import com.google.common.collect.Sets; - public abstract class AbstractPulsarE2ETest { public static final Logger log = LoggerFactory.getLogger(AbstractPulsarE2ETest.class); @@ -288,6 +286,11 @@ private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration conf workerConfig.setAuthenticationEnabled(true); workerConfig.setAuthorizationEnabled(true); + List urlPatterns = + Arrays.asList(getPulsarApiExamplesJar().getParentFile().toURI() + ".*", "http://127\\.0\\.0\\.1:.*"); + workerConfig.setAdditionalEnabledConnectorUrlPatterns(urlPatterns); + workerConfig.setAdditionalEnabledFunctionsUrlPatterns(urlPatterns); + PulsarWorkerService workerService = new PulsarWorkerService(); workerService.init(workerConfig, null, false); return workerService; diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index cd4c6c3da7fc1..9be6272aa3746 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -237,6 +237,18 @@ public class WorkerConfig implements Serializable, PulsarConfiguration { doc = "The directory where nar packages are extractors" ) private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR; + @FieldContext( + category = CATEGORY_CONNECTORS, + doc = "Whether to enable referencing connectors directory files by file url in connector (sink/source) " + + "creation. Default is true." + ) + private Boolean enableReferencingConnectorDirectoryFiles = true; + @FieldContext( + category = CATEGORY_FUNCTIONS, + doc = "Regex patterns for enabling creation of connectors by referencing packages in matching http/https " + + "urls." + ) + private List additionalEnabledConnectorUrlPatterns = new ArrayList<>(); @FieldContext( category = CATEGORY_CONNECTORS, doc = "Enables extended validation for connector config with fine-grain annotation based validation " @@ -255,6 +267,18 @@ public class WorkerConfig implements Serializable, PulsarConfiguration { doc = "The path to the location to locate builtin functions" ) private String functionsDirectory = "./functions"; + @FieldContext( + category = CATEGORY_FUNCTIONS, + doc = "Whether to enable referencing functions directory files by file url in functions creation. " + + "Default is true." + ) + private Boolean enableReferencingFunctionsDirectoryFiles = true; + @FieldContext( + category = CATEGORY_FUNCTIONS, + doc = "Regex patterns for enabling creation of functions by referencing packages in matching http/https " + + "urls." + ) + private List additionalEnabledFunctionsUrlPatterns = new ArrayList<>(); @FieldContext( category = CATEGORY_FUNC_METADATA_MNG, doc = "The Pulsar topic used for storing function metadata" diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java index a982604b05627..a592a1726a7bb 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java @@ -29,7 +29,22 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.io.MoreFiles; import com.google.common.io.RecursiveDeleteOption; - +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -57,23 +72,6 @@ import org.apache.pulsar.functions.utils.ValidatableFunctionPackage; import org.apache.pulsar.functions.utils.io.Connector; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.net.URL; -import java.nio.file.FileAlreadyExistsException; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.UUID; -import java.util.function.Consumer; -import java.util.function.Supplier; -import java.util.stream.Collectors; - @Data @Slf4j public class FunctionActioner { @@ -84,17 +82,21 @@ public class FunctionActioner { private final ConnectorsManager connectorsManager; private final FunctionsManager functionsManager; private final PulsarAdmin pulsarAdmin; + private final PackageUrlValidator packageUrlValidator; public FunctionActioner(WorkerConfig workerConfig, RuntimeFactory runtimeFactory, Namespace dlogNamespace, - ConnectorsManager connectorsManager,FunctionsManager functionsManager,PulsarAdmin pulsarAdmin) { + ConnectorsManager connectorsManager, + FunctionsManager functionsManager, PulsarAdmin pulsarAdmin, + PackageUrlValidator packageUrlValidator) { this.workerConfig = workerConfig; this.runtimeFactory = runtimeFactory; this.dlogNamespace = dlogNamespace; this.connectorsManager = connectorsManager; this.functionsManager = functionsManager; this.pulsarAdmin = pulsarAdmin; + this.packageUrlValidator = packageUrlValidator; } @@ -109,29 +111,13 @@ public void startFunction(FunctionRuntimeInfo functionRuntimeInfo) { String packageFile; - String pkgLocation = functionMetaData.getPackageLocation().getPackagePath(); - boolean isPkgUrlProvided = isFunctionPackageUrlSupported(pkgLocation); + Function.PackageLocationMetaData pkgLocation = functionMetaData.getPackageLocation(); if (runtimeFactory.externallyManaged()) { - packageFile = pkgLocation; + packageFile = pkgLocation.getPackagePath(); } else { - if (isPkgUrlProvided && pkgLocation.startsWith(FILE)) { - URL url = new URL(pkgLocation); - File pkgFile = new File(url.toURI()); - packageFile = pkgFile.getAbsolutePath(); - } else if (FunctionCommon.isFunctionCodeBuiltin(functionDetails)) { - File pkgFile = getBuiltinArchive(FunctionDetails.newBuilder(functionMetaData.getFunctionDetails())); - packageFile = pkgFile.getAbsolutePath(); - } else { - File pkgDir = new File(workerConfig.getDownloadDirectory(), - getDownloadPackagePath(functionMetaData, instanceId)); - pkgDir.mkdirs(); - File pkgFile = new File( - pkgDir, - new File(getDownloadFileName(functionMetaData.getFunctionDetails(), functionMetaData.getPackageLocation())).getName()); - downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, instanceId); - packageFile = pkgFile.getAbsolutePath(); - } + packageFile = getPackageFile(functionMetaData, functionDetails, instanceId, pkgLocation, + InstanceUtils.calculateSubjectType(functionDetails)); } // Setup for batch sources if necessary @@ -150,11 +136,45 @@ public void startFunction(FunctionRuntimeInfo functionRuntimeInfo) { } } + private String getPackageFile(FunctionMetaData functionMetaData, FunctionDetails functionDetails, int instanceId, + Function.PackageLocationMetaData pkgLocation, + FunctionDetails.ComponentType componentType) + throws URISyntaxException, IOException, ClassNotFoundException, PulsarAdminException { + String packagePath = pkgLocation.getPackagePath(); + boolean isPkgUrlProvided = isFunctionPackageUrlSupported(packagePath); + String packageFile; + if (isPkgUrlProvided && packagePath.startsWith(FILE)) { + if (!packageUrlValidator.isValidPackageUrl(componentType, packagePath)) { + throw new IllegalArgumentException("Package URL " + packagePath + " is not valid"); + } + URL url = new URL(packagePath); + File pkgFile = new File(url.toURI()); + packageFile = pkgFile.getAbsolutePath(); + } else if (FunctionCommon.isFunctionCodeBuiltin(functionDetails, componentType)) { + File pkgFile = getBuiltinArchive( + componentType, + FunctionDetails.newBuilder(functionMetaData.getFunctionDetails())); + packageFile = pkgFile.getAbsolutePath(); + } else { + File pkgDir = new File(workerConfig.getDownloadDirectory(), + getDownloadPackagePath(functionMetaData, instanceId)); + pkgDir.mkdirs(); + File pkgFile = new File( + pkgDir, + new File(getDownloadFileName(functionMetaData.getFunctionDetails(), + pkgLocation)).getName()); + downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, instanceId, pkgLocation, componentType); + packageFile = pkgFile.getAbsolutePath(); + } + return packageFile; + } + RuntimeSpawner getRuntimeSpawner(Function.Instance instance, String packageFile) { FunctionMetaData functionMetaData = instance.getFunctionMetaData(); int instanceId = instance.getInstanceId(); - FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(functionMetaData.getFunctionDetails()); + FunctionDetails.Builder functionDetailsBuilder = FunctionDetails + .newBuilder(functionMetaData.getFunctionDetails()); // check to make sure functionAuthenticationSpec has any data and authentication is enabled. // If not set to null, since for protobuf, @@ -197,14 +217,15 @@ InstanceConfig createInstanceConfig(FunctionDetails functionDetails, Function.Fu } private void downloadFile(File pkgFile, boolean isPkgUrlProvided, FunctionMetaData functionMetaData, - int instanceId) throws FileNotFoundException, IOException, PulsarAdminException { + int instanceId, Function.PackageLocationMetaData pkgLocation, + FunctionDetails.ComponentType componentType) + throws IOException, PulsarAdminException { FunctionDetails details = functionMetaData.getFunctionDetails(); File pkgDir = pkgFile.getParentFile(); if (pkgFile.exists()) { - log.warn("Function package exists already {} deleting it", - pkgFile); + log.warn("Function package exists already {} deleting it", pkgFile); pkgFile.delete(); } @@ -212,16 +233,19 @@ private void downloadFile(File pkgFile, boolean isPkgUrlProvided, FunctionMetaDa do { tempPkgFile = new File( pkgDir, - pkgFile.getName() + "." + instanceId + "." + UUID.randomUUID().toString()); + pkgFile.getName() + "." + instanceId + "." + UUID.randomUUID()); } while (tempPkgFile.exists() || !tempPkgFile.createNewFile()); - String pkgLocationPath = functionMetaData.getPackageLocation().getPackagePath(); + String pkgLocationPath = pkgLocation.getPackagePath(); boolean downloadFromHttp = isPkgUrlProvided && pkgLocationPath.startsWith(HTTP); boolean downloadFromPackageManagementService = isPkgUrlProvided && hasPackageTypePrefix(pkgLocationPath); log.info("{}/{}/{} Function package file {} will be downloaded from {}", tempPkgFile, details.getTenant(), details.getNamespace(), details.getName(), - downloadFromHttp ? pkgLocationPath : functionMetaData.getPackageLocation()); + downloadFromHttp ? pkgLocationPath : pkgLocation); - if(downloadFromHttp) { + if (downloadFromHttp) { + if (!packageUrlValidator.isValidPackageUrl(componentType, pkgLocationPath)) { + throw new IllegalArgumentException("Package URL " + pkgLocationPath + " is not valid"); + } FunctionCommon.downloadFromHttpUrl(pkgLocationPath, tempPkgFile); } else if (downloadFromPackageManagementService) { getPulsarAdmin().packages().download(pkgLocationPath, tempPkgFile.getPath()); @@ -248,13 +272,13 @@ private void downloadFile(File pkgFile, boolean isPkgUrlProvided, FunctionMetaDa } catch (FileAlreadyExistsException faee) { // file already exists log.warn("Function package has been downloaded from {} and saved at {}", - functionMetaData.getPackageLocation(), pkgFile); + pkgLocation, pkgFile); } } finally { tempPkgFile.delete(); } - if(details.getRuntime() == Function.FunctionDetails.Runtime.GO && !pkgFile.canExecute()) { + if (details.getRuntime() == Function.FunctionDetails.Runtime.GO && !pkgFile.canExecute()) { pkgFile.setExecutable(true); log.info("Golang function package file {} is set to executable", pkgFile); } @@ -296,7 +320,7 @@ public void stopFunction(FunctionRuntimeInfo functionRuntimeInfo) { public void terminateFunction(FunctionRuntimeInfo functionRuntimeInfo) { FunctionDetails details = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails(); String fqfn = FunctionCommon.getFullyQualifiedName(details); - log.info("{}-{} Terminating function...", fqfn,functionRuntimeInfo.getFunctionInstance().getInstanceId()); + log.info("{}-{} Terminating function...", fqfn, functionRuntimeInfo.getFunctionInstance().getInstanceId()); if (functionRuntimeInfo.getRuntimeSpawner() != null) { functionRuntimeInfo.getRuntimeSpawner().close(); @@ -306,13 +330,15 @@ public void terminateFunction(FunctionRuntimeInfo functionRuntimeInfo) { functionRuntimeInfo.getRuntimeSpawner() .getRuntimeFactory().getAuthProvider().ifPresent(functionAuthProvider -> { try { - log.info("{}-{} Cleaning up authentication data for function...", fqfn,functionRuntimeInfo.getFunctionInstance().getInstanceId()); + log.info("{}-{} Cleaning up authentication data for function...", fqfn, + functionRuntimeInfo.getFunctionInstance().getInstanceId()); functionAuthProvider .cleanUpAuthData( details, Optional.ofNullable(getFunctionAuthData( Optional.ofNullable( - functionRuntimeInfo.getRuntimeSpawner().getInstanceConfig().getFunctionAuthenticationSpec())))); + functionRuntimeInfo.getRuntimeSpawner().getInstanceConfig() + .getFunctionAuthenticationSpec())))); } catch (Exception e) { log.error("Failed to cleanup auth data for function: {}", fqfn, e); @@ -334,11 +360,15 @@ public void accept(Map.Entry stringConsumerSpecEn Function.ConsumerSpec consumerSpec = stringConsumerSpecEntry.getValue(); String topic = stringConsumerSpecEntry.getKey(); - String subscriptionName = isBlank(functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails().getSource().getSubscriptionName()) - ? InstanceUtils.getDefaultSubscriptionName(functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails()) - : functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails().getSource().getSubscriptionName(); + String subscriptionName = isBlank(functionRuntimeInfo.getFunctionInstance() + .getFunctionMetaData().getFunctionDetails().getSource().getSubscriptionName()) + ? InstanceUtils.getDefaultSubscriptionName(functionRuntimeInfo + .getFunctionInstance().getFunctionMetaData().getFunctionDetails()) + : functionRuntimeInfo.getFunctionInstance().getFunctionMetaData() + .getFunctionDetails().getSource().getSubscriptionName(); - deleteSubscription(topic, consumerSpec, subscriptionName, String.format("Cleaning up subscriptions for function %s", fqfn)); + deleteSubscription(topic, consumerSpec, subscriptionName, + String.format("Cleaning up subscriptions for function %s", fqfn)); } }); } @@ -347,7 +377,8 @@ public void accept(Map.Entry stringConsumerSpecEn cleanupBatchSource(details); } - private void deleteSubscription(String topic, Function.ConsumerSpec consumerSpec, String subscriptionName, String msg) { + private void deleteSubscription(String topic, Function.ConsumerSpec consumerSpec, + String subscriptionName, String msg) { try { Actions.newBuilder() .addAction( @@ -479,8 +510,9 @@ private String getDownloadPackagePath(FunctionMetaData functionMetaData, int ins File.separatorChar); } - private File getBuiltinArchive(FunctionDetails.Builder functionDetails) throws IOException, ClassNotFoundException { - if (functionDetails.hasSource()) { + private File getBuiltinArchive(FunctionDetails.ComponentType componentType, FunctionDetails.Builder functionDetails) + throws IOException, ClassNotFoundException { + if (componentType == FunctionDetails.ComponentType.SOURCE && functionDetails.hasSource()) { SourceSpec sourceSpec = functionDetails.getSource(); if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) { Connector connector = connectorsManager.getConnector(sourceSpec.getBuiltin()); @@ -495,7 +527,7 @@ private File getBuiltinArchive(FunctionDetails.Builder functionDetails) throws I } } - if (functionDetails.hasSink()) { + if (componentType == FunctionDetails.ComponentType.SINK && functionDetails.hasSink()) { SinkSpec sinkSpec = functionDetails.getSink(); if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) { Connector connector = connectorsManager.getConnector(sinkSpec.getBuiltin()); @@ -511,7 +543,8 @@ private File getBuiltinArchive(FunctionDetails.Builder functionDetails) throws I } } - if (!StringUtils.isEmpty(functionDetails.getBuiltin())) { + if (componentType == FunctionDetails.ComponentType.FUNCTION + && !StringUtils.isEmpty(functionDetails.getBuiltin())) { return functionsManager.getFunctionArchive(functionDetails.getBuiltin()).toFile(); } @@ -550,21 +583,21 @@ private void fillSinkTypeClass(FunctionDetails.Builder functionDetails, } } - private static String getDownloadFileName(FunctionDetails FunctionDetails, + private static String getDownloadFileName(FunctionDetails functionDetails, Function.PackageLocationMetaData packageLocation) { if (!org.apache.commons.lang.StringUtils.isEmpty(packageLocation.getOriginalFileName())) { return packageLocation.getOriginalFileName(); } - String[] hierarchy = FunctionDetails.getClassName().split("\\."); + String[] hierarchy = functionDetails.getClassName().split("\\."); String fileName; if (hierarchy.length <= 0) { - fileName = FunctionDetails.getClassName(); + fileName = functionDetails.getClassName(); } else if (hierarchy.length == 1) { fileName = hierarchy[0]; } else { fileName = hierarchy[hierarchy.length - 2]; } - switch (FunctionDetails.getRuntime()) { + switch (functionDetails.getRuntime()) { case JAVA: return fileName + ".jar"; case PYTHON: @@ -572,7 +605,7 @@ private static String getDownloadFileName(FunctionDetails FunctionDetails, case GO: return fileName + ".go"; default: - throw new RuntimeException("Unknown runtime " + FunctionDetails.getRuntime()); + throw new RuntimeException("Unknown runtime " + functionDetails.getRuntime()); } } @@ -590,13 +623,15 @@ private void setupBatchSource(Function.FunctionDetails functionDetails) { Actions.newBuilder() .addAction( Actions.Action.builder() - .actionName(String.format("Creating intermediate topic %s with subscription %s for Batch Source %s", + .actionName(String.format("Creating intermediate topic" + + "%s with subscription %s for Batch Source %s", intermediateTopicName, intermediateTopicSubscription, fqfn)) .numRetries(10) .sleepBetweenInvocationsMs(1000) .supplier(() -> { try { - pulsarAdmin.topics().createSubscription(intermediateTopicName, intermediateTopicSubscription, MessageId.latest); + pulsarAdmin.topics().createSubscription(intermediateTopicName, + intermediateTopicSubscription, MessageId.latest); return Actions.ActionResult.builder() .success(true) .build(); @@ -624,7 +659,8 @@ private void setupBatchSource(Function.FunctionDetails functionDetails) { private void cleanupBatchSource(Function.FunctionDetails functionDetails) { if (isBatchSource(functionDetails)) { // clean up intermediate topic - String intermediateTopicName = SourceConfigUtils.computeBatchSourceIntermediateTopicName(functionDetails.getTenant(), + String intermediateTopicName = SourceConfigUtils + .computeBatchSourceIntermediateTopicName(functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName()).toString(); String intermediateTopicSubscription = SourceConfigUtils.computeBatchSourceInstanceSubscriptionName( functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName()); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index 9d1d1233b3024..5c90b5e845753 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -210,7 +210,8 @@ public FunctionRuntimeManager(WorkerConfig workerConfig, PulsarWorkerService wor functionAuthProvider, runtimeCustomizer); this.functionActioner = new FunctionActioner(this.workerConfig, runtimeFactory, - dlogNamespace, connectorsManager, functionsManager, workerService.getBrokerAdmin()); + dlogNamespace, connectorsManager, functionsManager, workerService.getBrokerAdmin(), + workerService.getPackageUrlValidator()); this.membershipManager = membershipManager; this.functionMetaDataManager = functionMetaDataManager; diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PackageUrlValidator.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PackageUrlValidator.java new file mode 100644 index 0000000000000..43c90c7b2b201 --- /dev/null +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PackageUrlValidator.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.worker; + +import java.net.URI; +import java.nio.file.FileSystems; +import java.nio.file.Path; +import java.util.Collections; +import java.util.List; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.pulsar.functions.proto.Function; + +/** + * Validates package URLs for functions and connectors. + * Validates that the package URL is either a file in the connectors or functions directory + * when referencing connector or function files is enabled, or matches one of the additional url patterns. + */ +public class PackageUrlValidator { + private final Path connectionsDirectory; + private final Path functionsDirectory; + private final List additionalConnectionsPatterns; + private final List additionalFunctionsPatterns; + + public PackageUrlValidator(WorkerConfig workerConfig) { + this.connectionsDirectory = resolveDirectory(workerConfig.getEnableReferencingConnectorDirectoryFiles(), + workerConfig.getConnectorsDirectory()); + this.functionsDirectory = resolveDirectory(workerConfig.getEnableReferencingFunctionsDirectoryFiles(), + workerConfig.getFunctionsDirectory()); + this.additionalConnectionsPatterns = + compilePatterns(workerConfig.getAdditionalEnabledConnectorUrlPatterns()); + this.additionalFunctionsPatterns = + compilePatterns(workerConfig.getAdditionalEnabledFunctionsUrlPatterns()); + } + + private static Path resolveDirectory(Boolean enabled, String directory) { + return enabled != null && enabled + ? FileSystems.getDefault().getPath(directory).normalize().toAbsolutePath() : null; + } + + private static List compilePatterns(List additionalPatterns) { + return additionalPatterns != null ? additionalPatterns.stream().map(Pattern::compile).collect( + Collectors.toList()) : Collections.emptyList(); + } + + boolean isValidFunctionsPackageUrl(URI functionPkgUrl) { + return doesMatch(functionPkgUrl, functionsDirectory, additionalFunctionsPatterns); + } + + boolean isValidConnectionsPackageUrl(URI functionPkgUrl) { + return doesMatch(functionPkgUrl, connectionsDirectory, additionalConnectionsPatterns); + } + + private boolean doesMatch(URI functionPkgUrl, Path directory, List patterns) { + if (directory != null && "file".equals(functionPkgUrl.getScheme())) { + Path filePath = FileSystems.getDefault().getPath(functionPkgUrl.getPath()).normalize().toAbsolutePath(); + if (filePath.startsWith(directory)) { + return true; + } + } + String functionPkgUrlString = functionPkgUrl.normalize().toString(); + for (Pattern pattern : patterns) { + if (pattern.matcher(functionPkgUrlString).matches()) { + return true; + } + } + return false; + } + + public boolean isValidPackageUrl(Function.FunctionDetails.ComponentType componentType, String functionPkgUrl) { + URI uri = URI.create(functionPkgUrl); + if (componentType == null) { + // if component type is not specified, we need to check both functions and connections + return isValidFunctionsPackageUrl(uri) || isValidConnectionsPackageUrl(uri); + } + switch (componentType) { + case FUNCTION: + return isValidFunctionsPackageUrl(uri); + case SINK: + case SOURCE: + return isValidConnectionsPackageUrl(uri); + default: + throw new IllegalArgumentException("Unknown component type: " + componentType); + } + } +} diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java index 9a1f258de720d..d6bffc84c0fef 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java @@ -119,7 +119,8 @@ public interface PulsarClientCreator { private Sinks sinks; private Sources sources; private Workers workers; - + @Getter + private PackageUrlValidator packageUrlValidator; private final PulsarClientCreator clientCreator; public PulsarWorkerService() { @@ -201,6 +202,7 @@ public void init(WorkerConfig workerConfig, this.sinks = new SinksImpl(() -> PulsarWorkerService.this); this.sources = new SourcesImpl(() -> PulsarWorkerService.this); this.workers = new WorkerImpl(() -> PulsarWorkerService.this); + this.packageUrlValidator = new PackageUrlValidator(workerConfig); } @Override diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java index fa2d55aef4352..66484d606d2d0 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java @@ -1301,11 +1301,17 @@ private StreamingOutput getStreamingOutput(String pkgPath) { private StreamingOutput getStreamingOutput(String pkgPath, FunctionDetails.ComponentType componentType) { return output -> { if (pkgPath.startsWith(Utils.HTTP)) { + if (!worker().getPackageUrlValidator().isValidPackageUrl(componentType, pkgPath)) { + throw new IllegalArgumentException("Invalid package url: " + pkgPath); + } URL url = URI.create(pkgPath).toURL(); try (InputStream inputStream = url.openStream()) { IOUtils.copy(inputStream, output); } } else if (pkgPath.startsWith(Utils.FILE)) { + if (!worker().getPackageUrlValidator().isValidPackageUrl(componentType, pkgPath)) { + throw new IllegalArgumentException("Invalid package url: " + pkgPath); + } URI url = URI.create(pkgPath); File file = new File(url.getPath()); Files.copy(file.toPath(), output); @@ -1715,12 +1721,17 @@ static File downloadPackageFile(PulsarWorkerService worker, String packageName) return file; } - protected File getPackageFile(String functionPkgUrl, String existingPackagePath, InputStream uploadedInputStream) + protected File getPackageFile(FunctionDetails.ComponentType componentType, String functionPkgUrl, + String existingPackagePath, InputStream uploadedInputStream) throws IOException, PulsarAdminException { File componentPackageFile = null; if (isNotBlank(functionPkgUrl)) { - componentPackageFile = getPackageFile(functionPkgUrl); + componentPackageFile = getPackageFile(componentType, functionPkgUrl); } else if (existingPackagePath.startsWith(Utils.FILE) || existingPackagePath.startsWith(Utils.HTTP)) { + if (!worker().getPackageUrlValidator().isValidPackageUrl(componentType, functionPkgUrl)) { + throw new IllegalArgumentException("Function Package url is not valid." + + "supported url (http/https/file)"); + } try { componentPackageFile = FunctionCommon.extractFileFromPkgURL(existingPackagePath); } catch (Exception e) { @@ -1729,7 +1740,7 @@ protected File getPackageFile(String functionPkgUrl, String existingPackagePath, ComponentTypeUtils.toString(componentType), functionPkgUrl)); } } else if (Utils.hasPackageTypePrefix(existingPackagePath)) { - componentPackageFile = getPackageFile(existingPackagePath); + componentPackageFile = getPackageFile(componentType, existingPackagePath); } else if (uploadedInputStream != null) { componentPackageFile = WorkerUtils.dumpToTmpFile(uploadedInputStream); } else if (!existingPackagePath.startsWith(Utils.BUILTIN)) { @@ -1741,15 +1752,12 @@ protected File getPackageFile(String functionPkgUrl, String existingPackagePath, return componentPackageFile; } - protected File downloadPackageFile(String packageName) throws IOException, PulsarAdminException { - return downloadPackageFile(worker(), packageName); - } - - protected File getPackageFile(String functionPkgUrl) throws IOException, PulsarAdminException { + protected File getPackageFile(FunctionDetails.ComponentType componentType, String functionPkgUrl) + throws IOException, PulsarAdminException { if (Utils.hasPackageTypePrefix(functionPkgUrl)) { - return downloadPackageFile(functionPkgUrl); + return downloadPackageFile(worker(), functionPkgUrl); } else { - if (!Utils.isFunctionPackageUrlSupported(functionPkgUrl)) { + if (!worker().getPackageUrlValidator().isValidPackageUrl(componentType, functionPkgUrl)) { throw new IllegalArgumentException("Function Package url is not valid." + "supported url (http/https/file)"); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index c94964897dc9a..68d4cc7cb0020 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -53,7 +53,6 @@ import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.utils.ComponentTypeUtils; -import org.apache.pulsar.functions.utils.FunctionCommon; import org.apache.pulsar.functions.utils.FunctionConfigUtils; import org.apache.pulsar.functions.utils.FunctionFilePackage; import org.apache.pulsar.functions.utils.FunctionMetaDataUtils; @@ -139,29 +138,13 @@ public void registerFunction(final String tenant, } Function.FunctionDetails functionDetails; - boolean isPkgUrlProvided = isNotBlank(functionPkgUrl); File componentPackageFile = null; try { // validate parameters try { - if (isPkgUrlProvided) { - if (Utils.hasPackageTypePrefix(functionPkgUrl)) { - componentPackageFile = downloadPackageFile(functionPkgUrl); - } else { - if (!Utils.isFunctionPackageUrlSupported(functionPkgUrl)) { - throw new IllegalArgumentException("Function Package url is not valid." - + "supported url (http/https/file)"); - } - try { - - componentPackageFile = FunctionCommon.extractFileFromPkgURL(functionPkgUrl); - } catch (Exception e) { - throw new IllegalArgumentException(String.format("Encountered error \"%s\" " - + "when getting %s package from %s", e.getMessage(), - ComponentTypeUtils.toString(componentType), functionPkgUrl), e); - } - } + if (isNotBlank(functionPkgUrl)) { + componentPackageFile = getPackageFile(componentType, functionPkgUrl); functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, functionConfig, componentPackageFile); } else { @@ -320,54 +303,18 @@ public void updateFunction(final String tenant, // validate parameters try { - if (isNotBlank(functionPkgUrl)) { - if (Utils.hasPackageTypePrefix(functionPkgUrl)) { - componentPackageFile = downloadPackageFile(functionPkgUrl); - } else { - try { - componentPackageFile = FunctionCommon.extractFileFromPkgURL(functionPkgUrl); - } catch (Exception e) { - throw new IllegalArgumentException(String.format("Encountered error \"%s\" " - + "when getting %s package from %s", e.getMessage(), - ComponentTypeUtils.toString(componentType), functionPkgUrl)); - } - } - functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, - mergedConfig, componentPackageFile); - - } else if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.FILE) - || existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.HTTP)) { - try { - componentPackageFile = FunctionCommon.extractFileFromPkgURL( - existingComponent.getPackageLocation().getPackagePath()); - } catch (Exception e) { - throw new IllegalArgumentException(String.format("Encountered error \"%s\" " - + "when getting %s package from %s", e.getMessage(), - ComponentTypeUtils.toString(componentType), functionPkgUrl)); - } - functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, - mergedConfig, componentPackageFile); - } else if (uploadedInputStream != null) { - - componentPackageFile = WorkerUtils.dumpToTmpFile(uploadedInputStream); - functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, - mergedConfig, componentPackageFile); - - } else if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.BUILTIN)) { - functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, - mergedConfig, componentPackageFile); - if (!isFunctionCodeBuiltin(functionDetails) - && (componentPackageFile == null || fileDetail == null)) { - throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType) - + " Package is not provided"); - } - } else { - componentPackageFile = FunctionCommon.createPkgTempFile(); - componentPackageFile.deleteOnExit(); - WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), - componentPackageFile, existingComponent.getPackageLocation().getPackagePath()); - functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, - mergedConfig, componentPackageFile); + componentPackageFile = getPackageFile( + componentType, + functionPkgUrl, + existingComponent.getPackageLocation().getPackagePath(), + uploadedInputStream); + functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, + mergedConfig, componentPackageFile); + if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.BUILTIN) + && !isFunctionCodeBuiltin(functionDetails) + && (componentPackageFile == null || fileDetail == null)) { + throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType) + + " Package is not provided"); } } catch (Exception e) { log.error("Invalid update {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), @@ -772,7 +719,7 @@ private Function.FunctionDetails validateUpdateRequestParams(final String tenant if (archive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) { archive = archive.replaceFirst("^builtin://", ""); - FunctionsManager functionsManager = this.worker().getFunctionsManager(); + FunctionsManager functionsManager = worker().getFunctionsManager(); FunctionArchive function = functionsManager.getFunction(archive); // check if builtin function exists diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java index 8cd020b6b333b..69998ced75f4a 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java @@ -143,7 +143,7 @@ public void registerSink(final String tenant, // validate parameters try { if (isNotBlank(sinkPkgUrl)) { - componentPackageFile = getPackageFile(sinkPkgUrl); + componentPackageFile = getPackageFile(componentType, sinkPkgUrl); functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName, sinkConfig, componentPackageFile); } else { @@ -305,6 +305,7 @@ public void updateSink(final String tenant, // validate parameters try { componentPackageFile = getPackageFile( + componentType, sinkPkgUrl, existingComponent.getPackageLocation().getPackagePath(), uploadedInputStream); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java index 59bc5888a3403..4acd673b58467 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java @@ -143,7 +143,7 @@ public void registerSource(final String tenant, // validate parameters try { if (isPkgUrlProvided) { - componentPackageFile = getPackageFile(sourcePkgUrl); + componentPackageFile = getPackageFile(componentType, sourcePkgUrl); functionDetails = validateUpdateRequestParams(tenant, namespace, sourceName, sourceConfig, componentPackageFile); } else { @@ -304,6 +304,7 @@ public void updateSource(final String tenant, // validate parameters try { componentPackageFile = getPackageFile( + componentType, sourcePkgUrl, existingComponent.getPackageLocation().getPackagePath(), uploadedInputStream); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java index 7502e247d9908..fa79d06bbd4a7 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.functions.worker; +import static org.apache.pulsar.common.functions.Utils.FILE; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; @@ -26,7 +27,12 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; - +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; +import static org.testng.AssertJUnit.fail; +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; import org.apache.distributedlog.api.namespace.Namespace; import org.apache.pulsar.client.admin.Packages; import org.apache.pulsar.client.admin.PulsarAdmin; @@ -37,18 +43,10 @@ import org.apache.pulsar.functions.runtime.Runtime; import org.apache.pulsar.functions.runtime.RuntimeFactory; import org.apache.pulsar.functions.runtime.RuntimeSpawner; -import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig; import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory; +import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig; import org.testng.annotations.Test; -import java.util.Map; -import java.util.Optional; - -import static org.apache.pulsar.common.functions.Utils.FILE; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNull; -import static org.testng.AssertJUnit.fail; - /** * Unit test of {@link FunctionActioner}. */ @@ -67,7 +65,8 @@ public void testStartFunctionWithDLNamespace() throws Exception { workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName()); workerConfig.setFunctionRuntimeFactoryConfigs( ObjectMapperFactory.getThreadLocal().convertValue( - new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class)); workerConfig.setPulsarServiceUrl("pulsar://localhost:6650"); + new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class)); + workerConfig.setPulsarServiceUrl("pulsar://localhost:6650"); workerConfig.setStateStorageServiceUrl("foo"); workerConfig.setFunctionAssignmentTopicName("assignments"); @@ -79,8 +78,8 @@ public void testStartFunctionWithDLNamespace() throws Exception { @SuppressWarnings("resource") FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace, - new ConnectorsManager(workerConfig), new FunctionsManager(workerConfig), mock(PulsarAdmin.class)); - Runtime runtime = mock(Runtime.class); + new ConnectorsManager(workerConfig), new FunctionsManager(workerConfig), mock(PulsarAdmin.class), + mock(PackageUrlValidator.class)); Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder() .setFunctionDetails(Function.FunctionDetails.newBuilder().setTenant("test-tenant") .setNamespace("test-namespace").setName("func-1")) @@ -112,6 +111,8 @@ public void testStartFunctionWithPkgUrl() throws Exception { workerConfig.setPulsarServiceUrl("pulsar://localhost:6650"); workerConfig.setStateStorageServiceUrl("foo"); workerConfig.setFunctionAssignmentTopicName("assignments"); + workerConfig.setAdditionalEnabledFunctionsUrlPatterns(Arrays.asList("file:///user/.*", "http://invalid/.*")); + workerConfig.setAdditionalEnabledConnectorUrlPatterns(Arrays.asList("file:///user/.*", "http://invalid/.*")); String downloadDir = this.getClass().getProtectionDomain().getCodeSource().getLocation().getPath(); workerConfig.setDownloadDirectory(downloadDir); @@ -125,44 +126,44 @@ public void testStartFunctionWithPkgUrl() throws Exception { @SuppressWarnings("resource") FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace, - new ConnectorsManager(workerConfig), new FunctionsManager(workerConfig), mock(PulsarAdmin.class)); + new ConnectorsManager(workerConfig), new FunctionsManager(workerConfig), mock(PulsarAdmin.class), + new PackageUrlValidator(workerConfig)); // (1) test with file url. functionActioner should be able to consider file-url and it should be able to call // RuntimeSpawner - String pkgPathLocation = FILE + ":/user/my-file.jar"; - Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder() - .setFunctionDetails(Function.FunctionDetails.newBuilder().setTenant("test-tenant") - .setNamespace("test-namespace").setName("func-1")) - .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath(pkgPathLocation).build()) - .build(); - Function.Instance instance = Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0) - .build(); - FunctionRuntimeInfo functionRuntimeInfo = mock(FunctionRuntimeInfo.class); - doReturn(instance).when(functionRuntimeInfo).getFunctionInstance(); - - actioner.startFunction(functionRuntimeInfo); + String pkgPathLocation = FILE + ":///user/my-file.jar"; + startFunction(actioner, pkgPathLocation); verify(runtime, times(1)).start(); // (2) test with http-url, downloading file from http should fail with UnknownHostException due to invalid url - pkgPathLocation = "http://invalid/my-file.jar"; - function1 = Function.FunctionMetaData.newBuilder() - .setFunctionDetails(Function.FunctionDetails.newBuilder().setTenant("test-tenant") - .setNamespace("test-namespace").setName("func-1")) - .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath(pkgPathLocation).build()) - .build(); - instance = Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0).build(); - functionRuntimeInfo = mock(FunctionRuntimeInfo.class); - doReturn(instance).when(functionRuntimeInfo).getFunctionInstance(); - doThrow(new IllegalStateException("StartupException")).when(functionRuntimeInfo).setStartupException(any()); + String invalidPkgPathLocation = "http://invalid/my-file.jar"; try { - actioner.startFunction(functionRuntimeInfo); + startFunction(actioner, invalidPkgPathLocation); fail(); } catch (IllegalStateException ex) { assertEquals(ex.getMessage(), "StartupException"); } } + private void startFunction(FunctionActioner actioner, String pkgPathLocation) { + PackageLocationMetaData packageLocation = PackageLocationMetaData.newBuilder() + .setPackagePath(pkgPathLocation) + .build(); + Function.FunctionMetaData function = Function.FunctionMetaData.newBuilder() + .setFunctionDetails(Function.FunctionDetails.newBuilder().setTenant("test-tenant") + .setNamespace("test-namespace").setName("func-1")) + .setPackageLocation(packageLocation) + .build(); + Function.Instance instance = Function.Instance.newBuilder().setFunctionMetaData(function).setInstanceId(0) + .build(); + FunctionRuntimeInfo functionRuntimeInfo = mock(FunctionRuntimeInfo.class); + doReturn(instance).when(functionRuntimeInfo).getFunctionInstance(); + doThrow(new IllegalStateException("StartupException")).when(functionRuntimeInfo).setStartupException(any()); + + actioner.startFunction(functionRuntimeInfo); + } + @Test public void testFunctionAuthDisabled() throws Exception { WorkerConfig workerConfig = new WorkerConfig(); @@ -187,7 +188,8 @@ public void testFunctionAuthDisabled() throws Exception { @SuppressWarnings("resource") FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace, - new ConnectorsManager(workerConfig), new FunctionsManager(workerConfig), mock(PulsarAdmin.class)); + new ConnectorsManager(workerConfig), new FunctionsManager(workerConfig), mock(PulsarAdmin.class), + mock(PackageUrlValidator.class)); String pkgPathLocation = "http://invalid/my-file.jar"; @@ -250,22 +252,13 @@ public void testStartFunctionWithPackageUrl() throws Exception { @SuppressWarnings("resource") FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace, - new ConnectorsManager(workerConfig), new FunctionsManager(workerConfig), pulsarAdmin); + new ConnectorsManager(workerConfig), new FunctionsManager(workerConfig), pulsarAdmin, + mock(PackageUrlValidator.class)); // (1) test with file url. functionActioner should be able to consider file-url and it should be able to call // RuntimeSpawner String pkgPathLocation = "function://public/default/test-function@latest"; - Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder() - .setFunctionDetails(Function.FunctionDetails.newBuilder().setTenant("test-tenant") - .setNamespace("test-namespace").setName("func-1")) - .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath(pkgPathLocation).build()) - .build(); - Function.Instance instance = Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0) - .build(); - FunctionRuntimeInfo functionRuntimeInfo = mock(FunctionRuntimeInfo.class); - doReturn(instance).when(functionRuntimeInfo).getFunctionInstance(); - - actioner.startFunction(functionRuntimeInfo); + startFunction(actioner, pkgPathLocation); verify(runtime, times(1)).start(); } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java index 0ab0b2728b829..1722546ea486b 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java @@ -708,7 +708,7 @@ public void testExternallyManagedRuntimeUpdate() throws Exception { FunctionActioner functionActioner = spy(new FunctionActioner( workerConfig, - kubernetesRuntimeFactory, null, null, null, null)); + kubernetesRuntimeFactory, null, null, null, null, workerService.getPackageUrlValidator())); try (final MockedStatic runtimeFactoryMockedStatic = Mockito.mockStatic(RuntimeFactory.class);) { runtimeFactoryMockedStatic.when(() -> RuntimeFactory.getFuntionRuntimeFactory(anyString())) diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/AbstractFunctionApiResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/AbstractFunctionApiResourceTest.java index d6aa13dc2bfb9..827eb2491b4ab 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/AbstractFunctionApiResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/AbstractFunctionApiResourceTest.java @@ -59,7 +59,6 @@ import org.testng.annotations.Test; public abstract class AbstractFunctionApiResourceTest extends AbstractFunctionsResourceTest { - @Test public void testListFunctionsSuccess() { mockInstanceUtils(); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/AbstractFunctionsResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/AbstractFunctionsResourceTest.java index 9da30128f1c22..893a7e1e06ec0 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/AbstractFunctionsResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/AbstractFunctionsResourceTest.java @@ -28,10 +28,12 @@ import java.io.IOException; import java.io.InputStream; import java.io.UncheckedIOException; +import java.lang.reflect.Method; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardCopyOption; +import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -63,6 +65,7 @@ import org.apache.pulsar.functions.worker.FunctionRuntimeManager; import org.apache.pulsar.functions.worker.FunctionsManager; import org.apache.pulsar.functions.worker.LeaderService; +import org.apache.pulsar.functions.worker.PackageUrlValidator; import org.apache.pulsar.functions.worker.PulsarWorkerService; import org.apache.pulsar.functions.worker.WorkerConfig; import org.apache.pulsar.functions.worker.WorkerUtils; @@ -141,7 +144,7 @@ public static File getPulsarApiExamplesNar() { } @BeforeMethod - public final void setup() throws Exception { + public final void setup(Method method) throws Exception { this.mockedManager = mock(FunctionMetaDataManager.class); this.mockedFunctionRunTimeManager = mock(FunctionRuntimeManager.class); this.mockedRuntimeFactory = mock(RuntimeFactory.class); @@ -181,21 +184,32 @@ public final void setup() throws Exception { }).when(mockedPackages).download(any(), any()); // worker config + List urlPatterns = + Arrays.asList("http://localhost.*", "file:.*", "https://repo1.maven.org/maven2/org/apache/pulsar/.*"); WorkerConfig workerConfig = new WorkerConfig() .setWorkerId("test") .setWorkerPort(8080) .setFunctionMetadataTopicName("pulsar/functions") .setNumFunctionPackageReplicas(3) - .setPulsarServiceUrl("pulsar://localhost:6650/"); + .setPulsarServiceUrl("pulsar://localhost:6650/") + .setAdditionalEnabledFunctionsUrlPatterns(urlPatterns) + .setAdditionalEnabledConnectorUrlPatterns(urlPatterns); + customizeWorkerConfig(workerConfig, method); tempDirectory = PulsarFunctionTestTemporaryDirectory.create(getClass().getSimpleName()); tempDirectory.useTemporaryDirectoriesForWorkerConfig(workerConfig); when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig); when(mockedWorkerService.getFunctionsManager()).thenReturn(functionsManager); when(mockedWorkerService.getConnectorsManager()).thenReturn(connectorsManager); + PackageUrlValidator packageUrlValidator = new PackageUrlValidator(workerConfig); + when(mockedWorkerService.getPackageUrlValidator()).thenReturn(packageUrlValidator); doSetup(); } + protected void customizeWorkerConfig(WorkerConfig workerConfig, Method method) { + + } + protected File getDefaultNarFile() { return getPulsarIOTwitterNar(); } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java index a9927514efd05..64c027d73ca12 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java @@ -1380,17 +1380,15 @@ public void testRegisterSinkSuccessK8sNoUpload() throws Exception { SinkConfig sinkConfig = createDefaultSinkConfig(); sinkConfig.setArchive("builtin://cassandra"); - try (FileInputStream inputStream = new FileInputStream(getPulsarIOCassandraNar())) { - resource.registerSink( - tenant, - namespace, - sink, - inputStream, - mockedFormData, - null, - sinkConfig, - null); - } + resource.registerSink( + tenant, + namespace, + sink, + null, + mockedFormData, + null, + sinkConfig, + null); } /* @@ -1420,21 +1418,19 @@ public void testRegisterSinkSuccessK8sWithUpload() throws Exception { SinkConfig sinkConfig = createDefaultSinkConfig(); sinkConfig.setArchive("builtin://cassandra"); - try (FileInputStream inputStream = new FileInputStream(getPulsarIOCassandraNar())) { - try { - resource.registerSink( - tenant, - namespace, - sink, - inputStream, - mockedFormData, - null, - sinkConfig, - null); - Assert.fail(); - } catch (RuntimeException e) { - Assert.assertEquals(e.getMessage(), injectedErrMsg); - } + try { + resource.registerSink( + tenant, + namespace, + sink, + null, + mockedFormData, + null, + sinkConfig, + null); + Assert.fail(); + } catch (RuntimeException e) { + Assert.assertEquals(e.getMessage(), injectedErrMsg); } }