From 00dcf52ec5c7c70ff7677ae499c8511582852e0b Mon Sep 17 00:00:00 2001 From: Mukesh Kumar <65598381+mukesh154@users.noreply.github.com> Date: Mon, 13 Jan 2025 11:33:01 +0000 Subject: [PATCH 1/3] Added logLevel for Function/Source/Sink --- .../common/functions/FunctionConfig.java | 1 + .../apache/pulsar/common/io/SinkConfig.java | 1 + .../apache/pulsar/common/io/SourceConfig.java | 1 + .../apache/pulsar/admin/cli/CmdFunctions.java | 7 +++++++ .../org/apache/pulsar/admin/cli/CmdSinks.java | 5 +++++ .../apache/pulsar/admin/cli/CmdSources.java | 5 +++++ .../proto/src/main/proto/Function.proto | 1 + .../main/resources/java_instance_log4j2.xml | 2 +- .../resources/kubernetes_instance_log4j2.xml | 2 +- .../functions/runtime/RuntimeUtils.java | 5 ++++- .../runtime/kubernetes/KubernetesRuntime.java | 6 ++++++ .../runtime/process/ProcessRuntime.java | 6 ++++++ .../functions/runtime/RuntimeUtilsTest.java | 1 + .../kubernetes/KubernetesRuntimeTest.java | 1 + .../runtime/process/ProcessRuntimeTest.java | 1 + .../functions/utils/FunctionConfigUtils.java | 18 ++++++++++++++++++ .../functions/utils/SinkConfigUtils.java | 19 +++++++++++++++++++ .../functions/utils/SourceConfigUtils.java | 19 +++++++++++++++++++ .../utils/FunctionConfigUtilsTest.java | 3 +++ 19 files changed, 101 insertions(+), 3 deletions(-) diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java index e304f25d5d373..6a3c7123b6a02 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java @@ -88,6 +88,7 @@ public enum Runtime { private String outputSerdeClassName; private String logTopic; + private String logLevel; private ProcessingGuarantees processingGuarantees; // Do we want function instances to process data in the same order as in the input topics // This essentially means that every partition of input topic is consumed by only one instance diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SinkConfig.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SinkConfig.java index 57e67c0bcee0d..a1ad7edb80387 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SinkConfig.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SinkConfig.java @@ -95,4 +95,5 @@ public class SinkConfig { private String transformFunctionClassName; private String transformFunctionConfig; private String logTopic; + private String logLevel; } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SourceConfig.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SourceConfig.java index 1991957045752..ebfb28c2583c6 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SourceConfig.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SourceConfig.java @@ -73,4 +73,5 @@ public class SourceConfig { // batchBuilder provides two types of batch construction methods, DEFAULT and KEY_BASED private String batchBuilder; private String logTopic; + private String logLevel; } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 4c7e058af6de1..2e784f8cbaea7 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -228,6 +228,10 @@ abstract class FunctionDetailsCommand extends BaseCommand { + " #Java, Python, Go") protected String logTopic; + @Option(names = "--logLevel", description = "Log level at which the logs of a Pulsar Function are produced" + + " #Java") + protected String logLevel; + @Option(names = {"-st", "--schema-type"}, description = "The builtin schema type or " + "custom schema class name to be used for messages output by the function #Java") protected String schemaType = ""; @@ -506,6 +510,9 @@ void processArguments() throws Exception { if (null != logTopic) { functionConfig.setLogTopic(logTopic); } + if (null != logLevel) { + functionConfig.setLogLevel(logLevel); + } if (null != className) { functionConfig.setClassName(className); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index a4fb047550dcb..787455cdd78fe 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -410,6 +410,8 @@ abstract class SinkDetailsCommand extends BaseCommand { protected String transformFunctionConfig; @Option(names = "--log-topic", description = "The topic to which the logs of a Pulsar Sink are produced") protected String logTopic; + @Option(names = "--logLevel", description = "Log level at which the logs of a Pulsar Sink are produced") + protected String logLevel; @Option(names = "--runtime-flags", description = "Any flags that you want to pass to a runtime" + " (for process & Kubernetes runtime only).") protected String runtimeFlags; @@ -611,6 +613,9 @@ void processArguments() throws Exception { if (null != logTopic) { sinkConfig.setLogTopic(logTopic); } + if (null != logLevel) { + sinkConfig.setLogLevel(logLevel); + } if (null != runtimeFlags) { sinkConfig.setRuntimeFlags(runtimeFlags); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java index c8af7ddd954b1..4e647fd302104 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java @@ -364,6 +364,8 @@ abstract class SourceDetailsCommand extends BaseCommand { protected String secretsString; @Option(names = "--log-topic", description = "The topic to which the logs of a Pulsar Sink are produced") protected String logTopic; + @Option(names = "--logLevel", description = "Log level at which the logs of a Pulsar Source are produced") + protected String logLevel; @Option(names = "--runtime-flags", description = "Any flags that you want to pass to a runtime" + " (for process & Kubernetes runtime only).") protected String runtimeFlags; @@ -505,6 +507,9 @@ void processArguments() throws Exception { if (null != logTopic) { sourceConfig.setLogTopic(logTopic); } + if (null != logLevel) { + sourceConfig.setLogLevel(logLevel); + } if (null != runtimeFlags) { sourceConfig.setRuntimeFlags(runtimeFlags); } diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto index de3f03a39008c..7985bfbbb6f29 100644 --- a/pulsar-functions/proto/src/main/proto/Function.proto +++ b/pulsar-functions/proto/src/main/proto/Function.proto @@ -98,6 +98,7 @@ message FunctionDetails { bool retainOrdering = 21; bool retainKeyOrdering = 22; SubscriptionPosition subscriptionPosition = 23; + string logLevel = 24; } message ConsumerSpec { diff --git a/pulsar-functions/runtime-all/src/main/resources/java_instance_log4j2.xml b/pulsar-functions/runtime-all/src/main/resources/java_instance_log4j2.xml index 190d9be92940b..bb78c80ea9374 100644 --- a/pulsar-functions/runtime-all/src/main/resources/java_instance_log4j2.xml +++ b/pulsar-functions/runtime-all/src/main/resources/java_instance_log4j2.xml @@ -120,7 +120,7 @@ - info + ${sys:pulsar.log.level} ${sys:pulsar.log.appender} ${sys:pulsar.log.level} diff --git a/pulsar-functions/runtime-all/src/main/resources/kubernetes_instance_log4j2.xml b/pulsar-functions/runtime-all/src/main/resources/kubernetes_instance_log4j2.xml index f86d03e41793f..c5c596b6840c0 100644 --- a/pulsar-functions/runtime-all/src/main/resources/kubernetes_instance_log4j2.xml +++ b/pulsar-functions/runtime-all/src/main/resources/kubernetes_instance_log4j2.xml @@ -50,7 +50,7 @@ - info + ${sys:pulsar.log.level} Console ${sys:pulsar.log.level} diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java index 49a5dd40fa271..0d3d57f60289d 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java @@ -78,6 +78,7 @@ public static List composeCmd(InstanceConfig instanceConfig, Integer grpcPort, Long expectedHealthCheckInterval, String logConfigFile, + String logLevel, String secretsProviderClassName, String secretsProviderConfig, Boolean installUserCodeDependencies, @@ -92,7 +93,7 @@ public static List composeCmd(InstanceConfig instanceConfig, cmd.addAll(getCmd(instanceConfig, instanceFile, extraDependenciesDir, logDirectory, originalCodeFileName, originalTransformFunctionFileName, pulsarServiceUrl, stateStorageServiceUrl, authConfig, shardId, grpcPort, expectedHealthCheckInterval, - logConfigFile, secretsProviderClassName, secretsProviderConfig, + logConfigFile, logLevel, secretsProviderClassName, secretsProviderConfig, installUserCodeDependencies, pythonDependencyRepository, pythonExtraDependencyRepository, narExtractionDirectory, functionInstanceClassPath, false, pulsarWebServiceUrl)); @@ -315,6 +316,7 @@ public static List getCmd(InstanceConfig instanceConfig, Integer grpcPort, Long expectedHealthCheckInterval, String logConfigFile, + String logLevel, String secretsProviderClassName, String secretsProviderConfig, Boolean installUserCodeDependencies, @@ -360,6 +362,7 @@ public static List getCmd(InstanceConfig instanceConfig, } args.add(String.format("-D%s=%s", FUNCTIONS_INSTANCE_CLASSPATH, systemFunctionInstanceClasspath)); } + args.add("-Dpulsar.log.level=" + logLevel); args.add("-Dlog4j.configurationFile=" + logConfigFile); args.add("-Dpulsar.function.log.dir=" + genFunctionLogFolder(logDirectory, instanceConfig)); args.add("-Dpulsar.function.log.file=" + String.format( diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java index 7a69b822cbd89..9738833fba785 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java @@ -74,6 +74,7 @@ import lombok.extern.slf4j.Slf4j; import okhttp3.Response; import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider; import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; @@ -236,6 +237,10 @@ public class KubernetesRuntime implements Runtime { case GO: break; } + String logLevel = instanceConfig.getFunctionDetails().getLogLevel(); + if (StringUtils.isBlank(logLevel)) { + logLevel = "info"; + } this.authConfig = authConfig; @@ -275,6 +280,7 @@ public class KubernetesRuntime implements Runtime { grpcPort, -1L, logConfigFile, + logLevel, secretsProviderClassName, secretsProviderConfig, installUserCodeDependencies, diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java index e59fcadc729ed..049b8fe4f09e9 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java @@ -122,6 +122,11 @@ class ProcessRuntime implements Runtime { case GO: break; } + String logLevel = instanceConfig.getFunctionDetails().getLogLevel(); + if (StringUtils.isBlank(logLevel)) { + logLevel = "info"; + } + this.extraDependenciesDir = extraDependenciesDir; this.narExtractionDirectory = narExtractionDirectory; this.processArgs = RuntimeUtils.composeCmd( @@ -142,6 +147,7 @@ class ProcessRuntime implements Runtime { instanceConfig.getPort(), expectedHealthCheckInterval, logConfigFile, + logLevel, secretsProviderClassName, secretsProviderConfig, false, diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java index b19be92e6ba81..4650b3496c9f0 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java @@ -213,6 +213,7 @@ public void getAdditionalJavaRuntimeArguments(boolean k8sRuntime) throws Excepti 23, 1234L, "logConfigFile", + "info", "secretsProviderClassName", "secretsProviderConfig", false, diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java index bf73f0a9d34a2..f3c6905658e90 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java @@ -475,6 +475,7 @@ private void verifyJavaInstance(InstanceConfig config, String depsDir, boolean s String expectedArgs = "exec java -cp " + classpath + extraDepsEnv + " -Dpulsar.functions.instance.classpath=/pulsar/lib/*" + + " -Dpulsar.log.level=info" + " -Dlog4j.configurationFile=kubernetes_instance_log4j2.xml " + "-Dpulsar.function.log.dir=" + logDirectory + "/" + FunctionCommon.getFullyQualifiedName(config.getFunctionDetails()) + " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-$SHARD_ID" diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java index 365704ea0b4ed..91bfb245a5719 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java @@ -324,6 +324,7 @@ private void verifyJavaInstance(InstanceConfig config, Path depsDir, String webS String expectedArgs = "java -cp " + classpath + extraDepsEnv + " -Dpulsar.functions.instance.classpath=/pulsar/lib/*" + + " -Dpulsar.log.level=info" + " -Dlog4j.configurationFile=java_instance_log4j2.xml " + "-Dpulsar.function.log.dir=" + logDirectory + "/functions/" + FunctionCommon.getFullyQualifiedName(config.getFunctionDetails()) + " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-" + config.getInstanceId() diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java index 45fb4c1cb1ee7..e6c846aa6432b 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java @@ -32,6 +32,7 @@ import com.google.gson.reflect.TypeToken; import java.io.File; import java.lang.reflect.Type; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.LinkedList; @@ -70,6 +71,7 @@ public static class ExtractedFunctionDetails { static final Integer MAX_PENDING_ASYNC_REQUESTS_DEFAULT = 1000; static final Boolean FORWARD_SOURCE_MESSAGE_PROPERTY_DEFAULT = Boolean.TRUE; + private static final List VALID_LOG_LEVELS = Arrays.asList("INFO", "DEBUG", "WARN", "ERROR"); private static final ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.create(); @@ -272,6 +274,9 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ExtractedFu if (functionConfig.getLogTopic() != null) { functionDetailsBuilder.setLogTopic(functionConfig.getLogTopic()); } + if (functionConfig.getLogLevel() != null) { + functionDetailsBuilder.setLogLevel(functionConfig.getLogLevel()); + } if (functionConfig.getRuntime() != null) { functionDetailsBuilder.setRuntime(FunctionCommon.convertRuntime(functionConfig.getRuntime())); } @@ -447,6 +452,9 @@ public static FunctionConfig convertFromDetails(FunctionDetails functionDetails) if (!isEmpty(functionDetails.getLogTopic())) { functionConfig.setLogTopic(functionDetails.getLogTopic()); } + if (!isEmpty(functionDetails.getLogLevel())) { + functionConfig.setLogLevel(functionDetails.getLogLevel()); + } if (functionDetails.getSink().getForwardSourceMessageProperty()) { functionConfig.setForwardSourceMessageProperty(functionDetails.getSink().getForwardSourceMessageProperty()); } @@ -806,6 +814,13 @@ public static void doCommonChecks(FunctionConfig functionConfig) { } } + if (!isEmpty(functionConfig.getLogLevel())) { + if (!VALID_LOG_LEVELS.contains(functionConfig.getLogLevel().toUpperCase())) { + throw new IllegalArgumentException( + String.format("LogLevel %s is invalid", functionConfig.getLogLevel())); + } + } + if (functionConfig.getParallelism() != null && functionConfig.getParallelism() <= 0) { throw new IllegalArgumentException("Function parallelism must be a positive number"); } @@ -1027,6 +1042,9 @@ public static FunctionConfig validateUpdate(FunctionConfig existingConfig, Funct if (!StringUtils.isEmpty(newConfig.getLogTopic())) { mergedConfig.setLogTopic(newConfig.getLogTopic()); } + if (!StringUtils.isEmpty(newConfig.getLogLevel())) { + mergedConfig.setLogLevel(newConfig.getLogLevel()); + } if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees() .equals(existingConfig.getProcessingGuarantees())) { throw new IllegalArgumentException("Processing Guarantees cannot be altered"); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java index 65b6b97fc6ee9..be45da2330586 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.lang.reflect.Type; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.LinkedList; @@ -62,6 +63,8 @@ @Slf4j public class SinkConfigUtils { + private static final List VALID_LOG_LEVELS = Arrays.asList("INFO", "DEBUG", "WARN", "ERROR"); + @Getter @Setter @AllArgsConstructor @@ -90,6 +93,9 @@ public static FunctionDetails convert(SinkConfig sinkConfig, ExtractedSinkDetail if (sinkConfig.getLogTopic() != null) { functionDetailsBuilder.setLogTopic(sinkConfig.getLogTopic()); } + if (sinkConfig.getLogLevel() != null) { + functionDetailsBuilder.setLogLevel(sinkConfig.getLogLevel()); + } functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA); if (sinkConfig.getParallelism() != null) { functionDetailsBuilder.setParallelism(sinkConfig.getParallelism()); @@ -327,6 +333,9 @@ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) { if (!isEmpty(functionDetails.getLogTopic())) { sinkConfig.setLogTopic(functionDetails.getLogTopic()); } + if (!isEmpty(functionDetails.getLogLevel())) { + sinkConfig.setLogLevel(functionDetails.getLogLevel()); + } sinkConfig.setProcessingGuarantees(convertProcessingGuarantee(functionDetails.getProcessingGuarantees())); @@ -439,6 +448,13 @@ public static ExtractedSinkDetails validateAndExtractDetails(SinkConfig sinkConf } } + if (!isEmpty(sinkConfig.getLogLevel())) { + if (!VALID_LOG_LEVELS.contains(sinkConfig.getLogLevel().toUpperCase())) { + throw new IllegalArgumentException( + String.format("LogLevel %s is invalid", sinkConfig.getLogLevel())); + } + } + if (sinkConfig.getParallelism() != null && sinkConfig.getParallelism() <= 0) { throw new IllegalArgumentException("Sink parallelism must be a positive number"); } @@ -628,6 +644,9 @@ public static SinkConfig validateUpdate(SinkConfig existingConfig, SinkConfig ne if (!StringUtils.isEmpty(newConfig.getLogTopic())) { mergedConfig.setLogTopic(newConfig.getLogTopic()); } + if (!StringUtils.isEmpty(newConfig.getLogLevel())) { + mergedConfig.setLogLevel(newConfig.getLogLevel()); + } if (newConfig.getInputs() != null) { newConfig.getInputs().forEach((topicName -> { diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java index 6229bffff5317..799f2f29cf87e 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java @@ -28,7 +28,9 @@ import com.google.gson.reflect.TypeToken; import java.io.IOException; import java.lang.reflect.Type; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import lombok.AllArgsConstructor; import lombok.Getter; @@ -56,6 +58,8 @@ @Slf4j public class SourceConfigUtils { + private static final List VALID_LOG_LEVELS = Arrays.asList("INFO", "DEBUG", "WARN", "ERROR"); + @Getter @Setter @AllArgsConstructor @@ -83,6 +87,9 @@ public static FunctionDetails convert(SourceConfig sourceConfig, ExtractedSource if (sourceConfig.getLogTopic() != null) { functionDetailsBuilder.setLogTopic(sourceConfig.getLogTopic()); } + if (sourceConfig.getLogLevel() != null) { + functionDetailsBuilder.setLogLevel(sourceConfig.getLogLevel()); + } functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA); if (sourceConfig.getParallelism() != null) { functionDetailsBuilder.setParallelism(sourceConfig.getParallelism()); @@ -241,6 +248,9 @@ public static SourceConfig convertFromDetails(FunctionDetails functionDetails) { if (!isEmpty(functionDetails.getLogTopic())) { sourceConfig.setLogTopic(functionDetails.getLogTopic()); } + if (!isEmpty(functionDetails.getLogLevel())) { + sourceConfig.setLogLevel(functionDetails.getLogLevel()); + } if (functionDetails.hasResources()) { Resources resources = new Resources(); resources.setCpu(functionDetails.getResources().getCpu()); @@ -281,6 +291,12 @@ public static ExtractedSourceDetails validateAndExtractDetails(SourceConfig sour String.format("LogTopic topic %s is invalid", sourceConfig.getLogTopic())); } } + if (!isEmpty(sourceConfig.getLogLevel())) { + if (!VALID_LOG_LEVELS.contains(sourceConfig.getLogLevel().toUpperCase())) { + throw new IllegalArgumentException( + String.format("LogLevel %s is invalid", sourceConfig.getLogLevel())); + } + } if (sourceConfig.getParallelism() != null && sourceConfig.getParallelism() <= 0) { throw new IllegalArgumentException("Source parallelism must be a positive number"); } @@ -410,6 +426,9 @@ public static SourceConfig validateUpdate(SourceConfig existingConfig, SourceCon if (!StringUtils.isEmpty(newConfig.getLogTopic())) { mergedConfig.setLogTopic(newConfig.getLogTopic()); } + if (!StringUtils.isEmpty(newConfig.getLogLevel())) { + mergedConfig.setLogLevel(newConfig.getLogLevel()); + } if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees() .equals(existingConfig.getProcessingGuarantees())) { throw new IllegalArgumentException("Processing Guarantees cannot be altered"); diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java index cf4e7dd92a8f7..b78174b303841 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java @@ -596,6 +596,7 @@ public void testFunctionConfigConvertFromDetails() { .build(); boolean autoAck = true; String logTopic = "log-topic1"; + String logLevel = "debug"; Function.Resources resources = Function.Resources.newBuilder().setCpu(1.5).setDisk(1024 * 20).setRam(1024 * 10).build(); String packageUrl = "http://package.url"; Map secretsMap = new HashMap<>(); @@ -616,6 +617,7 @@ public void testFunctionConfigConvertFromDetails() { .setSource(sourceSpec) .setAutoAck(autoAck) .setLogTopic(logTopic) + .setLogLevel(logLevel) .setResources(resources) .setPackageUrl(packageUrl) .setSecretsMap(new Gson().toJson(secretsMap)) @@ -629,6 +631,7 @@ public void testFunctionConfigConvertFromDetails() { assertEquals(functionConfig.getName(), name); assertEquals(functionConfig.getClassName(), classname); assertEquals(functionConfig.getLogTopic(), logTopic); + assertEquals(functionConfig.getLogLevel(), logLevel); assertEquals((Object) functionConfig.getResources().getCpu(), resources.getCpu()); assertEquals(functionConfig.getResources().getDisk().longValue(), resources.getDisk()); assertEquals(functionConfig.getResources().getRam().longValue(), resources.getRam()); From 003a0c77ff17980e81da2daf890e05e2ba2166d0 Mon Sep 17 00:00:00 2001 From: Mukesh Kumar <65598381+mukesh154@users.noreply.github.com> Date: Mon, 13 Jan 2025 12:14:27 +0000 Subject: [PATCH 2/3] Review-comments taken care --- .../src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java | 2 +- .../src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java | 2 +- .../src/main/java/org/apache/pulsar/admin/cli/CmdSources.java | 2 +- .../org/apache/pulsar/functions/utils/FunctionConfigUtils.java | 2 +- .../java/org/apache/pulsar/functions/utils/SinkConfigUtils.java | 2 +- .../org/apache/pulsar/functions/utils/SourceConfigUtils.java | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 2e784f8cbaea7..616db9724d51c 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -228,7 +228,7 @@ abstract class FunctionDetailsCommand extends BaseCommand { + " #Java, Python, Go") protected String logTopic; - @Option(names = "--logLevel", description = "Log level at which the logs of a Pulsar Function are produced" + @Option(names = "--log-level", description = "Log level at which the logs of a Pulsar Function are produced" + " #Java") protected String logLevel; diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index 787455cdd78fe..3f21b277bac3d 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -410,7 +410,7 @@ abstract class SinkDetailsCommand extends BaseCommand { protected String transformFunctionConfig; @Option(names = "--log-topic", description = "The topic to which the logs of a Pulsar Sink are produced") protected String logTopic; - @Option(names = "--logLevel", description = "Log level at which the logs of a Pulsar Sink are produced") + @Option(names = "--log-level", description = "Log level at which the logs of a Pulsar Sink are produced") protected String logLevel; @Option(names = "--runtime-flags", description = "Any flags that you want to pass to a runtime" + " (for process & Kubernetes runtime only).") diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java index 4e647fd302104..bbae8621bd69b 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java @@ -364,7 +364,7 @@ abstract class SourceDetailsCommand extends BaseCommand { protected String secretsString; @Option(names = "--log-topic", description = "The topic to which the logs of a Pulsar Sink are produced") protected String logTopic; - @Option(names = "--logLevel", description = "Log level at which the logs of a Pulsar Source are produced") + @Option(names = "--log-level", description = "Log level at which the logs of a Pulsar Source are produced") protected String logLevel; @Option(names = "--runtime-flags", description = "Any flags that you want to pass to a runtime" + " (for process & Kubernetes runtime only).") diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java index e6c846aa6432b..d984a377ad517 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java @@ -71,7 +71,7 @@ public static class ExtractedFunctionDetails { static final Integer MAX_PENDING_ASYNC_REQUESTS_DEFAULT = 1000; static final Boolean FORWARD_SOURCE_MESSAGE_PROPERTY_DEFAULT = Boolean.TRUE; - private static final List VALID_LOG_LEVELS = Arrays.asList("INFO", "DEBUG", "WARN", "ERROR"); + private static final List VALID_LOG_LEVELS = Arrays.asList("OFF", "FATAL", "ERROR", "WARN", "INFO", "DEBUG", "TRACE", "ALL"); private static final ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.create(); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java index be45da2330586..48f85b5357e9a 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java @@ -63,7 +63,7 @@ @Slf4j public class SinkConfigUtils { - private static final List VALID_LOG_LEVELS = Arrays.asList("INFO", "DEBUG", "WARN", "ERROR"); + private static final List VALID_LOG_LEVELS = Arrays.asList("OFF", "FATAL", "ERROR", "WARN", "INFO", "DEBUG", "TRACE", "ALL"); @Getter @Setter diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java index 799f2f29cf87e..a104e6a92fd9c 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java @@ -58,7 +58,7 @@ @Slf4j public class SourceConfigUtils { - private static final List VALID_LOG_LEVELS = Arrays.asList("INFO", "DEBUG", "WARN", "ERROR"); + private static final List VALID_LOG_LEVELS = Arrays.asList("OFF", "FATAL", "ERROR", "WARN", "INFO", "DEBUG", "TRACE", "ALL"); @Getter @Setter From 000272934bdc755cce313f99eb037b56ef2decb6 Mon Sep 17 00:00:00 2001 From: Mukesh Kumar <65598381+mukesh154@users.noreply.github.com> Date: Fri, 17 Jan 2025 05:16:09 +0000 Subject: [PATCH 3/3] Added functionality for python runtime --- .../java/org/apache/pulsar/admin/cli/CmdFunctions.java | 3 +-- .../instance/src/main/python/Function_pb2.py | 9 ++++++++- .../apache/pulsar/functions/runtime/RuntimeUtils.java | 2 ++ 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 616db9724d51c..c053a91844701 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -228,8 +228,7 @@ abstract class FunctionDetailsCommand extends BaseCommand { + " #Java, Python, Go") protected String logTopic; - @Option(names = "--log-level", description = "Log level at which the logs of a Pulsar Function are produced" - + " #Java") + @Option(names = "--log-level", description = "Log level at which the logs of a Pulsar Function are produced") protected String logLevel; @Option(names = {"-st", "--schema-type"}, description = "The builtin schema type or " diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py b/pulsar-functions/instance/src/main/python/Function_pb2.py index 118a6a1cd8967..8f031c4659fad 100644 --- a/pulsar-functions/instance/src/main/python/Function_pb2.py +++ b/pulsar-functions/instance/src/main/python/Function_pb2.py @@ -39,7 +39,7 @@ syntax='proto3', serialized_options=b'\n!org.apache.pulsar.functions.protoB\010Function', create_key=_descriptor._internal_create_key, - serialized_pb=b'\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01 \x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02 \x01(\t\"\xa6\x06\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\x10\n\x08logTopic\x18\x05 \x01(\t\x12\x39\n\x14processingGuarantees\x18\x06 \x01(\x0e\x32\x1b.proto.ProcessingGuarantees\x12\x12\n\nuserConfig\x18\x07 \x01(\t\x12\x12\n\nsecretsMap\x18\x10 \x01(\t\x12/\n\x07runtime\x18\x08 \x01(\x0e\x32\x1e.proto.FunctionDetails.Runtime\x12\x13\n\x07\x61utoAck\x18\t \x01(\x08\x42\x02\x18\x01\x12\x13\n\x0bparallelism\x18\n \x01(\x05\x12!\n\x06source\x18\x0b \x01(\x0b\x32\x11.proto.SourceSpec\x12\x1d\n\x04sink\x18\x0c \x01(\x0b\x32\x0f.proto.SinkSpec\x12#\n\tresources\x18\r \x01(\x0b\x32\x10.proto.Resources\x12\x12\n\npackageUrl\x18\x0e \x01(\t\x12)\n\x0cretryDetails\x18\x0f \x01(\x0b\x32\x13.proto.RetryDetails\x12\x14\n\x0cruntimeFlags\x18\x11 \x01(\t\x12;\n\rcomponentType\x18\x12 \x01(\x0e\x32$.proto.FunctionDetails.ComponentType\x12\x1c\n\x14\x63ustomRuntimeOptions\x18\x13 \x01(\t\x12\x0f\n\x07\x62uiltin\x18\x14 \x01(\t\x12\x16\n\x0eretainOrdering\x18\x15 \x01(\x08\x12\x19\n\x11retainKeyOrdering\x18\x16 \x01(\x08\x12\x39\n\x14subscriptionPosition\x18\x17 \x01(\x0e\x32\x1b.proto.SubscriptionPosition\"\'\n\x07Runtime\x12\x08\n\x04JAVA\x10\x00\x12\n\n\x06PYTHON\x10\x01\x12\x06\n\x02GO\x10\x03\"@\n\rComponentType\x12\x0b\n\x07UNKNOWN\x10\x00\x12\x0c\n\x08\x46UNCTION\x10\x01\x12\n\n\x06SOURCE\x10\x02\x12\x08\n\x04SINK\x10\x03\"\xf7\x03\n\x0c\x43onsumerSpec\x12\x12\n\nschemaType\x18\x01 \x01(\t\x12\x16\n\x0eserdeClassName\x18\x02 \x01(\t\x12\x16\n\x0eisRegexPattern\x18\x03 \x01(\x08\x12@\n\x11receiverQueueSize\x18\x04 \x01(\x0b\x32%.proto.ConsumerSpec.ReceiverQueueSize\x12\x43\n\x10schemaProperties\x18\x05 \x03(\x0b\x32).proto.ConsumerSpec.SchemaPropertiesEntry\x12G\n\x12\x63onsumerProperties\x18\x06 \x03(\x0b\x32+.proto.ConsumerSpec.ConsumerPropertiesEntry\x12%\n\ncryptoSpec\x18\x07 \x01(\x0b\x32\x11.proto.CryptoSpec\x12\x14\n\x0cpoolMessages\x18\x08 \x01(\x08\x1a\"\n\x11ReceiverQueueSize\x12\r\n\x05value\x18\x01 \x01(\x05\x1a\x37\n\x15SchemaPropertiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\x39\n\x17\x43onsumerPropertiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xe5\x01\n\x0cProducerSpec\x12\x1a\n\x12maxPendingMessages\x18\x01 \x01(\x05\x12*\n\"maxPendingMessagesAcrossPartitions\x18\x02 \x01(\x05\x12\x1f\n\x17useThreadLocalProducers\x18\x03 \x01(\x08\x12%\n\ncryptoSpec\x18\x04 \x01(\x0b\x32\x11.proto.CryptoSpec\x12\x14\n\x0c\x62\x61tchBuilder\x18\x05 \x01(\t\x12/\n\x0f\x63ompressionType\x18\x06 \x01(\x0e\x32\x16.proto.CompressionType\"\xbb\x02\n\nCryptoSpec\x12 \n\x18\x63ryptoKeyReaderClassName\x18\x01 \x01(\t\x12\x1d\n\x15\x63ryptoKeyReaderConfig\x18\x02 \x01(\t\x12!\n\x19producerEncryptionKeyName\x18\x03 \x03(\t\x12\x44\n\x1bproducerCryptoFailureAction\x18\x04 \x01(\x0e\x32\x1f.proto.CryptoSpec.FailureAction\x12\x44\n\x1b\x63onsumerCryptoFailureAction\x18\x05 \x01(\x0e\x32\x1f.proto.CryptoSpec.FailureAction\"=\n\rFailureAction\x12\x08\n\x04\x46\x41IL\x10\x00\x12\x0b\n\x07\x44ISCARD\x10\x01\x12\x0b\n\x07\x43ONSUME\x10\x02\x12\x08\n\x04SEND\x10\n\"\xe2\x04\n\nSourceSpec\x12\x11\n\tclassName\x18\x01 \x01(\t\x12\x0f\n\x07\x63onfigs\x18\x02 \x01(\t\x12\x15\n\rtypeClassName\x18\x05 \x01(\t\x12\x31\n\x10subscriptionType\x18\x03 \x01(\x0e\x32\x17.proto.SubscriptionType\x12Q\n\x16topicsToSerDeClassName\x18\x04 \x03(\x0b\x32-.proto.SourceSpec.TopicsToSerDeClassNameEntryB\x02\x18\x01\x12\x35\n\ninputSpecs\x18\n \x03(\x0b\x32!.proto.SourceSpec.InputSpecsEntry\x12\x11\n\ttimeoutMs\x18\x06 \x01(\x04\x12\x19\n\rtopicsPattern\x18\x07 \x01(\tB\x02\x18\x01\x12\x0f\n\x07\x62uiltin\x18\x08 \x01(\t\x12\x18\n\x10subscriptionName\x18\t \x01(\t\x12\x1b\n\x13\x63leanupSubscription\x18\x0b \x01(\x08\x12\x39\n\x14subscriptionPosition\x18\x0c \x01(\x0e\x32\x1b.proto.SubscriptionPosition\x12$\n\x1cnegativeAckRedeliveryDelayMs\x18\r \x01(\x04\x1a=\n\x1bTopicsToSerDeClassNameEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\x46\n\x0fInputSpecsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\"\n\x05value\x18\x02 \x01(\x0b\x32\x13.proto.ConsumerSpec:\x02\x38\x01\"\xdc\x03\n\x08SinkSpec\x12\x11\n\tclassName\x18\x01 \x01(\t\x12\x0f\n\x07\x63onfigs\x18\x02 \x01(\t\x12\x15\n\rtypeClassName\x18\x05 \x01(\t\x12\r\n\x05topic\x18\x03 \x01(\t\x12)\n\x0cproducerSpec\x18\x0b \x01(\x0b\x32\x13.proto.ProducerSpec\x12\x16\n\x0eserDeClassName\x18\x04 \x01(\t\x12\x0f\n\x07\x62uiltin\x18\x06 \x01(\t\x12\x12\n\nschemaType\x18\x07 \x01(\t\x12$\n\x1c\x66orwardSourceMessageProperty\x18\x08 \x01(\x08\x12?\n\x10schemaProperties\x18\t \x03(\x0b\x32%.proto.SinkSpec.SchemaPropertiesEntry\x12\x43\n\x12\x63onsumerProperties\x18\n \x03(\x0b\x32\'.proto.SinkSpec.ConsumerPropertiesEntry\x1a\x37\n\x15SchemaPropertiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\x39\n\x17\x43onsumerPropertiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"H\n\x17PackageLocationMetaData\x12\x13\n\x0bpackagePath\x18\x01 \x01(\t\x12\x18\n\x10originalFileName\x18\x02 \x01(\t\"\xba\x03\n\x10\x46unctionMetaData\x12/\n\x0f\x66unctionDetails\x18\x01 \x01(\x0b\x32\x16.proto.FunctionDetails\x12\x37\n\x0fpackageLocation\x18\x02 \x01(\x0b\x32\x1e.proto.PackageLocationMetaData\x12\x0f\n\x07version\x18\x03 \x01(\x04\x12\x12\n\ncreateTime\x18\x04 \x01(\x04\x12\x43\n\x0einstanceStates\x18\x05 \x03(\x0b\x32+.proto.FunctionMetaData.InstanceStatesEntry\x12;\n\x10\x66unctionAuthSpec\x18\x06 \x01(\x0b\x32!.proto.FunctionAuthenticationSpec\x12H\n transformFunctionPackageLocation\x18\x07 \x01(\x0b\x32\x1e.proto.PackageLocationMetaData\x1aK\n\x13InstanceStatesEntry\x12\x0b\n\x03key\x18\x01 \x01(\x05\x12#\n\x05value\x18\x02 \x01(\x0e\x32\x14.proto.FunctionState:\x02\x38\x01\"<\n\x1a\x46unctionAuthenticationSpec\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\x0c\x12\x10\n\x08provider\x18\x02 \x01(\t\"Q\n\x08Instance\x12\x31\n\x10\x66unctionMetaData\x18\x01 \x01(\x0b\x32\x17.proto.FunctionMetaData\x12\x12\n\ninstanceId\x18\x02 \x01(\x05\"A\n\nAssignment\x12!\n\x08instance\x18\x01 \x01(\x0b\x32\x0f.proto.Instance\x12\x10\n\x08workerId\x18\x02 \x01(\t*[\n\x14ProcessingGuarantees\x12\x10\n\x0c\x41TLEAST_ONCE\x10\x00\x12\x0f\n\x0b\x41TMOST_ONCE\x10\x01\x12\x14\n\x10\x45\x46\x46\x45\x43TIVELY_ONCE\x10\x02\x12\n\n\x06MANUAL\x10\x03*<\n\x10SubscriptionType\x12\n\n\x06SHARED\x10\x00\x12\x0c\n\x08\x46\x41ILOVER\x10\x01\x12\x0e\n\nKEY_SHARED\x10\x02*0\n\x14SubscriptionPosition\x12\n\n\x06LATEST\x10\x00\x12\x0c\n\x08\x45\x41RLIEST\x10\x01*D\n\x0f\x43ompressionType\x12\x07\n\x03LZ4\x10\x00\x12\x08\n\x04NONE\x10\x01\x12\x08\n\x04ZLIB\x10\x02\x12\x08\n\x04ZSTD\x10\x03\x12\n\n\x06SNAPPY\x10\x04*)\n\rFunctionState\x12\x0b\n\x07RUNNING\x10\x00\x12\x0b\n\x07STOPPED\x10\x01\x42-\n!org.apache.pulsar.functions.protoB\x08\x46unctionb\x06proto3' + serialized_pb=b'\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01 \x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02 \x01(\t\"\xb8\x06\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\x10\n\x08logTopic\x18\x05 \x01(\t\x12\x39\n\x14processingGuarantees\x18\x06 \x01(\x0e\x32\x1b.proto.ProcessingGuarantees\x12\x12\n\nuserConfig\x18\x07 \x01(\t\x12\x12\n\nsecretsMap\x18\x10 \x01(\t\x12/\n\x07runtime\x18\x08 \x01(\x0e\x32\x1e.proto.FunctionDetails.Runtime\x12\x13\n\x07\x61utoAck\x18\t \x01(\x08\x42\x02\x18\x01\x12\x13\n\x0bparallelism\x18\n \x01(\x05\x12!\n\x06source\x18\x0b \x01(\x0b\x32\x11.proto.SourceSpec\x12\x1d\n\x04sink\x18\x0c \x01(\x0b\x32\x0f.proto.SinkSpec\x12#\n\tresources\x18\r \x01(\x0b\x32\x10.proto.Resources\x12\x12\n\npackageUrl\x18\x0e \x01(\t\x12)\n\x0cretryDetails\x18\x0f \x01(\x0b\x32\x13.proto.RetryDetails\x12\x14\n\x0cruntimeFlags\x18\x11 \x01(\t\x12;\n\rcomponentType\x18\x12 \x01(\x0e\x32$.proto.FunctionDetails.ComponentType\x12\x1c\n\x14\x63ustomRuntimeOptions\x18\x13 \x01(\t\x12\x0f\n\x07\x62uiltin\x18\x14 \x01(\t\x12\x16\n\x0eretainOrdering\x18\x15 \x01(\x08\x12\x19\n\x11retainKeyOrdering\x18\x16 \x01(\x08\x12\x39\n\x14subscriptionPosition\x18\x17 \x01(\x0e\x32\x1b.proto.SubscriptionPosition\x12\x10\n\x08logLevel\x18\x18 \x01(\t\"\'\n\x07Runtime\x12\x08\n\x04JAVA\x10\x00\x12\n\n\x06PYTHON\x10\x01\x12\x06\n\x02GO\x10\x03\"@\n\rComponentType\x12\x0b\n\x07UNKNOWN\x10\x00\x12\x0c\n\x08\x46UNCTION\x10\x01\x12\n\n\x06SOURCE\x10\x02\x12\x08\n\x04SINK\x10\x03\"\xf7\x03\n\x0c\x43onsumerSpec\x12\x12\n\nschemaType\x18\x01 \x01(\t\x12\x16\n\x0eserdeClassName\x18\x02 \x01(\t\x12\x16\n\x0eisRegexPattern\x18\x03 \x01(\x08\x12@\n\x11receiverQueueSize\x18\x04 \x01(\x0b\x32%.proto.ConsumerSpec.ReceiverQueueSize\x12\x43\n\x10schemaProperties\x18\x05 \x03(\x0b\x32).proto.ConsumerSpec.SchemaPropertiesEntry\x12G\n\x12\x63onsumerProperties\x18\x06 \x03(\x0b\x32+.proto.ConsumerSpec.ConsumerPropertiesEntry\x12%\n\ncryptoSpec\x18\x07 \x01(\x0b\x32\x11.proto.CryptoSpec\x12\x14\n\x0cpoolMessages\x18\x08 \x01(\x08\x1a\"\n\x11ReceiverQueueSize\x12\r\n\x05value\x18\x01 \x01(\x05\x1a\x37\n\x15SchemaPropertiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\x39\n\x17\x43onsumerPropertiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xe5\x01\n\x0cProducerSpec\x12\x1a\n\x12maxPendingMessages\x18\x01 \x01(\x05\x12*\n\"maxPendingMessagesAcrossPartitions\x18\x02 \x01(\x05\x12\x1f\n\x17useThreadLocalProducers\x18\x03 \x01(\x08\x12%\n\ncryptoSpec\x18\x04 \x01(\x0b\x32\x11.proto.CryptoSpec\x12\x14\n\x0c\x62\x61tchBuilder\x18\x05 \x01(\t\x12/\n\x0f\x63ompressionType\x18\x06 \x01(\x0e\x32\x16.proto.CompressionType\"\xbb\x02\n\nCryptoSpec\x12 \n\x18\x63ryptoKeyReaderClassName\x18\x01 \x01(\t\x12\x1d\n\x15\x63ryptoKeyReaderConfig\x18\x02 \x01(\t\x12!\n\x19producerEncryptionKeyName\x18\x03 \x03(\t\x12\x44\n\x1bproducerCryptoFailureAction\x18\x04 \x01(\x0e\x32\x1f.proto.CryptoSpec.FailureAction\x12\x44\n\x1b\x63onsumerCryptoFailureAction\x18\x05 \x01(\x0e\x32\x1f.proto.CryptoSpec.FailureAction\"=\n\rFailureAction\x12\x08\n\x04\x46\x41IL\x10\x00\x12\x0b\n\x07\x44ISCARD\x10\x01\x12\x0b\n\x07\x43ONSUME\x10\x02\x12\x08\n\x04SEND\x10\n\"\xf8\x04\n\nSourceSpec\x12\x11\n\tclassName\x18\x01 \x01(\t\x12\x0f\n\x07\x63onfigs\x18\x02 \x01(\t\x12\x15\n\rtypeClassName\x18\x05 \x01(\t\x12\x31\n\x10subscriptionType\x18\x03 \x01(\x0e\x32\x17.proto.SubscriptionType\x12Q\n\x16topicsToSerDeClassName\x18\x04 \x03(\x0b\x32-.proto.SourceSpec.TopicsToSerDeClassNameEntryB\x02\x18\x01\x12\x35\n\ninputSpecs\x18\n \x03(\x0b\x32!.proto.SourceSpec.InputSpecsEntry\x12\x11\n\ttimeoutMs\x18\x06 \x01(\x04\x12\x19\n\rtopicsPattern\x18\x07 \x01(\tB\x02\x18\x01\x12\x0f\n\x07\x62uiltin\x18\x08 \x01(\t\x12\x18\n\x10subscriptionName\x18\t \x01(\t\x12\x1b\n\x13\x63leanupSubscription\x18\x0b \x01(\x08\x12\x39\n\x14subscriptionPosition\x18\x0c \x01(\x0e\x32\x1b.proto.SubscriptionPosition\x12$\n\x1cnegativeAckRedeliveryDelayMs\x18\r \x01(\x04\x12\x14\n\x0cskipToLatest\x18\x0e \x01(\x08\x1a=\n\x1bTopicsToSerDeClassNameEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\x46\n\x0fInputSpecsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\"\n\x05value\x18\x02 \x01(\x0b\x32\x13.proto.ConsumerSpec:\x02\x38\x01\"\xdc\x03\n\x08SinkSpec\x12\x11\n\tclassName\x18\x01 \x01(\t\x12\x0f\n\x07\x63onfigs\x18\x02 \x01(\t\x12\x15\n\rtypeClassName\x18\x05 \x01(\t\x12\r\n\x05topic\x18\x03 \x01(\t\x12)\n\x0cproducerSpec\x18\x0b \x01(\x0b\x32\x13.proto.ProducerSpec\x12\x16\n\x0eserDeClassName\x18\x04 \x01(\t\x12\x0f\n\x07\x62uiltin\x18\x06 \x01(\t\x12\x12\n\nschemaType\x18\x07 \x01(\t\x12$\n\x1c\x66orwardSourceMessageProperty\x18\x08 \x01(\x08\x12?\n\x10schemaProperties\x18\t \x03(\x0b\x32%.proto.SinkSpec.SchemaPropertiesEntry\x12\x43\n\x12\x63onsumerProperties\x18\n \x03(\x0b\x32\'.proto.SinkSpec.ConsumerPropertiesEntry\x1a\x37\n\x15SchemaPropertiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\x39\n\x17\x43onsumerPropertiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"H\n\x17PackageLocationMetaData\x12\x13\n\x0bpackagePath\x18\x01 \x01(\t\x12\x18\n\x10originalFileName\x18\x02 \x01(\t\"\xba\x03\n\x10\x46unctionMetaData\x12/\n\x0f\x66unctionDetails\x18\x01 \x01(\x0b\x32\x16.proto.FunctionDetails\x12\x37\n\x0fpackageLocation\x18\x02 \x01(\x0b\x32\x1e.proto.PackageLocationMetaData\x12\x0f\n\x07version\x18\x03 \x01(\x04\x12\x12\n\ncreateTime\x18\x04 \x01(\x04\x12\x43\n\x0einstanceStates\x18\x05 \x03(\x0b\x32+.proto.FunctionMetaData.InstanceStatesEntry\x12;\n\x10\x66unctionAuthSpec\x18\x06 \x01(\x0b\x32!.proto.FunctionAuthenticationSpec\x12H\n transformFunctionPackageLocation\x18\x07 \x01(\x0b\x32\x1e.proto.PackageLocationMetaData\x1aK\n\x13InstanceStatesEntry\x12\x0b\n\x03key\x18\x01 \x01(\x05\x12#\n\x05value\x18\x02 \x01(\x0e\x32\x14.proto.FunctionState:\x02\x38\x01\"<\n\x1a\x46unctionAuthenticationSpec\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\x0c\x12\x10\n\x08provider\x18\x02 \x01(\t\"Q\n\x08Instance\x12\x31\n\x10\x66unctionMetaData\x18\x01 \x01(\x0b\x32\x17.proto.FunctionMetaData\x12\x12\n\ninstanceId\x18\x02 \x01(\x05\"A\n\nAssignment\x12!\n\x08instance\x18\x01 \x01(\x0b\x32\x0f.proto.Instance\x12\x10\n\x08workerId\x18\x02 \x01(\t*[\n\x14ProcessingGuarantees\x12\x10\n\x0c\x41TLEAST_ONCE\x10\x00\x12\x0f\n\x0b\x41TMOST_ONCE\x10\x01\x12\x14\n\x10\x45\x46\x46\x45\x43TIVELY_ONCE\x10\x02\x12\n\n\x06MANUAL\x10\x03*<\n\x10SubscriptionType\x12\n\n\x06SHARED\x10\x00\x12\x0c\n\x08\x46\x41ILOVER\x10\x01\x12\x0e\n\nKEY_SHARED\x10\x02*0\n\x14SubscriptionPosition\x12\n\n\x06LATEST\x10\x00\x12\x0c\n\x08\x45\x41RLIEST\x10\x01*D\n\x0f\x43ompressionType\x12\x07\n\x03LZ4\x10\x00\x12\x08\n\x04NONE\x10\x01\x12\x08\n\x04ZLIB\x10\x02\x12\x08\n\x04ZSTD\x10\x03\x12\n\n\x06SNAPPY\x10\x04*)\n\rFunctionState\x12\x0b\n\x07RUNNING\x10\x00\x12\x0b\n\x07STOPPED\x10\x01\x42-\n!org.apache.pulsar.functions.protoB\x08\x46unctionb\x06proto3' ) _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor( @@ -575,6 +575,13 @@ message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='logLevel', full_name='proto.FunctionDetails.logLevel', index=23, + number=24, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), ], extensions=[ ], diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java index 0d3d57f60289d..ed68831de846b 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java @@ -430,6 +430,8 @@ public static List getCmd(InstanceConfig instanceConfig, // set logging config file args.add("--logging_config_file"); args.add(logConfigFile); + args.add("--logging_level"); + args.add(logLevel); // `installUserCodeDependencies` is only valid for python runtime if (installUserCodeDependencies != null && installUserCodeDependencies) { args.add("--install_usercode_dependencies");