柚子快報(bào)激活碼778899分享:緩存 Java多線程2
柚子快報(bào)激活碼778899分享:緩存 Java多線程2
1. 線程通信
1.1. 線程通信引入
應(yīng)用場(chǎng)景:生產(chǎn)者和消費(fèi)者問題
假設(shè)倉(cāng)庫(kù)中只能存放一件產(chǎn)品,生產(chǎn)者將生產(chǎn)出來(lái)的產(chǎn)品放入倉(cāng)庫(kù),消費(fèi)者將倉(cāng)庫(kù)中產(chǎn)品取走消費(fèi)如果倉(cāng)庫(kù)中沒有產(chǎn)品,則生產(chǎn)者將產(chǎn)品放入倉(cāng)庫(kù),否則停止生產(chǎn)并等待,直到倉(cāng)庫(kù)中的產(chǎn)品被消費(fèi)者取走為止如果倉(cāng)庫(kù)中放有產(chǎn)品,則消費(fèi)者可以將產(chǎn)品取走消費(fèi),否則停止消費(fèi)并等待,直到倉(cāng)庫(kù)中再次放入產(chǎn)品為止
分析
這是一個(gè)線程同步問題,生產(chǎn)者和消費(fèi)者共享同一個(gè)資源,并且生產(chǎn)者和消費(fèi)者之間相互依賴,互為條件 對(duì)于生產(chǎn)者,沒有生產(chǎn)產(chǎn)品之前,要通知消費(fèi)者等待。生產(chǎn)產(chǎn)品之后,還需要通知消費(fèi)者消費(fèi) 對(duì)于消費(fèi)者,在消費(fèi)之后,要通知生產(chǎn)者已經(jīng)消費(fèi)結(jié)束,生產(chǎn)者需要繼續(xù)生產(chǎn)新產(chǎn)品以供消費(fèi)者 在生產(chǎn)者消費(fèi)者問題中,僅有synchronized是不夠的
synchronized可阻止并發(fā)更新同一個(gè)共享資源,實(shí)現(xiàn)了同步synchronized不能用來(lái)實(shí)現(xiàn)不同線程之間的消息傳遞(通信) Java提供了3個(gè)方法解決線程之間的通信問題
均是java.lang.Object類的方法都只能在同步方法或同步代碼塊中使用,否則會(huì)拋出異常
public class Product {
private String name;
private String color;
public Product() {}
@Override
public String toString() {
return "Product{" +
"name='" + name + '\'' +
", color='" + color + '\'' +
'}';
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getColor() {
return color;
}
public void setColor(String color) {
this.color = color;
}
public Product(String name, String color) {
this.name = name;
this.color = color;
}
}
/*
* 生產(chǎn)者線程
* */
public class ProduceRunnable implements Runnable {
private Product product;
public void setProduct(Product product) {
this.product = product;
}
@Override
public void run() {
int i = 0;
while (true) {
if(i % 2 == 0) {
product.setName("饅頭");
product.setColor("白色");
}else {
product.setName("玉米餅");
product.setColor("黃色");
}
System.out.println("生產(chǎn)者生產(chǎn)商品:" + product.getName() + " " + product.getColor());
i++;
}
}
}
/*
* 消費(fèi)者線程
* */
public class ConsumeRunnable implements Runnable {
private Product product;
public ConsumeRunnable() {}
public ConsumeRunnable(Product product) {
this.product = product;
}
@Override
public void run() {
while (true) {
System.out.println("消費(fèi)者消費(fèi)商品:" + product.getName() + " " + product.getColor());
}
}
}
public class Test {
public static void main(String[] args) {
Product product = new Product();
ProduceRunnable runnable = new ProduceRunnable();
runnable.setProduct(product);
Thread thread = new Thread(runnable);
ConsumeRunnable runnable1 = new ConsumeRunnable(product);
Thread thread1 = new Thread(runnable1);
thread.start();
thread1.start();
}
}
注意:必須保證生產(chǎn)者消費(fèi)者操作的是一個(gè)商品對(duì)象,否則就會(huì)出現(xiàn)消費(fèi)者消費(fèi)null的情況
1.2. 使用同步代碼塊實(shí)現(xiàn)線程同步
public class ProduceRunnable1 implements Runnable {
private Product product;
public void setProduct(Product product) {
this.product = product;
}
@Override
public void run() {
int i = 0;
while (true) {
synchronized (product) {
if(i % 2 == 0) {
product.setName("饅頭");
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
product.setColor("白色");
}else {
product.setName("玉米餅");
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
product.setColor("黃色");
}
System.out.println("生產(chǎn)者生產(chǎn)商品:" + product.getName() + " " + product.getColor());
}
i++;
}
}
}
public class ConsumeRunnable1 implements Runnable {
private Product product;
@Override
public void run() {
while (true) {
synchronized (product) {
System.out.println("消費(fèi)者消費(fèi)商品:" + product.getName() + " " + product.getColor());
}
}
}
}
注意:不僅生產(chǎn)者要加鎖,消費(fèi)者也要加鎖,并且必須是一把鎖(不僅是一個(gè)引用變量,而且必須指向同一個(gè)對(duì)象)
public class ProduceRunnable2 implements Runnable {
private Product product;
public void setProduct(Product product) {
this.product = product;
}
@Override
public void run() {
int i = 0;
while (true) {
synchronized (product) {
// 如果已有商品,就等待
if(product.flag) {
try {
// 必須調(diào)用同步監(jiān)視器的通信方法
product.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 生產(chǎn)商品
if(i % 2 == 0) {
product.setName("饅頭");
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
product.setColor("白色");
}else {
product.setName("玉米餅");
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
product.setColor("黃色");
}
System.out.println("生產(chǎn)者生產(chǎn)商品:" + product.getName() + " " + product.getColor());
// 修改商品的狀態(tài)
product.flag = true;
// 通知消費(fèi)者來(lái)消費(fèi)
product.notify();;
}
i++;
}
}
}
public class ConsumeRunnable2 implements Runnable {
private Product product;
public ConsumeRunnable2(Product product) {
this.product = product;
}
@Override
public void run() {
while (true) {
synchronized (product) {
// 如果沒有商品,就等待
if(!product.flag) {
try {
product.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 消費(fèi)商品
System.out.println("消費(fèi)者消費(fèi)商品:" + product.getName() + " " + product.getColor());
// 修改商品狀態(tài)
product.flag = false;
// 通知生產(chǎn)者進(jìn)行生產(chǎn)
product.notifyAll();
}
}
}
}
}
線程同步的細(xì)節(jié)
細(xì)節(jié)1: 進(jìn)行線程通信的多個(gè)線程,要使用同一個(gè)同步監(jiān)視器(product),還必須要調(diào)用該同步監(jiān)視器的wait()、notify()、notifyAll()細(xì)節(jié)2: 線程通信的三個(gè)方法
wait():在其他線程調(diào)用此對(duì)象的notify()方法或notifyAll()方法前,導(dǎo)致當(dāng)前線程等待。當(dāng)前線程必須擁有此對(duì)象監(jiān)視器wait(time):在其他線程調(diào)用此對(duì)象的notify()方法或notifyAll()方法或超過指定的時(shí)間量前,導(dǎo)致當(dāng)前線程等待。當(dāng)前線程必須擁有此對(duì)象監(jiān)視器notify():?jiǎn)拘言诖藢?duì)象監(jiān)視器上等待的單個(gè)線程。如果所有線程都在此對(duì)象上等待,則會(huì)選擇喚醒其中一個(gè)線程。選擇是任意性的,在對(duì)實(shí)現(xiàn)作出決定時(shí)發(fā)生的notifyAll():?jiǎn)拘言诖藢?duì)象監(jiān)視器上等待的所有線程。被喚醒的線程將以常規(guī)方式與在該對(duì)象上主動(dòng)同步的其他所有線程進(jìn)行競(jìng)爭(zhēng) 細(xì)節(jié)3:完整的線程生命周期 阻塞狀態(tài)有三種
普通的阻塞:sleep、join、Scanner、input.next()同步阻塞(鎖池隊(duì)列):沒有獲取同步監(jiān)視器的線程的隊(duì)列等待阻塞(阻塞隊(duì)列):被調(diào)用了wait()后釋放鎖,然后進(jìn)行該隊(duì)列 細(xì)節(jié)4: sleep()和wait()的區(qū)別
sleep()線程會(huì)讓出CPU進(jìn)入阻塞狀態(tài),但不會(huì)釋放對(duì)象鎖;wait()線程會(huì)讓出CPU進(jìn)入阻塞狀態(tài),也會(huì)放棄對(duì)象鎖,進(jìn)入等待此對(duì)象的等待鎖定池進(jìn)入的阻塞狀態(tài)也是不同的隊(duì)列wait只能在同步控制方法或者同步控制塊內(nèi)使用,而sleep可以在任何地方使用
2. 線程通信
2.1. 使用同步方法實(shí)現(xiàn)線程通信
public class Product1 {
private String name;
private String color;
// 默認(rèn)沒有商品
boolean flag = false;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getColor() {
return color;
}
public void setColor(String color) {
this.color = color;
}
public synchronized void produce(String name, String color) {
// 如果已有商品,就等待
if(flag) {
try {
// 讓出CPU,會(huì)同時(shí)釋放鎖
this.wait(); // 必須調(diào)用同步監(jiān)視器的通信方法
}catch (InterruptedException e) {
e.printStackTrace();
}
}
// 生產(chǎn)商品
this.name = name;
try {
// 讓出了CPU,不釋放鎖
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.color = color;
System.out.println("生產(chǎn)者生產(chǎn)商品:" + getName() + " " + getColor());
// 修改商品的狀態(tài)
flag = true;
// 通知消費(fèi)者來(lái)消費(fèi)
this.notify();
}
public synchronized void consume() {
// 如果沒有商品,就等待
if(!flag) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 消費(fèi)商品
System.out.println("消費(fèi)者消費(fèi)商品:" + name + " " + color);
// 修改商品的狀態(tài)
flag = false;
// 通知生產(chǎn)者進(jìn)行生產(chǎn)
this.notifyAll();
}
}
public class ConsumeRunnable3 implements Runnable{
private Product1 product;
public void setProduct(Product1 product) {
this.product = product;
}
@Override
public void run() {
while (true) {
product.consume();
}
}
}
public class ProduceRunnable3 implements Runnable {
private Product1 product;
public void setProduct(Product1 product) {
this.product = product;
}
@Override
public void run() {
int i = 0;
while(true) {
if(i % 2 == 0) {
product.produce("饅頭", "白色");
}else {
product.produce("玉米餅", "黃色");
}
i++;
}
}
}
同步方法的同步監(jiān)視器都是this,所以需要將produce()和consume()放入一個(gè)類Product中,保證是同一把鎖必須調(diào)用this的wait()、notify()、notifyAll()方法,this可以省略,因?yàn)橥奖O(jiān)視器是this
2.2. 使用Lock鎖實(shí)現(xiàn)線程通信
之前實(shí)現(xiàn)線程通信時(shí),是生產(chǎn)者和消費(fèi)者在一個(gè)等待隊(duì)列中,會(huì)存在本來(lái)打算喚醒消費(fèi)者,卻喚醒了一個(gè)生產(chǎn)者的問題,要讓生產(chǎn)者和消費(fèi)者線程在不同的隊(duì)列中等待,就要使用Lock鎖
public class Product2 {
private String name;
private String color;
boolean flag = false;
Lock lock = new ReentrantLock();
Condition produceCondition = lock.newCondition();
Condition consumeCondition = lock.newCondition();
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getColor() {
return color;
}
public void setColor(String color) {
this.color = color;
}
public void produce(String name, String color) {
lock.lock();
try {
// 如果已有商品,就等待
if(flag) {
try {
produceCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 生產(chǎn)商品
this.name = name;
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.color = color;
System.out.println("生產(chǎn)者生產(chǎn)商品:" + getName() + " " + getColor());
// 修改商品的狀態(tài)
flag = true;
// 通知消費(fèi)者來(lái)消費(fèi)
consumeCondition.signal();
} finally {
lock.unlock();
}
}
public void consume() {
lock.lock();
try {
// 如果沒有商品,就等待
if(!flag) {
try {
consumeCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("消費(fèi)者消費(fèi)商品:" + name + " " + color);
// 修改商品狀態(tài)
flag = false;
// 通知生產(chǎn)者生產(chǎn)
produceCondition.signalAll();
} finally {
lock.unlock();
}
}
}
一個(gè)Object的監(jiān)視器模型上,一個(gè)對(duì)象擁有一個(gè)同步隊(duì)列和等待隊(duì)列,而Lock(同步器)擁有一個(gè)同步隊(duì)列和多個(gè)等待隊(duì)列
2.3. Condition
Condition是在Java1.5中出現(xiàn)的,用來(lái)替代傳統(tǒng)的Object的wait()、notify()實(shí)現(xiàn)線程間的協(xié)作,相比使用Object的wait()、notify(),使用Condition的await()、signal()實(shí)現(xiàn)線程間協(xié)作更加安全和高效。 Condition能構(gòu)更加精細(xì)的控制多線程的休眠與喚醒。對(duì)于同一個(gè)鎖,可以創(chuàng)建多個(gè)Condition,在不同情況下使用不同的Condition。 Object的wait()、notify()、notifyAll()方法是和同步鎖關(guān)鍵字synchronized捆綁使用的;而Condition是需要與互斥鎖/共享鎖捆綁使用的 調(diào)用Condition的await()、signal()、signalAll()方法,都必須在lock保護(hù)之內(nèi),就是說必須在lock.lock()和lock.unlock()之間才可以使用
Condition中的await()對(duì)應(yīng)Object的wait() Condition中的signal()對(duì)應(yīng)Object的notify() Condition中的signalAll()對(duì)應(yīng)Object的notifyAll() void await() throws InterruptedException:造成當(dāng)前線程在接到信號(hào)或被中斷之前一直處于等待狀態(tài) 與此Condition相關(guān)的鎖以原子方式釋放,并且出于線程調(diào)度的目的,將禁用當(dāng)前線程,且在發(fā)生以下四種情況之一之前,當(dāng)前線程一直處于休眠狀態(tài):
其他某個(gè)線程調(diào)用此Condition的signal()方法,并且碰巧將當(dāng)前線程選為被喚醒著的線程其他某個(gè)線程調(diào)用此Condition的signalAll()方法其他某個(gè)線程中斷當(dāng)前線程,且支持中斷線程的掛起發(fā)生虛假喚醒 在所有情況下,在此方法可以返回當(dāng)前線程之前,都必須重新獲取與此條件有關(guān)的鎖。在線程返回時(shí),可以保證它保持此鎖。 void signal():?jiǎn)拘岩粋€(gè)等待線程。 如果所有的線程都在等待此條件,則選擇其中的一個(gè)喚醒。在從await返回之前,該線程必須重新獲得鎖 void signalAll():?jiǎn)拘阉械却€程。 如果所有的線程都在等待此條件,則喚醒所有線程。在從await返回之前,每個(gè)線程都必須重新獲取鎖
3. 線程池ThreadPoolExecutor
3.1. 線程池ThreadPoolExecutor引入
什么是線程池
創(chuàng)建和銷毀對(duì)象是非常耗費(fèi)時(shí)間的創(chuàng)建對(duì)象:需要分配內(nèi)存等資源銷毀對(duì)象:雖然不需要程序員操作,但是垃圾回收器會(huì)一直在后臺(tái)跟蹤并銷毀對(duì)于經(jīng)常創(chuàng)建和銷毀、使用量特別大的資源,比如并發(fā)情況下的線程,對(duì)性能影響很大思路:創(chuàng)建好多個(gè)線程,放入線程池中,使用時(shí)直接獲取引用,不使用時(shí)放回池中??梢员苊忸l繁創(chuàng)建銷毀、實(shí)現(xiàn)重復(fù)利用生活案例:共享單車技術(shù)案例:線程池、數(shù)據(jù)庫(kù)連接池JDK1.5起,提供了內(nèi)置線程池 線程池的好處
提高響應(yīng)速度(減少了創(chuàng)建新線程的時(shí)間)降低資源消耗(重復(fù)利用線程池中線程,不需要每次都創(chuàng)建)提高線程的管理性:避免線程無(wú)限制創(chuàng)建、從而消耗系統(tǒng)資源,降低系統(tǒng)穩(wěn)定性,甚至內(nèi)存溢出或CPU耗盡 線程池的應(yīng)用場(chǎng)合
需要大量線程,并且完成任務(wù)的時(shí)間短對(duì)性能要求苛刻接受突發(fā)性的大量請(qǐng)求
3.2. 使用線程池執(zhí)行大量的Runnable命令
public class TestThreadPool1 {
public static void main(String[] args) {
// 創(chuàng)建一個(gè)線程池
// 池中只有1個(gè)線程,保證線程一直存在
// ExecutorService pool = Executors.newSingleThreadExecutor();
// 池中有固定數(shù)量的線程
// ExecutorService pool = Executors.newFixedThreadPool(10);
// 池中線程的數(shù)量可以動(dòng)態(tài)變化
ExecutorService pool = Executors.newCachedThreadPool();
// 使用線程池執(zhí)行大量的Runnable命令
for (int i = 0; i < 20; i++) {
final int n = i;
// 指定Runnable命令
Runnable command = new Runnable() {
@Override
public void run() {
System.out.println("開始執(zhí)行" + n);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("執(zhí)行結(jié)束" + n);
}
};
// 不需要new Thread(command),使用線程池命令
pool.shutdown();
}
}
}
3.3. 使用線程池執(zhí)行大量的Callable任務(wù)
public class TestThreadPool2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(10);
// 使用線程池
List
for (int i = 0; i < 20000; i++) {
// 指定一個(gè)Callable任務(wù)
Callable
// 將任務(wù)交給線程池
Future
// 將Future加入到List
list.add(future);
}
for (Future f : list) {
System.out.println(f.get());
}
// 關(guān)閉線程池
pool.shutdown();
}
}
class MyCallable implements Callable
@Override
public Integer call() throws Exception {
Thread.sleep(2000);
return new Random().nextInt(10);
}
}
3.4. 線程池API總結(jié)
Executor:線程池頂級(jí)接口,只有一個(gè)方法ExecutorService:真正的線程池接口
void execute(Runnable command):執(zhí)行任務(wù)/命令,沒有返回值,一般用來(lái)執(zhí)行Runnable Future submit(Callable task):執(zhí)行任務(wù),有返回值,一般用來(lái)執(zhí)行Callablevoid shutdown():關(guān)閉線程池 AbstractExecutorService:基本實(shí)現(xiàn)了ExecutorService的所有方法ThreadPoolExecutor:默認(rèn)的線程池實(shí)現(xiàn)類 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue
corePoolSize:核心池大小
默認(rèn)創(chuàng)建線程后,線程數(shù)為0,有任務(wù)進(jìn)來(lái)后,就會(huì)創(chuàng)建一個(gè)線程去執(zhí)行任務(wù)但是當(dāng)線程池中線程數(shù)量達(dá)到corePoolSize后,就會(huì)把到達(dá)的任務(wù)放到隊(duì)列中等待 maximumPoolSize:最大線程數(shù)
corePoolSize和maximumPoolSize之間的線程數(shù)會(huì)自動(dòng)釋放,小于等于corePoolSize的不會(huì)釋放。大于時(shí)就會(huì)將任務(wù)由一個(gè)丟棄處理機(jī)制來(lái)處理 keepAliveTime:線程沒有任務(wù)時(shí),最多保持多長(zhǎng)時(shí)間后會(huì)終止
默認(rèn)只限于corePoolSize和maximumPoolSize間的線程TimeUnit: keepAliveTime的時(shí)間單位 BlockingQueue:存儲(chǔ)等待執(zhí)行的任務(wù)的阻塞隊(duì)列,有多種選擇,可以是順序隊(duì)列、鏈?zhǔn)疥?duì)列等ThreadFactory:線程工廠,默認(rèn)是DefaultThreadFactory,Executors的靜態(tài)內(nèi)部類RejectExecutionHandler:
拒絕處理任務(wù)時(shí)的策略。如果線程池的線程已經(jīng)飽和,并且任務(wù)隊(duì)列也已滿,對(duì)新的任務(wù)應(yīng)該采取的措施比如拋出異常、直接舍棄、丟棄隊(duì)列中最舊任務(wù)等,默認(rèn)是直接拋出異常
CallerRunsPolicy:如果發(fā)現(xiàn)線程池還在運(yùn)行,就直接運(yùn)行這個(gè)線程DiscardOldestPolicy:在線程池的等待隊(duì)列中,將頭取出一個(gè)拋棄,然后將當(dāng)前線程放進(jìn)去DiscardPolicy:什么也不做AbortPolicy:java默認(rèn),拋出一個(gè)異常 ScheduledThreadPoolExecutor:實(shí)現(xiàn)周期性任務(wù)調(diào)度的線程池Executors:工具類、線程池的工廠類,用于創(chuàng)建并返回不同類型的線程池
Executors.newCachedThreadPool():創(chuàng)建一個(gè)可根據(jù)需要?jiǎng)?chuàng)建新線程池的線程池Executors.newFixedThreadPool(n):創(chuàng)建一個(gè)可重用固定線程數(shù)的線程池Executors.newSingleThreadExecutor():創(chuàng)建一個(gè)只有一個(gè)線程的線程池Executors.newScheduledThreadPool(n):創(chuàng)建一個(gè)線程池,它可安排在給定延遲后運(yùn)行命令或定期地執(zhí)行
4. ForkJoin框架
4.1. ForkJoin框架
4.1.1. 應(yīng)用場(chǎng)景
目前按任務(wù)并發(fā)處理并不能完全充分的利用處理器資源,因?yàn)橐话愕膽?yīng)用程序沒有那么多的并發(fā)處理任務(wù)?;谶@種現(xiàn)狀,考慮把一個(gè)任務(wù)拆分成多個(gè)單元,每個(gè)單元分別得到執(zhí)行,最后合并每個(gè)單元的結(jié)果。 Fork/Join框架是Java7提供的一個(gè)用于并行執(zhí)行任務(wù)的框架,是一個(gè)把大任務(wù)分割成若干小任務(wù),最后匯總每個(gè)小任務(wù)結(jié)果得到大任務(wù)結(jié)果的框架。
4.1.2. 工作竊取法
一個(gè)大任務(wù)拆分成多個(gè)小任務(wù),為了減少線程間的競(jìng)爭(zhēng),把這些子任務(wù)分別放到不同的隊(duì)列中,并且每個(gè)隊(duì)列都有單獨(dú)的線程來(lái)執(zhí)行隊(duì)列里的任務(wù),線程和隊(duì)列一一對(duì)應(yīng)。 如果A線程處理完成自己隊(duì)列的任務(wù),B線程的隊(duì)列里還有很多任務(wù)要處理,那么A就會(huì)從雙端隊(duì)列的尾部拿任務(wù)執(zhí)行,而B線程永遠(yuǎn)是從雙端隊(duì)列的頭部拿任務(wù)執(zhí)行。
優(yōu)點(diǎn):利用了線程進(jìn)行并行計(jì)算,減少了線程間的競(jìng)爭(zhēng)缺點(diǎn)
如果雙端隊(duì)列中只有一個(gè)任務(wù)時(shí),線程間會(huì)存在相互競(jìng)爭(zhēng)竊取算法消耗了更多的系統(tǒng)資源,如會(huì)創(chuàng)建多個(gè)線程和多個(gè)雙端隊(duì)列
4.1.3. 主要類
ForkJoinTask:使用該框架,需要?jiǎng)?chuàng)建一個(gè)ForkJoin任務(wù),它提供在任務(wù)中執(zhí)行fork和join操作的機(jī)制。一般情況下,并不需要直接繼承ForkJoinTask類,只需要繼承他的子類
RecursiveActive:用于返回沒有結(jié)果的任務(wù)RecursiveTask:用于有返回結(jié)果的任務(wù) ForkJoinPool:任務(wù)ForkJoinTask需要通過ForkJoinPool來(lái)執(zhí)行ForkJoinWorkerThread:ForkJoinPool線程池中的一個(gè)執(zhí)行任務(wù)的線程
4.2. ForkJoin框架示例
使用ForkJoin計(jì)算1+2+3+…+n = ?
public class SumTask extends RecursiveTask
private int start;
private int end;
// 當(dāng)任務(wù)處理的數(shù)字范圍在step以內(nèi)時(shí),將不再進(jìn)一步拆分,而是直接進(jìn)行累加計(jì)算。
private final int step = 2000000;
public SumTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long sum = 0;
if(end - start <= step) {
// 小于5個(gè)數(shù),直接求和
for (int i = start; i <= end; i++) {
sum += i;
}
}else {
// 大于5個(gè)數(shù),分解任務(wù)
int mid = (end + start) / 2;
SumTask leftTask = new SumTask(start, mid);
SumTask rightTask = new SumTask(mid + 1, end);
// 執(zhí)行子任務(wù)
leftTask.fork();
rightTask.fork();
// 子任務(wù)執(zhí)行完,得到結(jié)果
long leftSum = leftTask.join();
long rightSum = rightTask.join();
sum = leftSum + rightSum;
}
return sum;
}
public static void main(String[] args) {
// 如果是多核CPU,其實(shí)是一個(gè)一直使用,其他閑置,使用多線程解決,但是涉及到任務(wù)的拆分與合并等細(xì)節(jié);現(xiàn)在使用ForkJoin框架,可以較輕松解決
long start = System.currentTimeMillis();
long sum = 0;
for (int i = 0; i < 1000000000; i++) {
sum += i;
}
System.out.println(sum);
long end = System.currentTimeMillis();
System.out.println("for:" + (end - start));
// 使用ForkJoin框架解決
// 創(chuàng)建一個(gè)線程池
ForkJoinPool pool = new ForkJoinPool();
// 定義一個(gè)任務(wù)
SumTask sumTask = new SumTask(1, 1000000000);
// 將任務(wù)交給線程池
start = System.currentTimeMillis();
ForkJoinTask
// 得到結(jié)果輸出
try {
Long result = future.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
end = System.currentTimeMillis();
System.out.println("pool:" + (end - start));
}
}
柚子快報(bào)激活碼778899分享:緩存 Java多線程2
文章來(lái)源
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。