Skip to content

Commit

Permalink
Added logLevel for Function/Source/Sink
Browse files Browse the repository at this point in the history
  • Loading branch information
mukesh154 committed Jan 13, 2025
1 parent ba04a43 commit 00dcf52
Show file tree
Hide file tree
Showing 19 changed files with 101 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public enum Runtime {

private String outputSerdeClassName;
private String logTopic;
private String logLevel;
private ProcessingGuarantees processingGuarantees;
// Do we want function instances to process data in the same order as in the input topics
// This essentially means that every partition of input topic is consumed by only one instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,5 @@ public class SinkConfig {
private String transformFunctionClassName;
private String transformFunctionConfig;
private String logTopic;
private String logLevel;
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,5 @@ public class SourceConfig {
// batchBuilder provides two types of batch construction methods, DEFAULT and KEY_BASED
private String batchBuilder;
private String logTopic;
private String logLevel;
}
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,10 @@ abstract class FunctionDetailsCommand extends BaseCommand {
+ " #Java, Python, Go")
protected String logTopic;

@Option(names = "--logLevel", description = "Log level at which the logs of a Pulsar Function are produced"
+ " #Java")
protected String logLevel;

@Option(names = {"-st", "--schema-type"}, description = "The builtin schema type or "
+ "custom schema class name to be used for messages output by the function #Java")
protected String schemaType = "";
Expand Down Expand Up @@ -506,6 +510,9 @@ void processArguments() throws Exception {
if (null != logTopic) {
functionConfig.setLogTopic(logTopic);
}
if (null != logLevel) {
functionConfig.setLogLevel(logLevel);
}
if (null != className) {
functionConfig.setClassName(className);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,8 @@ abstract class SinkDetailsCommand extends BaseCommand {
protected String transformFunctionConfig;
@Option(names = "--log-topic", description = "The topic to which the logs of a Pulsar Sink are produced")
protected String logTopic;
@Option(names = "--logLevel", description = "Log level at which the logs of a Pulsar Sink are produced")
protected String logLevel;
@Option(names = "--runtime-flags", description = "Any flags that you want to pass to a runtime"
+ " (for process & Kubernetes runtime only).")
protected String runtimeFlags;
Expand Down Expand Up @@ -611,6 +613,9 @@ void processArguments() throws Exception {
if (null != logTopic) {
sinkConfig.setLogTopic(logTopic);
}
if (null != logLevel) {
sinkConfig.setLogLevel(logLevel);
}
if (null != runtimeFlags) {
sinkConfig.setRuntimeFlags(runtimeFlags);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,8 @@ abstract class SourceDetailsCommand extends BaseCommand {
protected String secretsString;
@Option(names = "--log-topic", description = "The topic to which the logs of a Pulsar Sink are produced")
protected String logTopic;
@Option(names = "--logLevel", description = "Log level at which the logs of a Pulsar Source are produced")
protected String logLevel;
@Option(names = "--runtime-flags", description = "Any flags that you want to pass to a runtime"
+ " (for process & Kubernetes runtime only).")
protected String runtimeFlags;
Expand Down Expand Up @@ -505,6 +507,9 @@ void processArguments() throws Exception {
if (null != logTopic) {
sourceConfig.setLogTopic(logTopic);
}
if (null != logLevel) {
sourceConfig.setLogLevel(logLevel);
}
if (null != runtimeFlags) {
sourceConfig.setRuntimeFlags(runtimeFlags);
}
Expand Down
1 change: 1 addition & 0 deletions pulsar-functions/proto/src/main/proto/Function.proto
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ message FunctionDetails {
bool retainOrdering = 21;
bool retainKeyOrdering = 22;
SubscriptionPosition subscriptionPosition = 23;
string logLevel = 24;
}

message ConsumerSpec {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@
</AppenderRef>
</Logger>
<Root>
<level>info</level>
<level>${sys:pulsar.log.level}</level>
<AppenderRef>
<ref>${sys:pulsar.log.appender}</ref>
<level>${sys:pulsar.log.level}</level>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
</AppenderRef>
</Logger>
<Root>
<level>info</level>
<level>${sys:pulsar.log.level}</level>
<AppenderRef>
<ref>Console</ref>
<level>${sys:pulsar.log.level}</level>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public static List<String> composeCmd(InstanceConfig instanceConfig,
Integer grpcPort,
Long expectedHealthCheckInterval,
String logConfigFile,
String logLevel,
String secretsProviderClassName,
String secretsProviderConfig,
Boolean installUserCodeDependencies,
Expand All @@ -92,7 +93,7 @@ public static List<String> composeCmd(InstanceConfig instanceConfig,
cmd.addAll(getCmd(instanceConfig, instanceFile, extraDependenciesDir, logDirectory,
originalCodeFileName, originalTransformFunctionFileName, pulsarServiceUrl, stateStorageServiceUrl,
authConfig, shardId, grpcPort, expectedHealthCheckInterval,
logConfigFile, secretsProviderClassName, secretsProviderConfig,
logConfigFile, logLevel, secretsProviderClassName, secretsProviderConfig,
installUserCodeDependencies, pythonDependencyRepository,
pythonExtraDependencyRepository, narExtractionDirectory,
functionInstanceClassPath, false, pulsarWebServiceUrl));
Expand Down Expand Up @@ -315,6 +316,7 @@ public static List<String> getCmd(InstanceConfig instanceConfig,
Integer grpcPort,
Long expectedHealthCheckInterval,
String logConfigFile,
String logLevel,
String secretsProviderClassName,
String secretsProviderConfig,
Boolean installUserCodeDependencies,
Expand Down Expand Up @@ -360,6 +362,7 @@ public static List<String> getCmd(InstanceConfig instanceConfig,
}
args.add(String.format("-D%s=%s", FUNCTIONS_INSTANCE_CLASSPATH, systemFunctionInstanceClasspath));
}
args.add("-Dpulsar.log.level=" + logLevel);
args.add("-Dlog4j.configurationFile=" + logConfigFile);
args.add("-Dpulsar.function.log.dir=" + genFunctionLogFolder(logDirectory, instanceConfig));
args.add("-Dpulsar.function.log.file=" + String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import lombok.extern.slf4j.Slf4j;
import okhttp3.Response;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
Expand Down Expand Up @@ -236,6 +237,10 @@ public class KubernetesRuntime implements Runtime {
case GO:
break;
}
String logLevel = instanceConfig.getFunctionDetails().getLogLevel();
if (StringUtils.isBlank(logLevel)) {
logLevel = "info";
}

this.authConfig = authConfig;

Expand Down Expand Up @@ -275,6 +280,7 @@ public class KubernetesRuntime implements Runtime {
grpcPort,
-1L,
logConfigFile,
logLevel,
secretsProviderClassName,
secretsProviderConfig,
installUserCodeDependencies,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ class ProcessRuntime implements Runtime {
case GO:
break;
}
String logLevel = instanceConfig.getFunctionDetails().getLogLevel();
if (StringUtils.isBlank(logLevel)) {
logLevel = "info";
}

this.extraDependenciesDir = extraDependenciesDir;
this.narExtractionDirectory = narExtractionDirectory;
this.processArgs = RuntimeUtils.composeCmd(
Expand All @@ -142,6 +147,7 @@ class ProcessRuntime implements Runtime {
instanceConfig.getPort(),
expectedHealthCheckInterval,
logConfigFile,
logLevel,
secretsProviderClassName,
secretsProviderConfig,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ public void getAdditionalJavaRuntimeArguments(boolean k8sRuntime) throws Excepti
23,
1234L,
"logConfigFile",
"info",
"secretsProviderClassName",
"secretsProviderConfig",
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ private void verifyJavaInstance(InstanceConfig config, String depsDir, boolean s
String expectedArgs = "exec java -cp " + classpath
+ extraDepsEnv
+ " -Dpulsar.functions.instance.classpath=/pulsar/lib/*"
+ " -Dpulsar.log.level=info"
+ " -Dlog4j.configurationFile=kubernetes_instance_log4j2.xml "
+ "-Dpulsar.function.log.dir=" + logDirectory + "/" + FunctionCommon.getFullyQualifiedName(config.getFunctionDetails())
+ " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-$SHARD_ID"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ private void verifyJavaInstance(InstanceConfig config, Path depsDir, String webS
String expectedArgs = "java -cp " + classpath
+ extraDepsEnv
+ " -Dpulsar.functions.instance.classpath=/pulsar/lib/*"
+ " -Dpulsar.log.level=info"
+ " -Dlog4j.configurationFile=java_instance_log4j2.xml "
+ "-Dpulsar.function.log.dir=" + logDirectory + "/functions/" + FunctionCommon.getFullyQualifiedName(config.getFunctionDetails())
+ " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-" + config.getInstanceId()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.gson.reflect.TypeToken;
import java.io.File;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
Expand Down Expand Up @@ -70,6 +71,7 @@ public static class ExtractedFunctionDetails {

static final Integer MAX_PENDING_ASYNC_REQUESTS_DEFAULT = 1000;
static final Boolean FORWARD_SOURCE_MESSAGE_PROPERTY_DEFAULT = Boolean.TRUE;
private static final List<String> VALID_LOG_LEVELS = Arrays.asList("INFO", "DEBUG", "WARN", "ERROR");

private static final ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.create();

Expand Down Expand Up @@ -272,6 +274,9 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ExtractedFu
if (functionConfig.getLogTopic() != null) {
functionDetailsBuilder.setLogTopic(functionConfig.getLogTopic());
}
if (functionConfig.getLogLevel() != null) {
functionDetailsBuilder.setLogLevel(functionConfig.getLogLevel());
}
if (functionConfig.getRuntime() != null) {
functionDetailsBuilder.setRuntime(FunctionCommon.convertRuntime(functionConfig.getRuntime()));
}
Expand Down Expand Up @@ -447,6 +452,9 @@ public static FunctionConfig convertFromDetails(FunctionDetails functionDetails)
if (!isEmpty(functionDetails.getLogTopic())) {
functionConfig.setLogTopic(functionDetails.getLogTopic());
}
if (!isEmpty(functionDetails.getLogLevel())) {
functionConfig.setLogLevel(functionDetails.getLogLevel());
}
if (functionDetails.getSink().getForwardSourceMessageProperty()) {
functionConfig.setForwardSourceMessageProperty(functionDetails.getSink().getForwardSourceMessageProperty());
}
Expand Down Expand Up @@ -806,6 +814,13 @@ public static void doCommonChecks(FunctionConfig functionConfig) {
}
}

if (!isEmpty(functionConfig.getLogLevel())) {
if (!VALID_LOG_LEVELS.contains(functionConfig.getLogLevel().toUpperCase())) {
throw new IllegalArgumentException(
String.format("LogLevel %s is invalid", functionConfig.getLogLevel()));
}
}

if (functionConfig.getParallelism() != null && functionConfig.getParallelism() <= 0) {
throw new IllegalArgumentException("Function parallelism must be a positive number");
}
Expand Down Expand Up @@ -1027,6 +1042,9 @@ public static FunctionConfig validateUpdate(FunctionConfig existingConfig, Funct
if (!StringUtils.isEmpty(newConfig.getLogTopic())) {
mergedConfig.setLogTopic(newConfig.getLogTopic());
}
if (!StringUtils.isEmpty(newConfig.getLogLevel())) {
mergedConfig.setLogLevel(newConfig.getLogLevel());
}
if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees()
.equals(existingConfig.getProcessingGuarantees())) {
throw new IllegalArgumentException("Processing Guarantees cannot be altered");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
Expand Down Expand Up @@ -62,6 +63,8 @@
@Slf4j
public class SinkConfigUtils {

private static final List<String> VALID_LOG_LEVELS = Arrays.asList("INFO", "DEBUG", "WARN", "ERROR");

@Getter
@Setter
@AllArgsConstructor
Expand Down Expand Up @@ -90,6 +93,9 @@ public static FunctionDetails convert(SinkConfig sinkConfig, ExtractedSinkDetail
if (sinkConfig.getLogTopic() != null) {
functionDetailsBuilder.setLogTopic(sinkConfig.getLogTopic());
}
if (sinkConfig.getLogLevel() != null) {
functionDetailsBuilder.setLogLevel(sinkConfig.getLogLevel());
}
functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
if (sinkConfig.getParallelism() != null) {
functionDetailsBuilder.setParallelism(sinkConfig.getParallelism());
Expand Down Expand Up @@ -327,6 +333,9 @@ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) {
if (!isEmpty(functionDetails.getLogTopic())) {
sinkConfig.setLogTopic(functionDetails.getLogTopic());
}
if (!isEmpty(functionDetails.getLogLevel())) {
sinkConfig.setLogLevel(functionDetails.getLogLevel());
}

sinkConfig.setProcessingGuarantees(convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));

Expand Down Expand Up @@ -439,6 +448,13 @@ public static ExtractedSinkDetails validateAndExtractDetails(SinkConfig sinkConf
}
}

if (!isEmpty(sinkConfig.getLogLevel())) {
if (!VALID_LOG_LEVELS.contains(sinkConfig.getLogLevel().toUpperCase())) {
throw new IllegalArgumentException(
String.format("LogLevel %s is invalid", sinkConfig.getLogLevel()));
}
}

if (sinkConfig.getParallelism() != null && sinkConfig.getParallelism() <= 0) {
throw new IllegalArgumentException("Sink parallelism must be a positive number");
}
Expand Down Expand Up @@ -628,6 +644,9 @@ public static SinkConfig validateUpdate(SinkConfig existingConfig, SinkConfig ne
if (!StringUtils.isEmpty(newConfig.getLogTopic())) {
mergedConfig.setLogTopic(newConfig.getLogTopic());
}
if (!StringUtils.isEmpty(newConfig.getLogLevel())) {
mergedConfig.setLogLevel(newConfig.getLogLevel());
}

if (newConfig.getInputs() != null) {
newConfig.getInputs().forEach((topicName -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import com.google.gson.reflect.TypeToken;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Getter;
Expand Down Expand Up @@ -56,6 +58,8 @@
@Slf4j
public class SourceConfigUtils {

private static final List<String> VALID_LOG_LEVELS = Arrays.asList("INFO", "DEBUG", "WARN", "ERROR");

@Getter
@Setter
@AllArgsConstructor
Expand Down Expand Up @@ -83,6 +87,9 @@ public static FunctionDetails convert(SourceConfig sourceConfig, ExtractedSource
if (sourceConfig.getLogTopic() != null) {
functionDetailsBuilder.setLogTopic(sourceConfig.getLogTopic());
}
if (sourceConfig.getLogLevel() != null) {
functionDetailsBuilder.setLogLevel(sourceConfig.getLogLevel());
}
functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
if (sourceConfig.getParallelism() != null) {
functionDetailsBuilder.setParallelism(sourceConfig.getParallelism());
Expand Down Expand Up @@ -241,6 +248,9 @@ public static SourceConfig convertFromDetails(FunctionDetails functionDetails) {
if (!isEmpty(functionDetails.getLogTopic())) {
sourceConfig.setLogTopic(functionDetails.getLogTopic());
}
if (!isEmpty(functionDetails.getLogLevel())) {
sourceConfig.setLogLevel(functionDetails.getLogLevel());
}
if (functionDetails.hasResources()) {
Resources resources = new Resources();
resources.setCpu(functionDetails.getResources().getCpu());
Expand Down Expand Up @@ -281,6 +291,12 @@ public static ExtractedSourceDetails validateAndExtractDetails(SourceConfig sour
String.format("LogTopic topic %s is invalid", sourceConfig.getLogTopic()));
}
}
if (!isEmpty(sourceConfig.getLogLevel())) {
if (!VALID_LOG_LEVELS.contains(sourceConfig.getLogLevel().toUpperCase())) {
throw new IllegalArgumentException(
String.format("LogLevel %s is invalid", sourceConfig.getLogLevel()));
}
}
if (sourceConfig.getParallelism() != null && sourceConfig.getParallelism() <= 0) {
throw new IllegalArgumentException("Source parallelism must be a positive number");
}
Expand Down Expand Up @@ -410,6 +426,9 @@ public static SourceConfig validateUpdate(SourceConfig existingConfig, SourceCon
if (!StringUtils.isEmpty(newConfig.getLogTopic())) {
mergedConfig.setLogTopic(newConfig.getLogTopic());
}
if (!StringUtils.isEmpty(newConfig.getLogLevel())) {
mergedConfig.setLogLevel(newConfig.getLogLevel());
}
if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees()
.equals(existingConfig.getProcessingGuarantees())) {
throw new IllegalArgumentException("Processing Guarantees cannot be altered");
Expand Down
Loading

0 comments on commit 00dcf52

Please sign in to comment.