柚子快報邀請碼778899分享:RxJava響應式編程
柚子快報邀請碼778899分享:RxJava響應式編程
文章目錄
響應式編程什么是響應式編程TheReactiveManifesto(響應式宣言)百度百科維基百科SpringFramework5ReactiveX
小結
RxJava什么是RxJava基本概念原理數(shù)據(jù)流上游、下游背壓觀察者模式RxJ觀察者模式圖解原理
使用場景如何使用依賴pom.xml使用步驟
操作符介紹創(chuàng)建類操作符轉換類操作符過濾類操作符組合類操作符功能操作符
線程調(diào)度相關同步、異步訂閱調(diào)度器背壓策略BackpressureStrategy.ERRORBackpressureStrategy.MISSINGBackpressureStrategy.BUFFERBackpressureStrategy.DROPBackpressureStrategy.LATEST
并行處理
小結
參考鏈接
響應式編程
什么是響應式編程
互聯(lián)網(wǎng)上對響應式編程的解釋其實是多種多樣的。下面從幾個權威的解釋來認識什么是響應式編程。
TheReactiveManifesto(響應式宣言)
Reactive Systems are: Responsive, Resilient, Elastic and Message Driven.
The Reactive Manifesto對響應式的歸納為:響應式系統(tǒng)應當是響應的、適應性強的、彈性的和消息驅(qū)動的。
百度百科
響應式編程是一種面向數(shù)據(jù)流和變化傳播的編程范式。
百度給出了例子來說明何為響應式編程:Excel的單元格可以包含類似"=B1+C1"的公式,或進行求和計算,而包含公式的單元格的值會依據(jù)其他單元格的值的變化而變化,這就是典型的響應式編程。
維基百科
Reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change. With this paradigm it is possible to express static (e.g. arrays) or dynamic (e.g. event emitters) data streams with ease, and also communicate that an inferred dependency within the associated execution model exists, which facilitates the automatic propagation of the changed data flow.
維基百科作為全世界的權威知識庫,其給出的定義為:響應式編程是一種聲明式的編程范式,其核心要素是數(shù)據(jù)流與其傳播變化,前者是關于數(shù)據(jù)結構的描述,包括靜態(tài)的數(shù)組和動態(tài)的事件發(fā)射器。
SpringFramework5
The term “reactive” refers to programming models that are built around reacting to change?—?network component reacting to I/O events, UI controller reacting to mouse events, etc. In that sense non-blocking is reactive because instead of being blocked we are now in the mode of reacting to notifications as operations complete or data becomes available.
術語“reactive”指的是圍繞響應更改構建的編程模型——網(wǎng)絡組件響應I/O事件,UI控制器響應鼠標事件,等等。從這個意義上說,非阻塞是反應性的,因為當操作完成或數(shù)據(jù)可用時,我們現(xiàn)在處于響應通知的模式,而不是被阻塞。
ReactiveX
Reactive Extensions for Async Programming.
ReactiveX是實現(xiàn)異步編程的響應式擴展。ReactiveX的官網(wǎng)并沒有對Reactive作出明確的解釋,而是通過一系列的技術實現(xiàn)去詮釋什么是響應式編程。RxPY、RxJs、RxCpp、RxAndroid、RxGroovy,以及本文的核心編程庫RxJava,ReactiveX實現(xiàn)了主流編程語言的響應式編程API,其核心便是異步編程。
小結
響應式編程其實是個比較抽象的概念,以上解釋初看時并不會有任何直觀感受,需要真正走近使用才能有明顯的體會。
RxJava
什么是RxJava
歷史:ReactiveX(后文簡稱Rx)是Reactive Extensions的縮寫,由微軟的架構師Erik Meijer領導團隊開發(fā),于2012年11月開源,Rx庫支持 .NET 、JavaScript 、Java 、Android、C++ 等幾乎全部的流行編程語言,Rx的大部分語言庫由ReactiveX這個組織負責維護,比較流行的有RxJava/RxJS/Rx.NET,社區(qū)網(wǎng)站 reactivex.io,源碼托管于github.com/ReactiveX。其中RxJava(后文簡稱RxJ)是Java語言的實現(xiàn)。 概念:RxJava 是對Java語言的響應式編程實現(xiàn):一個使用可觀察序列組成異步和基于事件的程序的庫。它擴展了觀察者模式以支持數(shù)據(jù)/事件序列,并添加了允許您以聲明方式將序列組合在一起的運算符,同時抽象出對低級線程、同步、線程安全和并發(fā)數(shù)據(jù)結構等事物的關注。 總結:基于事件流、實現(xiàn)異步操作的庫。
基本概念原理
數(shù)據(jù)流
數(shù)據(jù)流是響應式編程的核心。數(shù)據(jù)流是一個按時間排序的即將發(fā)生的事件序列。拿點外賣來講,總會有這樣一些流程,選擇店鋪,進入店鋪,兌換優(yōu)惠券,選擇主食,加個飲料,下單,支付。這一系列事件組成的序列就是數(shù)據(jù)流。數(shù)據(jù)流可以被觀測(下單完成庫存減少),被過濾(錢不夠了無法下單),被操作,或者為新的消費者與另外一條流合并為一條新的流。
上游、下游
source.operator1().operator2().operator3().subscribe(consumer);
RxJ中的數(shù)據(jù)流由一個源、零個或多個中間步驟組成,后跟一個數(shù)據(jù)消費者;我們可以把operator2()左看向源的方向稱為上游,向右看向消費者被稱為下游。如果把RxJ的數(shù)據(jù)流向比作一條河流,那么就很容易理解上下游對應的意義了。
背壓
當數(shù)據(jù)流通過異步步驟時,每個步驟可能以不同的速度執(zhí)行不同的事情,這時如果上游的數(shù)據(jù)產(chǎn)生過快,下游處理數(shù)據(jù)過慢,未及時處理的數(shù)據(jù)造成積壓,這些數(shù)據(jù)被放到緩沖池中,如果長時間不處理這些緩沖數(shù)據(jù),最后一定會造成無法預料的結果。比如,在發(fā)洪水期間,下游沒辦法一下子消耗那么多水,大壩此時的作用就是攔截洪水,并根據(jù)下游的消耗情況酌情排放。
觀察者模式
觀察者模式是常用的軟件設計模式之一。觀察者模式定義對象間的一種一對多的依賴關系,當一個對象(被觀察者)的狀態(tài)發(fā)生改變時,所有依賴于它的對象(觀察者)都得到通知并被自動更新。以點外賣為例,下單(被觀察者)這個動作完成,外賣平臺可能會觸發(fā)以下動作,實時修改訂單狀態(tài)(觀察者1)、通知用戶下單完成(觀察者2)、給客戶送積分(觀察者3)、通知商家接單(觀察者4)…,這些動作之間并沒有強耦合,非常適合觀察者模式,即如果下單后需要觸發(fā)更多的操作,只需要增加觀察者即可,完美實現(xiàn)軟件設計的開閉原則。
RxJ觀察者模式
RxJ其實就是對觀察者模式的擴展。 被觀察者(Observable):通過訂閱行為subscribe()把事件按順序發(fā)送到觀察者(Observer)。 觀察者(Observer):按順序接收到事件&做出響應反饋。
圖解原理
RxJ的入門有一個經(jīng)典案例,即顧客點餐,下面以此例對RxJ產(chǎn)生一個初步的認識。 顧客點餐:顧客點餐,廚師按訂單依次準備(薯條、漢堡、雞塊、肉卷、奶茶、魚排),服務員依次按做好的菜品上菜(薯條、漢堡、雞塊、肉卷、奶茶、魚排),最后廚師告知服務菜齊了,服務員通知顧客菜已上齊。 其中RxJ中觀察者、被觀察者、訂閱、事件與上圖對應的關系可以表示為
點餐RxJava說明廚師被觀察者(Observable)產(chǎn)生數(shù)據(jù)流,產(chǎn)生事件——即做雞塊、做薯條…,菜做完告知服務員服務員訂閱(subscribe)連接被觀察者和觀察者——即將菜上給顧客,最后告知顧客做完了顧客觀察者(Observer)消費數(shù)據(jù)流,監(jiān)聽事件——即上一個菜吃一個,上完接著吃每份菜、做完菜的通知事件(Event)數(shù)據(jù)流——即雞塊、薯條…這些菜品,以及菜已做齊這件事
以上過程分別可以用RxJ3可以表示為
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.ObservableEmitter;
import io.reactivex.rxjava3.core.ObservableOnSubscribe;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.operators.observable.ObservableObserveOn;
import org.junit.Test;
/**
* 顧客點餐案例解釋RxJava
*
* @author yaoji
* @date 2022/7/20 0:52
*/
public class CustomerOrderObservableTest {
@Test
public void orderTest() {
/*
* Observable被觀察者,數(shù)據(jù)流/事件的生產(chǎn)者——RxJava中稱為上游-upstream
* 本例中被觀察者為廚師,負責準備薯條、漢堡、雞塊、肉卷、奶茶、魚排,并告知服務員菜都準備齊了
*/
Observable
@Override
public void subscribe(@NonNull ObservableEmitter
// emitter是發(fā)射器,可以理解為服務員
// RxJava通過onNext事件傳遞數(shù)據(jù)流
System.out.println("薯條準備完畢,服務員上菜...");
emitter.onNext("薯條");
System.out.println("漢堡準備完畢,服務員上菜...");
emitter.onNext("漢堡");
System.out.println("雞塊準備完畢,服務員上菜...");
emitter.onNext("雞塊");
System.out.println("肉卷準備完畢,服務員上菜...");
emitter.onNext("肉卷");
System.out.println("奶茶準備完畢,服務員上菜...");
emitter.onNext("奶茶");
System.out.println("魚排準備完畢,服務員上菜...");
emitter.onNext("魚排");
System.out.println("全部菜品準備完成");
// onComplete事件表示所有數(shù)據(jù)傳遞完畢 即全部菜準備齊全
emitter.onComplete();
}
});
/*
* Observer觀察者,數(shù)據(jù)流/事件的接收者——RxJava中稱為下游-downstream
* 本例中觀察者為客戶,負責接收服務員送到的菜,以及菜是否上齊等信息
*/
Observer
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("顧客點單:薯條、漢堡、雞塊、肉卷、奶茶、魚排");
}
@Override
public void onNext(@NonNull String s) {
System.out.println("顧客接受上菜:" + s);
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println("上菜出錯了" + e);
}
@Override
public void onComplete() {
System.out.println("菜上齊了!??!");
}
};
// 通過subscribe建立聯(lián)系,subscribe()=訂閱,即服務員,連接廚師和顧客
cookerObservable.subscribe(customerObserver);
/*輸出結果:
顧客點單:薯條、漢堡、雞塊、肉卷、奶茶、魚排
薯條準備完畢,服務員上菜...
顧客接受上菜:薯條
漢堡準備完畢,服務員上菜...
顧客接受上菜:漢堡
雞塊準備完畢,服務員上菜...
顧客接受上菜:雞塊
肉卷準備完畢,服務員上菜...
顧客接受上菜:肉卷
奶茶準備完畢,服務員上菜...
顧客接受上菜:奶茶
魚排準備完畢,服務員上菜...
顧客接受上菜:魚排
全部菜品準備完成
菜上齊了!??!
*/
}
}
使用場景
所有異步操作均可使用RxJ。其實RxJ完全可以被自定義線程替代,但RxJ的鏈式調(diào)用和操作符能大量簡化代碼的復雜度,使得異步調(diào)用、線程切換變得十分簡單。
P.S RxJ在Android領域能發(fā)揮出更強大的功效(可試試RxAndroid和RxKotlin),因為大部分的UI開發(fā)都有異步數(shù)據(jù)流的處理。RxJ在Java服務端并沒有很熱門的反響,其根本原因是服務端絕大部分處理以同步為主,加之RxJ的入門成本是比較高的。目前使用RxJ完成的項目中,比較出名的類庫是Hystrix(處理分布式系統(tǒng)的延遲和容錯的庫)。
如何使用
早在2019年,ReactiveX就發(fā)布了RxJ3,RxJ主要包括RxJ1.x,RxJ2.x,RxJ3.x,需要注意的是,RxJ2.x是對RxJ1.x的增強,而RxJ3.x除了增強外,其包結構是做了不兼容調(diào)整的,升級時需要注意。本文的使用案例均基于RxJ3.x。 Github RxJava官網(wǎng)使用說明
依賴pom.xml
maven管理
使用步驟
創(chuàng)建被觀察者 Observable:產(chǎn)生數(shù)據(jù)流創(chuàng)建觀察者 Observer:消費數(shù)據(jù)訂閱 Subscribe:連接被觀察者和觀察者
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import org.junit.Test;
/**
* RxJava使用步驟
* @author yaoji
* @date 2022/7/24 21:06
*/
public class UseRxJavaTest {
/**
* 基本步驟
* 1. 創(chuàng)建被觀察者Observable:產(chǎn)生數(shù)據(jù)流
* 2. 創(chuàng)建觀察者Observer:消費數(shù)據(jù)
* 3. 訂閱Subscribe:連接被觀察者和觀察者
*/
@Test
public void baseUseTest() {
// 1.創(chuàng)建observable
@NonNull final Observable
@Override
public void subscribe(@NonNull ObservableEmitter
System.out.println("發(fā)送數(shù)據(jù):薯條");
emitter.onNext("薯條");
emitter.onComplete();
}
});
// 2.創(chuàng)建observer
final Observer
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("訂閱");
}
@Override
public void onNext(String o) {
System.out.println("接收數(shù)據(jù):" + o);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
System.out.println("數(shù)據(jù)接收完畢");
}
};
// 3.訂閱subscribe
observable.subscribe(observer);
}
/**
* Observable鏈式調(diào)用
* 可以通過鏈式調(diào)用一步完成上述編碼
*/
@Test
public void chainCallTest() {
Observable.create((ObservableOnSubscribe
System.out.println("發(fā)送數(shù)據(jù):薯條");
emitter.onNext("薯條");
emitter.onComplete();
})
// 訂閱時響應
.doOnSubscribe(d -> System.out.println("訂閱"))
// next事件響應
.doOnNext(o -> System.out.println("接收數(shù)據(jù):" + o))
// complete事件響應
.doOnComplete(() -> System.out.println("數(shù)據(jù)接收完畢"))
// 訂閱
.subscribe();
}
/**
* 使用支持背壓的{@link Flowable}替代{@link Observable}
* 注:{@link Flowable}的使用幾乎和{@link Observable}相差無幾
* 只是{@link Flowable}支持了背壓{@link BackpressureStrategy}
*/
@Test
public void flowableTest() {
Flowable.create((FlowableOnSubscribe
// 創(chuàng)建被觀察者
System.out.println("發(fā)送數(shù)據(jù):薯條");
emitter.onNext("薯條");
emitter.onComplete();
// 背壓的模式
}, BackpressureStrategy.BUFFER
)
// 訂閱時響應
.doOnSubscribe(d -> System.out.println("訂閱"))
// next事件響應
.doOnNext(o -> System.out.println("接收數(shù)據(jù):" + o))
// complete事件響應
.doOnComplete(() -> System.out.println("數(shù)據(jù)接收完畢"))
// 訂閱
.subscribe();
}
}
以上通過三種方式對RxJ的基本步驟作了編碼,后文所有例子將使用支持背壓、鏈式調(diào)用的Flowable進行編碼。
操作符介紹
RxJ的大部分運算符對 Observable 進行操作并返回一個 Observable,這使得RxJ可以進行鏈式調(diào)用,每次運算符都會修改前一個運算符的運算產(chǎn)生的 Observable。 下面將分類介紹RxJ的核心操作符——操作符比較多、且重載很多。
創(chuàng)建類操作符
產(chǎn)生新 Observable 的操作符
import io.reactivex.rxjava3.core.BackpressureStrategy;
import io.reactivex.rxjava3.core.Flowable;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* RxJava創(chuàng)建的操作符
* 作用:產(chǎn)生新 Observable 的操作符
* {@see https://github.com/ReactiveX/RxJava/blob/3.x/docs/Creating-Observables.md}
*
* @author yaoji
* @date 2022/7/26 0:12
*/
@FixMethodOrder(MethodSorters.JVM)
public class CreateOperatorTest {
/**
* create操作符 最基本的創(chuàng)建 {@link io.reactivex.rxjava3.core.Observable} 的方式
*/
@Test
public void createTest() {
System.out.println("\ncreate()操作符測試結果:");
Flowable.create(emitter -> {
emitter.onNext("薯條");
emitter.onNext("雞翅");
}, BackpressureStrategy.BUFFER).subscribe(System.out::println).dispose();
// 輸出 薯條 雞翅
}
/**
* just操作符
*/
@Test
public void justTest() {
System.out.println("\njust()操作符測試結果:");
// 指定發(fā)射數(shù)據(jù)
Flowable.just("薯條", "雞翅").subscribe(System.out::println).dispose();
// 輸出 薯條 雞翅
}
/**
* fromArray()操作符 指定發(fā)射數(shù)據(jù)為數(shù)組
*/
@Test
public void fromArrayTest() {
System.out.println("\nfromArray()操作符測試結果:");
Flowable.fromArray(new String[]{"薯條", "雞翅"}).subscribe(System.out::println).dispose();
// 輸出 薯條 雞翅
}
/**
* fromIterable()操作符 指定發(fā)射數(shù)據(jù)為迭代器
*/
@Test
public void fromIterableTest() {
System.out.println("\nfromIterable()操作符測試結果:");
List
foods.add("薯條");
foods.add("雞翅");
Flowable.fromIterable(foods).subscribe(System.out::println).dispose();
// 輸出 薯條 雞翅
}
/**
* interval()操作符 按一定時間間隔發(fā)射數(shù)據(jù)(異步)
* 該操作為
* 異步
*/
@Test
public void intervalTest() throws InterruptedException {
System.out.println("\ninterval()操作符測試結果:");
// 每間隔1s發(fā)射一條數(shù)據(jù)(數(shù)據(jù)從0開始,每次遞增1)
Flowable.interval(1, TimeUnit.SECONDS).subscribe(data ->
System.out.println("timestamp:" + System.currentTimeMillis() + " data: " + data));
// 異步操作 需要阻塞線程
Thread.sleep(3 * 1000);
// 輸出
// timestamp:1658942322488 data: 0
// timestamp:1658942323482 data: 1
// timestamp:1658942324487 data: 2
}
/**
* timer()操作符 延遲一定時間后發(fā)射數(shù)據(jù)(0L)
* 該操作為異步
*/
@Test
public void timerTest() throws InterruptedException {
System.out.println("\ntimer()操作符測試結果:");
final long start = System.currentTimeMillis();
// 延遲1800ms發(fā)射數(shù)據(jù)(數(shù)據(jù)為0L)
Flowable.timer(1800, TimeUnit.MILLISECONDS).subscribe(data -> {
System.out.println("延遲: " + (System.currentTimeMillis() - start) + "ms");
System.out.println("接收數(shù)據(jù):" + data);
});
Thread.sleep(3 * 1000);
// 輸出 延遲: 1981ms 接收數(shù)據(jù):0
}
/**
* range()操作符 指定發(fā)射序列整數(shù)數(shù)據(jù)
*/
@Test
public void rangeTest() {
System.out.println("\nrange()操作符測試結果:");
// 發(fā)射數(shù)據(jù)從20開始,共10個
Flowable.range(20, 3).subscribe(System.out::println).dispose();
// 輸出 20 21 22
}
}
轉換類操作符
轉換 Observable 發(fā)出的事件(數(shù)據(jù))的操作符
import io.reactivex.=rxjava3.core.Flowable;
import io.reactivex.rxjava3.functions.Function;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import org.reactivestreams.Publisher;
/**
* RxJava轉換的操作符
* 作用:轉換 Observable 發(fā)出的事件(數(shù)據(jù))的操作符
* {@see https://github.com/ReactiveX/RxJava/blob/3.x/docs/Transforming-Observables.md}
*
* @author yaoji
* @date 2022/7/26 0:42
*/
@FixMethodOrder(MethodSorters.JVM)
public class TransformOperatorTest {
/**
* map()操作符 對發(fā)射的數(shù)據(jù)進行函數(shù)式轉換 產(chǎn)生新的 Observable
*/
@Test
public void mapTest() {
System.out.println("\nmap()操作符測試結果:");
Flowable.just("薯條", "雞翅")
// 每條數(shù)據(jù)單獨轉換 (各種食物調(diào)味)
.map(eachFood -> {
// 通過map對數(shù)據(jù)進行函數(shù)轉換(雞翅要特辣的)
return "雞翅".equals(eachFood) ? eachFood + "(特辣)" : eachFood + "(常規(guī))";
})
.subscribe(System.out::println).dispose();
// 輸出 薯條(常規(guī)) 雞翅(特辣)
}
/**
* flatMap()操作符 對每條數(shù)據(jù)都轉換為 Observable
* flatMap() 和 map() 的區(qū)別是:
* map() 只對數(shù)據(jù)轉換,最終每條數(shù)據(jù)對應的 Observable 只有數(shù)據(jù)變化
* flatMap() 對 Observable 改變,數(shù)據(jù)可能被去除或增加
*/
@Test
public void flatMapTest() {
System.out.println("\nflatMap()操作符測試結果:");
Flowable.just("薯條", "雞翅")
// 每條數(shù)據(jù)單獨轉換為 Observable (點了雞翅可以贈送飲料——觀察者接收的食物量改變)
.flatMap(
(Function
// 產(chǎn)生新的 Observable (雞翅送冰紅茶)
return "雞翅".equals(s) ? Flowable.just(s, "冰紅茶") : Flowable.just(s);
}
)
// 觀察者接收到的數(shù)據(jù)為"薯條", "雞翅", "冰紅茶"
.subscribe(System.out::println).dispose();
// 輸出 薯條 雞翅 冰紅茶
}
/**
* buffer()操作符 將發(fā)射的數(shù)據(jù)放入集合,每次發(fā)射集合替代發(fā)射單條數(shù)據(jù)
*/
@Test
public void bufferTest() {
System.out.println("\nbuffer()操作符測試結果:");
Flowable.just("奶茶", "鹵蛋", "漢堡", "炸雞", "雞湯", "牛排")
// 兩條數(shù)據(jù)分到一起發(fā)射 (每次上菜 左右手都端 節(jié)省時間)
.buffer(2)
.subscribe(System.out::println).dispose();
// 輸出 [奶茶, 鹵蛋] [漢堡, 炸雞] [雞湯, 牛排]
}
/**
* window()操作符 按 Observable 集合發(fā)射
* window()操作符和buffer()的區(qū)別在于:
* buffer()是將數(shù)據(jù)作為集合發(fā)射
* window()是將 Observable 集合一次發(fā)射
*/
@Test
public void windowTest() {
System.out.println("\nwindow()操作符測試結果:");
Flowable.range(1, 4)
.window(2)
.subscribe(c -> {
// 接收到的是 Observable
System.out.println(c);
// "子訂閱" 輸出
c.subscribe(System.out::println);
}
).dispose();
// 輸出
// io.reactivex.rxjava3.internal.operators.flowable.FlowableWindowSubscribeIntercept@212bf671
// 1
// 2
// io.reactivex.rxjava3.internal.operators.flowable.FlowableWindowSubscribeIntercept@16aa0a0a
// 3
// 4
}
/**
* groupBy()操作符 對 Observable 發(fā)射的數(shù)據(jù)按Key分組,每個 Observable 發(fā)射一組數(shù)據(jù)
*/
@Test
public void groupByTest() {
System.out.println("\ngroupBy()操作符測試結果:");
Flowable.just("雞頭", "雞你太美", "漢堡", "奶茶")
.groupBy((Function
if (food.contains("雞")) {
return "雞";
}
return "其它";
})
.subscribe(ob -> {
Object key = ob.getKey();
System.out.println("分組:" + key);
// "子訂閱"
ob.subscribe(System.out::println);
}).dispose();
// 輸出
// 分組:雞
// 雞頭
// 雞你太美
// 分組:其它
// 漢堡
// 奶茶
}
}
過濾類操作符
對 Observable 發(fā)射的數(shù)據(jù)進行過濾和選擇的操作符
import io.reactivex.rxjava3.core.Flowable;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import java.util.concurrent.TimeUnit;
/**
* RxJava過濾的操作符
* 作用:對 Observable 發(fā)射的數(shù)據(jù)進行過濾和選擇的操作符
* {@see https://github.com/ReactiveX/RxJava/blob/3.x/docs/Filtering-Observables.md}
* 注:這里的過濾和Java 8的{@link java.util.stream.Stream}有類似效果,都是將發(fā)射的數(shù)據(jù)按一定條件過濾
* 過濾操作符很多,大部分的實際用處可能不大,且使用比較簡單,作個簡單了解即可
*
* @author yaoji
* @date 2022/7/27 1:22
*/
@FixMethodOrder(MethodSorters.JVM)
public class FilterOperatorTest {
/**
* filter()操作符 對 Observable 發(fā)射的數(shù)據(jù)進行過濾,滿足條件的才被 Observer 接收
*/
@Test
public void filterTest() {
System.out.println("\nfilter()操作符測試結果:");
Flowable.range(1, 6)
// 過濾 剩下偶數(shù)
.filter(integer -> integer % 2 == 0)
.subscribe(System.out::println).dispose();
// 輸出 2 4 6
}
/**
* distinct()操作符 對 Observable 發(fā)射的數(shù)據(jù)去重
*/
@Test
public void distinctTest() {
System.out.println("\ndistinct()操作符測試結果:");
Flowable.just(1, 2, 3, 1, 2, 3, 4, 5)
// 去重 1 2 3重復 剩下一組數(shù)據(jù)
.distinct()
.subscribe(System.out::println).dispose();
// 輸出1 2 3 4 5
}
/**
* first()操作符 僅發(fā)射 Observable 的第一條數(shù)據(jù)
*/
@Test
public void firstTest() {
System.out.println("\nfirst()操作符測試結果:");
Flowable.range(1, 10)
// 取第一條數(shù)據(jù) 沒有發(fā)射數(shù)據(jù)則默認發(fā)射-1
.first(-1)
// 不想要默認發(fā)射數(shù)據(jù) 則使用firstElement替代
//.firstElement()
.subscribe(System.out::println).dispose();
// 輸出 1
}
/**
* last()操作符 僅發(fā)射 Observable 的最后一條數(shù)據(jù)
*/
@Test
public void lastTest() {
System.out.println("\nlast()操作符測試結果:");
Flowable.range(1, 10)
// 取最后一條數(shù)據(jù) 沒有發(fā)射數(shù)據(jù)則默認發(fā)射-1
.last(-1)
// 不想要默認發(fā)射數(shù)據(jù) 則使用lastElement替代
//.lastElement()
.subscribe(System.out::println).dispose();
// 輸出 10
}
/**
* ofType()操作符 按數(shù)據(jù)的類型過濾
*/
@Test
public void ofTypeTest() {
System.out.println("\nofType()操作符測試結果:");
Flowable.just(1L, "薯條", Math.PI, 0.618f)
// 按Double過濾 只有π會被發(fā)射出去
.ofType(Double.class)
.subscribe(System.out::println).dispose();
// 輸出 3.141592653589793
}
/**
* sample()操作符 在周期性的時間間隔內(nèi),發(fā)射最后該段間隔的最后一條數(shù)據(jù)
*/
@Test
public void sampleTest() {
System.out.println("\nsample()操作符測試結果:");
Flowable.range(0, 99999999)
// 共發(fā)射 99999999 個數(shù)據(jù),每200ms間隔內(nèi)的數(shù)據(jù)取最后一條發(fā)射
.sample(200, TimeUnit.MILLISECONDS)
.subscribe(System.out::println).dispose();
// 輸出 47181190 94194001 輸出結果和運行時有關
}
/**
* skip()操作符 過濾掉前n項數(shù)據(jù)后發(fā)射
*/
@Test
public void skipTest() {
System.out.println("\nskip()操作符測試結果:");
Flowable.range(1, 10)
// 1-10 過濾掉前5個
.skip(5)
.subscribe(System.out::println).dispose();
// 輸出 6 7 8 9 10
}
/**
* take()操作符 只發(fā)射前n項數(shù)據(jù)
*/
@Test
public void takeTest() {
System.out.println("\ntake()操作符測試結果:");
Flowable.range(1, 10)
// 1-10 只發(fā)射前5個
.take(5)
.subscribe(System.out::println).dispose();
// 輸出 1 2 3 4 5
}
}
組合類操作符
組合多個 Observable 為一個 Observable 的操作符
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.functions.BiFunction;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
/**
* RxJava組合操作符
* 作用:組合多個 Observable 為一個 Observable
* {@see https://github.com/ReactiveX/RxJava/blob/3.x/docs/Combining-Observables.md}
*
* @author yaoji
* @date 2022/7/27 22:32
*/
@FixMethodOrder(MethodSorters.JVM)
public class CombineOperatorTest {
/**
* merge()操作符 組合多個 Observable 到一個中
*/
@Test
public void mergeTest() {
Flowable.just(1, 2)
// 合并第一個 Observable
.mergeWith(Flowable.just(3, 4))
// 合并第二個 Observable
.mergeWith(Flowable.just(5, 6))
.subscribe(System.out::println).dispose();
// 輸出 1, 2, 3, 4, 5, 6
}
/**
* startWith()操作符 Observable 發(fā)射數(shù)據(jù)前先發(fā)射指定的數(shù)據(jù)序列
*/
@Test
public void startWithTest() {
Flowable.just(1, 2)
.startWith(Flowable.just(3, 4))
.startWith(Flowable.just(5, 6))
.subscribe(System.out::println).dispose();
// 輸出 5, 6, 3, 4, 1, 2
}
/**
* count()操作符 統(tǒng)計 Observable 發(fā)射的數(shù)據(jù)數(shù)量,并將統(tǒng)計結果發(fā)射給觀察者
*/
@Test
public void countTest() {
// 發(fā)射從1開始的10個數(shù)
Flowable.range(1, 10)
.count()
.subscribe(System.out::println).dispose();
// 輸出 10
}
/**
* zip()操作符 通過函數(shù)將多個 Observable 發(fā)射的數(shù)據(jù)作函數(shù)處理,并將函數(shù)處理的結果發(fā)射給觀察者
*/
@Test
public void zipTest() {
Flowable.just(1.1, 2.2, 3.3, 4.4)
// 第一次zip 對4組發(fā)射數(shù)據(jù)排列作相乘處理,得 2.2, 8.8, 6.6(4.4的數(shù)據(jù)自動丟棄)
.zipWith(Flowable.just(2, 4, 2), (last, current) -> last * current)
// 第二次zip 對三組發(fā)射數(shù)據(jù)排列作相加處理,得 3.2, 10.8(6.6的數(shù)據(jù)自動丟棄)
.zipWith(Flowable.just(1, 2), (BiFunction
.subscribe(System.out::println).dispose();
// 輸出 3.2, 10.8
}
/**
* combineLatest()操作符 多個 Observable 進行合并,最后一個 Observable 發(fā)射的每一條數(shù)據(jù)與 前面所有的 Observable 發(fā)射的最后一條數(shù)據(jù)進行函數(shù)轉換,最后將結果發(fā)射給觀察者
*/
@Test
public void combineLatestTest() {
Flowable.combineLatest(
// 發(fā)射 1-10 該 Observable 只有最后一條數(shù)據(jù)10參與組合
Flowable.range(1, 10),
// 發(fā)射 1-5 該 Observable 的每一條數(shù)據(jù)參與組合
Flowable.range(1, 5),
(first, second) -> "first " + first + ", second " + second
).subscribe(System.out::println).dispose();
// 輸出
// first 10, second 1
// first 10, second 2
// first 10, second 3
// first 10, second 4
// first 10, second 5
Flowable.combineLatest(
// 發(fā)射 1-10 該 Observable 只有最后一條數(shù)據(jù)10參與組合
Flowable.range(1, 10),
// 發(fā)射 1-5 該 Observable 只有最后一條數(shù)據(jù)5參與組合
Flowable.range(1, 5),
// 發(fā)射 1-5 該 Observable 的每一條數(shù)據(jù)參與組合
Flowable.range(1, 5),
(first, second, third) -> "first " + first + ", second " + second + ", third" + third
).subscribe(System.out::println).dispose();
// 輸出
// first 10, second 5, third1
// first 10, second 5, third2
// first 10, second 5, third3
// first 10, second 5, third4
// first 10, second 5, third5
// 上述組合的 Observable 如果都換成 interval()-異步創(chuàng)建,則產(chǎn)生的效果會更能體現(xiàn)
}
}
功能操作符
實用性比較強的操作符
import io.reactivex.rxjava3.core.BackpressureStrategy;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.FlowableOnSubscribe;
import io.reactivex.rxjava3.schedulers.Schedulers;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import java.util.concurrent.TimeUnit;
/**
* RxJava效能的操作符
* {@see https://github.com/ReactiveX/RxJava/blob/3.x/docs/Observable-Utility-Operators.md}
*
* @author yaoji
* @date 2022/7/29 0:19
*/
@FixMethodOrder(MethodSorters.JVM)
public class UtilityOperatorTest {
/**
* subscribe()操作符 訂閱:連接被觀察者和觀察者
* subscribe()前為被觀察者,后為觀察者
*/
@Test
public void subscribeTest() {
Flowable.just(1, 2).subscribe(System.out::println).dispose();
// 輸出 1, 2
}
/**
* timestamp()操作符 將時間戳附加到 Observable 發(fā)出的每一項上
*/
@Test
public void timestampTest() throws InterruptedException {
Flowable.interval(100, TimeUnit.MILLISECONDS)
// 發(fā)射的數(shù)據(jù)會帶上時間戳信息
.timestamp()
.subscribe(System.out::println);
Thread.sleep(500);
// 輸出
/*
Timed[time=1659025449727, unit=MILLISECONDS, value=0]
Timed[time=1659025449822, unit=MILLISECONDS, value=1]
Timed[time=1659025449916, unit=MILLISECONDS, value=2]
Timed[time=1659025450027, unit=MILLISECONDS, value=3]
Timed[time=1659025450121, unit=MILLISECONDS, value=4]
*/
}
/**
* doOnEach()操作符 Observable 每次發(fā)射一次事件前都會調(diào)用(包括onNext()、onError()、onComplete())
*/
@Test
public void doOnEachTest() {
Flowable.just(1, 2)
.doOnEach(c -> {
if (c.isOnNext()) {
// Observable 發(fā)射 onNext()事件,共兩個數(shù)據(jù) 所以會調(diào)用兩次
System.out.println("Observable發(fā)射事件onNext(),發(fā)射數(shù)據(jù):" + c.getValue());
}
if (c.isOnComplete()) {
// Observable 發(fā)射 onComplete()事件,數(shù)據(jù)發(fā)射完后 Observable 會默認調(diào)用onComplete(), 所以會執(zhí)行一次
System.out.println("Observable發(fā)射事件onComplete()");
}
})
.subscribe(onNext -> System.out.println("Observer接收數(shù)據(jù):" + onNext))
.dispose();
// 輸出
/*
Observable發(fā)射事件onNext(),發(fā)射數(shù)據(jù):1
Observer接收數(shù)據(jù):1
Observable發(fā)射事件onNext(),發(fā)射數(shù)據(jù):2
Observer接收數(shù)據(jù):2
Observable發(fā)射事件onComplete()
*/
}
/**
* doOnNext()操作符 在觀察者 Observer 執(zhí)行onNext()前執(zhí)行
* doAfterNext()操作符 在觀察者 Observer 執(zhí)行onNext()后執(zhí)行
*/
@Test
public void doOnNextTest() {
Flowable.just("onNext-1", "onNext-2")
.doOnNext(c -> System.out.print("onNext()前執(zhí)行\(zhòng)t"))
.doAfterNext(c -> System.out.println("\tonNext()后執(zhí)行"))
// 注:subscribe()的內(nèi)執(zhí)行的就是onNext(),或者說監(jiān)聽到被觀察者發(fā)射onNext()事件
.subscribe(System.out::print)
.dispose();
// 輸出
// onNext()前執(zhí)行 onNext-1 onNext()后執(zhí)行
// onNext()前執(zhí)行 onNext-2 onNext()后執(zhí)行
}
/**
* doOnComplete()操作符 在所有事件發(fā)射完成后執(zhí)行
*/
@Test
public void doOnCompleteTest() {
Flowable.just(1, 2)
// Observable 將所有數(shù)據(jù)發(fā)射完畢后執(zhí)行
.doOnComplete(() -> System.out.println("onComplete()后執(zhí)行"))
.subscribe(System.out::println)
.dispose();
}
/**
* doFinally()操作符 在 Observable 終止或釋放時執(zhí)行
*/
@Test
public void doFinallyTest() {
/*
* 每隔1秒發(fā)射一條數(shù)據(jù)
* 由于interval是異步執(zhí)行,所以 Observable 根本來不及發(fā)射出數(shù)據(jù),主線程就會推出,導致無法輸出信息
* 但是doFinally()可以保證在釋放時執(zhí)行
* */
Flowable.interval(1, TimeUnit.SECONDS)
// Observable 終止或釋放時執(zhí)行
.doFinally(() -> System.out.println("Observable釋放時執(zhí)行"))
// 這里觀察者是無法接收到數(shù)據(jù)的
.subscribe(onNext -> System.out.println("觀察者接收到數(shù)據(jù)"))
// 手動直接釋放,Observable來不及發(fā)射數(shù)據(jù)就會被釋放
.dispose();
// 輸出 Observable釋放時執(zhí)行
}
/**
* doOnError()操作符 在觀察者 Observer 執(zhí)行onError()前執(zhí)行
*/
@Test
public void doOnError() {
// Observable 發(fā)射onError()事件
Flowable.error(new RuntimeException("發(fā)射onError()事件"))
// 在 Observer 接收onError()前執(zhí)行
.doOnError(error -> System.out.println(error.getClass().getName() + error.getMessage()))
.subscribe(System.out::println)
.dispose();
// 輸出 java.lang.RuntimeException發(fā)射onError()事件
}
/**
* doOnTerminate()操作符 Observable 執(zhí)行終止時執(zhí)行(可能發(fā)射完成、可能發(fā)射異常)
*/
@Test
public void doOnTerminatingTest() {
Flowable.just(1, 2)
// 合并onNext()事件并發(fā)射
.mergeWith(Flowable.just(3))
// 合并onError()事件并發(fā)射——模擬發(fā)射異常的場景
.mergeWith(Flowable.error(new RuntimeException("error")))
// 合并onNext()事件并發(fā)射
.mergeWith(Flowable.just(4))
.doOnTerminate(() -> System.out.println("Observable終止時執(zhí)行"))
.subscribe(System.out::println)
.dispose();
// 輸出 從輸出結果看出 在發(fā)射異常后直接執(zhí)行了doOnTerminate()
/*
1
2
3
Observable終止時執(zhí)行
*/
}
/**
* observeOn()操作符 指定觀察者接收事件的調(diào)度器/線程
*/
@Test
public void observeOnTest() {
Flowable.just(1, 2)
// 指定觀察者接收事件的線程
.observeOn(Schedulers.newThread())
.subscribe(onNext -> System.out.println("observe線程:" + Thread.currentThread().getName() + ", 接收數(shù)據(jù):" + onNext));
// 輸出
// observe線程:RxNewThreadScheduler-1, 接收數(shù)據(jù):1
// observe線程:RxNewThreadScheduler-1, 接收數(shù)據(jù):2
}
/**
* subscribeOn()操作符 指定被觀察者發(fā)射事件的調(diào)度器/線程
*/
@Test
public void subscribeOnTest() throws InterruptedException {
Flowable.create((FlowableOnSubscribe
System.out.println("observable線程:" + Thread.currentThread().getName());
emitter.onNext(1);
emitter.onNext(2);
}, BackpressureStrategy.BUFFER)
// 指定觀察者接收事件的線程
.subscribeOn(Schedulers.newThread())
.subscribe(System.out::println);
Thread.sleep(100);
// 輸出 observable線程:RxNewThreadScheduler-1
}
/**
* retry()操作符 出現(xiàn)錯誤時被觀察者重新發(fā)射數(shù)據(jù)
*/
@Test
public void retryTest() {
Flowable.just(1, 2)
// 手動發(fā)射error模擬異常
.mergeWith(Flowable.error(new RuntimeException()))
.mergeWith(Flowable.just(3, 4))
.retry(1)
.subscribe(System.out::println)
.dispose();
// 輸出 1 2 1 2,經(jīng)過合并后,發(fā)射的事件應該是1,2,3,4,但是失敗后重試了一次
}
}
線程調(diào)度相關
同步、異步訂閱
剛接觸RxJ的使用,鑒于RxJ定義就是處理異步的庫,所以很容易讓人產(chǎn)生這樣一種錯覺:RxJ就是異步的。
其實并不是:RxJ的訂閱可以分為同步訂閱和異步訂閱。
同步訂閱是指被觀察者與觀察者在同一個線程運行,這里被觀察者發(fā)射的數(shù)據(jù)只有被觀察者接收處理后,才能繼續(xù)發(fā)射下一條數(shù)據(jù)。異步訂閱是指被觀察者與觀察者在不同線程運行,這里被觀察者發(fā)射的數(shù)據(jù)全部存到緩存區(qū),觀察者則只在緩存區(qū)拿數(shù)據(jù)。
異步訂閱需要使用RxJ的調(diào)度器,以實現(xiàn)異步、線程切換。
調(diào)度器
RxJ的 Observable 和 Observer 分別處理了發(fā)射事件和接收事件的職責,其中通過操作符subscribeOn()、observeOn()可分別定義各自處理程序的線程,但RxJ操作符不直接與Thread或ExecutorService一起使用,而是提供了幾個通過Schedulers類訪問的標準調(diào)度器。
Schedulers.io():創(chuàng)建一個可復用的工作線程,線程不夠新建。主要用于I/O類型的操作,如文件處理、網(wǎng)絡請求等耗時的操作。Schedulers.computation():創(chuàng)建固定數(shù)量(cpu核數(shù))的專用線程運行計算密集型工作,是大多數(shù)異步操作符(如interval(),timer())的默認調(diào)度器。因線程數(shù)有限,盡量不用I/O耗時操作,常用于快速計算類型的操作。Schedulers.single():以順序和FIFO的方式在單個線程上運行。Schedulers.newThread():每次新建一個線程運行。Schedulers.trampoline():將當前線程執(zhí)行加入主線程。主要用于多個 Observable 運行時,阻塞并按Observable 順序調(diào)用。
背壓策略
當上下游的流操作處于不同的線程(上文的調(diào)度器)——即異步訂閱時,如果上游發(fā)射數(shù)據(jù)的速度大于下游處理數(shù)據(jù)的速度,那么就會造成積壓,這些數(shù)據(jù)存放在一個異步緩存池(RxJ默認緩沖區(qū)大小為128)中,如果緩存池中的數(shù)據(jù)不能及時處理,最后就會造成內(nèi)存溢出。
處理上述問題的原理是:控制數(shù)據(jù)發(fā)射、數(shù)據(jù)接收的速度。
RxJ在 BackpressureStrategy 類中提供了多種背壓策略,用于處理上述問題
背壓策略處理方式BackpressureStrategy.ERROR拋出異常 MissingBackpressureException ,通過 onError() 事件發(fā)射異常事件BackpressureStrategy.MISSING給出友好提示,通過 onError 事件發(fā)射,和ERROR策略似乎無明顯區(qū)別BackpressureStrategy.BUFFER將緩存區(qū)大小設置成無限大(容易造成內(nèi)存溢出、系統(tǒng)崩潰)BackpressureStrategy.DROP超過緩沖區(qū)大小的數(shù)據(jù)全部丟棄BackpressureStrategy.LATEST超出緩存區(qū)數(shù)據(jù)只保留最新的
BackpressureStrategy.ERROR
策略:拋出異常 MissingBackpressureException ,通過 onError() 事件發(fā)射異常事件
@Test
public void errorTest() {
Flowable.create((FlowableOnSubscribe
int count = 150;
// 發(fā)射 150 個數(shù)據(jù) 緩沖池接收數(shù)據(jù)大小為 128
for (int i = 1; i <= count; i++) {
emitter.onNext(i);
}
emitter.onComplete();
// 指定背壓策略為 ERROR
}, BackpressureStrategy.ERROR)
// Observer 在io線程運行
.observeOn(Schedulers.io())
// Observable 在主線程運行 為了阻塞主線程方便查看輸出
.subscribeOn(Schedulers.trampoline())
// 在發(fā)射 onError()事件執(zhí)行
.doOnError(e -> System.out.println("發(fā)生錯誤:" + e.getMessage()))
.subscribe(System.out::println)
.dispose();
// 輸出 可以看出 此時數(shù)據(jù)發(fā)射量大于緩沖時 Observable 會拋出異常并發(fā)射onError()事件
/*
1
2
...
發(fā)生錯誤:create: could not emit value due to lack of requests
*/
}
BackpressureStrategy.MISSING
策略:給出友好提示,通過 onError 事件發(fā)射,和ERROR策略似乎無明顯區(qū)別
/**
* BackpressureStrategy.MISSING
* 給出友好提示,通過 `onError` 事件發(fā)射,和ERROR策略似乎無明顯區(qū)別
*/
@Test
public void missingTest() {
Flowable.create((FlowableOnSubscribe
int count = 150;
// 發(fā)射 150 個數(shù)據(jù) 緩沖池接收數(shù)據(jù)大小為 128
for (int i = 1; i <= count; i++) {
emitter.onNext(i);
}
emitter.onComplete();
// 指定背壓策略為錯誤
}, BackpressureStrategy.MISSING)
// Observer 在io線程運行
.observeOn(Schedulers.io())
// Observable 在主線程運行 為了阻塞主線程方便查看輸出
.subscribeOn(Schedulers.trampoline())
// 在發(fā)射 onError()事件執(zhí)行
.doOnError(e -> System.out.println("發(fā)生錯誤:" + e.getMessage()))
.subscribe(System.out::println)
.dispose();
// 輸出
/*
1
2
...
發(fā)生錯誤:Queue is full?!
*/
}
BackpressureStrategy.BUFFER
策略:將緩存區(qū)大小設置成無限大
@Test
public void bufferTest() {
Flowable.create((FlowableOnSubscribe
int count = 150;
// 發(fā)射 150 個數(shù)據(jù) 緩沖池接收數(shù)據(jù)大小為 128
for (int i = 1; i <= count; i++) {
emitter.onNext(i);
}
emitter.onComplete();
// 指定背壓策略為 BUFFER
}, BackpressureStrategy.BUFFER)
// Observer 在io線程運行
.observeOn(Schedulers.io())
// Observable 在主線程運行 為了阻塞主線程方便查看輸出
.subscribeOn(Schedulers.trampoline())
// 在發(fā)射 onError()事件執(zhí)行
.doOnError(e -> System.out.println("發(fā)生錯誤:" + e.getMessage()))
.subscribe(System.out::println)
.dispose();
// 輸出 1-150 無異常 注:有興趣可以將發(fā)射的數(shù)據(jù)改成引用對象,一直發(fā)射觀察內(nèi)存情況,看看會不會內(nèi)存溢出
}
BackpressureStrategy.DROP
策略:超過緩存區(qū)大小的數(shù)據(jù)全部丟棄
@Test
public void dropTest() {
Flowable.create((FlowableOnSubscribe
int count = 150;
// 發(fā)射 150 個數(shù)據(jù) 緩沖池接收數(shù)據(jù)大小為 128
for (int i = 1; i <= count; i++) {
emitter.onNext(i);
}
emitter.onComplete();
// 指定背壓策略為 DROP
}, BackpressureStrategy.DROP)
// Observer 在io線程運行
.observeOn(Schedulers.io())
// Observable 在主線程運行 為了阻塞主線程方便查看輸出
.subscribeOn(Schedulers.trampoline())
// 在發(fā)射 onError()事件執(zhí)行
.doOnError(e -> System.out.println("發(fā)生錯誤:" + e.getMessage()))
//.onBackpressureDrop()
.subscribe(System.out::println);
// 輸出 1-128 丟棄了129-150
}
BackpressureStrategy.LATEST
策略:超出緩存區(qū)數(shù)據(jù)只保留最新的
@Test
public void latestTest() {
Flowable.create((FlowableOnSubscribe
int count = 150;
// 發(fā)射 150 個數(shù)據(jù) 緩沖池接收數(shù)據(jù)大小為 128
for (int i = 1; i <= count; i++) {
emitter.onNext(i);
}
emitter.onComplete();
// 指定背壓策略為 LATEST
}, BackpressureStrategy.LATEST)
// Observer 在io線程運行
.observeOn(Schedulers.io())
// Observable 在主線程運行 為了阻塞主線程方便查看輸出
.subscribeOn(Schedulers.trampoline())
// 在發(fā)射 onError()事件執(zhí)行
.doOnError(e -> System.out.println("發(fā)生錯誤:" + e.getMessage()))
.subscribe(System.out::println);
// 輸出 1-128, 150 注:丟棄超過緩沖池大小的數(shù)據(jù),但保留最新的一個
}
并行處理
@Test
public void schedulerParallelTest() throws InterruptedException {
Flowable.fromArray(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.map(c -> {
System.out.println(Thread.currentThread().getName() + "發(fā)射: " + c);
// 作平方計算后發(fā)射
return c * c;
})
// 指定被觀察者發(fā)射數(shù)據(jù)的線程 線程的數(shù)量為cpu核數(shù)
.subscribeOn(Schedulers.computation())
.subscribe(next -> System.out.println(Thread.currentThread().getName() + "接收: " + next));
Thread.sleep(500);
/* 輸出如下
注:本測試計算機為8核cpu,被觀察發(fā)射數(shù)據(jù)的線程一直為同一個,并不是并行
RxComputationThreadPool-1發(fā)射: 1
RxComputationThreadPool-1接收: 1
RxComputationThreadPool-1發(fā)射: 2
RxComputationThreadPool-1接收: 4
RxComputationThreadPool-1發(fā)射: 3
RxComputationThreadPool-1接收: 9
RxComputationThreadPool-1發(fā)射: 4
RxComputationThreadPool-1接收: 16
RxComputationThreadPool-1發(fā)射: 5
RxComputationThreadPool-1接收: 25
RxComputationThreadPool-1發(fā)射: 6
RxComputationThreadPool-1接收: 36
RxComputationThreadPool-1發(fā)射: 7
RxComputationThreadPool-1接收: 49
RxComputationThreadPool-1發(fā)射: 8
RxComputationThreadPool-1接收: 64
RxComputationThreadPool-1發(fā)射: 9
RxComputationThreadPool-1接收: 81
RxComputationThreadPool-1發(fā)射: 10
RxComputationThreadPool-1接收: 100
*/
}
初學時很容易將上述編碼理解為:只要指定了調(diào)度器(甚至不用指定調(diào)度器,只要發(fā)射了多條數(shù)據(jù)),RxJ就會將每條數(shù)據(jù)交給不同的線程處理。從輸出可以很明顯的看出,被觀察者和觀察者都在 RxComputationThreadPool-1 這個線程運行,整個發(fā)射、接收數(shù)據(jù)過程確實是異步的,不過只是相對于主線程而言,其發(fā)射的一組數(shù)據(jù)流本質(zhì)還是串行執(zhí)行的。
@Test
public void flatMapParallel() throws InterruptedException {
// 發(fā)射1-10
Flowable.range(1, 10)
// flatMap合并單獨的數(shù)據(jù)流到“數(shù)據(jù)流集合”
.flatMap(v ->
// 每條數(shù)據(jù)都轉換為單獨的數(shù)據(jù)流
Flowable.just(v)
// 指定每條數(shù)據(jù)流的運行調(diào)度器
.subscribeOn(Schedulers.computation())
.map(c -> {
System.out.println(Thread.currentThread().getName() + "發(fā)射: " + c);
return c * c;
})
)
.blockingSubscribe(next -> System.out.println(Thread.currentThread().getName() + "接收: " + next));
Thread.sleep(500);
/*輸出如下:可以看到 發(fā)射數(shù)據(jù)的線程已經(jīng)是多線程并行了(8核cpu)
RxComputationThreadPool-1發(fā)射: 1
RxComputationThreadPool-2發(fā)射: 2
RxComputationThreadPool-3發(fā)射: 3
RxComputationThreadPool-5發(fā)射: 5
RxComputationThreadPool-6發(fā)射: 6
RxComputationThreadPool-4發(fā)射: 4
RxComputationThreadPool-2發(fā)射: 8
RxComputationThreadPool-1發(fā)射: 7
RxComputationThreadPool-4發(fā)射: 10
main接收: 1
main接收: 9
main接收: 25
main接收: 36
main接收: 4
main接收: 64
main接收: 49
main接收: 16
RxComputationThreadPool-3發(fā)射: 9
main接收: 100
main接收: 81
*/
}
RxJ中的并行性意味著運行獨立的流,并將它們的結果合并回單個流中。操作符 flatMap 將從1到10的每個數(shù)字映射到它自己的獨立的 Flowable 并合并流,每次發(fā)射的數(shù)據(jù)轉換為每次發(fā)射 Flowable,實現(xiàn)并行計算。
小結
RxJ通過各種操作符讓異步調(diào)用和線程切換變得比較簡潔,但是付出的代價一旦引入RxJ那么所有看這份代碼的人都要去學習,是否值得引入是值得商榷的。
參考鏈接
RxJava3.x Github官方文檔 Reactivex官方文檔 Reactive Programming 一種技術,各自表述 RxJava2最全面、最詳細的講解(一) RxJava2最全面、最詳細的講解(二) 圖解RxJava2(一) 圖解RxJava2(二) 圖解RxJava2(三) RxJava現(xiàn)學現(xiàn)用下(應用場景) RxJava操作符匯總 RxJava操作符(03-變換操作) RxJava3.x——背壓策略
柚子快報邀請碼778899分享:RxJava響應式編程
參考閱讀
本文內(nèi)容根據(jù)網(wǎng)絡資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉載請注明,如有侵權,聯(lián)系刪除。