标签:# RabbitMQ

错用HashedWheelTimer导致的OOM问题

事件中心在私有化环境下,只要server一启动过几秒就oom,查看日志是 Failed to create a thread: retVal -1073741830, errno 11。 异常堆栈: Caused by: java.lang.OutOfMemoryError: Failed to create a thread: retVal -1073741830, errno 11 at java.lang.Thread.startImpl(Native Method) at java.lang.Thread.start(Thread.java:993) at io.netty.util.HashedWheelTimer.start(HashedWheelTimer.java:366) at io.netty.util.HashedWheelTimer.newTimeout(HashedWheelTimer.java:447) at 业务调用代码省略 在标品环境下没有问题,在其他KA客户上也没有问题 通过对日志的分析,最终发现是事件中心的延迟消息代码存在缺陷,使用了Netty的HashedWheelTimer,但是语法存在问题,理论上应该是new一个HashedWheelTimer来处理所有时间延迟,但是错用程每次new一个新的HashedWheelTimer,HashedWheelTimer内部每次都会new一个新的线程来处理做调度,一个线程占用1MB,最终内存资源被耗尽。 io.netty.util.HashedWheelTimer#HashedWheelTimer源码: public HashedWheelTimer( ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts, Executor taskExecutor) { checkNotNull(threadFactory, "threadFactory"); checkNotNull(unit, "unit"); checkPositive(tickDuration, "tickDuration"); checkPositive(ticksPerWheel, "ticksPerWheel"); this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor"); // Normalize ticksPerWheel to power of two and initialize the wheel. wheel = createWheel(ticksPerWheel); mask = wheel.length - 1; // Convert tickDuration to nanos. long duration = unit.toNanos(tickDuration); // Prevent overflow. if (duration >= Long.MAX_VALUE / wheel.length) { throw new IllegalArgumentException(String.format( "tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE / wheel.length)); } if (duration < MILLISECOND_NANOS) { logger.warn("Configured tickDuration {} smaller then {}, using 1ms.", tickDuration, MILLISECOND_NANOS); this.tickDuration = MILLISECOND_NANOS; } else { this.tickDuration = duration; } // 每次都new一个线程来处理 workerThread = threadFactory.newThread(worker); leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null; this.maxPendingTimeouts = maxPendingTimeouts; if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT && WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) { reportTooManyInstances(); } } 因为标品和其他KA使用的是阿里云RocketMQ,此客户使用的是自建的开源版RocketMQ,开源RocketMQ是没有自定义时长的延迟消息的,所以我们自己实现了一套时间轮来实现任意时长的延迟消息,当小于60s的延迟消息会丢入我们的时间轮来处理延迟投递,当时此客户的环境中有大量的60s内的延迟消息,导致一启动就会崩溃。 不过在RocketMQ5.0也支持任意时长了。
Read More ~

ThreadPoolExecutor “非常用” 方法

平时在使用线程池时,更多关注到的是coreSize、maxSize、blockQueue、RejectedExecutionHandler这些参数,但在线程池监控领域,还需要关注到其他的一些方法。在此处做统一记录和备忘: public static void main(String[] args) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 5, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<>(1024), new ThreadPoolExecutor.CallerRunsPolicy()); // 启动所有核心线程(预热) threadPoolExecutor.prestartAllCoreThreads(); // 启动一个核心线程 threadPoolExecutor.prestartCoreThread(); // 默认情况下构造器中的keepAliveTime指定的是非核心线程的空闲时间, 通过如下方法, 可以允许核心线程超时 threadPoolExecutor.allowCoreThreadTimeOut(true); // ⭐️ 动态线程池必备方法 // 启动后, 设置核心线程数量 threadPoolExecutor.setCorePoolSize(3); // 启动后, 设置最大线程数量 threadPoolExecutor.setMaximumPoolSize(10); // 已执行完的任务总数 threadPoolExecutor.getTaskCount(); // 获取工作队列剩余数量 threadPoolExecutor.getQueue().remainingCapacity(); } 后记 通过上面的代码可知,在运行过程中我们也是可以操作coreSize和maxSize的。那么如何才能实现对Queue的大小进行控制呢?目前开源届常用的是采取RabbitMQ中的VariableLinkedBlockingQueue来实现。
Read More ~