标签:# TimeingWheel

错用HashedWheelTimer导致的OOM问题

事件中心在私有化环境下,只要server一启动过几秒就oom,查看日志是 Failed to create a thread: retVal -1073741830, errno 11。 异常堆栈: Caused by: java.lang.OutOfMemoryError: Failed to create a thread: retVal -1073741830, errno 11 at java.lang.Thread.startImpl(Native Method) at java.lang.Thread.start(Thread.java:993) at io.netty.util.HashedWheelTimer.start(HashedWheelTimer.java:366) at io.netty.util.HashedWheelTimer.newTimeout(HashedWheelTimer.java:447) at 业务调用代码省略 在标品环境下没有问题,在其他KA客户上也没有问题 通过对日志的分析,最终发现是事件中心的延迟消息代码存在缺陷,使用了Netty的HashedWheelTimer,但是语法存在问题,理论上应该是new一个HashedWheelTimer来处理所有时间延迟,但是错用程每次new一个新的HashedWheelTimer,HashedWheelTimer内部每次都会new一个新的线程来处理做调度,一个线程占用1MB,最终内存资源被耗尽。 io.netty.util.HashedWheelTimer#HashedWheelTimer源码: public HashedWheelTimer( ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts, Executor taskExecutor) { checkNotNull(threadFactory, "threadFactory"); checkNotNull(unit, "unit"); checkPositive(tickDuration, "tickDuration"); checkPositive(ticksPerWheel, "ticksPerWheel"); this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor"); // Normalize ticksPerWheel to power of two and initialize the wheel. wheel = createWheel(ticksPerWheel); mask = wheel.length - 1; // Convert tickDuration to nanos. long duration = unit.toNanos(tickDuration); // Prevent overflow. if (duration >= Long.MAX_VALUE / wheel.length) { throw new IllegalArgumentException(String.format( "tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE / wheel.length)); } if (duration < MILLISECOND_NANOS) { logger.warn("Configured tickDuration {} smaller then {}, using 1ms.", tickDuration, MILLISECOND_NANOS); this.tickDuration = MILLISECOND_NANOS; } else { this.tickDuration = duration; } // 每次都new一个线程来处理 workerThread = threadFactory.newThread(worker); leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null; this.maxPendingTimeouts = maxPendingTimeouts; if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT && WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) { reportTooManyInstances(); } } 因为标品和其他KA使用的是阿里云RocketMQ,此客户使用的是自建的开源版RocketMQ,开源RocketMQ是没有自定义时长的延迟消息的,所以我们自己实现了一套时间轮来实现任意时长的延迟消息,当小于60s的延迟消息会丢入我们的时间轮来处理延迟投递,当时此客户的环境中有大量的60s内的延迟消息,导致一启动就会崩溃。 不过在RocketMQ5.0也支持任意时长了。
Read More ~

Java 实现简单单机时间轮方案

