Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue #191 #192

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,25 +57,61 @@ public static class ThreadConfig {

private Set<String> excludeTaskPrefixes = new HashSet<>();

/**
* Checks if the given executor type is excluded.
*
* @param type The class type of the executor to check.
* @return {@code true} if the executor type is excluded, {@code false} otherwise.
*/
public boolean isExcludedExecutor(Class<?> type) {
return type != null && isExcludedExecutor(type.getName());
}

/**
* Checks if the given executor name is excluded.
*
* @param name The name of the executor to check.
* @return {@code true} if the executor name is excluded, {@code false} otherwise.
*/
public boolean isExcludedExecutor(String name) {
return name == null || excludeExecutors.contains(name);
return name != null && excludeExecutors.contains(name);
}

/**
* Checks if the given task type is excluded.
*
* @param type The class type of the task to check.
* @return {@code true} if the task type is excluded, {@code false} otherwise.
*/
public boolean isExcludedTask(Class<?> type) {
return type != null && isExcludedTask(type.getName());
}

/**
* Checks if the given task name is excluded.
*
* @param name The name of the task to check.
* @return {@code true} if the task name is excluded or matches any excluded prefix, {@code false} otherwise.
*/
public boolean isExcludedTask(String name) {
return name == null || excludeTasks.contains(name) || isExcludedTaskPrefix(name);
return name != null && (excludeTasks.contains(name) || isExcludedTaskPrefix(name));
}

protected boolean isExcludedTaskPrefix(String name) {
if (name == null) {
return false;
}
/**
* Checks if the given task name matches any excluded prefix.
*
* @param name The name of the task to check.
* @return {@code true} if the task name starts with any excluded prefix, {@code false} otherwise.
*/
private boolean isExcludedTaskPrefix(String name) {
for (String prefix : excludeTaskPrefixes) {
if (name.startsWith(prefix)) {
return true;
}
}
return false;
}

}

}
Expand Down
34 changes: 13 additions & 21 deletions joylive-package/src/main/assembly/config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -218,29 +218,15 @@ agent:
- x-service-
thread:
excludeExecutors:
- org.apache.dubbo.common.threadpool.ThreadlessExecutor
- org.apache.tomcat.util.threads.ThreadPoolExecutor
- org.apache.tomcat.util.threads.ScheduledThreadPoolExecutor
- org.apache.tomcat.util.threads.InlineExecutorService
- javax.management.NotificationBroadcasterSupport$1
- io.grpc.stub.ClientCalls.ThreadlessExecutor
- io.grpc.SynchronizationContext
- io.netty.channel.MultithreadEventLoopGroup
- io.netty.channel.nio.NioEventLoop
- io.netty.channel.SingleThreadEventLoop
- io.netty.channel.kqueue.KQueueEventLoopGroup
- io.netty.channel.kqueue.KQueueEventLoop
- io.netty.util.concurrent.MultithreadEventExecutorGroup
- io.netty.util.concurrent.AbstractEventExecutorGroup
- io.netty.util.concurrent.ThreadPerTaskExecutor
- io.netty.util.concurrent.GlobalEventExecutor
- io.netty.util.concurrent.AbstractScheduledEventExecutor
- io.netty.util.concurrent.AbstractEventExecutor
- io.netty.util.concurrent.DefaultEventExecutor
- io.netty.util.concurrent.SingleThreadEventExecutor
- io.netty.util.internal.ThreadExecutorMap$1
- reactor.core.scheduler.BoundedElasticScheduler$BoundedScheduledExecutorService
- reactor.netty.resources.ColocatedEventLoopGroup
- org.apache.tomcat.util.threads.ThreadPoolExecutor
- org.apache.tomcat.util.threads.ScheduledThreadPoolExecutor
- org.apache.tomcat.util.threads.InlineExecutorService
- javax.management.NotificationBroadcasterSupport$1
- com.alibaba.nacos.shaded.io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup
- com.alibaba.nacos.shaded.io.grpc.netty.shaded.io.netty.channel.MultithreadEventLoopGroup
- com.alibaba.nacos.shaded.io.grpc.netty.shaded.io.netty.util.concurrent.MultithreadEventExecutorGroup
Expand All @@ -261,6 +247,7 @@ agent:
- com.alibaba.nacos.shaded.io.grpc.SynchronizationContext
- com.alibaba.nacos.shaded.com.google.common.util.concurrent.DirectExecutor
excludeTasks:
- com.alibaba.nacos.common.executor.NameThreadFactory
- com.alibaba.nacos.shaded.io.grpc.internal.DnsNameResolver.Resolve
- com.alibaba.nacos.client.naming.backups.FailoverReactor$DiskFileWriter
- com.alibaba.nacos.client.naming.backups.FailoverReactor.SwitchRefresher
Expand All @@ -269,6 +256,7 @@ agent:
- com.alibaba.nacos.shaded.io.grpc.internal.DelayedClientTransport$5
- com.alibaba.nacos.shaded.io.grpc.internal.SerializingExecutor
- com.alibaba.nacos.shaded.io.grpc.internal.LogExceptionRunnable
- com.alibaba.nacos.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder$1
- com.jd.live.agent.core.thread.NamedThreadFactory
- com.jd.jr.sgm.client.disruptor.LogEventFactory
- com.jd.jr.sgm.client.util.AgentThreadFactory
Expand All @@ -282,6 +270,11 @@ agent:
- io.sermant.implement.service.xds.handler.XdsHandler.NamedThreadFactory
- io.sermant.discovery.factory.RealmServiceThreadFactory
- org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory
- sun.rmi.runtime.RuntimeUtil$1
- sun.rmi.transport.tcp.TCPTransport$1
- sun.rmi.transport.DGCImpl$1
- sun.rmi.transport.DGCAckHandler$1
- org.apache.tomcat.util.threads.TaskThreadFactory
excludeTaskPrefixes:
- reactor.core.scheduler.BoundedElasticScheduler$$Lambda
- org.springframework.cloud.commons.util.InetUtils$$Lambda$
Expand All @@ -290,9 +283,8 @@ agent:
- com.alibaba.nacos.client.naming.remote.NamingClientProxyDelegate$$Lambda$
- com.alibaba.nacos.common.remote.client.RpcClient$$Lambda$
- com.alibaba.nacos.common.utils.ThreadFactoryBuilder$$Lambda$
- sun.rmi.transport.tcp.TCPTransport$
- sun.rmi.transport.DGCImpl$
- sun.rmi.transport.DGCAckHandler$
- org.apache.catalina.core.ContainerBase$
- org.apache.catalina.core.StandardServer$$Lambda$
counter:
gateway: true
service: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package com.jd.live.agent.plugin.registry.dubbo.v2_7.condition;

