diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 739adeff52257..152ac763520c8 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -42,6 +42,16 @@ metadataStoreOperationTimeoutSeconds: 30
# Metadata store cache expiry time in seconds
metadataStoreCacheExpirySeconds: 300
+# Specifies if the function worker should use classloading for validating submissions for built-in
+# connectors and functions. This is required for validateConnectorConfig to take effect.
+# Default is false.
+enableClassloadingOfBuiltinFiles: false
+
+# Specifies if the function worker should use classloading for validating submissions for external
+# connectors and functions. This is required for validateConnectorConfig to take effect.
+# Default is false.
+enableClassloadingOfExternalFiles: false
+
################################
# Function package management
################################
@@ -390,7 +400,10 @@ saslJaasServerRoleTokenSignerSecretPath:
connectorsDirectory: ./connectors
functionsDirectory: ./functions
-# Should connector config be validated during submission
+# Enables extended validation for connector config with fine-grain annotation based validation
+# during submission. Classloading with either enableClassloadingOfExternalFiles or
+# enableClassloadingOfBuiltinFiles must be enabled on the worker for this to take effect.
+# Default is false.
validateConnectorConfig: false
# Whether to initialize distributed log metadata by runtime.
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index ca7e61bbd2e77..91e8c279f656a 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -465,6 +465,10 @@ The Apache Software License, Version 2.0
* Jodah
- net.jodah-typetools-0.5.0.jar
- net.jodah-failsafe-2.4.4.jar
+ * Byte Buddy
+ - net.bytebuddy-byte-buddy-1.14.12.jar
+ * zt-zip
+ - org.zeroturnaround-zt-zip-1.17.jar
* Apache Avro
- org.apache.avro-avro-1.10.2.jar
- org.apache.avro-avro-protobuf-1.10.2.jar
diff --git a/pom.xml b/pom.xml
index 80f3df9c1eeab..5595fd741f368 100644
--- a/pom.xml
+++ b/pom.xml
@@ -131,6 +131,8 @@ flexible messaging model and an intuitive client API.
8.37
1.4.13
0.5.0
+ 1.14.12
+ 1.17
3.19.6
${protobuf3.version}
1.45.1
@@ -966,6 +968,18 @@ flexible messaging model and an intuitive client API.
${typetools.version}
+
+ net.bytebuddy
+ byte-buddy
+ ${byte-buddy.version}
+
+
+
+ org.zeroturnaround
+ zt-zip
+ ${zt-zip.version}
+
+
io.grpc
grpc-bom
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 198ee514f6e8e..f2351599cc6c9 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -447,6 +447,24 @@
+
+ ${project.groupId}
+ pulsar-io-cassandra
+ ${project.version}
+ jar
+ true
+ ${project.build.directory}
+ pulsar-io-cassandra.nar
+
+
+ ${project.groupId}
+ pulsar-io-twitter
+ ${project.version}
+ jar
+ true
+ ${project.build.directory}
+ pulsar-io-twitter.nar
+
${project.groupId}
pulsar-io-data-generator
@@ -483,6 +501,15 @@
${project.build.directory}
pulsar-functions-api-examples.nar
+
+ ${project.groupId}
+ pulsar-functions-api-examples
+ ${project.version}
+ jar
+ true
+ ${project.build.directory}
+ pulsar-functions-api-examples.nar
+
@@ -498,6 +525,8 @@
${project.build.directory}/pulsar-functions-api-examples.jar
${project.build.directory}/pulsar-functions-api-examples.nar
${project.build.directory}/pulsar-io-batch-data-generator.nar
+ ${project.build.directory}/pulsar-io-cassandra.nar
+ ${project.build.directory}/pulsar-io-twitter.nar
org.apache.pulsar.metadata.bookkeeper.PulsarMetadataBookieDriver
org.apache.pulsar.metadata.bookkeeper.PulsarMetadataClientDriver
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
index 9f5a0ee6bfaa1..fb549ad7d5de9 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
@@ -154,6 +154,11 @@ public NarClassLoader run() {
});
}
+ public static List getClasspathFromArchive(File narPath, String narExtractionDirectory) throws IOException {
+ File unpacked = NarUnpacker.unpackNar(narPath, getNarExtractionDirectory(narExtractionDirectory));
+ return getClassPathEntries(unpacked);
+ }
+
private static File getNarExtractionDirectory(String configuredDirectory) {
return new File(configuredDirectory + "/" + TMP_DIR_PREFIX);
}
@@ -164,16 +169,11 @@ private static File getNarExtractionDirectory(String configuredDirectory) {
* @param narWorkingDirectory
* directory to explode nar contents to
* @param parent
- * @throws IllegalArgumentException
- * if the NAR is missing the Java Services API file for FlowFileProcessor implementations.
- * @throws ClassNotFoundException
- * if any of the FlowFileProcessor implementations defined by the Java Services API cannot be
- * loaded.
* @throws IOException
* if an error occurs while loading the NAR.
*/
private NarClassLoader(final File narWorkingDirectory, Set additionalJars, ClassLoader parent)
- throws ClassNotFoundException, IOException {
+ throws IOException {
super(new URL[0], parent);
this.narWorkingDirectory = narWorkingDirectory;
@@ -238,22 +238,31 @@ public List getServiceImplementation(String serviceName) throws IOExcept
* if the URL list could not be updated.
*/
private void updateClasspath(File root) throws IOException {
- addURL(root.toURI().toURL()); // for compiled classes, META-INF/, etc.
+ getClassPathEntries(root).forEach(f -> {
+ try {
+ addURL(f.toURI().toURL());
+ } catch (IOException e) {
+ log.error("Failed to add entry to classpath: {}", f, e);
+ }
+ });
+ }
+ static List getClassPathEntries(File root) {
+ List classPathEntries = new ArrayList<>();
+ classPathEntries.add(root);
File dependencies = new File(root, "META-INF/bundled-dependencies");
if (!dependencies.isDirectory()) {
- log.warn("{} does not contain META-INF/bundled-dependencies!", narWorkingDirectory);
+ log.warn("{} does not contain META-INF/bundled-dependencies!", root);
}
- addURL(dependencies.toURI().toURL());
+ classPathEntries.add(dependencies);
if (dependencies.isDirectory()) {
final File[] jarFiles = dependencies.listFiles(JAR_FILTER);
if (jarFiles != null) {
Arrays.sort(jarFiles, Comparator.comparing(File::getName));
- for (File libJar : jarFiles) {
- addURL(libJar.toURI().toURL());
- }
+ classPathEntries.addAll(Arrays.asList(jarFiles));
}
}
+ return classPathEntries;
}
@Override
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java
index b2c0a37a65036..1554db5693fd8 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java
@@ -33,13 +33,14 @@
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
+import java.nio.file.Path;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.Enumeration;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.jar.JarEntry;
-import java.util.jar.JarFile;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
import lombok.extern.slf4j.Slf4j;
/**
@@ -114,16 +115,22 @@ static File doUnpackNar(final File nar, final File baseWorkingDirectory, Runnabl
* if the NAR could not be unpacked.
*/
private static void unpack(final File nar, final File workingDirectory) throws IOException {
- try (JarFile jarFile = new JarFile(nar)) {
- Enumeration jarEntries = jarFile.entries();
- while (jarEntries.hasMoreElements()) {
- JarEntry jarEntry = jarEntries.nextElement();
- String name = jarEntry.getName();
- File f = new File(workingDirectory, name);
- if (jarEntry.isDirectory()) {
+ Path workingDirectoryPath = workingDirectory.toPath().normalize();
+ try (ZipFile zipFile = new ZipFile(nar)) {
+ Enumeration extends ZipEntry> zipEntries = zipFile.entries();
+ while (zipEntries.hasMoreElements()) {
+ ZipEntry zipEntry = zipEntries.nextElement();
+ String name = zipEntry.getName();
+ Path targetFilePath = workingDirectoryPath.resolve(name).normalize();
+ if (!targetFilePath.startsWith(workingDirectoryPath)) {
+ log.error("Invalid zip file with entry '{}'", name);
+ throw new IOException("Invalid zip file. Aborting unpacking.");
+ }
+ File f = targetFilePath.toFile();
+ if (zipEntry.isDirectory()) {
FileUtils.ensureDirectoryExistAndCanReadAndWrite(f);
} else {
- makeFile(jarFile.getInputStream(jarEntry), f);
+ makeFile(zipFile.getInputStream(zipEntry), f);
}
}
}
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index 15add6decb70c..12606198ebff1 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -20,13 +20,13 @@
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.pulsar.common.functions.Utils.inferMissingArguments;
-
import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonParser;
+import io.prometheus.client.exporter.HTTPServer;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
@@ -34,7 +34,6 @@
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.nio.file.Files;
-import java.nio.file.Paths;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -48,19 +47,19 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
-
-import io.prometheus.client.exporter.HTTPServer;
import lombok.Builder;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.common.functions.Utils;
+import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.nar.FileUtils;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.Reflections;
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
import org.apache.pulsar.functions.proto.Function;
@@ -77,8 +76,11 @@
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
+import org.apache.pulsar.functions.utils.FunctionRuntimeCommon;
+import org.apache.pulsar.functions.utils.LoadedFunctionPackage;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
+import org.apache.pulsar.functions.utils.ValidatableFunctionPackage;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
import org.apache.pulsar.functions.utils.functions.FunctionArchive;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
@@ -97,7 +99,6 @@ public class LocalRunner implements AutoCloseable {
private final Thread shutdownHook;
private final int instanceLivenessCheck;
private UserCodeClassLoader userCodeClassLoader;
- private UserCodeClassLoader transformFunctionCodeClassLoader;
private RuntimeFactory runtimeFactory;
private HTTPServer metricsServer;
@@ -152,43 +153,56 @@ public RuntimeEnv convert(String value) {
}
}
- @Parameter(names = "--functionConfig", description = "The json representation of FunctionConfig", hidden = true, converter = FunctionConfigConverter.class)
+ @Parameter(names = "--functionConfig", description = "The json representation of FunctionConfig",
+ hidden = true, converter = FunctionConfigConverter.class)
protected FunctionConfig functionConfig;
- @Parameter(names = "--sourceConfig", description = "The json representation of SourceConfig", hidden = true, converter = SourceConfigConverter.class)
+ @Parameter(names = "--sourceConfig", description = "The json representation of SourceConfig",
+ hidden = true, converter = SourceConfigConverter.class)
protected SourceConfig sourceConfig;
- @Parameter(names = "--sinkConfig", description = "The json representation of SinkConfig", hidden = true, converter = SinkConfigConverter.class)
+ @Parameter(names = "--sinkConfig", description = "The json representation of SinkConfig",
+ hidden = true, converter = SinkConfigConverter.class)
protected SinkConfig sinkConfig;
- @Parameter(names = "--stateStorageImplClass", description = "The implemenatation class state storage service (by default Apache BookKeeper)", hidden = true, required = false)
+ @Parameter(names = "--stateStorageImplClass", description = "The implemenatation class "
+ + "state storage service (by default Apache BookKeeper)", hidden = true, required = false)
protected String stateStorageImplClass;
- @Parameter(names = "--stateStorageServiceUrl", description = "The URL for the state storage service (by default Apache BookKeeper)", hidden = true)
+ @Parameter(names = "--stateStorageServiceUrl", description = "The URL for the state storage service "
+ + "(by default Apache BookKeeper)", hidden = true)
protected String stateStorageServiceUrl;
@Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker", hidden = true)
protected String brokerServiceUrl;
@Parameter(names = "--webServiceUrl", description = "The URL for the Pulsar web service", hidden = true)
protected String webServiceUrl = null;
- @Parameter(names = "--clientAuthPlugin", description = "Client authentication plugin using which function-process can connect to broker", hidden = true)
+ @Parameter(names = "--clientAuthPlugin", description = "Client authentication plugin using which "
+ + "function-process can connect to broker", hidden = true)
protected String clientAuthPlugin;
@Parameter(names = "--clientAuthParams", description = "Client authentication param", hidden = true)
protected String clientAuthParams;
@Parameter(names = "--useTls", description = "Use tls connection\n", hidden = true, arity = 1)
protected boolean useTls;
- @Parameter(names = "--tlsAllowInsecureConnection", description = "Allow insecure tls connection\n", hidden = true, arity = 1)
+ @Parameter(names = "--tlsAllowInsecureConnection", description = "Allow insecure tls connection\n",
+ hidden = true, arity = 1)
protected boolean tlsAllowInsecureConnection;
- @Parameter(names = "--tlsHostNameVerificationEnabled", description = "Enable hostname verification", hidden = true, arity = 1)
+ @Parameter(names = "--tlsHostNameVerificationEnabled", description = "Enable hostname verification", hidden = true
+ , arity = 1)
protected boolean tlsHostNameVerificationEnabled;
@Parameter(names = "--tlsTrustCertFilePath", description = "tls trust cert file path", hidden = true)
protected String tlsTrustCertFilePath;
@Parameter(names = "--instanceIdOffset", description = "Start the instanceIds from this offset", hidden = true)
protected int instanceIdOffset = 0;
- @Parameter(names = "--runtime", description = "Function runtime to use (Thread/Process)", hidden = true, converter = RuntimeConverter.class)
+ @Parameter(names = "--runtime", description = "Function runtime to use (Thread/Process)", hidden = true,
+ converter = RuntimeConverter.class)
protected RuntimeEnv runtimeEnv;
- @Parameter(names = "--secretsProviderClassName", description = "Whats the classname of secrets provider", hidden = true)
+ @Parameter(names = "--secretsProviderClassName",
+ description = "Whats the classname of secrets provider", hidden = true)
protected String secretsProviderClassName;
- @Parameter(names = "--secretsProviderConfig", description = "Whats the config for the secrets provider", hidden = true)
+ @Parameter(names = "--secretsProviderConfig",
+ description = "Whats the config for the secrets provider", hidden = true)
protected String secretsProviderConfig;
- @Parameter(names = "--metricsPortStart", description = "The starting port range for metrics server. When running instances as threads, one metrics server is used to host the stats for all instances.", hidden = true)
+ @Parameter(names = "--metricsPortStart", description = "The starting port range for metrics server. When running "
+ + "instances as threads, one metrics server is used to host the stats for all instances.", hidden = true)
protected Integer metricsPortStart;
- @Parameter(names = "--exitOnError", description = "The starting port range for metrics server. When running instances as threads, one metrics server is used to host the stats for all instances.", hidden = true)
+ @Parameter(names = "--exitOnError", description = "The starting port range for metrics server. When running "
+ + "instances as threads, one metrics server is used to host the stats for all instances.", hidden = true)
protected boolean exitOnError;
private static final String DEFAULT_SERVICE_URL = "pulsar://localhost:6650";
@@ -256,11 +270,13 @@ public LocalRunner(FunctionConfig functionConfig, SourceConfig sourceConfig, Sin
}
private static String getPulsarDirectory(String directory) {
- String pulsarHome = System.getenv("PULSAR_HOME");
- if (pulsarHome == null) {
- pulsarHome = Paths.get("").toAbsolutePath().toString();
+ final File directoryPath;
+ if (System.getenv("PULSAR_HOME") != null) {
+ directoryPath = new File(System.getenv("PULSAR_HOME"), directory);
+ } else {
+ directoryPath = new File(directory);
}
- return Paths.get(pulsarHome, directory).toString();
+ return directoryPath.getAbsolutePath();
}
private static File createNarExtractionTempDirectory() {
@@ -309,8 +325,6 @@ public synchronized void stop() {
closeClassLoaderIfneeded(userCodeClassLoader);
userCodeClassLoader = null;
- closeClassLoaderIfneeded(transformFunctionCodeClassLoader);
- transformFunctionCodeClassLoader = null;
}
}
@@ -344,9 +358,12 @@ public void start(boolean blocking) throws Exception {
userCodeFile = functionConfig.getJar();
userCodeClassLoader = extractClassLoader(
userCodeFile, ComponentType.FUNCTION, functionConfig.getClassName());
+ ValidatableFunctionPackage validatableFunctionPackage =
+ new LoadedFunctionPackage(getCurrentOrUserCodeClassLoader(),
+ FunctionDefinition.class);
functionDetails = FunctionConfigUtils.convert(
functionConfig,
- FunctionConfigUtils.validateJavaFunction(functionConfig, getCurrentOrUserCodeClassLoader()));
+ FunctionConfigUtils.validateJavaFunction(functionConfig, validatableFunctionPackage));
} else if (functionConfig.getRuntime() == FunctionConfig.Runtime.GO) {
userCodeFile = functionConfig.getGo();
} else if (functionConfig.getRuntime() == FunctionConfig.Runtime.PYTHON) {
@@ -356,7 +373,10 @@ public void start(boolean blocking) throws Exception {
}
if (functionDetails == null) {
- functionDetails = FunctionConfigUtils.convert(functionConfig, getCurrentOrUserCodeClassLoader());
+ ValidatableFunctionPackage validatableFunctionPackage =
+ new LoadedFunctionPackage(getCurrentOrUserCodeClassLoader(),
+ FunctionDefinition.class);
+ functionDetails = FunctionConfigUtils.convert(functionConfig, validatableFunctionPackage);
}
} else if (sourceConfig != null) {
inferMissingArguments(sourceConfig);
@@ -364,9 +384,10 @@ public void start(boolean blocking) throws Exception {
parallelism = sourceConfig.getParallelism();
userCodeClassLoader = extractClassLoader(
userCodeFile, ComponentType.SOURCE, sourceConfig.getClassName());
- functionDetails = SourceConfigUtils.convert(
- sourceConfig,
- SourceConfigUtils.validateAndExtractDetails(sourceConfig, getCurrentOrUserCodeClassLoader(), true));
+ ValidatableFunctionPackage validatableFunctionPackage =
+ new LoadedFunctionPackage(getCurrentOrUserCodeClassLoader(), ConnectorDefinition.class);
+ functionDetails = SourceConfigUtils.convert(sourceConfig,
+ SourceConfigUtils.validateAndExtractDetails(sourceConfig, validatableFunctionPackage, true));
} else if (sinkConfig != null) {
inferMissingArguments(sinkConfig);
userCodeFile = sinkConfig.getArchive();
@@ -374,24 +395,13 @@ public void start(boolean blocking) throws Exception {
parallelism = sinkConfig.getParallelism();
userCodeClassLoader = extractClassLoader(
userCodeFile, ComponentType.SINK, sinkConfig.getClassName());
- if (isNotEmpty(sinkConfig.getTransformFunction())) {
- transformFunctionCodeClassLoader = extractClassLoader(
- sinkConfig.getTransformFunction(),
- ComponentType.FUNCTION,
- sinkConfig.getTransformFunctionClassName());
- }
-
- ClassLoader functionClassLoader = null;
- if (transformFunctionCodeClassLoader != null) {
- functionClassLoader = transformFunctionCodeClassLoader.getClassLoader() == null
- ? Thread.currentThread().getContextClassLoader()
- : transformFunctionCodeClassLoader.getClassLoader();
- }
+ ValidatableFunctionPackage validatableFunctionPackage =
+ new LoadedFunctionPackage(getCurrentOrUserCodeClassLoader(), ConnectorDefinition.class);
functionDetails = SinkConfigUtils.convert(
sinkConfig,
- SinkConfigUtils.validateAndExtractDetails(sinkConfig, getCurrentOrUserCodeClassLoader(),
- functionClassLoader, true));
+ SinkConfigUtils.validateAndExtractDetails(sinkConfig, validatableFunctionPackage,
+ true));
} else {
throw new IllegalArgumentException("Must specify Function, Source or Sink config");
}
@@ -416,7 +426,8 @@ public void start(boolean blocking) throws Exception {
webServiceUrl = DEFAULT_WEB_SERVICE_URL;
}
- if ((sourceConfig != null || sinkConfig != null || functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA)
+ if ((sourceConfig != null || sinkConfig != null
+ || functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA)
&& (runtimeEnv == null || runtimeEnv == RuntimeEnv.THREAD)) {
// By default run java functions as threads
startThreadedMode(functionDetails, parallelism, instanceIdOffset, serviceUrl,
@@ -435,7 +446,7 @@ public void start(boolean blocking) throws Exception {
log.info("RuntimeSpawner quit because of", spawner.getRuntime().getDeathException());
}
close();
- } else {
+ } else {
synchronized (this) {
while (running.get()) {
this.wait();
@@ -458,7 +469,7 @@ private UserCodeClassLoader extractClassLoader(String userCodeFile, ComponentTyp
if (classLoader == null) {
if (userCodeFile != null && Utils.isFunctionPackageUrlSupported(userCodeFile)) {
File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
- classLoader = FunctionCommon.getClassLoaderFromPackage(
+ classLoader = FunctionRuntimeCommon.getClassLoaderFromPackage(
componentType, className, file, narExtractionDirectory);
classLoaderCreated = true;
} else if (userCodeFile != null) {
@@ -480,7 +491,7 @@ private UserCodeClassLoader extractClassLoader(String userCodeFile, ComponentTyp
}
throw new RuntimeException(errorMsg + " (" + userCodeFile + ") does not exist");
}
- classLoader = FunctionCommon.getClassLoaderFromPackage(
+ classLoader = FunctionRuntimeCommon.getClassLoaderFromPackage(
componentType, className, file, narExtractionDirectory);
classLoaderCreated = true;
} else {
@@ -547,7 +558,8 @@ private void startProcessMode(org.apache.pulsar.functions.proto.Function.Functio
if (functionConfig != null) {
instanceConfig.setMaxPendingAsyncRequests(functionConfig.getMaxPendingAsyncRequests());
if (functionConfig.getExposePulsarAdminClientEnabled() != null) {
- instanceConfig.setExposePulsarAdminClientEnabled(functionConfig.getExposePulsarAdminClientEnabled());
+ instanceConfig
+ .setExposePulsarAdminClientEnabled(functionConfig.getExposePulsarAdminClientEnabled());
}
}
@@ -601,10 +613,11 @@ private void startThreadedMode(org.apache.pulsar.functions.proto.Function.Functi
SecretsProvider secretsProvider;
if (secretsProviderClassName != null) {
- secretsProvider = (SecretsProvider) Reflections.createInstance(secretsProviderClassName, ClassLoader.getSystemClassLoader());
+ secretsProvider = (SecretsProvider) Reflections
+ .createInstance(secretsProviderClassName, ClassLoader.getSystemClassLoader());
Map config = null;
if (secretsProviderConfig != null) {
- config = (Map)new Gson().fromJson(secretsProviderConfig, Map.class);
+ config = (Map) new Gson().fromJson(secretsProviderConfig, Map.class);
}
secretsProvider.init(config);
} else {
@@ -652,7 +665,8 @@ private void startThreadedMode(org.apache.pulsar.functions.proto.Function.Functi
if (functionConfig != null) {
instanceConfig.setMaxPendingAsyncRequests(functionConfig.getMaxPendingAsyncRequests());
if (functionConfig.getExposePulsarAdminClientEnabled() != null) {
- instanceConfig.setExposePulsarAdminClientEnabled(functionConfig.getExposePulsarAdminClientEnabled());
+ instanceConfig
+ .setExposePulsarAdminClientEnabled(functionConfig.getExposePulsarAdminClientEnabled());
}
}
@@ -696,7 +710,7 @@ private ClassLoader isBuiltInFunction(String functionType) throws IOException {
FunctionArchive function = functions.get(functionName);
if (function != null && function.getFunctionDefinition().getFunctionClass() != null) {
// Function type is a valid built-in type.
- return function.getClassLoader();
+ return function.getFunctionPackage().getClassLoader();
} else {
return null;
}
@@ -710,7 +724,7 @@ private ClassLoader isBuiltInSource(String sourceType) throws IOException {
Connector connector = connectors.get(source);
if (connector != null && connector.getConnectorDefinition().getSourceClass() != null) {
// Source type is a valid built-in connector type.
- return connector.getClassLoader();
+ return connector.getConnectorFunctionPackage().getClassLoader();
} else {
return null;
}
@@ -724,18 +738,18 @@ private ClassLoader isBuiltInSink(String sinkType) throws IOException {
Connector connector = connectors.get(sink);
if (connector != null && connector.getConnectorDefinition().getSinkClass() != null) {
// Sink type is a valid built-in connector type
- return connector.getClassLoader();
+ return connector.getConnectorFunctionPackage().getClassLoader();
} else {
return null;
}
}
private TreeMap getFunctions() throws IOException {
- return FunctionUtils.searchForFunctions(functionsDir);
+ return FunctionUtils.searchForFunctions(functionsDir, narExtractionDirectory, true);
}
private TreeMap getConnectors() throws IOException {
- return ConnectorUtils.searchForConnectors(connectorsDir, narExtractionDirectory);
+ return ConnectorUtils.searchForConnectors(connectorsDir, narExtractionDirectory, true);
}
private SecretsProviderConfigurator getSecretsProviderConfigurator() {
@@ -743,9 +757,10 @@ private SecretsProviderConfigurator getSecretsProviderConfigurator() {
if (secretsProviderClassName != null) {
Map config = null;
if (secretsProviderConfig != null) {
- config = (Map)new Gson().fromJson(secretsProviderConfig, Map.class);
+ config = (Map) new Gson().fromJson(secretsProviderConfig, Map.class);
}
- secretsProviderConfigurator = new NameAndConfigBasedSecretsProviderConfigurator(secretsProviderClassName, config);
+ secretsProviderConfigurator =
+ new NameAndConfigBasedSecretsProviderConfigurator(secretsProviderClassName, config);
} else {
secretsProviderConfigurator = new DefaultSecretsProviderConfigurator();
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
index 64db38deac0b9..e99a3607cc303 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
@@ -39,6 +39,9 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
+import net.bytebuddy.description.type.TypeDefinition;
+import net.bytebuddy.dynamic.ClassFileLocator;
+import net.bytebuddy.pool.TypePool;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.functions.WindowConfig;
import org.apache.pulsar.common.nar.NarClassLoader;
@@ -316,7 +319,8 @@ public void close() {
}
private void inferringMissingTypeClassName(Function.FunctionDetails.Builder functionDetailsBuilder,
- ClassLoader classLoader) throws ClassNotFoundException {
+ ClassLoader classLoader) {
+ TypePool typePool = TypePool.Default.of(ClassFileLocator.ForClassLoader.of(classLoader));
switch (functionDetailsBuilder.getComponentType()) {
case FUNCTION:
if ((functionDetailsBuilder.hasSource()
@@ -335,14 +339,13 @@ private void inferringMissingTypeClassName(Function.FunctionDetails.Builder func
WindowConfig.class);
className = windowConfig.getActualWindowFunctionClassName();
}
-
- Class>[] typeArgs = FunctionCommon.getFunctionTypes(classLoader.loadClass(className),
+ TypeDefinition[] typeArgs = FunctionCommon.getFunctionTypes(typePool.describe(className).resolve(),
isWindowConfigPresent);
if (functionDetailsBuilder.hasSource()
&& functionDetailsBuilder.getSource().getTypeClassName().isEmpty()
&& typeArgs[0] != null) {
Function.SourceSpec.Builder sourceBuilder = functionDetailsBuilder.getSource().toBuilder();
- sourceBuilder.setTypeClassName(typeArgs[0].getName());
+ sourceBuilder.setTypeClassName(typeArgs[0].asErasure().getTypeName());
functionDetailsBuilder.setSource(sourceBuilder.build());
}
@@ -350,7 +353,7 @@ private void inferringMissingTypeClassName(Function.FunctionDetails.Builder func
&& functionDetailsBuilder.getSink().getTypeClassName().isEmpty()
&& typeArgs[1] != null) {
Function.SinkSpec.Builder sinkBuilder = functionDetailsBuilder.getSink().toBuilder();
- sinkBuilder.setTypeClassName(typeArgs[1].getName());
+ sinkBuilder.setTypeClassName(typeArgs[1].asErasure().getTypeName());
functionDetailsBuilder.setSink(sinkBuilder.build());
}
}
@@ -359,7 +362,8 @@ private void inferringMissingTypeClassName(Function.FunctionDetails.Builder func
if ((functionDetailsBuilder.hasSink()
&& functionDetailsBuilder.getSink().getTypeClassName().isEmpty())) {
String typeArg =
- getSinkType(functionDetailsBuilder.getSink().getClassName(), classLoader).getName();
+ getSinkType(functionDetailsBuilder.getSink().getClassName(), typePool).asErasure()
+ .getTypeName();
Function.SinkSpec.Builder sinkBuilder =
Function.SinkSpec.newBuilder(functionDetailsBuilder.getSink());
@@ -378,7 +382,8 @@ private void inferringMissingTypeClassName(Function.FunctionDetails.Builder func
if ((functionDetailsBuilder.hasSource()
&& functionDetailsBuilder.getSource().getTypeClassName().isEmpty())) {
String typeArg =
- getSourceType(functionDetailsBuilder.getSource().getClassName(), classLoader).getName();
+ getSourceType(functionDetailsBuilder.getSource().getClassName(), typePool).asErasure()
+ .getTypeName();
Function.SourceSpec.Builder sourceBuilder =
Function.SourceSpec.newBuilder(functionDetailsBuilder.getSource());
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
index 17b6cd2072491..aee4ba89e88d3 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
@@ -26,7 +26,6 @@
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarServerException;
@@ -36,6 +35,7 @@
import org.apache.pulsar.common.nar.FileUtils;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.InstanceUtils;
+import org.apache.pulsar.functions.instance.JavaInstanceRunnable;
import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
@@ -44,7 +44,6 @@
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
-import org.apache.pulsar.functions.instance.JavaInstanceRunnable;
import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.FunctionsManager;
@@ -119,24 +118,24 @@ private static ClassLoader getFunctionClassLoader(InstanceConfig instanceConfig,
String narExtractionDirectory,
FunctionCacheManager fnCache,
Optional connectorsManager,
- Optional functionsManager,
- Function.FunctionDetails.ComponentType componentType)
- throws Exception {
- if (FunctionCommon.isFunctionCodeBuiltin(instanceConfig.getFunctionDetails(), componentType)) {
+ Optional functionsManager) throws Exception {
+ if (FunctionCommon.isFunctionCodeBuiltin(instanceConfig.getFunctionDetails())) {
+ Function.FunctionDetails.ComponentType componentType =
+ InstanceUtils.calculateSubjectType(instanceConfig.getFunctionDetails());
if (componentType == Function.FunctionDetails.ComponentType.FUNCTION && functionsManager.isPresent()) {
return functionsManager.get()
.getFunction(instanceConfig.getFunctionDetails().getBuiltin())
- .getClassLoader();
+ .getFunctionPackage().getClassLoader();
}
if (componentType == Function.FunctionDetails.ComponentType.SOURCE && connectorsManager.isPresent()) {
return connectorsManager.get()
.getConnector(instanceConfig.getFunctionDetails().getSource().getBuiltin())
- .getClassLoader();
+ .getConnectorFunctionPackage().getClassLoader();
}
if (componentType == Function.FunctionDetails.ComponentType.SINK && connectorsManager.isPresent()) {
return connectorsManager.get()
.getConnector(instanceConfig.getFunctionDetails().getSink().getBuiltin())
- .getClassLoader();
+ .getConnectorFunctionPackage().getClassLoader();
}
}
return loadJars(jarFile, instanceConfig, functionId, instanceConfig.getFunctionDetails().getName(),
@@ -179,9 +178,8 @@ public static ClassLoader loadJars(String jarFile,
Collections.emptyList());
}
- log.info(
- "Initialize function class loader for function {} at function cache manager, functionClassLoader: {}",
- functionName, fnCache.getClassLoader(functionId));
+ log.info("Initialize function class loader for function {} at function cache manager, functionClassLoader: {}",
+ functionName, fnCache.getClassLoader(instanceConfig.getFunctionId()));
fnClassLoader = fnCache.getClassLoader(functionId);
if (null == fnClassLoader) {
@@ -199,13 +197,12 @@ public void start() throws Exception {
// extract class loader for function
ClassLoader functionClassLoader =
- getFunctionClassLoader(instanceConfig, instanceConfig.getFunctionId(), jarFile, narExtractionDirectory,
- fnCache, connectorsManager, functionsManager,
- InstanceUtils.calculateSubjectType(instanceConfig.getFunctionDetails()));
+ getFunctionClassLoader(instanceConfig, instanceConfig.getFunctionId(), jarFile, narExtractionDirectory, fnCache, connectorsManager,
+ functionsManager);
ClassLoader transformFunctionClassLoader = transformFunctionFile == null ? null : getFunctionClassLoader(
instanceConfig, instanceConfig.getTransformFunctionId(), transformFunctionFile, narExtractionDirectory,
- fnCache, connectorsManager, functionsManager, Function.FunctionDetails.ComponentType.FUNCTION);
+ fnCache, connectorsManager, functionsManager);
// re-initialize JavaInstanceRunnable so that variables in constructor can be re-initialized
this.javaInstanceRunnable = new JavaInstanceRunnable(
@@ -254,7 +251,9 @@ public void stop() {
// kill the thread
fnThread.join(THREAD_SHUTDOWN_TIMEOUT_MILLIS, 0);
if (fnThread.isAlive()) {
- log.warn("The function instance thread is still alive after {} milliseconds. Giving up waiting and moving forward to close function.", THREAD_SHUTDOWN_TIMEOUT_MILLIS);
+ log.warn("The function instance thread is still alive after {} milliseconds. "
+ + "Giving up waiting and moving forward to close function.",
+ THREAD_SHUTDOWN_TIMEOUT_MILLIS);
}
} catch (InterruptedException e) {
// ignore this
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
index bb346e7bd82a3..1c8ec96026898 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
@@ -21,6 +21,7 @@
import com.google.common.base.Preconditions;
import io.netty.util.internal.PlatformDependent;
+import java.util.Optional;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -28,6 +29,7 @@
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.auth.FunctionAuthProvider;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceCache;
@@ -39,15 +41,12 @@
import org.apache.pulsar.functions.runtime.RuntimeUtils;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
-import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManagerImpl;
import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.FunctionsManager;
import org.apache.pulsar.functions.worker.WorkerConfig;
-import java.util.Optional;
-
/**
* Thread based function container factory implementation.
*/
@@ -179,13 +178,15 @@ public void initialize(WorkerConfig workerConfig, AuthenticationConfig authentic
ThreadRuntimeFactoryConfig factoryConfig = RuntimeUtils.getRuntimeFunctionConfig(
workerConfig.getFunctionRuntimeFactoryConfigs(), ThreadRuntimeFactoryConfig.class);
+ FunctionsManager functionsManagerNew = new FunctionsManager(workerConfig);
+
initialize(factoryConfig.getThreadGroupName(), Optional.ofNullable(factoryConfig.getPulsarClientMemoryLimit()),
workerConfig.getPulsarServiceUrl(), authenticationConfig,
workerConfig.getStateStorageProviderImplementation(),
workerConfig.getStateStorageServiceUrl(), secretsProviderConfigurator, null,
null, workerConfig.getNarExtractionDirectory(), null,
workerConfig.isExposeAdminClientEnabled(),
- workerConfig.getPulsarWebServiceUrl(), Optional.of(connectorsManager), Optional.of(functionsManager),
+ workerConfig.getPulsarWebServiceUrl(), Optional.of(connectorsManager), Optional.of(functionsManagerNew),
null);
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
index 0afffa52c70c4..c78bb73428548 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
@@ -18,27 +18,45 @@
*/
package org.apache.pulsar.functions.worker;
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.io.ConfigFieldDefinition;
import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.utils.io.Connector;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.util.List;
-import java.util.TreeMap;
-import java.util.stream.Collectors;
-
@Slf4j
-public class ConnectorsManager {
+public class ConnectorsManager implements AutoCloseable {
@Getter
private volatile TreeMap connectors;
+ @VisibleForTesting
+ public ConnectorsManager() {
+ this.connectors = new TreeMap<>();
+ }
+
public ConnectorsManager(WorkerConfig workerConfig) throws IOException {
- this.connectors = ConnectorUtils.searchForConnectors(workerConfig.getConnectorsDirectory(), workerConfig.getNarExtractionDirectory());
+ this.connectors = createConnectors(workerConfig);
+ }
+
+ private static TreeMap createConnectors(WorkerConfig workerConfig) throws IOException {
+ boolean enableClassloading = workerConfig.getEnableClassloadingOfBuiltinFiles()
+ || ThreadRuntimeFactory.class.getName().equals(workerConfig.getFunctionRuntimeFactoryClassName());
+ return ConnectorUtils.searchForConnectors(workerConfig.getConnectorsDirectory(),
+ workerConfig.getNarExtractionDirectory(), enableClassloading);
+ }
+
+ @VisibleForTesting
+ public void addConnector(String connectorType, Connector connector) {
+ connectors.put(connectorType, connector);
}
public Connector getConnector(String connectorType) {
@@ -50,7 +68,8 @@ public ConnectorDefinition getConnectorDefinition(String connectorType) {
}
public List getConnectorDefinitions() {
- return connectors.values().stream().map(connector -> connector.getConnectorDefinition()).collect(Collectors.toList());
+ return connectors.values().stream().map(connector -> connector.getConnectorDefinition())
+ .collect(Collectors.toList());
}
public Path getSourceArchive(String sourceType) {
@@ -70,6 +89,25 @@ public Path getSinkArchive(String sinkType) {
}
public void reloadConnectors(WorkerConfig workerConfig) throws IOException {
- connectors = ConnectorUtils.searchForConnectors(workerConfig.getConnectorsDirectory(), workerConfig.getNarExtractionDirectory());
+ TreeMap oldConnectors = connectors;
+ this.connectors = createConnectors(workerConfig);
+ closeConnectors(oldConnectors);
}
+
+ @Override
+ public void close() {
+ closeConnectors(connectors);
+ }
+
+ private void closeConnectors(TreeMap connectorMap) {
+ connectorMap.values().forEach(connector -> {
+ try {
+ connector.close();
+ } catch (Exception e) {
+ log.warn("Failed to close connector", e);
+ }
+ });
+ connectorMap.clear();
+ }
+
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java
index 5937d09eaa0b8..af038204ad961 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java
@@ -18,23 +18,34 @@
*/
package org.apache.pulsar.functions.worker;
+import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.TreeMap;
import java.util.stream.Collectors;
+
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.functions.FunctionDefinition;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.utils.functions.FunctionArchive;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
@Slf4j
-public class FunctionsManager {
-
+public class FunctionsManager implements AutoCloseable {
private TreeMap functions;
+ @VisibleForTesting
+ public FunctionsManager() {
+ this.functions = new TreeMap<>();
+ }
+
public FunctionsManager(WorkerConfig workerConfig) throws IOException {
- this.functions = FunctionUtils.searchForFunctions(workerConfig.getFunctionsDirectory());
+ this.functions = createFunctions(workerConfig);
+ }
+
+ public void addFunction(String functionType, FunctionArchive functionArchive) {
+ functions.put(functionType, functionArchive);
}
public FunctionArchive getFunction(String functionType) {
@@ -51,6 +62,32 @@ public List getFunctionDefinitions() {
}
public void reloadFunctions(WorkerConfig workerConfig) throws IOException {
- this.functions = FunctionUtils.searchForFunctions(workerConfig.getFunctionsDirectory());
+ TreeMap oldFunctions = functions;
+ this.functions = createFunctions(workerConfig);
+ closeFunctions(oldFunctions);
+ }
+
+ private static TreeMap createFunctions(WorkerConfig workerConfig) throws IOException {
+ boolean enableClassloading = workerConfig.getEnableClassloadingOfBuiltinFiles()
+ || ThreadRuntimeFactory.class.getName().equals(workerConfig.getFunctionRuntimeFactoryClassName());
+ return FunctionUtils.searchForFunctions(workerConfig.getFunctionsDirectory(),
+ workerConfig.getNarExtractionDirectory(),
+ enableClassloading);
+ }
+
+ @Override
+ public void close() {
+ closeFunctions(functions);
+ }
+
+ private void closeFunctions(TreeMap functionMap) {
+ functionMap.values().forEach(functionArchive -> {
+ try {
+ functionArchive.close();
+ } catch (Exception e) {
+ log.warn("Failed to close function archive", e);
+ }
+ });
+ functionMap.clear();
}
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 8025dbf48e2c6..c3fc6dd9a39ce 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -210,6 +210,21 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
)
private boolean zooKeeperAllowReadOnlyOperations;
+ @FieldContext(
+ doc = "Specifies if the function worker should use classloading for validating submissions for built-in "
+ + "connectors and functions. This is required for validateConnectorConfig to take effect. "
+ + "Default is false."
+ )
+ private Boolean enableClassloadingOfBuiltinFiles = false;
+
+ @FieldContext(
+ category = CATEGORY_WORKER,
+ doc = "Specifies if the function worker should use classloading for validating submissions for external "
+ + "connectors and functions. This is required for validateConnectorConfig to take effect. "
+ + "Default is false."
+ )
+ private Boolean enableClassloadingOfExternalFiles = false;
+
@FieldContext(
category = CATEGORY_CONNECTORS,
doc = "The path to the location to locate builtin connectors"
@@ -222,7 +237,10 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR;
@FieldContext(
category = CATEGORY_CONNECTORS,
- doc = "Should we validate connector config during submission"
+ doc = "Enables extended validation for connector config with fine-grain annotation based validation "
+ + "during submission. Classloading with either enableClassloadingOfExternalFiles or "
+ + "enableClassloadingOfBuiltinFiles must be enabled on the worker for this to take effect. "
+ + "Default is false."
)
private Boolean validateConnectorConfig = false;
@FieldContext(
diff --git a/pulsar-functions/utils/pom.xml b/pulsar-functions/utils/pom.xml
index 897c77982dd0d..03858fe039455 100644
--- a/pulsar-functions/utils/pom.xml
+++ b/pulsar-functions/utils/pom.xml
@@ -87,6 +87,17 @@
typetools
+
+ net.bytebuddy
+ byte-buddy
+
+
+
+ org.zeroturnaround
+ zt-zip
+ 1.17
+
+
${project.groupId}
pulsar-client-original
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
index 52a7c6b43202c..ff1129ff96700 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
@@ -22,27 +22,27 @@
import com.google.protobuf.AbstractMessage.Builder;
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.util.JsonFormat;
-import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
-import java.io.ObjectOutputStream;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
-import java.net.MalformedURLException;
import java.net.ServerSocket;
import java.net.URISyntaxException;
import java.net.URL;
+import java.net.URLConnection;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.Collection;
-import java.util.Collections;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import net.bytebuddy.description.type.TypeDefinition;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.description.type.TypeList;
+import net.bytebuddy.pool.TypePool;
import net.jodah.typetools.TypeResolver;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.MessageId;
@@ -51,16 +51,12 @@
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Utils;
-import org.apache.pulsar.common.nar.NarClassLoader;
-import org.apache.pulsar.common.nar.NarClassLoaderBuilder;
-import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.WindowFunction;
import org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType;
import org.apache.pulsar.functions.proto.Function.FunctionDetails.Runtime;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
-import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.io.core.BatchSource;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.Source;
@@ -83,61 +79,85 @@ public static void mergeJson(String json, Builder builder) throws IOException {
public static int findAvailablePort() {
// The logic here is a little flaky. There is no guarantee that this
// port returned will be available later on when the instance starts
- // TODO(sanjeev):- Fix this
+ // TODO:- Fix this.
try {
ServerSocket socket = new ServerSocket(0);
int port = socket.getLocalPort();
socket.close();
return port;
- } catch (IOException ex){
+ } catch (IOException ex) {
throw new RuntimeException("No free port found", ex);
}
}
- public static Class>[] getFunctionTypes(FunctionConfig functionConfig, ClassLoader classLoader)
+ public static TypeDefinition[] getFunctionTypes(FunctionConfig functionConfig, TypePool typePool)
throws ClassNotFoundException {
- return getFunctionTypes(functionConfig, classLoader.loadClass(functionConfig.getClassName()));
+ return getFunctionTypes(functionConfig, typePool.describe(functionConfig.getClassName()).resolve());
}
- public static Class>[] getFunctionTypes(FunctionConfig functionConfig, Class functionClass)
- throws ClassNotFoundException {
+ public static TypeDefinition[] getFunctionTypes(FunctionConfig functionConfig, TypeDefinition functionClass) {
boolean isWindowConfigPresent = functionConfig.getWindowConfig() != null;
return getFunctionTypes(functionClass, isWindowConfigPresent);
}
- public static Class>[] getFunctionTypes(Class> userClass, boolean isWindowConfigPresent) {
+ public static TypeDefinition[] getFunctionTypes(TypeDefinition userClass, boolean isWindowConfigPresent) {
Class> classParent = getFunctionClassParent(userClass, isWindowConfigPresent);
- Class>[] typeArgs = TypeResolver.resolveRawArguments(classParent, userClass);
+ TypeList.Generic typeArgsList = resolveInterfaceTypeArguments(userClass, classParent);
+ TypeDescription.Generic[] typeArgs = new TypeDescription.Generic[2];
+ typeArgs[0] = typeArgsList.get(0);
+ typeArgs[1] = typeArgsList.get(1);
// if window function
if (isWindowConfigPresent) {
if (classParent.equals(java.util.function.Function.class)) {
- if (!typeArgs[0].equals(Collection.class)) {
+ if (!typeArgs[0].asErasure().isAssignableTo(Collection.class)) {
throw new IllegalArgumentException("Window function must take a collection as input");
}
- typeArgs[0] = (Class>) unwrapType(classParent, userClass, 0);
+ typeArgs[0] = typeArgs[0].getTypeArguments().get(0);
}
}
- if (typeArgs[1].equals(Record.class)) {
- typeArgs[1] = (Class>) unwrapType(classParent, userClass, 1);
+ if (typeArgs[1].asErasure().isAssignableTo(Record.class)) {
+ typeArgs[1] = typeArgs[1].getTypeArguments().get(0);
+ }
+ if (typeArgs[1].asErasure().isAssignableTo(CompletableFuture.class)) {
+ typeArgs[1] = typeArgs[1].getTypeArguments().get(0);
}
-
return typeArgs;
}
- public static Class>[] getRawFunctionTypes(Class> userClass, boolean isWindowConfigPresent) {
+ private static TypeList.Generic resolveInterfaceTypeArguments(TypeDefinition userClass, Class> interfaceClass) {
+ if (!interfaceClass.isInterface()) {
+ throw new IllegalArgumentException("interfaceClass must be an interface");
+ }
+ for (TypeDescription.Generic interfaze : userClass.getInterfaces()) {
+ if (interfaze.asErasure().isAssignableTo(interfaceClass)) {
+ return interfaze.getTypeArguments();
+ }
+ }
+ if (userClass.getSuperClass() != null) {
+ return resolveInterfaceTypeArguments(userClass.getSuperClass(), interfaceClass);
+ }
+ return null;
+ }
+
+ public static TypeDescription.Generic[] getRawFunctionTypes(TypeDefinition userClass,
+ boolean isWindowConfigPresent) {
Class> classParent = getFunctionClassParent(userClass, isWindowConfigPresent);
- return TypeResolver.resolveRawArguments(classParent, userClass);
+ TypeList.Generic typeArgsList = resolveInterfaceTypeArguments(userClass, classParent);
+ TypeDescription.Generic[] typeArgs = new TypeDescription.Generic[2];
+ typeArgs[0] = typeArgsList.get(0);
+ typeArgs[1] = typeArgsList.get(1);
+ return typeArgs;
}
- public static Class> getFunctionClassParent(Class> userClass, boolean isWindowConfigPresent) {
+ public static Class> getFunctionClassParent(TypeDefinition userClass, boolean isWindowConfigPresent) {
if (isWindowConfigPresent) {
- if (WindowFunction.class.isAssignableFrom(userClass)) {
+ if (userClass.asErasure().isAssignableTo(WindowFunction.class)) {
return WindowFunction.class;
} else {
return java.util.function.Function.class;
}
} else {
- if (Function.class.isAssignableFrom(userClass)) {
+ if (userClass.asErasure().isAssignableTo(Function.class)) {
return Function.class;
} else {
return java.util.function.Function.class;
@@ -151,35 +171,6 @@ private static Type unwrapType(Class> type, Class> subType, int position) {
return ((ParameterizedType) argType).getActualTypeArguments()[0];
}
- public static Object createInstance(String userClassName, ClassLoader classLoader) {
- Class> theCls;
- try {
- theCls = Class.forName(userClassName);
- } catch (ClassNotFoundException | NoClassDefFoundError cnfe) {
- try {
- theCls = Class.forName(userClassName, true, classLoader);
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
- throw new RuntimeException("User class must be in class path", cnfe);
- }
- }
- Object result;
- try {
- Constructor> meth = theCls.getDeclaredConstructor();
- meth.setAccessible(true);
- result = meth.newInstance();
- } catch (InstantiationException ie) {
- throw new RuntimeException("User class must be concrete", ie);
- } catch (NoSuchMethodException e) {
- throw new RuntimeException("User class doesn't have such method", e);
- } catch (IllegalAccessException e) {
- throw new RuntimeException("User class must have a no-arg constructor", e);
- } catch (InvocationTargetException e) {
- throw new RuntimeException("User class constructor throws exception", e);
- }
- return result;
-
- }
-
public static Runtime convertRuntime(FunctionConfig.Runtime runtime) {
for (Runtime type : Runtime.values()) {
if (type.name().equals(runtime.name())) {
@@ -200,7 +191,9 @@ public static FunctionConfig.Runtime convertRuntime(Runtime runtime) {
public static org.apache.pulsar.functions.proto.Function.ProcessingGuarantees convertProcessingGuarantee(
FunctionConfig.ProcessingGuarantees processingGuarantees) {
- for (org.apache.pulsar.functions.proto.Function.ProcessingGuarantees type : org.apache.pulsar.functions.proto.Function.ProcessingGuarantees.values()) {
+ for (org.apache.pulsar.functions.proto.Function.ProcessingGuarantees type :
+ org.apache.pulsar.functions.proto.Function.ProcessingGuarantees
+ .values()) {
if (type.name().equals(processingGuarantees.name())) {
return type;
}
@@ -218,50 +211,46 @@ public static FunctionConfig.ProcessingGuarantees convertProcessingGuarantee(
throw new RuntimeException("Unrecognized processing guarantee: " + processingGuarantees.name());
}
- public static Class> getSourceType(String className, ClassLoader classLoader) throws ClassNotFoundException {
- return getSourceType(classLoader.loadClass(className));
+ public static TypeDefinition getSourceType(String className, TypePool typePool) {
+ return getSourceType(typePool.describe(className).resolve());
}
- public static Class> getSourceType(Class sourceClass) {
-
- if (Source.class.isAssignableFrom(sourceClass)) {
- return TypeResolver.resolveRawArgument(Source.class, sourceClass);
- } else if (BatchSource.class.isAssignableFrom(sourceClass)) {
- return TypeResolver.resolveRawArgument(BatchSource.class, sourceClass);
+ public static TypeDefinition getSourceType(TypeDefinition sourceClass) {
+ if (sourceClass.asErasure().isAssignableTo(Source.class)) {
+ return resolveInterfaceTypeArguments(sourceClass, Source.class).get(0);
+ } else if (sourceClass.asErasure().isAssignableTo(BatchSource.class)) {
+ return resolveInterfaceTypeArguments(sourceClass, BatchSource.class).get(0);
} else {
throw new IllegalArgumentException(
String.format("Source class %s does not implement the correct interface",
- sourceClass.getName()));
+ sourceClass.getActualName()));
}
}
- public static Class> getSinkType(String className, ClassLoader classLoader) throws ClassNotFoundException {
- return getSinkType(classLoader.loadClass(className));
+ public static TypeDefinition getSinkType(String className, TypePool typePool) {
+ return getSinkType(typePool.describe(className).resolve());
}
- public static Class> getSinkType(Class sinkClass) {
- return TypeResolver.resolveRawArgument(Sink.class, sinkClass);
+ public static TypeDefinition getSinkType(TypeDefinition sinkClass) {
+ if (sinkClass.asErasure().isAssignableTo(Sink.class)) {
+ return resolveInterfaceTypeArguments(sinkClass, Sink.class).get(0);
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Sink class %s does not implement the correct interface",
+ sinkClass.getActualName()));
+ }
}
public static void downloadFromHttpUrl(String destPkgUrl, File targetFile) throws IOException {
- URL website = new URL(destPkgUrl);
- try (InputStream in = website.openStream()) {
+ final URL url = new URL(destPkgUrl);
+ final URLConnection connection = url.openConnection();
+ try (InputStream in = connection.getInputStream()) {
log.info("Downloading function package from {} to {} ...", destPkgUrl, targetFile.getAbsoluteFile());
Files.copy(in, targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
}
log.info("Downloading function package from {} to {} completed!", destPkgUrl, targetFile.getAbsoluteFile());
}
- public static ClassLoader extractClassLoader(String destPkgUrl) throws IOException, URISyntaxException {
- File file = extractFileFromPkgURL(destPkgUrl);
- try {
- return ClassLoaderUtils.loadJar(file);
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException(
- "Corrupt User PackageFile " + file + " with error " + e.getMessage());
- }
- }
-
public static File createPkgTempFile() throws IOException {
return File.createTempFile("functions", ".tmp");
}
@@ -280,23 +269,9 @@ public static File extractFileFromPkgURL(String destPkgUrl) throws IOException,
downloadFromHttpUrl(destPkgUrl, tempFile);
return tempFile;
} else {
- throw new IllegalArgumentException("Unsupported url protocol "+ destPkgUrl +", supported url protocols: [file/http/https]");
- }
- }
-
- public static NarClassLoader extractNarClassLoader(File packageFile,
- String narExtractionDirectory) {
- if (packageFile != null) {
- try {
- return NarClassLoaderBuilder.builder()
- .narFile(packageFile)
- .extractionDirectory(narExtractionDirectory)
- .build();
- } catch (IOException e) {
- throw new IllegalArgumentException(e.getMessage());
- }
+ throw new IllegalArgumentException("Unsupported url protocol "
+ + destPkgUrl + ", supported url protocols: [file/http/https]");
}
- return null;
}
public static String getFullyQualifiedInstanceId(org.apache.pulsar.functions.proto.Function.Instance instance) {
@@ -334,17 +309,6 @@ public static final MessageId getMessageId(long sequenceId) {
return new MessageIdImpl(ledgerId, entryId, -1);
}
- public static byte[] toByteArray(Object obj) throws IOException {
- byte[] bytes = null;
- try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(bos)) {
- oos.writeObject(obj);
- oos.flush();
- bytes = bos.toByteArray();
- }
- return bytes;
- }
-
public static String getUniquePackageName(String packageName) {
return String.format("%s-%s", UUID.randomUUID().toString(), packageName);
}
@@ -358,11 +322,13 @@ public static String getUniquePackageName(String packageName) {
*/
public static String getStateNamespace(String tenant, String namespace) {
return String.format("%s_%s", tenant, namespace)
- .replace("-", "_");
+ .replace("-", "_");
}
- public static String getFullyQualifiedName(org.apache.pulsar.functions.proto.Function.FunctionDetails FunctionDetails) {
- return getFullyQualifiedName(FunctionDetails.getTenant(), FunctionDetails.getNamespace(), FunctionDetails.getName());
+ public static String getFullyQualifiedName(
+ org.apache.pulsar.functions.proto.Function.FunctionDetails functionDetails) {
+ return getFullyQualifiedName(functionDetails.getTenant(), functionDetails.getNamespace(),
+ functionDetails.getName());
}
@@ -390,142 +356,11 @@ private static String extractFromFullyQualifiedName(String fqfn, int index) {
throw new RuntimeException("Invalid Fully Qualified Function Name " + fqfn);
}
- public static Class> getTypeArg(String className, Class> funClass, ClassLoader classLoader)
- throws ClassNotFoundException {
- Class> loadedClass = classLoader.loadClass(className);
- if (!funClass.isAssignableFrom(loadedClass)) {
- throw new IllegalArgumentException(
- String.format("class %s is not type of %s", className, funClass.getName()));
- }
- return TypeResolver.resolveRawArgument(funClass, loadedClass);
- }
-
public static double roundDecimal(double value, int places) {
double scale = Math.pow(10, places);
return Math.round(value * scale) / scale;
}
- public static ClassLoader getClassLoaderFromPackage(
- ComponentType componentType,
- String className,
- File packageFile,
- String narExtractionDirectory) {
- String connectorClassName = className;
- ClassLoader jarClassLoader = null;
- boolean keepJarClassLoader = false;
- ClassLoader narClassLoader = null;
- boolean keepNarClassLoader = false;
-
- Exception jarClassLoaderException = null;
- Exception narClassLoaderException = null;
-
- try {
- try {
- jarClassLoader = ClassLoaderUtils.extractClassLoader(packageFile);
- } catch (Exception e) {
- jarClassLoaderException = e;
- }
- try {
- narClassLoader = FunctionCommon.extractNarClassLoader(packageFile, narExtractionDirectory);
- } catch (Exception e) {
- narClassLoaderException = e;
- }
-
- // if connector class name is not provided, we can only try to load archive as a NAR
- if (isEmpty(connectorClassName)) {
- if (narClassLoader == null) {
- throw new IllegalArgumentException(String.format("%s package does not have the correct format. " +
- "Pulsar cannot determine if the package is a NAR package or JAR package. " +
- "%s classname is not provided and attempts to load it as a NAR package produced " +
- "the following error.",
- capFirstLetter(componentType), capFirstLetter(componentType)),
- narClassLoaderException);
- }
- try {
- if (componentType == ComponentType.FUNCTION) {
- connectorClassName = FunctionUtils.getFunctionClass(narClassLoader);
- } else if (componentType == ComponentType.SOURCE) {
- connectorClassName = ConnectorUtils.getIOSourceClass((NarClassLoader) narClassLoader);
- } else {
- connectorClassName = ConnectorUtils.getIOSinkClass((NarClassLoader) narClassLoader);
- }
- } catch (IOException e) {
- throw new IllegalArgumentException(String.format("Failed to extract %s class from archive",
- componentType.toString().toLowerCase()), e);
- }
-
- try {
- narClassLoader.loadClass(connectorClassName);
- keepNarClassLoader = true;
- return narClassLoader;
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
- throw new IllegalArgumentException(
- String.format("%s class %s must be in class path", capFirstLetter(componentType),
- connectorClassName), e);
- }
-
- } else {
- // if connector class name is provided, we need to try to load it as a JAR and as a NAR.
- if (jarClassLoader != null) {
- try {
- jarClassLoader.loadClass(connectorClassName);
- keepJarClassLoader = true;
- return jarClassLoader;
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
- // class not found in JAR try loading as a NAR and searching for the class
- if (narClassLoader != null) {
-
- try {
- narClassLoader.loadClass(connectorClassName);
- keepNarClassLoader = true;
- return narClassLoader;
- } catch (ClassNotFoundException | NoClassDefFoundError e1) {
- throw new IllegalArgumentException(
- String.format("%s class %s must be in class path",
- capFirstLetter(componentType), connectorClassName), e1);
- }
- } else {
- throw new IllegalArgumentException(
- String.format("%s class %s must be in class path", capFirstLetter(componentType),
- connectorClassName), e);
- }
- }
- } else if (narClassLoader != null) {
- try {
- narClassLoader.loadClass(connectorClassName);
- keepNarClassLoader = true;
- return narClassLoader;
- } catch (ClassNotFoundException | NoClassDefFoundError e1) {
- throw new IllegalArgumentException(
- String.format("%s class %s must be in class path",
- capFirstLetter(componentType), connectorClassName), e1);
- }
- } else {
- StringBuilder errorMsg = new StringBuilder(capFirstLetter(componentType)
- + " package does not have the correct format."
- + " Pulsar cannot determine if the package is a NAR package or JAR package.");
-
- if (jarClassLoaderException != null) {
- errorMsg.append(" Attempts to load it as a JAR package produced error: " + jarClassLoaderException.getMessage());
- }
-
- if (narClassLoaderException != null) {
- errorMsg.append(" Attempts to load it as a NAR package produced error: " + narClassLoaderException.getMessage());
- }
-
- throw new IllegalArgumentException(errorMsg.toString());
- }
- }
- } finally {
- if (!keepJarClassLoader) {
- ClassLoaderUtils.closeClassLoader(jarClassLoader);
- }
- if (!keepNarClassLoader) {
- ClassLoaderUtils.closeClassLoader(narClassLoader);
- }
- }
- }
-
public static String capFirstLetter(Enum en) {
return StringUtils.capitalize(en.toString().toLowerCase());
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 9e2be93ae3568..911de7848a0b2 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -16,22 +16,36 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.pulsar.functions.utils;
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.commons.lang3.StringUtils.isNotEmpty;
+import static org.apache.pulsar.common.functions.Utils.BUILTIN;
+import static org.apache.pulsar.functions.utils.FunctionCommon.convertFromFunctionDetailsSubscriptionPosition;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
-
+import java.io.File;
+import java.lang.reflect.Type;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang.StringUtils;
+import net.bytebuddy.description.type.TypeDefinition;
+import net.bytebuddy.pool.TypePool;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.functions.WindowConfig;
@@ -41,24 +55,6 @@
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
-import java.io.IOException;
-import java.io.File;
-import java.lang.reflect.Type;
-import java.net.MalformedURLException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.commons.lang.StringUtils.isBlank;
-import static org.apache.commons.lang.StringUtils.isNotBlank;
-import static org.apache.commons.lang.StringUtils.isNotEmpty;
-import static org.apache.commons.lang3.StringUtils.isEmpty;
-import static org.apache.pulsar.common.functions.Utils.BUILTIN;
-import static org.apache.pulsar.common.util.ClassLoaderUtils.loadJar;
-import static org.apache.pulsar.functions.utils.FunctionCommon.convertFromFunctionDetailsSubscriptionPosition;
-
@Slf4j
public class FunctionConfigUtils {
@@ -71,8 +67,8 @@ public static class ExtractedFunctionDetails {
private String typeArg1;
}
- static final Integer MAX_PENDING_ASYNC_REQUESTS_DEFAULT = 1000;
- static final Boolean FORWARD_SOURCE_MESSAGE_PROPERTY_DEFAULT = Boolean.TRUE;
+ static final Integer MAX_PENDING_ASYNC_REQUESTS_DEFAULT = 1000;
+ static final Boolean FORWARD_SOURCE_MESSAGE_PROPERTY_DEFAULT = Boolean.TRUE;
/*
Enable Function subscription naming workaround when activated
@@ -91,26 +87,21 @@ to the Functions Worker (or to the Pulsar Broker when there isn't a separate Fun
private static final ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.create();
- public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader classLoader)
- throws IllegalArgumentException {
+ public static FunctionDetails convert(FunctionConfig functionConfig) {
+ return convert(functionConfig, (ValidatableFunctionPackage) null);
+ }
- if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
- if (classLoader != null) {
- try {
- Class>[] typeArgs = FunctionCommon.getFunctionTypes(functionConfig, classLoader);
- return convert(
- functionConfig,
- new ExtractedFunctionDetails(
- functionConfig.getClassName(),
- typeArgs[0].getName(),
- typeArgs[1].getName()));
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
- throw new IllegalArgumentException(
- String.format("Function class %s must be in class path", functionConfig.getClassName()), e);
- }
- }
+ public static FunctionDetails convert(FunctionConfig functionConfig,
+ ValidatableFunctionPackage validatableFunctionPackage)
+ throws IllegalArgumentException {
+ if (functionConfig == null) {
+ throw new IllegalArgumentException("Function config is not provided");
+ }
+ if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA && validatableFunctionPackage != null) {
+ return convert(functionConfig, doJavaChecks(functionConfig, validatableFunctionPackage));
+ } else {
+ return convert(functionConfig, new ExtractedFunctionDetails(functionConfig.getClassName(), null, null));
}
- return convert(functionConfig, new ExtractedFunctionDetails(functionConfig.getClassName(), null, null));
}
public static FunctionDetails convert(FunctionConfig functionConfig, ExtractedFunctionDetails extractedDetails)
@@ -158,7 +149,8 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ExtractedFu
.setIsRegexPattern(false)
.build());
} catch (JsonProcessingException e) {
- throw new IllegalArgumentException(String.format("Incorrect custom schema inputs,Topic %s ", topicName));
+ throw new IllegalArgumentException(
+ String.format("Incorrect custom schema inputs,Topic %s ", topicName));
}
});
}
@@ -190,7 +182,8 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ExtractedFu
// Set subscription type
Function.SubscriptionType subType;
if ((functionConfig.getRetainOrdering() != null && functionConfig.getRetainOrdering())
- || FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE.equals(functionConfig.getProcessingGuarantees())) {
+ || FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE
+ .equals(functionConfig.getProcessingGuarantees())) {
subType = Function.SubscriptionType.FAILOVER;
} else if (functionConfig.getRetainKeyOrdering() != null && functionConfig.getRetainKeyOrdering()) {
subType = Function.SubscriptionType.KEY_SHARED;
@@ -255,7 +248,8 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ExtractedFu
sinkSpecBuilder.putAllConsumerProperties(consumerConfig.getConsumerProperties());
}
} catch (JsonProcessingException e) {
- throw new IllegalArgumentException(String.format("Incorrect custom schema outputs,Topic %s ", functionConfig.getOutput()));
+ throw new IllegalArgumentException(
+ String.format("Incorrect custom schema outputs,Topic %s ", functionConfig.getOutput()));
}
}
if (extractedDetails.getTypeArg1() != null) {
@@ -281,6 +275,12 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ExtractedFu
}
sinkSpecBuilder.setProducerSpec(pbldr.build());
}
+ if (functionConfig.getBatchBuilder() != null) {
+ Function.ProducerSpec.Builder builder = sinkSpecBuilder.getProducerSpec() != null
+ ? sinkSpecBuilder.getProducerSpec().toBuilder()
+ : Function.ProducerSpec.newBuilder();
+ sinkSpecBuilder.setProducerSpec(builder.setBatchBuilder(functionConfig.getBatchBuilder()).build());
+ }
functionDetailsBuilder.setSink(sinkSpecBuilder);
if (functionConfig.getTenant() != null) {
@@ -330,7 +330,6 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ExtractedFu
configs.put(WindowConfig.WINDOW_CONFIG_KEY, windowConfig);
// set class name to window function executor
functionDetailsBuilder.setClassName("org.apache.pulsar.functions.windowing.WindowFunctionExecutor");
-
} else {
if (extractedDetails.getFunctionClassName() != null) {
functionDetailsBuilder.setClassName(extractedDetails.getFunctionClassName());
@@ -339,7 +338,6 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ExtractedFu
if (!configs.isEmpty()) {
functionDetailsBuilder.setUserConfig(new Gson().toJson(configs));
}
-
if (functionConfig.getSecrets() != null && !functionConfig.getSecrets().isEmpty()) {
functionDetailsBuilder.setSecretsMap(new Gson().toJson(functionConfig.getSecrets()));
}
@@ -379,18 +377,36 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ExtractedFu
functionDetailsBuilder.setBuiltin(builtin);
}
- return functionDetailsBuilder.build();
+ return validateFunctionDetails(functionDetailsBuilder.build());
+ }
+
+ public static FunctionDetails validateFunctionDetails(FunctionDetails functionDetails)
+ throws IllegalArgumentException {
+ if (!functionDetails.getAutoAck() && functionDetails.getProcessingGuarantees()
+ == Function.ProcessingGuarantees.ATMOST_ONCE) {
+ throw new IllegalArgumentException("When Guarantees == ATMOST_ONCE, autoAck must be equal to true."
+ + " This is a contradictory configuration, autoAck will be removed later."
+ + " Please refer to PIP: https://github.com/apache/pulsar/issues/15560");
+ }
+ if (!functionDetails.getAutoAck()) {
+ log.warn("The autoAck configuration will be deprecated in the future."
+ + " If you want not to automatically ack, please configure the processing guarantees as MANUAL.");
+ }
+ return functionDetails;
}
public static FunctionConfig convertFromDetails(FunctionDetails functionDetails) {
+ functionDetails = validateFunctionDetails(functionDetails);
FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setTenant(functionDetails.getTenant());
functionConfig.setNamespace(functionDetails.getNamespace());
functionConfig.setName(functionDetails.getName());
functionConfig.setParallelism(functionDetails.getParallelism());
- functionConfig.setProcessingGuarantees(FunctionCommon.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
+ functionConfig.setProcessingGuarantees(
+ FunctionCommon.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
Map consumerConfigMap = new HashMap<>();
- for (Map.Entry input : functionDetails.getSource().getInputSpecsMap().entrySet()) {
+ for (Map.Entry input : functionDetails.getSource().getInputSpecsMap()
+ .entrySet()) {
ConsumerConfig consumerConfig = new ConsumerConfig();
if (isNotEmpty(input.getValue().getSerdeClassName())) {
consumerConfig.setSerdeClassName(input.getValue().getSerdeClassName());
@@ -531,7 +547,7 @@ public static void inferMissingArguments(FunctionConfig functionConfig,
}
if (functionConfig.getMaxPendingAsyncRequests() == null) {
- functionConfig.setMaxPendingAsyncRequests(MAX_PENDING_ASYNC_REQUESTS_DEFAULT);
+ functionConfig.setMaxPendingAsyncRequests(MAX_PENDING_ASYNC_REQUESTS_DEFAULT);
}
if (forwardSourceMessagePropertyEnabled) {
@@ -557,48 +573,49 @@ public static void inferMissingArguments(FunctionConfig functionConfig,
}
}
- public static ExtractedFunctionDetails doJavaChecks(FunctionConfig functionConfig, ClassLoader clsLoader) {
+ public static ExtractedFunctionDetails doJavaChecks(FunctionConfig functionConfig,
+ ValidatableFunctionPackage validatableFunctionPackage) {
- String functionClassName = functionConfig.getClassName();
- Class functionClass;
+ String functionClassName = StringUtils.trimToNull(functionConfig.getClassName());
+ TypeDefinition functionClass;
try {
// if class name in function config is not set, this should be a built-in function
// thus we should try to find its class name in the NAR service definition
if (functionClassName == null) {
- try {
- functionClassName = FunctionUtils.getFunctionClass(clsLoader);
- } catch (IOException e) {
- throw new IllegalArgumentException("Failed to extract source class from archive", e);
+ FunctionDefinition functionDefinition =
+ validatableFunctionPackage.getFunctionMetaData(FunctionDefinition.class);
+ if (functionDefinition == null) {
+ throw new IllegalArgumentException("Function class name is not provided.");
+ }
+ functionClassName = functionDefinition.getFunctionClass();
+ if (functionClassName == null) {
+ throw new IllegalArgumentException("Function class name is not provided.");
}
}
- functionClass = clsLoader.loadClass(functionClassName);
+ functionClass = validatableFunctionPackage.resolveType(functionClassName);
- if (!org.apache.pulsar.functions.api.Function.class.isAssignableFrom(functionClass)
- && !java.util.function.Function.class.isAssignableFrom(functionClass)
- && !org.apache.pulsar.functions.api.WindowFunction.class.isAssignableFrom(functionClass)) {
+ if (!functionClass.asErasure().isAssignableTo(org.apache.pulsar.functions.api.Function.class)
+ && !functionClass.asErasure().isAssignableTo(java.util.function.Function.class)
+ && !functionClass.asErasure()
+ .isAssignableTo(org.apache.pulsar.functions.api.WindowFunction.class)) {
throw new IllegalArgumentException(
String.format("Function class %s does not implement the correct interface",
- functionClass.getName()));
+ functionClassName));
}
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
+ } catch (TypePool.Resolution.NoSuchTypeException e) {
throw new IllegalArgumentException(
- String.format("Function class %s must be in class path", functionConfig.getClassName()), e);
+ String.format("Function class %s must be in class path", functionClassName), e);
}
- Class>[] typeArgs;
- try {
- typeArgs = FunctionCommon.getFunctionTypes(functionConfig, functionClass);
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
- throw new IllegalArgumentException(
- String.format("Function class %s must be in class path", functionConfig.getClassName()), e);
- }
+ TypeDefinition[] typeArgs = FunctionCommon.getFunctionTypes(functionConfig, functionClass);
// inputs use default schema, so there is no check needed there
// Check if the Input serialization/deserialization class exists in jar or already loaded and that it
// implements SerDe class
if (functionConfig.getCustomSerdeInputs() != null) {
functionConfig.getCustomSerdeInputs().forEach((topicName, inputSerializer) -> {
- ValidatorUtils.validateSerde(inputSerializer, typeArgs[0], clsLoader, true);
+ ValidatorUtils.validateSerde(inputSerializer, typeArgs[0], validatableFunctionPackage.getTypePool(),
+ true);
});
}
@@ -610,10 +627,11 @@ public static ExtractedFunctionDetails doJavaChecks(FunctionConfig functionConfi
try {
consumerConfig = OBJECT_MAPPER.readValue(conf, ConsumerConfig.class);
} catch (JsonProcessingException e) {
- throw new IllegalArgumentException(String.format("Topic %s has an incorrect schema Info", topicName));
+ throw new IllegalArgumentException(
+ String.format("Topic %s has an incorrect schema Info", topicName));
}
- ValidatorUtils.validateSchema(consumerConfig.getSchemaType(), typeArgs[0], clsLoader, true);
-
+ ValidatorUtils.validateSchema(consumerConfig.getSchemaType(), typeArgs[0],
+ validatableFunctionPackage.getTypePool(), true);
});
}
@@ -628,13 +646,16 @@ public static ExtractedFunctionDetails doJavaChecks(FunctionConfig functionConfi
"Only one of schemaType or serdeClassName should be set in inputSpec");
}
if (!isEmpty(conf.getSerdeClassName())) {
- ValidatorUtils.validateSerde(conf.getSerdeClassName(), typeArgs[0], clsLoader, true);
+ ValidatorUtils.validateSerde(conf.getSerdeClassName(), typeArgs[0],
+ validatableFunctionPackage.getTypePool(), true);
}
if (!isEmpty(conf.getSchemaType())) {
- ValidatorUtils.validateSchema(conf.getSchemaType(), typeArgs[0], clsLoader, true);
+ ValidatorUtils.validateSchema(conf.getSchemaType(), typeArgs[0],
+ validatableFunctionPackage.getTypePool(), true);
}
if (conf.getCryptoConfig() != null) {
- ValidatorUtils.validateCryptoKeyReader(conf.getCryptoConfig(), clsLoader, false);
+ ValidatorUtils.validateCryptoKeyReader(conf.getCryptoConfig(),
+ validatableFunctionPackage.getTypePool(), false);
}
});
}
@@ -642,8 +663,8 @@ public static ExtractedFunctionDetails doJavaChecks(FunctionConfig functionConfi
if (Void.class.equals(typeArgs[1])) {
return new FunctionConfigUtils.ExtractedFunctionDetails(
functionClassName,
- typeArgs[0].getName(),
- typeArgs[1].getName());
+ typeArgs[0].asErasure().getTypeName(),
+ typeArgs[1].asErasure().getTypeName());
}
// One and only one of outputSchemaType and outputSerdeClassName should be set
@@ -653,20 +674,25 @@ public static ExtractedFunctionDetails doJavaChecks(FunctionConfig functionConfi
}
if (!isEmpty(functionConfig.getOutputSchemaType())) {
- ValidatorUtils.validateSchema(functionConfig.getOutputSchemaType(), typeArgs[1], clsLoader, false);
+ ValidatorUtils.validateSchema(functionConfig.getOutputSchemaType(), typeArgs[1],
+ validatableFunctionPackage.getTypePool(), false);
}
if (!isEmpty(functionConfig.getOutputSerdeClassName())) {
- ValidatorUtils.validateSerde(functionConfig.getOutputSerdeClassName(), typeArgs[1], clsLoader, false);
+ ValidatorUtils.validateSerde(functionConfig.getOutputSerdeClassName(), typeArgs[1],
+ validatableFunctionPackage.getTypePool(), false);
}
- if (functionConfig.getProducerConfig() != null && functionConfig.getProducerConfig().getCryptoConfig() != null) {
- ValidatorUtils.validateCryptoKeyReader(functionConfig.getProducerConfig().getCryptoConfig(), clsLoader, true);
+ if (functionConfig.getProducerConfig() != null
+ && functionConfig.getProducerConfig().getCryptoConfig() != null) {
+ ValidatorUtils
+ .validateCryptoKeyReader(functionConfig.getProducerConfig().getCryptoConfig(),
+ validatableFunctionPackage.getTypePool(), true);
}
return new FunctionConfigUtils.ExtractedFunctionDetails(
functionClassName,
- typeArgs[0].getName(),
- typeArgs[1].getName());
+ typeArgs[0].asErasure().getTypeName(),
+ typeArgs[1].asErasure().getTypeName());
}
private static void doPythonChecks(FunctionConfig functionConfig) {
@@ -705,15 +731,17 @@ private static void doGolangChecks(FunctionConfig functionConfig) {
}
}
- private static void verifyNoTopicClash(Collection inputTopics, String outputTopic) throws IllegalArgumentException {
+ private static void verifyNoTopicClash(Collection inputTopics, String outputTopic)
+ throws IllegalArgumentException {
if (inputTopics.contains(outputTopic)) {
throw new IllegalArgumentException(
- String.format("Output topic %s is also being used as an input topic (topics must be one or the other)",
+ String.format(
+ "Output topic %s is also being used as an input topic (topics must be one or the other)",
outputTopic));
}
}
- private static void doCommonChecks(FunctionConfig functionConfig) {
+ public static void doCommonChecks(FunctionConfig functionConfig) {
if (isEmpty(functionConfig.getTenant())) {
throw new IllegalArgumentException("Function tenant cannot be null");
}
@@ -742,19 +770,22 @@ private static void doCommonChecks(FunctionConfig functionConfig) {
if (!isEmpty(functionConfig.getOutput())) {
if (!TopicName.isValid(functionConfig.getOutput())) {
- throw new IllegalArgumentException(String.format("Output topic %s is invalid", functionConfig.getOutput()));
+ throw new IllegalArgumentException(
+ String.format("Output topic %s is invalid", functionConfig.getOutput()));
}
}
if (!isEmpty(functionConfig.getLogTopic())) {
if (!TopicName.isValid(functionConfig.getLogTopic())) {
- throw new IllegalArgumentException(String.format("LogTopic topic %s is invalid", functionConfig.getLogTopic()));
+ throw new IllegalArgumentException(
+ String.format("LogTopic topic %s is invalid", functionConfig.getLogTopic()));
}
}
if (!isEmpty(functionConfig.getDeadLetterTopic())) {
if (!TopicName.isValid(functionConfig.getDeadLetterTopic())) {
- throw new IllegalArgumentException(String.format("DeadLetter topic %s is invalid", functionConfig.getDeadLetterTopic()));
+ throw new IllegalArgumentException(
+ String.format("DeadLetter topic %s is invalid", functionConfig.getDeadLetterTopic()));
}
}
@@ -793,27 +824,31 @@ private static void doCommonChecks(FunctionConfig functionConfig) {
&& functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
throw new IllegalArgumentException("MaxMessageRetries and Effectively once don't gel well");
}
- if ((functionConfig.getMaxMessageRetries() == null || functionConfig.getMaxMessageRetries() < 0) && !org.apache.commons.lang3.StringUtils.isEmpty(functionConfig.getDeadLetterTopic())) {
+ if ((functionConfig.getMaxMessageRetries() == null || functionConfig.getMaxMessageRetries() < 0)
+ && !org.apache.commons.lang3.StringUtils.isEmpty(functionConfig.getDeadLetterTopic())) {
throw new IllegalArgumentException("Dead Letter Topic specified, however max retries is set to infinity");
}
if (functionConfig.getRetainKeyOrdering() != null
&& functionConfig.getRetainKeyOrdering()
&& functionConfig.getProcessingGuarantees() != null
&& functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
- throw new IllegalArgumentException("When effectively once processing guarantee is specified, retain Key ordering cannot be set");
+ throw new IllegalArgumentException(
+ "When effectively once processing guarantee is specified, retain Key ordering cannot be set");
}
if (functionConfig.getRetainKeyOrdering() != null && functionConfig.getRetainKeyOrdering()
&& functionConfig.getRetainOrdering() != null && functionConfig.getRetainOrdering()) {
throw new IllegalArgumentException("Only one of retain ordering or retain key ordering can be set");
}
- if (!isEmpty(functionConfig.getPy()) && !org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(functionConfig.getPy())
+ if (!isEmpty(functionConfig.getPy()) && !org.apache.pulsar.common.functions.Utils
+ .isFunctionPackageUrlSupported(functionConfig.getPy())
&& functionConfig.getPy().startsWith(BUILTIN)) {
if (!new File(functionConfig.getPy()).exists()) {
throw new IllegalArgumentException("The supplied python file does not exist");
}
}
- if (!isEmpty(functionConfig.getGo()) && !org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(functionConfig.getGo())
+ if (!isEmpty(functionConfig.getGo()) && !org.apache.pulsar.common.functions.Utils
+ .isFunctionPackageUrlSupported(functionConfig.getGo())
&& functionConfig.getGo().startsWith(BUILTIN)) {
if (!new File(functionConfig.getGo()).exists()) {
throw new IllegalArgumentException("The supplied go file does not exist");
@@ -835,7 +870,8 @@ private static void doCommonChecks(FunctionConfig functionConfig) {
});
}
- if (functionConfig.getProducerConfig() != null && functionConfig.getProducerConfig().getCryptoConfig() != null) {
+ if (functionConfig.getProducerConfig() != null
+ && functionConfig.getProducerConfig().getCryptoConfig() != null) {
if (isBlank(functionConfig.getProducerConfig().getCryptoConfig().getCryptoKeyReaderClassName())) {
throw new IllegalArgumentException("CryptoKeyReader class name required");
}
@@ -847,7 +883,7 @@ private static void doCommonChecks(FunctionConfig functionConfig) {
}
}
- private static Collection collectAllInputTopics(FunctionConfig functionConfig) {
+ public static Collection collectAllInputTopics(FunctionConfig functionConfig) {
List retval = new LinkedList<>();
if (functionConfig.getInputs() != null) {
retval.addAll(functionConfig.getInputs());
@@ -867,47 +903,21 @@ private static Collection collectAllInputTopics(FunctionConfig functionC
return retval;
}
- public static ClassLoader validate(FunctionConfig functionConfig, File functionPackageFile) {
+ public static void validateNonJavaFunction(FunctionConfig functionConfig) {
doCommonChecks(functionConfig);
- if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
- ClassLoader classLoader;
- if (functionPackageFile != null) {
- try {
- classLoader = loadJar(functionPackageFile);
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException("Corrupted Jar File", e);
- }
- } else if (!isEmpty(functionConfig.getJar())) {
- File jarFile = new File(functionConfig.getJar());
- if (!jarFile.exists()) {
- throw new IllegalArgumentException("Jar file does not exist");
- }
- try {
- classLoader = loadJar(jarFile);
- } catch (Exception e) {
- throw new IllegalArgumentException("Corrupted Jar File", e);
- }
- } else {
- throw new IllegalArgumentException("Function Package is not provided");
- }
-
- doJavaChecks(functionConfig, classLoader);
- return classLoader;
- } else if (functionConfig.getRuntime() == FunctionConfig.Runtime.GO) {
+ if (functionConfig.getRuntime() == FunctionConfig.Runtime.GO) {
doGolangChecks(functionConfig);
- return null;
- } else if (functionConfig.getRuntime() == FunctionConfig.Runtime.PYTHON){
+ } else if (functionConfig.getRuntime() == FunctionConfig.Runtime.PYTHON) {
doPythonChecks(functionConfig);
- return null;
} else {
throw new IllegalArgumentException("Function language runtime is either not set or cannot be determined");
}
}
public static ExtractedFunctionDetails validateJavaFunction(FunctionConfig functionConfig,
- ClassLoader classLoader) {
+ ValidatableFunctionPackage validatableFunctionPackage) {
doCommonChecks(functionConfig);
- return doJavaChecks(functionConfig, classLoader);
+ return doJavaChecks(functionConfig, validatableFunctionPackage);
}
public static FunctionConfig validateUpdate(FunctionConfig existingConfig, FunctionConfig newConfig) {
@@ -973,27 +983,33 @@ public static FunctionConfig validateUpdate(FunctionConfig existingConfig, Funct
throw new IllegalArgumentException("Input Topics cannot be altered");
}
if (consumerConfig.isRegexPattern() != existingConfig.getInputSpecs().get(topicName).isRegexPattern()) {
- throw new IllegalArgumentException("isRegexPattern for input topic " + topicName + " cannot be altered");
+ throw new IllegalArgumentException(
+ "isRegexPattern for input topic " + topicName + " cannot be altered");
}
mergedConfig.getInputSpecs().put(topicName, consumerConfig);
});
}
- if (!StringUtils.isEmpty(newConfig.getOutputSerdeClassName()) && !newConfig.getOutputSerdeClassName().equals(existingConfig.getOutputSerdeClassName())) {
+ if (!StringUtils.isEmpty(newConfig.getOutputSerdeClassName()) && !newConfig.getOutputSerdeClassName()
+ .equals(existingConfig.getOutputSerdeClassName())) {
throw new IllegalArgumentException("Output Serde mismatch");
}
- if (!StringUtils.isEmpty(newConfig.getOutputSchemaType()) && !newConfig.getOutputSchemaType().equals(existingConfig.getOutputSchemaType())) {
+ if (!StringUtils.isEmpty(newConfig.getOutputSchemaType()) && !newConfig.getOutputSchemaType()
+ .equals(existingConfig.getOutputSchemaType())) {
throw new IllegalArgumentException("Output Schema mismatch");
}
if (!StringUtils.isEmpty(newConfig.getLogTopic())) {
mergedConfig.setLogTopic(newConfig.getLogTopic());
}
- if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees().equals(existingConfig.getProcessingGuarantees())) {
+ if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees()
+ .equals(existingConfig.getProcessingGuarantees())) {
throw new IllegalArgumentException("Processing Guarantees cannot be altered");
}
- if (newConfig.getRetainOrdering() != null && !newConfig.getRetainOrdering().equals(existingConfig.getRetainOrdering())) {
+ if (newConfig.getRetainOrdering() != null && !newConfig.getRetainOrdering()
+ .equals(existingConfig.getRetainOrdering())) {
throw new IllegalArgumentException("Retain Ordering cannot be altered");
}
- if (newConfig.getRetainKeyOrdering() != null && !newConfig.getRetainKeyOrdering().equals(existingConfig.getRetainKeyOrdering())) {
+ if (newConfig.getRetainKeyOrdering() != null && !newConfig.getRetainKeyOrdering()
+ .equals(existingConfig.getRetainKeyOrdering())) {
throw new IllegalArgumentException("Retain Key Ordering cannot be altered");
}
if (!StringUtils.isEmpty(newConfig.getOutput())) {
@@ -1017,14 +1033,16 @@ public static FunctionConfig validateUpdate(FunctionConfig existingConfig, Funct
if (!StringUtils.isEmpty(newConfig.getDeadLetterTopic())) {
mergedConfig.setDeadLetterTopic(newConfig.getDeadLetterTopic());
}
- if (!StringUtils.isEmpty(newConfig.getSubName()) && !newConfig.getSubName().equals(existingConfig.getSubName())) {
+ if (!StringUtils.isEmpty(newConfig.getSubName()) && !newConfig.getSubName()
+ .equals(existingConfig.getSubName())) {
throw new IllegalArgumentException("Subscription Name cannot be altered");
}
if (newConfig.getParallelism() != null) {
mergedConfig.setParallelism(newConfig.getParallelism());
}
if (newConfig.getResources() != null) {
- mergedConfig.setResources(ResourceConfigUtils.merge(existingConfig.getResources(), newConfig.getResources()));
+ mergedConfig
+ .setResources(ResourceConfigUtils.merge(existingConfig.getResources(), newConfig.getResources()));
}
if (newConfig.getWindowConfig() != null) {
mergedConfig.setWindowConfig(newConfig.getWindowConfig());
@@ -1041,6 +1059,9 @@ public static FunctionConfig validateUpdate(FunctionConfig existingConfig, Funct
if (!StringUtils.isEmpty(newConfig.getCustomRuntimeOptions())) {
mergedConfig.setCustomRuntimeOptions(newConfig.getCustomRuntimeOptions());
}
+ if (newConfig.getProducerConfig() != null) {
+ mergedConfig.setProducerConfig(newConfig.getProducerConfig());
+ }
return mergedConfig;
}
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionFilePackage.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionFilePackage.java
new file mode 100644
index 0000000000000..80b0bc05b3a1f
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionFilePackage.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.MalformedURLException;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.List;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.dynamic.ClassFileLocator;
+import net.bytebuddy.pool.TypePool;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.nar.NarClassLoaderBuilder;
+import org.apache.pulsar.functions.utils.functions.FunctionUtils;
+import org.zeroturnaround.zip.ZipUtil;
+
+/**
+ * FunctionFilePackage is a class that represents a function package and
+ * implements the ValidatableFunctionPackage interface which decouples the
+ * function package from classloading.
+ */
+public class FunctionFilePackage implements AutoCloseable, ValidatableFunctionPackage {
+ private final File file;
+ private final ClassFileLocator.Compound classFileLocator;
+ private final TypePool typePool;
+ private final boolean isNar;
+ private final String narExtractionDirectory;
+ private final boolean enableClassloading;
+
+ private ClassLoader classLoader;
+
+ private final Object configMetadata;
+
+ public FunctionFilePackage(File file, String narExtractionDirectory, boolean enableClassloading,
+ Class> configClass) {
+ this.file = file;
+ boolean nonZeroFile = file.isFile() && file.length() > 0;
+ this.isNar = nonZeroFile ? ZipUtil.containsAnyEntry(file,
+ new String[] {"META-INF/services/pulsar-io.yaml", "META-INF/bundled-dependencies"}) : false;
+ this.narExtractionDirectory = narExtractionDirectory;
+ this.enableClassloading = enableClassloading;
+ if (isNar) {
+ List classpathFromArchive = null;
+ try {
+ classpathFromArchive = NarClassLoader.getClasspathFromArchive(file, narExtractionDirectory);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ List classFileLocators = new ArrayList<>();
+ classFileLocators.add(ClassFileLocator.ForClassLoader.ofSystemLoader());
+ for (File classpath : classpathFromArchive) {
+ if (classpath.exists()) {
+ try {
+ ClassFileLocator locator;
+ if (classpath.isDirectory()) {
+ locator = new ClassFileLocator.ForFolder(classpath);
+ } else {
+ locator = ClassFileLocator.ForJarFile.of(classpath);
+ }
+ classFileLocators.add(locator);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ }
+ this.classFileLocator = new ClassFileLocator.Compound(classFileLocators);
+ this.typePool = TypePool.Default.of(classFileLocator);
+ try {
+ this.configMetadata = FunctionUtils.getPulsarIOServiceConfig(file, configClass);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ } else {
+ try {
+ this.classFileLocator = nonZeroFile
+ ? new ClassFileLocator.Compound(ClassFileLocator.ForClassLoader.ofSystemLoader(),
+ ClassFileLocator.ForJarFile.of(file)) :
+ new ClassFileLocator.Compound(ClassFileLocator.ForClassLoader.ofSystemLoader());
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ this.typePool =
+ TypePool.Default.of(classFileLocator);
+ this.configMetadata = null;
+ }
+ }
+
+ public TypeDescription resolveType(String className) {
+ return typePool.describe(className).resolve();
+ }
+
+ public boolean isNar() {
+ return isNar;
+ }
+
+ public File getFile() {
+ return file;
+ }
+
+ public TypePool getTypePool() {
+ return typePool;
+ }
+
+ @Override
+ public T getFunctionMetaData(Class clazz) {
+ return configMetadata != null ? clazz.cast(configMetadata) : null;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ classFileLocator.close();
+ if (classLoader instanceof Closeable) {
+ ((Closeable) classLoader).close();
+ }
+ }
+
+ public boolean isEnableClassloading() {
+ return enableClassloading;
+ }
+
+ public synchronized ClassLoader getClassLoader() {
+ if (classLoader == null) {
+ classLoader = createClassLoader();
+ }
+ return classLoader;
+ }
+
+ private ClassLoader createClassLoader() {
+ if (enableClassloading) {
+ if (isNar) {
+ try {
+ return NarClassLoaderBuilder.builder()
+ .narFile(file)
+ .extractionDirectory(narExtractionDirectory)
+ .build();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ } else {
+ try {
+ return new URLClassLoader(new java.net.URL[] {file.toURI().toURL()},
+ NarClassLoader.class.getClassLoader());
+ } catch (MalformedURLException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ } else {
+ throw new IllegalStateException("Classloading is not enabled");
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "FunctionFilePackage{"
+ + "file=" + file
+ + ", isNar=" + isNar
+ + '}';
+ }
+}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionRuntimeCommon.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionRuntimeCommon.java
new file mode 100644
index 0000000000000..948b1f1905a00
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionRuntimeCommon.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils;
+
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import java.io.File;
+import java.io.IOException;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.nar.NarClassLoaderBuilder;
+import org.apache.pulsar.common.util.ClassLoaderUtils;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.utils.functions.FunctionUtils;
+import org.apache.pulsar.functions.utils.io.ConnectorUtils;
+
+public class FunctionRuntimeCommon {
+ public static NarClassLoader extractNarClassLoader(File packageFile,
+ String narExtractionDirectory) {
+ if (packageFile != null) {
+ try {
+ return NarClassLoaderBuilder.builder()
+ .narFile(packageFile)
+ .extractionDirectory(narExtractionDirectory)
+ .build();
+ } catch (IOException e) {
+ throw new IllegalArgumentException(e.getMessage());
+ }
+ }
+ return null;
+ }
+
+ public static ClassLoader getClassLoaderFromPackage(
+ Function.FunctionDetails.ComponentType componentType,
+ String className,
+ File packageFile,
+ String narExtractionDirectory) {
+ String connectorClassName = className;
+ ClassLoader jarClassLoader = null;
+ boolean keepJarClassLoader = false;
+ NarClassLoader narClassLoader = null;
+ boolean keepNarClassLoader = false;
+
+ Exception jarClassLoaderException = null;
+ Exception narClassLoaderException = null;
+
+ try {
+ try {
+ jarClassLoader = ClassLoaderUtils.extractClassLoader(packageFile);
+ } catch (Exception e) {
+ jarClassLoaderException = e;
+ }
+ try {
+ narClassLoader = extractNarClassLoader(packageFile, narExtractionDirectory);
+ } catch (Exception e) {
+ narClassLoaderException = e;
+ }
+
+ // if connector class name is not provided, we can only try to load archive as a NAR
+ if (isEmpty(connectorClassName)) {
+ if (narClassLoader == null) {
+ throw new IllegalArgumentException(String.format("%s package does not have the correct format. "
+ + "Pulsar cannot determine if the package is a NAR package or JAR package. "
+ + "%s classname is not provided and attempts to load it as a NAR package produced "
+ + "the following error.",
+ FunctionCommon.capFirstLetter(componentType), FunctionCommon.capFirstLetter(componentType)),
+ narClassLoaderException);
+ }
+ try {
+ if (componentType == Function.FunctionDetails.ComponentType.FUNCTION) {
+ connectorClassName = FunctionUtils.getFunctionClass(narClassLoader);
+ } else if (componentType == Function.FunctionDetails.ComponentType.SOURCE) {
+ connectorClassName = ConnectorUtils.getIOSourceClass(narClassLoader);
+ } else {
+ connectorClassName = ConnectorUtils.getIOSinkClass(narClassLoader);
+ }
+ } catch (IOException e) {
+ throw new IllegalArgumentException(String.format("Failed to extract %s class from archive",
+ componentType.toString().toLowerCase()), e);
+ }
+
+ try {
+ narClassLoader.loadClass(connectorClassName);
+ keepNarClassLoader = true;
+ return narClassLoader;
+ } catch (ClassNotFoundException | NoClassDefFoundError e) {
+ throw new IllegalArgumentException(String.format("%s class %s must be in class path",
+ FunctionCommon.capFirstLetter(componentType), connectorClassName), e);
+ }
+
+ } else {
+ // if connector class name is provided, we need to try to load it as a JAR and as a NAR.
+ if (jarClassLoader != null) {
+ try {
+ jarClassLoader.loadClass(connectorClassName);
+ keepJarClassLoader = true;
+ return jarClassLoader;
+ } catch (ClassNotFoundException | NoClassDefFoundError e) {
+ // class not found in JAR try loading as a NAR and searching for the class
+ if (narClassLoader != null) {
+
+ try {
+ narClassLoader.loadClass(connectorClassName);
+ keepNarClassLoader = true;
+ return narClassLoader;
+ } catch (ClassNotFoundException | NoClassDefFoundError e1) {
+ throw new IllegalArgumentException(
+ String.format("%s class %s must be in class path",
+ FunctionCommon.capFirstLetter(componentType), connectorClassName), e1);
+ }
+ } else {
+ throw new IllegalArgumentException(String.format("%s class %s must be in class path",
+ FunctionCommon.capFirstLetter(componentType), connectorClassName), e);
+ }
+ }
+ } else if (narClassLoader != null) {
+ try {
+ narClassLoader.loadClass(connectorClassName);
+ keepNarClassLoader = true;
+ return narClassLoader;
+ } catch (ClassNotFoundException | NoClassDefFoundError e1) {
+ throw new IllegalArgumentException(
+ String.format("%s class %s must be in class path",
+ FunctionCommon.capFirstLetter(componentType), connectorClassName), e1);
+ }
+ } else {
+ StringBuilder errorMsg = new StringBuilder(FunctionCommon.capFirstLetter(componentType)
+ + " package does not have the correct format."
+ + " Pulsar cannot determine if the package is a NAR package or JAR package.");
+
+ if (jarClassLoaderException != null) {
+ errorMsg.append(
+ " Attempts to load it as a JAR package produced error: " + jarClassLoaderException
+ .getMessage());
+ }
+
+ if (narClassLoaderException != null) {
+ errorMsg.append(
+ " Attempts to load it as a NAR package produced error: " + narClassLoaderException
+ .getMessage());
+ }
+
+ throw new IllegalArgumentException(errorMsg.toString());
+ }
+ }
+ } finally {
+ if (!keepJarClassLoader) {
+ ClassLoaderUtils.closeClassLoader(jarClassLoader);
+ }
+ if (!keepNarClassLoader) {
+ ClassLoaderUtils.closeClassLoader(narClassLoader);
+ }
+ }
+ }
+
+}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/LoadedFunctionPackage.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/LoadedFunctionPackage.java
new file mode 100644
index 0000000000000..3f885a7cd2a3d
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/LoadedFunctionPackage.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.dynamic.ClassFileLocator;
+import net.bytebuddy.pool.TypePool;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.functions.utils.functions.FunctionUtils;
+
+/**
+ * LoadedFunctionPackage is a class that represents a function package and
+ * implements the ValidatableFunctionPackage interface which decouples the
+ * function package from classloading. This implementation is backed by
+ * a ClassLoader, and it is used when the function package is already loaded
+ * by a ClassLoader. This is the case in the LocalRunner and in some of
+ * the unit tests.
+ */
+public class LoadedFunctionPackage implements ValidatableFunctionPackage {
+ private final ClassLoader classLoader;
+ private final Object configMetadata;
+ private final TypePool typePool;
+
+ public LoadedFunctionPackage(ClassLoader classLoader, Class configMetadataClass, T configMetadata) {
+ this.classLoader = classLoader;
+ this.configMetadata = configMetadata;
+ typePool = TypePool.Default.of(
+ ClassFileLocator.ForClassLoader.of(classLoader));
+ }
+
+ public LoadedFunctionPackage(ClassLoader classLoader, Class> configMetadataClass) {
+ this.classLoader = classLoader;
+ if (classLoader instanceof NarClassLoader) {
+ try {
+ configMetadata = FunctionUtils.getPulsarIOServiceConfig((NarClassLoader) classLoader,
+ configMetadataClass);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ } else {
+ configMetadata = null;
+ }
+ typePool = TypePool.Default.of(
+ ClassFileLocator.ForClassLoader.of(classLoader));
+ }
+
+ @Override
+ public TypeDescription resolveType(String className) {
+ return typePool.describe(className).resolve();
+ }
+
+ @Override
+ public TypePool getTypePool() {
+ return typePool;
+ }
+
+ @Override
+ public T getFunctionMetaData(Class clazz) {
+ return configMetadata != null ? clazz.cast(configMetadata) : null;
+ }
+
+ @Override
+ public boolean isEnableClassloading() {
+ return true;
+ }
+
+ @Override
+ public ClassLoader getClassLoader() {
+ return classLoader;
+ }
+}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index e943cfabc033c..7f961cc363e0a 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -19,14 +19,29 @@
package org.apache.pulsar.functions.utils;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.functions.utils.FunctionCommon.convertFromFunctionDetailsSubscriptionPosition;
+import static org.apache.pulsar.functions.utils.FunctionCommon.convertProcessingGuarantee;
+import static org.apache.pulsar.functions.utils.FunctionCommon.getSinkType;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
+import net.bytebuddy.description.type.TypeDefinition;
+import net.bytebuddy.pool.TypePool;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.functions.ConsumerConfig;
@@ -35,7 +50,6 @@
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.config.validation.ConfigValidation;
import org.apache.pulsar.functions.api.Record;
@@ -43,24 +57,6 @@
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
-import org.apache.pulsar.functions.utils.io.ConnectorUtils;
-
-import java.io.IOException;
-import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.commons.lang3.StringUtils.isEmpty;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import static org.apache.pulsar.functions.utils.FunctionCommon.convertFromFunctionDetailsSubscriptionPosition;
-import static org.apache.pulsar.functions.utils.FunctionCommon.convertProcessingGuarantee;
-import static org.apache.pulsar.functions.utils.FunctionCommon.getFunctionTypes;
-import static org.apache.pulsar.functions.utils.FunctionCommon.getRawFunctionTypes;
-import static org.apache.pulsar.functions.utils.FunctionCommon.getSinkType;
@Slf4j
public class SinkConfigUtils {
@@ -77,7 +73,9 @@ public static class ExtractedSinkDetails {
public static FunctionDetails convert(SinkConfig sinkConfig, ExtractedSinkDetails sinkDetails) throws IOException {
FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
- boolean isBuiltin = !org.apache.commons.lang3.StringUtils.isEmpty(sinkConfig.getArchive()) && sinkConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN);
+ boolean isBuiltin =
+ !org.apache.commons.lang3.StringUtils.isEmpty(sinkConfig.getArchive()) && sinkConfig.getArchive()
+ .startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN);
if (sinkConfig.getTenant() != null) {
functionDetailsBuilder.setTenant(sinkConfig.getTenant());
@@ -192,6 +190,7 @@ public static FunctionDetails convert(SinkConfig sinkConfig, ExtractedSinkDetail
} else {
functionDetailsBuilder.setAutoAck(true);
}
+
if (sinkConfig.getTimeoutMs() != null) {
sourceSpecBuilder.setTimeoutMs(sinkConfig.getTimeoutMs());
}
@@ -204,12 +203,6 @@ public static FunctionDetails convert(SinkConfig sinkConfig, ExtractedSinkDetail
sourceSpecBuilder.setNegativeAckRedeliveryDelayMs(sinkConfig.getNegativeAckRedeliveryDelayMs());
}
- if (sinkConfig.getCleanupSubscription() != null) {
- sourceSpecBuilder.setCleanupSubscription(sinkConfig.getCleanupSubscription());
- } else {
- sourceSpecBuilder.setCleanupSubscription(true);
- }
-
if (sinkConfig.getSourceSubscriptionPosition() == SubscriptionInitialPosition.Earliest) {
sourceSpecBuilder.setSubscriptionPosition(Function.SubscriptionPosition.EARLIEST);
} else {
@@ -218,6 +211,13 @@ public static FunctionDetails convert(SinkConfig sinkConfig, ExtractedSinkDetail
functionDetailsBuilder.setSource(sourceSpecBuilder);
+ if (sinkConfig.getRetainKeyOrdering() != null) {
+ functionDetailsBuilder.setRetainKeyOrdering(sinkConfig.getRetainKeyOrdering());
+ }
+ if (sinkConfig.getRetainOrdering() != null) {
+ functionDetailsBuilder.setRetainOrdering(sinkConfig.getRetainOrdering());
+ }
+
if (sinkConfig.getMaxMessageRetries() != null && sinkConfig.getMaxMessageRetries() > 0) {
Function.RetryDetails.Builder retryDetails = Function.RetryDetails.newBuilder();
retryDetails.setMaxMessageRetries(sinkConfig.getMaxMessageRetries());
@@ -273,7 +273,7 @@ public static FunctionDetails convert(SinkConfig sinkConfig, ExtractedSinkDetail
functionDetailsBuilder.setCustomRuntimeOptions(sinkConfig.getCustomRuntimeOptions());
}
- return functionDetailsBuilder.build();
+ return FunctionConfigUtils.validateFunctionDetails(functionDetailsBuilder.build());
}
public static SinkConfig convertFromDetails(FunctionDetails functionDetails) {
@@ -282,10 +282,12 @@ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) {
sinkConfig.setNamespace(functionDetails.getNamespace());
sinkConfig.setName(functionDetails.getName());
sinkConfig.setParallelism(functionDetails.getParallelism());
- sinkConfig.setProcessingGuarantees(FunctionCommon.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
+ sinkConfig.setProcessingGuarantees(
+ FunctionCommon.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
Map consumerConfigMap = new HashMap<>();
List inputs = new ArrayList<>();
- for (Map.Entry input : functionDetails.getSource().getInputSpecsMap().entrySet()) {
+ for (Map.Entry input : functionDetails.getSource().getInputSpecsMap()
+ .entrySet()) {
ConsumerConfig consumerConfig = new ConsumerConfig();
if (!isEmpty(input.getValue().getSerdeClassName())) {
consumerConfig.setSerdeClassName(input.getValue().getSerdeClassName());
@@ -331,7 +333,6 @@ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) {
// Set subscription position
sinkConfig.setSourceSubscriptionPosition(
convertFromFunctionDetailsSubscriptionPosition(functionDetails.getSource().getSubscriptionPosition()));
- sinkConfig.setCleanupSubscription(functionDetails.getSource().getCleanupSubscription());
if (functionDetails.getSource().getTimeoutMs() != 0) {
sinkConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs());
@@ -346,19 +347,23 @@ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) {
sinkConfig.setArchive("builtin://" + functionDetails.getSink().getBuiltin());
}
if (!org.apache.commons.lang3.StringUtils.isEmpty(functionDetails.getSink().getConfigs())) {
- TypeReference> typeRef
- = new TypeReference>() {};
+ TypeReference> typeRef =
+ new TypeReference>() {
+ };
Map configMap;
try {
- configMap = ObjectMapperFactory.getThreadLocal().readValue(functionDetails.getSink().getConfigs(), typeRef);
+ configMap =
+ ObjectMapperFactory.getThreadLocal().readValue(functionDetails.getSink().getConfigs(), typeRef);
} catch (IOException e) {
- log.error("Failed to read configs for sink {}", FunctionCommon.getFullyQualifiedName(functionDetails), e);
+ log.error("Failed to read configs for sink {}", FunctionCommon.getFullyQualifiedName(functionDetails),
+ e);
throw new RuntimeException(e);
}
sinkConfig.setConfigs(configMap);
}
if (!isEmpty(functionDetails.getSecretsMap())) {
- Type type = new TypeToken