柚子快報(bào)激活碼778899分享:Dubbo 線程池策略
柚子快報(bào)激活碼778899分享:Dubbo 線程池策略
Dubbo 線程池
Dubbo 線程模型文章提到,處理請(qǐng)求分為IO 線程和業(yè)務(wù)線程,
Dubbo 線程池策略主要針對(duì)業(yè)務(wù)線程池,Dubbo 內(nèi)部給我們實(shí)現(xiàn)了 四種fixed、cached、limited、eager線程池,默認(rèn)是fixed。
Dubbo 線程池,主要還是基于JDK線程池幾個(gè)參數(shù)做文章。
核心線程池?cái)?shù)最大線程池?cái)?shù)keep alive時(shí)間等待隊(duì)列
@SPI("fixed")
public interface ThreadPool {
/**
* Thread pool
*
* @param url URL contains thread parameter
* @return thread pool
*/
@Adaptive({THREADPOOL_KEY})
Executor getExecutor(URL url);
}
fixed=org.apache.dubbo.common.threadpool.support.fixed.FixedThreadPool
cached=org.apache.dubbo.common.threadpool.support.cached.CachedThreadPool
limited=org.apache.dubbo.common.threadpool.support.limited.LimitedThreadPool
eager=org.apache.dubbo.common.threadpool.support.eager.EagerThreadPool
fixed 線程池
線程池?cái)?shù)固定,最大線程池?cái)?shù)等于核心線程池?cái)?shù),因此線程數(shù)達(dá)到核心線程池?cái)?shù)時(shí)不會(huì)再額外開(kāi)啟線程,
keepAlive 時(shí)間設(shè)置為0,因?yàn)椴粫?huì)用到(線程不會(huì)超過(guò)核心線程池?cái)?shù)) queues == 0 時(shí) 使用的 SynchronousQueue(默認(rèn))
它是個(gè)零容量BlockQueue :SynchronousQueue內(nèi)部并不存儲(chǔ)任何元素,即它的容量為0。 也就意味著,當(dāng)一個(gè)線程執(zhí)行插入操作(put)時(shí),它必須等待另一個(gè)線程執(zhí)行對(duì)應(yīng)的刪除操作(take),反之亦然。這種機(jī)制確保了數(shù)據(jù)的直接傳遞,減少了數(shù)據(jù)在隊(duì)列中的等待時(shí)間。
其他情況使用 LinkedBlockingQueue
public class FixedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue
(queues < 0 ? new LinkedBlockingQueue
: new LinkedBlockingQueue
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
EagerThreadPool
默認(rèn)DEFAULT_QUEUES = 0; 默認(rèn)的TaskQueue 大小為0 。
當(dāng)線程池線程數(shù)達(dá)核心線程數(shù)時(shí),此時(shí)嘗試開(kāi)啟新線程執(zhí)行任務(wù)。(因?yàn)門askQueue大小為0,等同于沒(méi)有等待隊(duì)列,直接開(kāi)啟新線程執(zhí)行)
public class EagerThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
// init queue and executor
TaskQueue
EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
threads,
alive,
TimeUnit.MILLISECONDS,
taskQueue,
new NamedInternalThreadFactory(name, true),
new AbortPolicyWithReport(name, url));
taskQueue.setExecutor(executor);
return executor;
}
}
LimitedThreadPool
keepAlive time 是 Long.MAX_VALUE , 相當(dāng)于線程一直不會(huì)回收,當(dāng)?shù)却?duì)列滿了,就會(huì)開(kāi)啟新線程,并且新開(kāi)啟的線程不會(huì)被回收, 線程池?cái)?shù)量不斷動(dòng)態(tài)增加,直到達(dá)到最大線程池?cái)?shù)。
public class LimitedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue
(queues < 0 ? new LinkedBlockingQueue
: new LinkedBlockingQueue
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
CachedThreadPool
DEFAULT_CORE_THREADS = 0;
DEFAULT_QUEUES = 0;
DEFAULT_ALIVE = 60 * 1000
默認(rèn)核心線程池?cái)?shù)為0, 阻塞隊(duì)列大小也為0 ,keepAlive 時(shí)間默認(rèn) 1分鐘。
新請(qǐng)求到來(lái),由于核心線程為0,且隊(duì)列大小為0 ,因此會(huì)直接開(kāi)啟新線程執(zhí)行, 當(dāng)達(dá)到keepAlive后會(huì)自動(dòng)被回收。
public class public class CachedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue
(queues < 0 ? new LinkedBlockingQueue
: new LinkedBlockingQueue
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue
(queues < 0 ? new LinkedBlockingQueue
: new LinkedBlockingQueue
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
總結(jié)
Dubbo 線程池,主要還是基于JDK線程池幾個(gè)參數(shù)做文章,并沒(méi)有發(fā)明什么新東西,因此理解Dubbo 線程池前提掌握jdk 線程池的工作原理,當(dāng)然也可以自定義擴(kuò)展實(shí)現(xiàn)。 指定線程池方式如下
柚子快報(bào)激活碼778899分享:Dubbo 線程池策略
相關(guān)鏈接
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。