From 60ecce261f8795dd9f828227ac06bc62a64631e9 Mon Sep 17 00:00:00 2001 From: hexiaofeng Date: Thu, 16 May 2024 14:13:28 +0800 Subject: [PATCH] 1.Enhance the thread pool to filter out specified jobs 2.Fix the issue where jobs submitted to the thread pool are enhanced twice. --- .../src/main/assembly/config/config.yaml | 4 +- .../thread/config/ThreadConfig.java | 20 +++++-- .../thread/definition/ExecutorDefinition.java | 4 +- .../ScheduledExecutorServiceDefinition.java | 4 +- .../interceptor/ExecutorInterceptor.java | 53 +++++++++++++++++-- 5 files changed, 73 insertions(+), 12 deletions(-) diff --git a/joylive-package/src/main/assembly/config/config.yaml b/joylive-package/src/main/assembly/config/config.yaml index ea59adeb3..b2dcfc38b 100644 --- a/joylive-package/src/main/assembly/config/config.yaml +++ b/joylive-package/src/main/assembly/config/config.yaml @@ -118,7 +118,7 @@ agent: groupExpression: ${unit}-${cell}-${group} transmission: thread: - excludes: + excludeExecutors: - org.apache.dubbo.common.threadpool.ThreadlessExecutor - org.apache.tomcat.util.threads.ThreadPoolExecutor - org.apache.tomcat.util.threads.ScheduledThreadPoolExecutor @@ -161,6 +161,8 @@ agent: - com.alibaba.nacos.shaded.io.grpc.stub.ClientCalls.ThreadlessExecutor - com.alibaba.nacos.shaded.io.grpc.SynchronizationContext - com.alibaba.nacos.shaded.com.google.common.util.concurrent.DirectExecutor + excludeTasks: + - com.alibaba.nacos.shaded.io.grpc.internal.DnsNameResolver.Resolve counter: gateway: true service: true diff --git a/joylive-plugin/joylive-transmission/joylive-transmission-thread/src/main/java/com/jd/live/agent/plugin/transmission/thread/config/ThreadConfig.java b/joylive-plugin/joylive-transmission/joylive-transmission-thread/src/main/java/com/jd/live/agent/plugin/transmission/thread/config/ThreadConfig.java index dcb1cfe53..38b11c1db 100644 --- a/joylive-plugin/joylive-transmission/joylive-transmission-thread/src/main/java/com/jd/live/agent/plugin/transmission/thread/config/ThreadConfig.java +++ b/joylive-plugin/joylive-transmission/joylive-transmission-thread/src/main/java/com/jd/live/agent/plugin/transmission/thread/config/ThreadConfig.java @@ -40,6 +40,8 @@ public class ThreadConfig { "io.netty.channel.MultithreadEventLoopGroup", "io.netty.channel.nio.NioEventLoop", "io.netty.channel.SingleThreadEventLoop", + "io.netty.channel.kqueue.KQueueEventLoopGroup", + "io.netty.channel.kqueue.KQueueEventLoop", "io.netty.util.concurrent.MultithreadEventExecutorGroup", "io.netty.util.concurrent.AbstractEventExecutorGroup", "io.netty.util.concurrent.ThreadPerTaskExecutor", @@ -49,6 +51,8 @@ public class ThreadConfig { "io.netty.util.concurrent.SingleThreadEventExecutor", "io.netty.util.concurrent.DefaultEventExecutor", "io.netty.util.internal.ThreadExecutorMap$1", + "reactor.core.scheduler.BoundedElasticScheduler$BoundedScheduledExecutorService", + "reactor.netty.resources.ColocatedEventLoopGroup", "com.alibaba.nacos.shaded.io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup", "com.alibaba.nacos.shaded.io.grpc.netty.shaded.io.netty.channel.MultithreadEventLoopGroup", "com.alibaba.nacos.shaded.io.grpc.netty.shaded.io.netty.util.concurrent.MultithreadEventExecutorGroup", @@ -70,11 +74,19 @@ public class ThreadConfig { "com.alibaba.nacos.shaded.com.google.common.util.concurrent.DirectExecutor" }; - private Set excludes = new HashSet<>(Arrays.asList(EXCLUDE_EXECUTOR_CLASSES)); + private static final String[] EXCLUDE_TASK_CLASSES = new String[]{ + "com.alibaba.nacos.shaded.io.grpc.internal.DnsNameResolver.Resolve", + }; - public boolean exclude(String name) { - return name == null || excludes.contains(name); - } + private Set excludeExecutors = new HashSet<>(Arrays.asList(EXCLUDE_EXECUTOR_CLASSES)); + private Set excludeTasks = new HashSet<>(Arrays.asList(EXCLUDE_TASK_CLASSES)); + public boolean isExcludedExecutor(String name) { + return name == null || excludeExecutors.contains(name); + } + + public boolean isExcludedTask(String name) { + return name == null || excludeTasks.contains(name); + } } diff --git a/joylive-plugin/joylive-transmission/joylive-transmission-thread/src/main/java/com/jd/live/agent/plugin/transmission/thread/definition/ExecutorDefinition.java b/joylive-plugin/joylive-transmission/joylive-transmission-thread/src/main/java/com/jd/live/agent/plugin/transmission/thread/definition/ExecutorDefinition.java index ed759b9a8..137cc478c 100644 --- a/joylive-plugin/joylive-transmission/joylive-transmission-thread/src/main/java/com/jd/live/agent/plugin/transmission/thread/definition/ExecutorDefinition.java +++ b/joylive-plugin/joylive-transmission/joylive-transmission-thread/src/main/java/com/jd/live/agent/plugin/transmission/thread/definition/ExecutorDefinition.java @@ -65,9 +65,9 @@ public class ExecutorDefinition extends PluginDefinitionAdapter { public ExecutorDefinition() { this.matcher = () -> MatcherBuilder.isImplement(TYPE_EXECUTOR). - and(MatcherBuilder.not(MatcherBuilder.in(threadConfig.getExcludes()))); + and(MatcherBuilder.not(MatcherBuilder.in(threadConfig.getExcludeExecutors()))); this.interceptors = new InterceptorDefinition[]{ new InterceptorDefinitionAdapter(MatcherBuilder.in(METHODS).and(MatcherBuilder.isPublic()), - () -> new ExecutorInterceptor(handlers))}; + () -> new ExecutorInterceptor(handlers, threadConfig))}; } } \ No newline at end of file diff --git a/joylive-plugin/joylive-transmission/joylive-transmission-thread/src/main/java/com/jd/live/agent/plugin/transmission/thread/definition/ScheduledExecutorServiceDefinition.java b/joylive-plugin/joylive-transmission/joylive-transmission-thread/src/main/java/com/jd/live/agent/plugin/transmission/thread/definition/ScheduledExecutorServiceDefinition.java index 5192c2419..51f825349 100644 --- a/joylive-plugin/joylive-transmission/joylive-transmission-thread/src/main/java/com/jd/live/agent/plugin/transmission/thread/definition/ScheduledExecutorServiceDefinition.java +++ b/joylive-plugin/joylive-transmission/joylive-transmission-thread/src/main/java/com/jd/live/agent/plugin/transmission/thread/definition/ScheduledExecutorServiceDefinition.java @@ -68,10 +68,10 @@ public class ScheduledExecutorServiceDefinition extends PluginDefinitionAdapter public ScheduledExecutorServiceDefinition() { this.matcher = () -> MatcherBuilder.isImplement(TYPE_SCHEDULED_EXECUTOR_SERVICE). - and(MatcherBuilder.not(MatcherBuilder.in(threadConfig.getExcludes()))); + and(MatcherBuilder.not(MatcherBuilder.in(threadConfig.getExcludeExecutors()))); this.interceptors = new InterceptorDefinition[]{ new InterceptorDefinitionAdapter(MatcherBuilder.in(METHODS).and(MatcherBuilder.isPublic()), - () -> new ExecutorInterceptor(handlers))}; + () -> new ExecutorInterceptor(handlers, threadConfig))}; } } \ No newline at end of file diff --git a/joylive-plugin/joylive-transmission/joylive-transmission-thread/src/main/java/com/jd/live/agent/plugin/transmission/thread/interceptor/ExecutorInterceptor.java b/joylive-plugin/joylive-transmission/joylive-transmission-thread/src/main/java/com/jd/live/agent/plugin/transmission/thread/interceptor/ExecutorInterceptor.java index bafb5238b..a5c98d8d8 100644 --- a/joylive-plugin/joylive-transmission/joylive-transmission-thread/src/main/java/com/jd/live/agent/plugin/transmission/thread/interceptor/ExecutorInterceptor.java +++ b/joylive-plugin/joylive-transmission/joylive-transmission-thread/src/main/java/com/jd/live/agent/plugin/transmission/thread/interceptor/ExecutorInterceptor.java @@ -23,19 +23,37 @@ import com.jd.live.agent.plugin.transmission.thread.adapter.CallableAdapter; import com.jd.live.agent.plugin.transmission.thread.adapter.RunnableAdapter; import com.jd.live.agent.plugin.transmission.thread.adapter.RunnableAndCallableAdapter; +import com.jd.live.agent.plugin.transmission.thread.config.ThreadConfig; +import java.lang.reflect.Field; import java.util.List; import java.util.concurrent.Callable; +import java.util.concurrent.FutureTask; /** * ExecutorInterceptor */ public class ExecutorInterceptor extends InterceptorAdaptor { + private static final String FIELD_CALLABLE = "callable"; + + private static Field CALLABLE_FIELD; + private final Camera[] cameras; - public ExecutorInterceptor(List cameras) { + private final ThreadConfig threadConfig; + + static { + try { + CALLABLE_FIELD = FutureTask.class.getDeclaredField(FIELD_CALLABLE); + CALLABLE_FIELD.setAccessible(true); + } catch (NoSuchFieldException ignore) { + } + } + + public ExecutorInterceptor(List cameras, ThreadConfig threadConfig) { this.cameras = cameras == null ? new Camera[0] : cameras.toArray(new Camera[0]); + this.threadConfig = threadConfig; } @Override @@ -47,7 +65,14 @@ public void onEnter(ExecutableContext ctx) { return; } Object argument = arguments[0]; - if (argument == null || argument instanceof AbstractThreadAdapter) { + if (argument == null) { + return; + } + Object unwrapped = unwrap(argument); + if (unwrapped instanceof AbstractThreadAdapter) { + return; + } + if (threadConfig.isExcludedTask(unwrapped.getClass().getName())) { return; } @@ -55,7 +80,6 @@ public void onEnter(ExecutableContext ctx) { for (int i = 0; i < cameras.length; i++) { snapshots[i] = new Snapshot(cameras[i], cameras[i].snapshot()); } - if (argument instanceof Runnable && argument instanceof Callable) { arguments[0] = new RunnableAndCallableAdapter<>(name, (Runnable) argument, (Callable) argument, snapshots); } else if (argument instanceof Runnable) { @@ -65,4 +89,27 @@ public void onEnter(ExecutableContext ctx) { } } + /** + * Unwraps the provided argument object to retrieve its underlying value. If the argument is an instance + * of {@link AbstractThreadAdapter}, it is returned directly. If the argument is an instance of {@link FutureTask}, + * the method attempts to extract the 'callable' field from it using reflection. If the extraction fails, + * the exception is ignored, and the original argument is returned. + * + * @param argument the object to unwrap, which could be an instance of {@link AbstractThreadAdapter} or + * {@link FutureTask} or any other Object. + * @return the unwrapped object if unwrapping is possible, otherwise the original object. + */ + private Object unwrap(Object argument) { + if (argument instanceof AbstractThreadAdapter) { + return argument; + } + if (argument instanceof FutureTask && CALLABLE_FIELD != null) { + try { + return CALLABLE_FIELD.get(argument); + } catch (Exception ignore) { + } + } + return argument; + } + } \ No newline at end of file