柚子快報(bào)邀請(qǐng)碼778899分享:dubbo線程池為什么耗盡
文章概述
大家可能都遇到過(guò)DUBBO線程池打滿這個(gè)問(wèn)題,報(bào)錯(cuò)如下,本文我們就一起分析DUBBO線程池打滿這個(gè)問(wèn)題。 cause: org.apache.dubbo.remoting.RemotingException: Server side(10.0.0.100,20881) thread pool is exhausted, detail msg:Thread pool is EXHAUSTED! Thread Name: DubboServerHandler-10.0.0.100:20881, Pool Size: 800 (active: 800, core: 800, max: 800, largest: 800), Task: 50397601 (completed: 50396801), Executor status:(isShutdown:false, isTerminated:false, isTerminating:false), in dubbo://10.0.0.100:20881!
1 DUBBO線程模型
先看一張圖大概了解
** IO線程**
IO線程的工作實(shí)際上就是處理字節(jié)流的輸入輸出,對(duì)消息的讀取,序列化,不涉及業(yè)務(wù)操作 NettyServer中啟動(dòng)netty服務(wù)端,初始化boss和work線程信息
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
workerGroup = NettyEventLoopFactory.eventLoopGroup(
getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
"NettyServerWorker");
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NettyEventLoopFactory.serverSocketChannelClass())
.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// FIXME: should we use getTimeout()?
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
ch.pipeline().addLast("negotiation",
SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
}
ch.pipeline()
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
這里分別看線程數(shù)量
bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
workerGroup = NettyEventLoopFactory.eventLoopGroup(
getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
"NettyServerWorker");
int DEFAULT_IO_THREADS = Math.min(Runtime.getRuntime().availableProcessors() + 1, 32);
boss線程設(shè)置為1 主要看work線程(IO線程) 從url中獲取線程數(shù),如果沒(méi)設(shè)置的話,設(shè)置當(dāng)前機(jī)器的線程數(shù),最少設(shè)置為32個(gè) 這個(gè)配置是iothreads,如果配置的這樣配置。但是線程池耗盡并不是io線程數(shù)量不夠的原因
provider:
iothreads: 100
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
handler.received(channel, msg);
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
setReadTimestamp(channel);
if (isHeartbeatRequest(message)) {
Request req = (Request) message;
if (req.isTwoWay()) {
Response res = new Response(req.getId(), req.getVersion());
res.setEvent(HEARTBEAT_EVENT);
channel.send(res);
if (logger.isInfoEnabled()) {
int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
if (logger.isDebugEnabled()) {
logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
+ ", cause: The channel has no data-transmission exceeds a heartbeat period"
+ (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
}
}
}
return;
}
if (isHeartbeatResponse(message)) {
if (logger.isDebugEnabled()) {
logger.debug("Receive heartbeat response in thread " + Thread.currentThread().getName());
}
return;
}
handler.received(channel, message);
}
消息的不同類型有不同的處理方式如果是心跳直接就發(fā)送回去了, 如果是業(yè)務(wù)請(qǐng)求那么交給業(yè)務(wù)線程池處理
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getPreferredExecutorService(message);
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if(message instanceof Request && t instanceof RejectedExecutionException){
sendFeedback(channel, (Request) message, t);
return;
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
業(yè)務(wù)線程池
初始化 不同線程池策略會(huì)創(chuàng)建不同特性的線程池: dubbo提供了不同的線程池類型
fixed
包含固定個(gè)數(shù)線程
cached
線程空閑一分鐘會(huì)被回收,當(dāng)新請(qǐng)求到來(lái)時(shí)會(huì)創(chuàng)建新線程
limited
線程個(gè)數(shù)隨著任務(wù)增加而增加,但不會(huì)超過(guò)最大閾值??臻e線程不會(huì)被回收
eager
當(dāng)所有核心線程數(shù)都處于忙碌狀態(tài)時(shí),優(yōu)先創(chuàng)建新線程執(zhí)行任務(wù),而不是立即放入隊(duì)列
一般實(shí)際使用的就是fixed
public class FixedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue
(queues < 0 ? new LinkedBlockingQueue
: new LinkedBlockingQueue
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
這里主要看兩個(gè)參數(shù),分別是線程數(shù),和隊(duì)列長(zhǎng)度。默認(rèn)的線程數(shù)是200,queue默認(rèn)使用SynchronousQueue SynchronousQueue由于其獨(dú)有的線程一一配對(duì)通信機(jī)制,由于內(nèi)部沒(méi)有使用AQS,而是直接使用CAS,其并沒(méi)有存儲(chǔ)任務(wù)的隊(duì)列就是將任務(wù)與線程進(jìn)行匹配,如果任務(wù)進(jìn)來(lái),沒(méi)用可用線程,那么將直接拒絕,這也是我們碰到拒絕策略的原因 如果需要配置
dubbo:
protocol:
threads: 800
queues: 10000
業(yè)務(wù)線程線程池拒絕
這里就可以看到線程池拒絕AbortPolicyWithReport
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format("Thread pool is EXHAUSTED!" +
" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: "
+ "%d)," +
" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(),
e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
url.getProtocol(), url.getIp(), url.getPort());
logger.warn(msg);
dumpJStack();
throw new RejectedExecutionException(msg);
}
也就是開(kāi)頭的那個(gè)報(bào)錯(cuò),這里在發(fā)生問(wèn)題會(huì)自動(dòng)dump stack信息
線程池中的 getTaskCount 和 getCompletedTaskCount 是兩個(gè)重要的方法,它們用于獲取線程池的任務(wù)和已完成任務(wù)的統(tǒng)計(jì)信息。
getTaskCount: 這個(gè)方法返回線程池中的當(dāng)前任務(wù)數(shù)。它包括正在執(zhí)行的任務(wù)和等待執(zhí)行的任務(wù)。換句話說(shuō),它返回的是線程池中所有任務(wù)的總數(shù),包括那些尚未開(kāi)始執(zhí)行的任務(wù)。getCompletedTaskCount: 這個(gè)方法返回線程池已完成的任務(wù)數(shù)量。它只計(jì)算那些已經(jīng)完成執(zhí)行的任務(wù),而不包括正在執(zhí)行或等待執(zhí)行的任務(wù)。
再回頭我們的那個(gè)報(bào)錯(cuò)。 Pool Size: 800 (active: 800, core: 800, max: 800, largest: 800), Task: 50397601 (completed: 50396801)
2、估算合適的線程數(shù),尋找慢業(yè)務(wù)
我們知道DUBBO會(huì)選擇線程池策略進(jìn)行業(yè)務(wù)處理,那么如何估算可能產(chǎn)生的線程數(shù)呢?我們首先分析一個(gè)問(wèn)題:一個(gè)公司有7200名員工,每天上班打卡時(shí)間是早上8點(diǎn)到8點(diǎn)30分,每次打卡時(shí)間系統(tǒng)耗時(shí)5秒。請(qǐng)問(wèn)RT、QPS、并發(fā)量分別是多少? RT表示響應(yīng)時(shí)間,問(wèn)題已經(jīng)告訴了我們答案: RT = 5
QPS表示每秒查詢量,假設(shè)簽到行為平均分布: QPS = 7200 / (30 * 60) = 4
并發(fā)量表示系統(tǒng)同時(shí)處理的請(qǐng)求數(shù)量: 并發(fā)量 = QPS x RT = 4 x 5 = 20
根據(jù)上述實(shí)例引出如下公式: 并發(fā)量 = QPS x RT
如果系統(tǒng)為每一個(gè)請(qǐng)求分配一個(gè)處理線程,那么并發(fā)量可以近似等于線程數(shù)?;谏鲜龉讲浑y看出并發(fā)量受QPS和RT影響,這兩個(gè)指標(biāo)任意一個(gè)上升就會(huì)導(dǎo)致并發(fā)量上升。 但是這只是理想情況,因?yàn)椴l(fā)量受限于系統(tǒng)能力而不可能持續(xù)上升,例如DUBBO線程池就對(duì)線程數(shù)做了限制,超出最大線程數(shù)限制則會(huì)執(zhí)行拒絕策略,而拒絕策略會(huì)提示線程池已滿,這就是DUBBO線程池打滿問(wèn)題的根源。下面我們分別分析RT上升和QPS上升這兩個(gè)原因。 注意上面僅僅是一個(gè)例子,實(shí)際上一個(gè)服務(wù)遠(yuǎn)比例子復(fù)雜,實(shí)踐往往需要不斷的調(diào)參數(shù)。才能找到合理的值 線程池耗盡,往往是因?yàn)槟硞€(gè)業(yè)務(wù)慢導(dǎo)致,我們應(yīng)該尋找執(zhí)行緩慢的堆棧,例如使用arthas來(lái)監(jiān)控。
柚子快報(bào)邀請(qǐng)碼778899分享:dubbo線程池為什么耗盡
文章鏈接
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。