时间轮的使用场景自不必多说,最近研究RocketMQ 5.0时,想简单写一个活跃下思路,遂写了下面的方案(没有参照任何代码,没有优化),主要做下记录。 package top.imyzt.learning.algorithm.timer; import lombok.Getter; import lombok.Setter; import lombok.SneakyThrows; import java.time.LocalDateTime; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; /** * 简易单机时间轮 * @author imyzt * @date 2023-11-13 22:03 */ public class SingleTimingWheel { public static void main(String[] args) throws InterruptedException { TimingWheel timingWheel = new TimingWheel(60); TimingWheel timingWheel2 = new TimingWheel(12); while (true) { System.out.print("请输入延时周期: "); Scanner scanner = new Scanner(System.in); String next = scanner.next(); if ("exit".equals(next)) { timingWheel.shutdown(); timingWheel2.shutdown(); break; } String[] splits = next.split(","); for (String split : splits) { int delayTime = Integer.parseInt(split); System.out.println("新生产一个任务, 延迟" + delayTime + "秒后执行" + ", 当前时间: " + LocalDateTime.now() + ", 预计执行时间: " + LocalDateTime.now().plusSeconds(delayTime) ); Task task = new Task(() -> Thread.currentThread().getName(), delayTime); timingWheel.addTask(task); timingWheel2.addTask(task); } } TimeUnit.SECONDS.sleep(1); System.exit(0); } } class TimingWheel { private final ExecutorService EXECUTOR_TASK_POOL; private final ScheduledExecutorService SCHEDULED_TASK_POOL; /** * 时间轮周期 */ private final int timer; /** * 记录每个刻度的任务 */ private final List<LinkedList<Task>> secondWheel; /** * 刻度计数器 */ private final AtomicInteger secondAtomic; /** * 任务队列 */ private final Queue<Task> taskQueue; /** * 运行标记 */ private boolean flag; public TimingWheel(int timer) { this.timer = timer; this.secondWheel = IntStream.range(0, timer).mapToObj(d -> new LinkedList<Task>()).collect(Collectors.toList()); this.secondAtomic = new AtomicInteger(0); this.taskQueue = new LinkedBlockingQueue<>(); this.EXECUTOR_TASK_POOL = Executors.newSingleThreadExecutor(); this.SCHEDULED_TASK_POOL = Executors.newSingleThreadScheduledExecutor(); this.flag = true; this.init(); } public void addTask(Task task) { int delayTime = task.getDelayTime(); int targetRunSecond = delayTime + secondAtomic.get(); int cycle = delayTime / timer; int index = targetRunSecond % timer; task.setCycle(cycle); System.out.printf("任务id: %s, 当前刻度: %s, cycle: %s, 计划执行刻度: %s \n", task.getTaskId(), secondAtomic.get(), cycle, index); LinkedList<Task> tasks = secondWheel.get(index); if (tasks == null) { tasks = new LinkedList<>(); } tasks.add(task); } public void shutdown() { EXECUTOR_TASK_POOL.shutdown(); SCHEDULED_TASK_POOL.shutdown(); this.flag = false; System.out.println("[" + timer + "]shutdown..."); } @SneakyThrows private void init () { SCHEDULED_TASK_POOL.scheduleAtFixedRate(() -> { int second = secondAtomic.getAndAdd(1); if (second + 1 == timer) { secondAtomic.set(0); } LinkedList<Task> tasks = secondWheel.get(second); if (tasks != null && !tasks.isEmpty()) { Iterator<Task> iterator = tasks.iterator(); while (iterator.hasNext()) { Task task = iterator.next(); Integer taskCycle = task.getCycle(); if (taskCycle != 0) { task.setCycle(taskCycle - 1); System.out.println(task.getTaskId() + "还未到时间, 当前周期" + taskCycle); continue; } taskQueue.add(task); // 从队列中剔除 iterator.remove(); } } }, 0, 1, TimeUnit.SECONDS); EXECUTOR_TASK_POOL.execute(() -> { while (flag) { Task task = taskQueue.poll(); if (task != null) { System.out.println(LocalDateTime.now() + ", [" + timer + "]时间轮调度任务====>" + task); } } }); } } @Getter class Task { private final Integer taskId; /** * 执行任务 */ private final Supplier<String> runner; /** * 当前第几轮 */ @Setter private Integer cycle; private final Integer delayTime; /** * 创建时间 */ private final LocalDateTime createdAt; /** * 理应执行时间 */ private final LocalDateTime runnerTime; public Task(Supplier<String> runner, Integer delayTime) { this.taskId = new Random().nextInt() * 10000; this.runner = runner; this.delayTime = delayTime; this.createdAt = LocalDateTime.now(); this.runnerTime = this.createdAt.plusSeconds(delayTime); } @Override public String toString() { return "Task{" + "taskId=" + taskId + ", runner=" + runner.get() + ", cycle=" + cycle + ", delayTime=" + delayTime + ", createdAt=" + createdAt + ", runnerTime=" + runnerTime + '}'; } }
Read More ~