diff --git a/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/thread/NamedThreadFactory.java b/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/thread/NamedThreadFactory.java new file mode 100644 index 000000000..f4689ed1b --- /dev/null +++ b/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/thread/NamedThreadFactory.java @@ -0,0 +1,112 @@ +/* + * Copyright © ${year} ${owner} (${email}) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jd.live.agent.core.util.thread; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A factory class for creating named threads or thread pools. This class implements the {@link ThreadFactory} interface, + * allowing for customization of thread creation. Threads or thread pools created using this factory can be easily identified + * by their names, which is particularly useful for debugging or management purposes. + */ +public class NamedThreadFactory implements ThreadFactory { + + /** + * A global counter for all thread pools created by instances of this class. This counter ensures that each thread pool + * has a unique identifier. + */ + protected static final AtomicInteger POOL_COUNTER = new AtomicInteger(); + + /** + * A counter for the threads created by this particular instance of {@code NamedThreadFactory}. This helps in assigning + * a unique identifier to each thread within the pool. + */ + protected final AtomicInteger threadCounter = new AtomicInteger(0); + + /** + * The thread group to which threads created by this factory will belong. Using a thread group allows for easier management + * of threads, such as setting a maximum priority for all threads in the group. + */ + protected final ThreadGroup group; + + /** + * The prefix for the names of threads created by this factory. This prefix is used to easily identify threads belonging + * to a particular pool or purpose. + */ + protected final String namePrefix; + + /** + * Indicates whether threads created by this factory should be daemon threads. Daemon threads are terminated by the JVM + * when all non-daemon threads finish execution. Setting this to true is useful for background tasks that should not prevent + * the application from exiting. + */ + protected final boolean isDaemon; + + /** + * Constructs a new {@code NamedThreadFactory} instance with the specified prefix for thread names. + * + * @param prefix The prefix to be used in the names of threads created by this factory. + */ + public NamedThreadFactory(String prefix) { + this(null, prefix, true); + } + + /** + * Constructs a new {@code NamedThreadFactory} instance with the specified prefix for thread names and daemon status. + * + * @param prefix The prefix to be used in the names of threads created by this factory. + * @param daemon Indicates whether the threads created by this factory should be daemon threads. + */ + public NamedThreadFactory(String prefix, boolean daemon) { + this(null, prefix, daemon); + } + + /** + * Constructs a new {@code NamedThreadFactory} instance with the specified thread group, prefix for thread names, and + * daemon status. + * + * @param group The thread group to which threads created by this factory will belong. If {@code null}, the factory + * uses the current thread's {@link ThreadGroup}. + * @param prefix The prefix to be used in the names of threads created by this factory. + * @param daemon Indicates whether the threads created by this factory should be daemon threads. + */ + public NamedThreadFactory(ThreadGroup group, String prefix, boolean daemon) { + this.group = group == null ? Thread.currentThread().getThreadGroup() : group; + namePrefix = prefix + "-" + POOL_COUNTER.getAndIncrement() + "-T-"; + isDaemon = daemon; + } + + /** + * Creates a new {@link Thread} with the specified {@link Runnable} task and settings defined by this factory. The thread + * will belong to the thread group, have the name prefix, and be a daemon thread as specified during the factory's + * construction. + * + * @param r The {@link Runnable} task to be executed by the new thread. + * @return A new {@link Thread} configured according to this factory's settings. + */ + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, namePrefix + threadCounter.incrementAndGet(), 0); + t.setDaemon(isDaemon); + if (t.getPriority() != Thread.NORM_PRIORITY) { + t.setPriority(Thread.NORM_PRIORITY); + } + return t; + } + +} + diff --git a/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/time/DelayTask.java b/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/time/DelayTask.java new file mode 100644 index 000000000..3e842e57a --- /dev/null +++ b/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/time/DelayTask.java @@ -0,0 +1,26 @@ +/* + * Copyright © ${year} ${owner} (${email}) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jd.live.agent.core.util.time; + +/** + * Represents a task that is scheduled to be executed after a certain delay. This interface extends {@link TimeTask}, + * inheriting its methods for retrieving the task's name and its scheduled execution time. The execution time in the context + * of a {@code DelayTask} is understood to be the time at which the delay period ends and the task is eligible for execution. + */ +public interface DelayTask extends TimeTask { + +} + diff --git a/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/time/DelegateTask.java b/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/time/DelegateTask.java new file mode 100644 index 000000000..436c03ceb --- /dev/null +++ b/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/time/DelegateTask.java @@ -0,0 +1,70 @@ +/* + * Copyright © ${year} ${owner} (${email}) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jd.live.agent.core.util.time; + +/** + * A {@code DelegateTask} is a concrete implementation of the {@code TimeTask} interface, encapsulating a task + * with a specific execution time and a name. It delegates the execution to the {@code Runnable} provided at + * construction time. This class can be used to wrap a {@code Runnable} with additional timing and identification + * properties, making it suitable for scheduled execution in a timing framework. + */ +public class DelegateTask implements TimeTask { + /** + * The name of the task, used for identification purposes. + */ + private final String name; + + /** + * The scheduled execution time for the task, represented as a timestamp. + */ + private final long time; + + /** + * The {@code Runnable} containing the code to be executed when the task runs. + */ + private final Runnable runnable; + + /** + * Constructs a new {@code DelegateTask} with the specified name, time, and executable code. + * + * @param name The name of the task. + * @param time The execution time of the task as a timestamp. + * @param runnable The {@code Runnable} to be executed. + */ + public DelegateTask(final String name, final long time, final Runnable runnable) { + this.name = name; + this.time = time; + this.runnable = runnable; + } + + @Override + public String getName() { + return name; + } + + @Override + public long getTime() { + return time; + } + + @Override + public void run() { + if (runnable != null) { + runnable.run(); + } + } +} + diff --git a/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/time/TimeSlot.java b/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/time/TimeSlot.java new file mode 100644 index 000000000..2580c84ba --- /dev/null +++ b/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/time/TimeSlot.java @@ -0,0 +1,127 @@ +/* + * Copyright © ${year} ${owner} (${email}) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jd.live.agent.core.util.time; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +/** + * Represents a time slot within a timing wheel or similar scheduling system. + * A time slot is responsible for holding and managing {@link TimeWork} tasks that are scheduled to be executed + * at a specific expiration time. This class provides mechanisms to add, remove, and flush tasks when their + * execution time has arrived. + */ +public class TimeSlot implements Delayed { + + /** + * Indicates that the task is being added at the head of the list within the slot. + */ + public static final int HEAD = 1; + + /** + * Indicates that the task is being added at the tail of the list within the slot. + */ + public static final int TAIL = 2; + + /** + * The expiration time of the time slot. Tasks within this slot are due to be executed when the current + * time surpasses this expiration time. + */ + protected long expiration = -1L; + + /** + * The root node of a doubly linked list that holds the tasks. This is a sentinel node to simplify the + * add and remove operations by eliminating the need to check for null. + */ + private final TimeWork root = new TimeWork("root", -1L, null, null, null); + + /** + * Constructs a new {@code TimeSlot} instance. Initializes the doubly linked list with the root node + * pointing to itself, indicating an empty list. + */ + public TimeSlot() { + root.pre = root; + root.next = root; + } + + /** + * Adds a new {@link TimeWork} task to this time slot and sets a new expiration time for the slot if necessary. + * + * @param timeWork The task to be added. + * @param expire The new expiration time for the slot. + * @return An integer indicating whether the task was added at the head ({@link #HEAD}) or tail ({@link #TAIL}) of the list. + */ + protected int add(final TimeWork timeWork, final long expire) { + timeWork.timeSlot = this; + TimeWork tail = root.pre; + timeWork.next = root; + timeWork.pre = tail; + tail.next = timeWork; + root.pre = timeWork; + if (expiration == -1L) { + expiration = expire; + return HEAD; + } + return TAIL; + } + + /** + * Removes a {@link TimeWork} task from this time slot. This operation effectively detaches the task from + * the doubly linked list within the slot. + * + * @param timeWork The task to be removed. + */ + protected void remove(final TimeWork timeWork) { + timeWork.next.pre = timeWork.pre; + timeWork.pre.next = timeWork.next; + timeWork.timeSlot = null; + timeWork.next = null; + timeWork.pre = null; + } + + /** + * Flushes this time slot by executing all tasks within it. This method is called when the time slot has + * expired. Each task is removed from the slot and then passed to the provided consumer for execution. + * + * @param consumer A {@link Consumer} that takes a {@link TimeWork} task and executes it. + */ + protected void flush(final Consumer consumer) { + List ts = new LinkedList<>(); + TimeWork timeWork = root.next; + while (timeWork != root) { + remove(timeWork); + ts.add(timeWork); + timeWork = root.next; + } + expiration = -1L; + ts.forEach(consumer); + } + + @Override + public long getDelay(final TimeUnit unit) { + long delayMs = expiration - System.currentTimeMillis(); + return Math.max(0, unit.convert(delayMs, TimeUnit.MILLISECONDS)); + } + + @Override + public int compareTo(final Delayed o) { + return o instanceof TimeSlot ? Long.compare(expiration, ((TimeSlot) o).expiration) : 0; + } +} + diff --git a/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/time/TimeTask.java b/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/time/TimeTask.java new file mode 100644 index 000000000..14f9a83d7 --- /dev/null +++ b/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/time/TimeTask.java @@ -0,0 +1,38 @@ +/* + * Copyright © ${year} ${owner} (${email}) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jd.live.agent.core.util.time; + +/** + * Represents a task that can be executed at a specific time. + */ +public interface TimeTask extends Runnable { + + /** + * Retrieves the name of the task. + * + * @return A {@code String} representing the name of the task. + */ + String getName(); + + /** + * Retrieves the scheduled execution time for the task. The time is expected to be a specific + * point in time, represented as a long value, such as a timestamp. + * + * @return A {@code long} value representing the scheduled execution time of the task. + */ + long getTime(); +} + diff --git a/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/time/TimeWheel.java b/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/time/TimeWheel.java new file mode 100644 index 000000000..b21d85706 --- /dev/null +++ b/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/time/TimeWheel.java @@ -0,0 +1,156 @@ +/* + * Copyright © ${year} ${owner} (${email}) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jd.live.agent.core.util.time; + +import java.util.concurrent.DelayQueue; + +/** + * A time wheel is a data structure used for managing tasks that are to be executed after a certain delay. + * It consists of a circular list of time slots and provides efficient means to schedule tasks, advance time, + * and trigger task execution. This is particularly useful for implementing timer mechanisms in applications + * like schedulers, task managers, or any system that needs to handle timed events. + */ +public class TimeWheel { + /** + * The duration of a single tick in the time wheel. + */ + protected final long tickTime; + + /** + * The number of ticks in the time wheel. + */ + private final int ticks; + + /** + * The total duration covered by the time wheel. + */ + private final long duration; + + /** + * The current time, which is a multiple of {@code tickTime}. + */ + protected long now; + + /** + * The current position in the time slots array. + */ + private int index; + + /** + * The delay queue used to manage and trigger the execution of tasks when their delay has passed. + */ + private final DelayQueue queue; + + /** + * An array of time slots that hold the tasks. + */ + private final TimeSlot[] timeSlots; + + /** + * A reference to the next layer of the time wheel, which is used for scheduling tasks beyond the current time wheel's duration. + */ + private TimeWheel next; + + /** + * Constructs a new {@code TimeWheel} instance with the specified parameters. + * + * @param tickTime The duration of each tick in milliseconds. + * @param ticks The number of ticks the time wheel should have. + * @param now The current time in milliseconds. + * @param queue The delay queue used for managing time slots. + */ + public TimeWheel(final long tickTime, final int ticks, final long now, final DelayQueue queue) { + this.tickTime = tickTime; + this.ticks = ticks; + this.duration = ticks * tickTime; + this.timeSlots = new TimeSlot[ticks]; + // Align the current time to be a multiple of tickTime + this.now = now - (now % tickTime); + this.queue = queue; + for (int i = 0; i < ticks; i++) { + timeSlots[i] = new TimeSlot(); + } + } + + /** + * Creates or retrieves the next layer of the time wheel. + * + * @return The next layer of the time wheel. + */ + protected TimeWheel getNext() { + if (next == null) { + next = new TimeWheel(duration, ticks, now, queue); + } + return next; + } + + /** + * Calculates the time point at least one tick ahead of the current time. + * + * @param time The reference time. + * @return The time point at least one tick in the future. + */ + public long getLeastOneTick(final long time) { + long result = System.currentTimeMillis() + tickTime; + return Math.max(time, result); + } + + /** + * Adds a task to the appropriate time slot in the time wheel. + * + * @param timeWork The task to be scheduled. + * @return {@code true} if the task was successfully added, {@code false} if the task is already expired. + */ + public boolean add(final TimeWork timeWork) { + long time = timeWork.time - now; + if (time < tickTime) { + // If the task is expired, it should be executed immediately and not added to the time wheel + return false; + } else if (time < duration) { + // If the task falls within this time wheel's duration, add it to the appropriate time slot + int count = (int) (time / tickTime); + TimeSlot timeSlot = timeSlots[(count + index) % ticks]; + // Add the task to the slot + if (timeSlot.add(timeWork, now + count * tickTime) == TimeSlot.HEAD) { + queue.offer(timeSlot); + } + return true; + } else { + // If the task is beyond this time wheel's duration, add it to the next layer + return getNext().add(timeWork); + } + } + + /** + * Advances the time wheel to a new timestamp, potentially triggering the execution of tasks that have reached their expiration. + * + * @param timestamp The new timestamp to advance to. + */ + public void advance(final long timestamp) { + if (timestamp >= now + tickTime) { + now = timestamp - (timestamp % tickTime); + index++; + if (index >= ticks) { + index = 0; + } + if (next != null) { + // Advance the time in the next layer of the time wheel + next.advance(timestamp); + } + } + } +} + diff --git a/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/time/TimeWork.java b/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/time/TimeWork.java new file mode 100644 index 000000000..e8006d5b5 --- /dev/null +++ b/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/time/TimeWork.java @@ -0,0 +1,158 @@ +/* + * Copyright © ${year} ${owner} (${email}) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jd.live.agent.core.util.time; + +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.function.Consumer; + +/** + * Represents a scheduled task that can be executed at a specified time, cancelled, or checked for expiration. + */ +public class TimeWork implements Runnable, Timeout { + /** + * Initial state indicating the task has not been executed or cancelled. + */ + protected static final int INIT = 0; + + /** + * State indicating the task has been cancelled. + */ + protected static final int CANCELLED = 1; + + /** + * State indicating the task has been executed or has expired. + */ + protected static final int EXPIRED = 2; + + /** + * Atomic updater to safely update the state of the task across multiple threads. + */ + protected static final AtomicIntegerFieldUpdater STATE_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(TimeWork.class, "state"); + + /** + * The name of the task for identification purposes. + */ + private final String name; + + /** + * The scheduled execution time of the task. + */ + protected final long time; + + /** + * The {@code Runnable} task to be executed. + */ + private final Runnable runnable; + + /** + * A consumer that is called after the task has been run. + */ + private final Consumer afterRun; + + /** + * A consumer that is called if the task is cancelled. + */ + private final Consumer afterCancel; + + /** + * The {@code TimeSlot} in which this task is scheduled. Used for managing task execution timing. + */ + protected TimeSlot timeSlot; + + /** + * Reference to the next {@code TimeWork} node in the linked list, if any. + */ + protected TimeWork next; + + /** + * Reference to the previous {@code TimeWork} node in the linked list, if any. + */ + protected TimeWork pre; + + /** + * The current state of the task, indicating whether it is initialized, cancelled, or expired. + */ + protected volatile int state = INIT; + + /** + * Constructs a new {@code TimeWork} instance with the specified properties. + * + * @param name The name of the task. + * @param time The scheduled execution time of the task. + * @param runnable The {@code Runnable} task to be executed. + * @param afterRun A consumer that is called after the task has been run. + * @param afterCancel A consumer that is called if the task is cancelled. + */ + public TimeWork(final String name, final long time, final Runnable runnable, + final Consumer afterRun, + final Consumer afterCancel) { + this.time = time; + this.name = name; + this.runnable = runnable; + this.afterRun = afterRun; + this.afterCancel = afterCancel; + this.timeSlot = null; + this.next = null; + this.pre = null; + } + + @Override + public String toString() { + return name == null || name.isEmpty() ? super.toString() : name; + } + + @Override + public void run() { + if (STATE_UPDATER.compareAndSet(this, INIT, EXPIRED)) { + runnable.run(); + if (afterRun != null) { + afterRun.accept(this); + } + } + } + + @Override + public boolean isExpired() { + return state == EXPIRED; + } + + @Override + public boolean isCancelled() { + return state == CANCELLED; + } + + @Override + public boolean cancel() { + if (STATE_UPDATER.compareAndSet(this, INIT, CANCELLED)) { + if (afterCancel != null) { + afterCancel.accept(this); + } + return true; + } + return false; + } + + /** + * Removes this task from its associated {@code TimeSlot}, if any. + */ + void remove() { + if (timeSlot != null) { + timeSlot.remove(this); + } + } +} + diff --git a/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/time/Timeout.java b/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/time/Timeout.java new file mode 100644 index 000000000..0b21c579a --- /dev/null +++ b/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/time/Timeout.java @@ -0,0 +1,45 @@ +/* + * Copyright © ${year} ${owner} (${email}) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jd.live.agent.core.util.time; + +/** + * Represents an object that has a timeout behavior, allowing for checks on its expiration and cancellation status. + */ +public interface Timeout { + + /** + * Determines if the object has exceeded its timeout period. + * + * @return {@code true} if the object has expired; {@code false} otherwise. + */ + boolean isExpired(); + + /** + * Checks if the operation associated with this object has been voluntarily cancelled. + * + * @return {@code true} if the operation has been cancelled; {@code false} otherwise. + */ + boolean isCancelled(); + + /** + * Attempts to cancel the operation associated with this object. + * + * @return {@code true} if the operation was successfully cancelled; {@code false} if it could not be cancelled, + * for example, because it has already been executed, expired, or previously cancelled. + */ + boolean cancel(); +} + diff --git a/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/time/Timer.java b/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/time/Timer.java new file mode 100644 index 000000000..1d4bd73cc --- /dev/null +++ b/joylive-core/joylive-core-api/src/main/java/com/jd/live/agent/core/util/time/Timer.java @@ -0,0 +1,310 @@ +/* + * Copyright © ${year} ${owner} (${email}) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jd.live.agent.core.util.time; + +import com.jd.live.agent.core.util.thread.NamedThreadFactory; + +import java.util.Queue; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +/** + * A Timer is a scheduler that manages timed tasks using a time wheel mechanism. It can schedule tasks to be + * executed after a specified delay or at a scheduled time. The Timer is responsible for managing the lifecycle + * of tasks, including their execution, cancellation, and any actions to be taken before or after running tasks. + * It implements {@link AutoCloseable} to provide a mechanism to release resources when the timer is no longer needed. + */ +public class Timer implements AutoCloseable { + + /** + * The prefix used for naming threads created by this timer. + */ + private final String prefix; + + /** + * The number of worker threads that execute expired tasks. + */ + private final int workerThreads; + + /** + * The maximum number of tasks that can be queued for execution at any given time. + */ + private final long maxTasks; + + /** + * A consumer that is called after a task has been executed. + */ + private final Consumer afterRun; + + /** + * A consumer that is called after a task has been cancelled. + */ + private final Consumer afterCancel; + + /** + * A consumer that is called before a task is executed. + */ + private final Consumer beforeRun; + + /** + * The delay queue that holds time slots until they are ready to be processed. + */ + private final DelayQueue queue; + + /** + * The underlying time wheel that manages scheduling of tasks. + */ + private final TimeWheel timeWheel; + + /** + * The pool of worker threads that execute expired tasks. + */ + private ExecutorService workerPool; + + /** + * The pool of threads that poll the delay queue for expired tasks. + */ + private ExecutorService bossPool; + + /** + * A queue of tasks that have been cancelled. + */ + private final Queue cancels = new ConcurrentLinkedQueue<>(); + + /** + * A queue of tasks that are pending to be scheduled onto the time wheel. + */ + private final Queue flying = new ConcurrentLinkedQueue<>(); + + /** + * A count of the tasks that are currently pending execution. + */ + private final AtomicLong tasks = new AtomicLong(0); + + /** + * A flag indicating whether the timer has been started. + */ + private final AtomicBoolean started = new AtomicBoolean(false); + + /** + * Constructs a new Timer with the specified tick time, number of ticks, and number of worker threads. + * + * @param tickTime The time in milliseconds that each tick represents. + * @param ticks The number of ticks in the time wheel. + * @param workerThreads The number of worker threads to execute tasks. + */ + public Timer(final long tickTime, final int ticks, final int workerThreads) { + this(null, tickTime, ticks, workerThreads, 0); + } + + /** + * Constructs a new Timer with the specified name, tick time, number of ticks, and number of worker threads. + * + * @param name The name prefix for threads created by this timer. + * @param tickTime The time in milliseconds that each tick represents. + * @param ticks The number of ticks in the time wheel. + * @param workerThreads The number of worker threads to execute tasks. + */ + public Timer(final String name, final long tickTime, final int ticks, final int workerThreads) { + this(name, tickTime, ticks, workerThreads, 0); + } + + /** + * Constructs a new Timer with the specified name, tick time, number of ticks, number of worker threads, + * and maximum number of pending tasks. + * + * @param name The name prefix for threads created by this timer. + * @param tickTime The time in milliseconds that each tick represents. + * @param ticks The number of ticks in the time wheel. + * @param workerThreads The number of worker threads to execute tasks. + * @param maxTasks The maximum number of tasks that can be pending before being rejected. + */ + public Timer(final String name, final long tickTime, final int ticks, final int workerThreads, final long maxTasks) { + if (tickTime <= 0) { + throw new IllegalArgumentException("tickTime must be greater than 0"); + } else if (ticks <= 0) { + throw new IllegalArgumentException("ticks must be greater than 0"); + } else if (workerThreads <= 0) { + throw new IllegalArgumentException("workerThreads must be greater than 0"); + } + this.prefix = name == null || name.isEmpty() ? "timer" : name; + this.workerThreads = workerThreads; + this.maxTasks = maxTasks; + this.afterRun = o -> tasks.decrementAndGet(); + this.afterCancel = this::cancel; + this.beforeRun = this::supply; + this.queue = new DelayQueue<>(); + this.timeWheel = new TimeWheel(tickTime, ticks, System.currentTimeMillis(), queue); + } + + /** + * Starts the timer, initializing and starting the worker and boss thread pools. + */ + public void start() { + if (started.compareAndSet(false, true)) { + this.workerPool = Executors.newFixedThreadPool(workerThreads, new NamedThreadFactory(prefix + "-worker", true)); + this.bossPool = Executors.newFixedThreadPool(1, new NamedThreadFactory(prefix + "-boss", true)); + this.bossPool.submit(() -> { + while (started.get()) { + try { + //拉取一跳时间 + TimeSlot timeSlot = queue.poll(timeWheel.tickTime, TimeUnit.MILLISECONDS); + if (started.get()) { + //处理放弃的任务 + cancel(); + //添加新增的任务,如果当前任务已经过期则立刻执行,否则放入后续的槽中 + supply(); + if (timeSlot != null) { + //推进一跳 + timeWheel.advance(timeSlot.expiration); + //执行任务 + timeSlot.flush(beforeRun); + } else { + //推进一跳 + timeWheel.advance(timeWheel.now + timeWheel.tickTime); + } + } + } catch (InterruptedException e) { + break; + } + } + }); + } + } + + @Override + public void close() { + if (started.compareAndSet(true, false)) { + workerPool.shutdownNow(); + bossPool.shutdownNow(); + } + } + + /** + * Cancels pending tasks. + */ + protected void cancel() { + TimeWork timeWork; + // Remove and cancel pending tasks + while ((timeWork = cancels.poll()) != null) { + timeWork.remove(); + } + } + + /** + * Supplies new tasks to the time wheel. + * This method dequeues tasks from the 'flying' queue, which holds tasks pending to be scheduled, + * and supplies them to the time wheel for execution at their designated times. It attempts to process + * up to 100,000 tasks in one go, ensuring that a large number of tasks can be efficiently scheduled + * without causing significant delays. Tasks that have been cancelled are skipped to ensure that only + * valid tasks are scheduled. + */ + protected void supply() { + TimeWork timeWork; + // Attempt to add tasks to the time wheel, with a maximum of 100,000 iterations + // to prevent the method from running too long and potentially causing delays in scheduling. + for (int i = 0; i < 100000; i++) { + // Poll a task from the 'flying' queue, which contains tasks that are pending to be scheduled. + timeWork = flying.poll(); + if (timeWork == null) { + break; + } else if (!timeWork.isCancelled()) { + supply(timeWork); + } + } + } + + /** + * Supplies a single task to the time wheel. + * + * @param timeWork The task to be added. + */ + protected void supply(final TimeWork timeWork) { + if (!timeWheel.add(timeWork)) { + workerPool.submit(timeWork); + } + } + + /** + * Adds a task to be executed at least one tick in the future. + * + * @param name The name of the task. + * @param time The absolute execution time for the task. + * @param runnable The task to be executed. + * @return A Timeout object representing the scheduled task. + */ + public Timeout add(final String name, final long time, final Runnable runnable) { + return runnable == null ? null : add(new TimeWork(name, timeWheel.getLeastOneTick(time), runnable, afterRun, afterCancel)); + } + + /** + * Adds a task to be executed after a specified delay. + * + * @param name The name of the task. + * @param delay The delay in milliseconds before the task should be executed. + * @param runnable The task to be executed. + * @return A Timeout object representing the scheduled task. + */ + public Timeout delay(final String name, final long delay, final Runnable runnable) { + if (runnable == null) { + return null; + } + long time = timeWheel.getLeastOneTick(delay + System.currentTimeMillis()); + return add(new TimeWork(name, time, runnable, afterRun, afterCancel)); + } + + /** + * Adds a task that needs to be executed at least one tick in the future. + * + * @param task The timed task to be added. + * @return A Timeout object representing the scheduled task. + */ + public Timeout add(final TimeTask task) { + if (task == null) { + return null; + } + long time = timeWheel.getLeastOneTick(task instanceof DelayTask ? System.currentTimeMillis() + task.getTime() : task.getTime()); + return add(new TimeWork(task.getName(), time, task, afterRun, afterCancel)); + } + + /** + * Adds a task directly to the timer. + * + * @param timeWork The task to be added. + * @return A Timeout object representing the scheduled task. + */ + protected Timeout add(final TimeWork timeWork) { + if (maxTasks > 0 && tasks.incrementAndGet() > maxTasks) { + tasks.decrementAndGet(); + throw new RejectedExecutionException("the maximum of pending tasks is " + maxTasks); + } + flying.add(timeWork); + return timeWork; + } + + /** + * Cancels a task. + * + * @param timeWork The task to be cancelled. + */ + protected void cancel(final TimeWork timeWork) { + tasks.decrementAndGet(); + cancels.add(timeWork); + } + +}