柚子快報邀請碼778899分享:rust 異步編程概念小結(jié)
柚子快報邀請碼778899分享:rust 異步編程概念小結(jié)
梳理一下 rust 異步編程中的幾個概念,現(xiàn)在都看得迷迷糊糊的
文章目錄
Futureasync、awaitPinContext例子
Future
pub trait Future {
type Output;
// Required method
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll
}
pub enum Poll
Ready(T),
Pending,
}
std::future::Future特征用于表示異步操作的執(zhí)行狀態(tài),它只有一個方法poll,調(diào)用后會返回枚舉Poll,Poll::Ready(T)代表操作已完成,并包含執(zhí)行結(jié)果,Poll::Pending代表操作未完成
async、await
async可以放在函數(shù)和代碼塊前面,調(diào)用后會返回std::future::Future對象,下面是幾個常見的前置場景
/// async的前置類型
use tokio;
#[tokio::test]
async fn test_async() {
// 普通函數(shù)
async fn test_1() {
println!("run test_1");
}
test_1().await;
// 代碼塊
let test_2 = async {
println!("run test_2");
};
test_2.await;
// 閉包函數(shù)
let test_3 = || async { println!("run test_2") };
let test_4 = |x: i32| async move { println!("run test_2: {x}") };
test_3().await;
test_4(10).await;
// 特征方法
trait AddAsyncFn {
async fn async_call(&self);
}
impl AddAsyncFn for String{
async fn async_call(&self){
println!("run async_call: {self}");
}
}
"content".to_owned().async_call().await;
}
await用于Future的掛起,但并不會阻塞所在線程,而是交出執(zhí)行權(quán)給執(zhí)行器
Pin
poll方法的第一個參數(shù)是Pin,這個我也不是很理解,具體可以參考《Programming Rust 2nd》第20章的Pinning小結(jié),如果sub_future引用top_future中的變量,也就是引用了top_future,假設(shè)此時top_future被移動給其他變量,sub_future的引用變成了未初始化,從而失效,所以在調(diào)用poll時需要使用Pin固定future,避免被移動。
Context
poll方法的第二個參數(shù)是Context,里面有一個waker字段,包含wake()方法
pub struct Context<'a> {
waker: &'a Waker,
...
}
impl Waker {
pub fn wake(self) {
...
}
}
當(dāng)任務(wù)就緒后,會通過調(diào)用context.waker.wake()通知執(zhí)行器,執(zhí)行器負(fù)責(zé)異步操作的切換邏輯,喚醒后從上次掛起位置繼續(xù)執(zhí)行
例子
這個示例代碼參考了Under the Hood: Executing Futures and Tasks的2到4小節(jié),大部分代碼都是直接拿過來的,演示了如何自定義Future類型,如何構(gòu)建一個簡單的執(zhí)行器同時調(diào)用多個異步函數(shù)。原代碼使用了一些其他的庫,總是讓我覺得不知道發(fā)生了什么,所以都改成了調(diào)用標(biāo)準(zhǔn)庫的寫法,運行下面的代碼不需要其他依賴
use std::future::Future;
use std::pin::Pin;
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Wake, Waker};
use std::thread;
use std::time::Duration;
fn main(){
// 任務(wù):封裝傳入函數(shù)的結(jié)構(gòu)
struct Task {
future: Mutex
task_sender: SyncSender
}
// 任務(wù)喚醒器:用于喚醒對應(yīng)的一個任務(wù)
struct TaskWaker(Arc
// 實現(xiàn) Wake 特征
impl Wake for TaskWaker {
fn wake(self: Arc
Self::wake_by_ref(&self)
}
fn wake_by_ref(self: &Arc
// 復(fù)制包含的任務(wù)
let cloned = self.0.clone();
// 通過 channel 發(fā)送到讀端
self.0.task_sender.send(cloned).expect("too many tasks queued");
}
}
// 分發(fā)器
struct Spawner {
task_sender: SyncSender
}
impl Spawner {
fn spawn(&self, future: impl Future
// 將傳入的函數(shù),移動到堆上,并且轉(zhuǎn)為 Pin 類型,防止 poll 之后被移動出現(xiàn)問題
let future = Box::pin(future);
// 初始化任務(wù)
let task = Arc::new(Task {
future: Mutex::new(Some(future)),
task_sender: self.task_sender.clone(),
});
// 發(fā)送到讀端
self.task_sender.send(task).expect("too many tasks queued");
}
}
// 執(zhí)行器
struct Executor {
ready_queue: Receiver
}
impl Executor {
fn run(&self) {
// 不斷從讀端讀取數(shù)據(jù),沒有新數(shù)據(jù)時阻塞
while let Ok(task) = self.ready_queue.recv() {
// 嘗試加鎖讀取任務(wù)
let mut future_slot = task.future.lock().unwrap();
// 從鎖中取出任務(wù)
if let Some(mut future) = future_slot.take() {
// 復(fù)制任務(wù),并且生成一個 Waker
let waker = Waker::from(Arc::new(TaskWaker(task.clone())));
// 根據(jù) Waker 生成 Context
let context = &mut Context::from_waker(&waker);
// 調(diào)用 Future 的 poll 方法,檢查是否已完成,因為限制了函數(shù)不會有返回結(jié)果,所以這里不用處理
if future.as_mut().poll(context).is_pending() {
// 因為任務(wù)未完成,重新放回鎖里
*future_slot = Some(future);
}
}
}
}
}
// 共享狀態(tài)
struct SharedState {
completed: bool, // 是否完成的標(biāo)志
waker: Option
}
// 加鎖加Arc,因為要跨線程使用
struct TimerFuture {
shared_state: Arc
}
impl TimerFuture {
pub fn new(duration: Duration) -> Self {
// 創(chuàng)建一個共享狀態(tài)
let shared_state = Arc::new(Mutex::new(SharedState {
completed: false,
waker: None,
}));
// 獲取一個副本
let thread_shared_state = shared_state.clone();
// 把共享狀態(tài)傳入一個單獨線程
thread::spawn(move || {
// 線程級別睡眠
thread::sleep(duration);
// 嘗試加鎖獲取共享狀態(tài)
let mut shared_state = thread_shared_state.lock().unwrap();
// 修改狀態(tài)為完成
shared_state.completed = true;
// 如果執(zhí)行完成時喚醒器已存在就調(diào)用
if let Some(waker) = shared_state.waker.take() {
waker.wake()
}
});
// 返回對象
TimerFuture { shared_state }
}
}
// 實現(xiàn) Future 特征
impl Future for TimerFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll
// 嘗試加鎖獲取共享狀態(tài)
let mut shared_state = self.shared_state.lock().unwrap();
// 檢查執(zhí)行狀態(tài)是否已經(jīng)完成
if shared_state.completed {
Poll::Ready(())
} else {
// 未完成的話,傳入 waker,并返回 Pending
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
async fn test_1() {
TimerFuture::new(Duration::new(1, 0)).await;
println!("run test_1");
}
async fn test_2() {
TimerFuture::new(Duration::new(2, 0)).await;
println!("run test_2");
}
// 創(chuàng)建讀端和寫端
let (task_sender, ready_queue) = sync_channel(10);
// 創(chuàng)建分發(fā)器
let spawner = Spawner { task_sender };
// 創(chuàng)建執(zhí)行器
let executor = Executor { ready_queue };
// 分發(fā) test_1
spawner.spawn(test_1());
// 分發(fā) test_1
spawner.spawn(test_2());
// 釋放分發(fā)器,告訴讀端后續(xù)無數(shù)據(jù)可讀
drop(spawner);
// 開始運行
executor.run();
}
柚子快報邀請碼778899分享:rust 異步編程概念小結(jié)
好文推薦
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。