import com.jd.live.agent.core.extension.annotation.ConditionalComposite;
import com.jd.live.agent.core.extension.annotation.ConditionalOnMissingClass;
import com.jd.live.agent.core.extension.annotation.ConditionalOnClass;
import com.jd.live.agent.governance.annotation.ConditionalOnDubboEnabled;

import java.lang.annotation.*;
Expand All @@ -25,7 +25,7 @@
@Retention(RetentionPolicy.RUNTIME)
@Documented
@ConditionalOnDubboEnabled
@ConditionalOnMissingClass(ConditionalOnDubbo27Enabled.TYPE_PROTOCOL_FILTER_WRAPPER)
@ConditionalOnClass(ConditionalOnDubbo27Enabled.TYPE_PROTOCOL_FILTER_WRAPPER)
@ConditionalComposite
public @interface ConditionalOnDubbo27Enabled {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package com.jd.live.agent.plugin.registry.dubbo.v3.condition;

import com.jd.live.agent.core.extension.annotation.ConditionalComposite;
import com.jd.live.agent.core.extension.annotation.ConditionalOnMissingClass;
import com.jd.live.agent.core.extension.annotation.ConditionalOnClass;
import com.jd.live.agent.governance.annotation.ConditionalOnDubboEnabled;

import java.lang.annotation.*;
Expand All @@ -25,7 +25,7 @@
@Retention(RetentionPolicy.RUNTIME)
@Documented
@ConditionalOnDubboEnabled
@ConditionalOnMissingClass(ConditionalOnDubbo3Enabled.TYPE_CONSUMER_CONTEXT_FILTER)
@ConditionalOnClass(ConditionalOnDubbo3Enabled.TYPE_CONSUMER_CONTEXT_FILTER)
@ConditionalComposite
public @interface ConditionalOnDubbo3Enabled {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
package com.jd.live.agent.plugin.transmission.thread.interceptor;

import com.jd.live.agent.bootstrap.bytekit.context.ExecutableContext;
import com.jd.live.agent.bootstrap.logger.Logger;
import com.jd.live.agent.bootstrap.logger.LoggerFactory;
import com.jd.live.agent.core.plugin.definition.InterceptorAdaptor;
import com.jd.live.agent.core.thread.Camera;
import com.jd.live.agent.core.thread.Snapshot;
import com.jd.live.agent.governance.config.TransmitConfig;
import com.jd.live.agent.governance.config.TransmitConfig.ThreadConfig;
import com.jd.live.agent.plugin.transmission.thread.adapter.AbstractThreadAdapter;
import com.jd.live.agent.plugin.transmission.thread.adapter.CallableAdapter;
import com.jd.live.agent.plugin.transmission.thread.adapter.RunnableAdapter;
Expand All @@ -28,56 +30,42 @@
import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.*;

/**
* ExecutorInterceptor
*/
public class ExecutorInterceptor extends InterceptorAdaptor {

private static final Logger logger = LoggerFactory.getLogger(ExecutorInterceptor.class);

private static final String FIELD_CALLABLE = "callable";

private final Field callableField;

private final Camera[] cameras;

private final TransmitConfig.ThreadConfig threadConfig;
private final ThreadConfig threadConfig;

private final Map<Class<?>, Boolean> excludes = new ConcurrentHashMap<>();
private final Map<Class<?>, Boolean> excludes = new ConcurrentHashMap<>(128);

public ExecutorInterceptor(List<Camera> cameras, TransmitConfig.ThreadConfig threadConfig) {
public ExecutorInterceptor(List<Camera> cameras, ThreadConfig threadConfig) {
this.cameras = cameras == null ? new Camera[0] : cameras.toArray(new Camera[0]);
this.threadConfig = threadConfig;
this.callableField = getCallableField();
}

private Field getCallableField() {
Field result = null;
try {
result = FutureTask.class.getDeclaredField(FIELD_CALLABLE);
result.setAccessible(true);
} catch (NoSuchFieldException ignore) {
}
return result;
}

private boolean isExcluded(Object task) {
return task != null && excludes.computeIfAbsent(task.getClass(), c -> threadConfig.isExcludedTask(c.getName()));
}

@Override
public void onEnter(ExecutableContext ctx) {
Object target = ctx.getTarget();
String name = target.getClass().getSimpleName();
Object[] arguments = ctx.getArguments();
if (arguments == null || arguments.length == 0 || cameras.length == 0) {
return;
} else if (target instanceof ThreadPoolExecutor
&& isExcluded(((ThreadPoolExecutor) target).getThreadFactory())) {
// filter sgm thread pool before unwrap
if (arguments == null
|| arguments.length == 0
|| cameras.length == 0
|| isExcludedExecutor(target)
|| target instanceof ThreadPoolExecutor
&& isExcludedThreadFactory(((ThreadPoolExecutor) target).getThreadFactory())) {
return;
}
Object argument = arguments[0];
Expand All @@ -86,7 +74,7 @@ && isExcluded(((ThreadPoolExecutor) target).getThreadFactory())) {
return;
} else if (unwrapped instanceof AbstractThreadAdapter) {
return;
} else if (isExcluded(unwrapped)) {
} else if (isExcludedTask(unwrapped)) {
return;
}

Expand All @@ -103,6 +91,88 @@ && isExcluded(((ThreadPoolExecutor) target).getThreadFactory())) {
}
}

/**
* Checks if the given thread factory is excluded by its class type.
* The result is cached in the {@code excludes} map to avoid repeated computations.
*
* @param factory The thread factory object to check.
* @return {@code true} if the thread factory is excluded, {@code false} otherwise.
*/
private boolean isExcludedThreadFactory(ThreadFactory factory) {
return factory != null && excludes.computeIfAbsent(factory.getClass(), this::isExcludedThreadFactoryType);
}

/**
* Checks if the given thread factory type is excluded.
* If the thread factory is excluded, logs an informational message.
*
* @param type The class type of the thread factory to check.
* @return {@code true} if the thread factory type is excluded, {@code false} otherwise.
*/
private boolean isExcludedThreadFactoryType(Class<?> type) {
if (threadConfig.isExcludedTask(type)) {
logger.info("Disable transmission in threads of factory " + type.getName());
return true;
}
logger.info("Enable transmission in threads of factory " + type.getName());
return false;
}


/**
* Checks if the given task is excluded by its class type.
* The result is cached in the {@code excludes} map to avoid repeated computations.
*
* @param task The task object to check.
* @return {@code true} if the task is excluded, {@code false} otherwise.
*/
private boolean isExcludedTask(Object task) {
return task != null && excludes.computeIfAbsent(task.getClass(), this::isExcludeTaskType);
}

/**
* Checks if the given task type is excluded.
* If the task is excluded, logs an informational message.
*
* @param type The class type of the task to check.
* @return {@code true} if the task type is excluded, {@code false} otherwise.
*/
private boolean isExcludeTaskType(Class<?> type) {
if (threadConfig.isExcludedTask(type)) {
logger.info("Disable transmission in task " + type.getName());
return true;
}
logger.info("Enable transmission in task " + type.getName());
return false;
}

/**
* Checks if the given executor is excluded by its class type.
* The result is cached in the {@code excludes} map to avoid repeated computations.
*
* @param executor The executor object to check.
* @return {@code true} if the executor is excluded, {@code false} otherwise.
*/
private boolean isExcludedExecutor(Object executor) {
return executor != null && excludes.computeIfAbsent(executor.getClass(), this::isExcludeExecutorType);
}

/**
* Checks if the given executor type is excluded.
* If the executor is excluded, logs an informational message.
*
* @param type The class type of the executor to check.
* @return {@code true} if the executor type is excluded, {@code false} otherwise.
*/
private boolean isExcludeExecutorType(Class<?> type) {
if (threadConfig.isExcludedExecutor(type)) {
logger.info("Disable transmission in executor " + type.getName());
return true;
}
logger.info("Enable transmission in executor " + type.getName());
return false;
}

/**
* Unwraps the provided argument object to retrieve its underlying value. If the argument is an instance
* of {@link AbstractThreadAdapter}, it is returned directly. If the argument is an instance of {@link FutureTask},
Expand All @@ -127,4 +197,20 @@ private Object unwrap(Object argument) {
return argument;
}

/**
* Retrieves the {@link Field} object representing the {@code callable} field in the {@link FutureTask} class.
* This method uses reflection to access the private field and makes it accessible.
*
* @return The {@link Field} object representing the {@code callable} field, or {@code null} if the field is not found.
*/
private static Field getCallableField() {
Field result = null;
try {
result = FutureTask.class.getDeclaredField(FIELD_CALLABLE);
result.setAccessible(true);
} catch (NoSuchFieldException ignore) {
}
return result;
}

}
Loading