欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

首頁綜合 正文
目錄

柚子快報邀請碼778899分享:rust 異步編程概念小結(jié)

柚子快報邀請碼778899分享:rust 異步編程概念小結(jié)

http://yzkb.51969.com/

梳理一下 rust 異步編程中的幾個概念,現(xiàn)在都看得迷迷糊糊的

文章目錄

Futureasync、awaitPinContext例子

Future

pub trait Future {

type Output;

// Required method

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll;

}

pub enum Poll {

Ready(T),

Pending,

}

std::future::Future特征用于表示異步操作的執(zhí)行狀態(tài),它只有一個方法poll,調(diào)用后會返回枚舉Poll,Poll::Ready(T)代表操作已完成,并包含執(zhí)行結(jié)果,Poll::Pending代表操作未完成

async、await

async可以放在函數(shù)和代碼塊前面,調(diào)用后會返回std::future::Future對象,下面是幾個常見的前置場景

/// async的前置類型

use tokio;

#[tokio::test]

async fn test_async() {

// 普通函數(shù)

async fn test_1() {

println!("run test_1");

}

test_1().await;

// 代碼塊

let test_2 = async {

println!("run test_2");

};

test_2.await;

// 閉包函數(shù)

let test_3 = || async { println!("run test_2") };

let test_4 = |x: i32| async move { println!("run test_2: {x}") };

test_3().await;

test_4(10).await;

// 特征方法

trait AddAsyncFn {

async fn async_call(&self);

}

impl AddAsyncFn for String{

async fn async_call(&self){

println!("run async_call: {self}");

}

}

"content".to_owned().async_call().await;

}

await用于Future的掛起,但并不會阻塞所在線程,而是交出執(zhí)行權(quán)給執(zhí)行器

Pin

poll方法的第一個參數(shù)是Pin,這個我也不是很理解,具體可以參考《Programming Rust 2nd》第20章的Pinning小結(jié),如果sub_future引用top_future中的變量,也就是引用了top_future,假設(shè)此時top_future被移動給其他變量,sub_future的引用變成了未初始化,從而失效,所以在調(diào)用poll時需要使用Pin固定future,避免被移動。

Context

poll方法的第二個參數(shù)是Context,里面有一個waker字段,包含wake()方法

pub struct Context<'a> {

waker: &'a Waker,

...

}

impl Waker {

pub fn wake(self) {

...

}

}

當(dāng)任務(wù)就緒后,會通過調(diào)用context.waker.wake()通知執(zhí)行器,執(zhí)行器負(fù)責(zé)異步操作的切換邏輯,喚醒后從上次掛起位置繼續(xù)執(zhí)行

例子

這個示例代碼參考了Under the Hood: Executing Futures and Tasks的2到4小節(jié),大部分代碼都是直接拿過來的,演示了如何自定義Future類型,如何構(gòu)建一個簡單的執(zhí)行器同時調(diào)用多個異步函數(shù)。原代碼使用了一些其他的庫,總是讓我覺得不知道發(fā)生了什么,所以都改成了調(diào)用標(biāo)準(zhǔn)庫的寫法,運行下面的代碼不需要其他依賴

use std::future::Future;

use std::pin::Pin;

use std::sync::mpsc::{sync_channel, Receiver, SyncSender};

use std::sync::{Arc, Mutex};

use std::task::{Context, Poll, Wake, Waker};

use std::thread;

use std::time::Duration;

fn main(){

// 任務(wù):封裝傳入函數(shù)的結(jié)構(gòu)

struct Task {

future: Mutex + Send + 'static>>>>, // 限制返回結(jié)果只能為空

task_sender: SyncSender>, // 實際是一個channel的寫端

}

// 任務(wù)喚醒器:用于喚醒對應(yīng)的一個任務(wù)

struct TaskWaker(Arc);

// 實現(xiàn) Wake 特征

impl Wake for TaskWaker {

fn wake(self: Arc) {

Self::wake_by_ref(&self)

}

fn wake_by_ref(self: &Arc) {

// 復(fù)制包含的任務(wù)

let cloned = self.0.clone();

// 通過 channel 發(fā)送到讀端

self.0.task_sender.send(cloned).expect("too many tasks queued");

}

}

// 分發(fā)器

struct Spawner {

task_sender: SyncSender>,

}

impl Spawner {

fn spawn(&self, future: impl Future + 'static + Send) {

// 將傳入的函數(shù),移動到堆上,并且轉(zhuǎn)為 Pin 類型,防止 poll 之后被移動出現(xiàn)問題

let future = Box::pin(future);

// 初始化任務(wù)

let task = Arc::new(Task {

future: Mutex::new(Some(future)),

task_sender: self.task_sender.clone(),

});

// 發(fā)送到讀端

self.task_sender.send(task).expect("too many tasks queued");

}

}

// 執(zhí)行器

struct Executor {

ready_queue: Receiver>, // 實際是一個channel的讀端

}

impl Executor {

fn run(&self) {

// 不斷從讀端讀取數(shù)據(jù),沒有新數(shù)據(jù)時阻塞

while let Ok(task) = self.ready_queue.recv() {

// 嘗試加鎖讀取任務(wù)

let mut future_slot = task.future.lock().unwrap();

// 從鎖中取出任務(wù)

if let Some(mut future) = future_slot.take() {

// 復(fù)制任務(wù),并且生成一個 Waker

let waker = Waker::from(Arc::new(TaskWaker(task.clone())));

// 根據(jù) Waker 生成 Context

let context = &mut Context::from_waker(&waker);

// 調(diào)用 Future 的 poll 方法,檢查是否已完成,因為限制了函數(shù)不會有返回結(jié)果,所以這里不用處理

if future.as_mut().poll(context).is_pending() {

// 因為任務(wù)未完成,重新放回鎖里

*future_slot = Some(future);

}

}

}

}

}

// 共享狀態(tài)

struct SharedState {

completed: bool, // 是否完成的標(biāo)志

waker: Option, // 喚醒器

}

// 加鎖加Arc,因為要跨線程使用

struct TimerFuture {

shared_state: Arc>,

}

impl TimerFuture {

pub fn new(duration: Duration) -> Self {

// 創(chuàng)建一個共享狀態(tài)

let shared_state = Arc::new(Mutex::new(SharedState {

completed: false,

waker: None,

}));

// 獲取一個副本

let thread_shared_state = shared_state.clone();

// 把共享狀態(tài)傳入一個單獨線程

thread::spawn(move || {

// 線程級別睡眠

thread::sleep(duration);

// 嘗試加鎖獲取共享狀態(tài)

let mut shared_state = thread_shared_state.lock().unwrap();

// 修改狀態(tài)為完成

shared_state.completed = true;

// 如果執(zhí)行完成時喚醒器已存在就調(diào)用

if let Some(waker) = shared_state.waker.take() {

waker.wake()

}

});

// 返回對象

TimerFuture { shared_state }

}

}

// 實現(xiàn) Future 特征

impl Future for TimerFuture {

type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll {

// 嘗試加鎖獲取共享狀態(tài)

let mut shared_state = self.shared_state.lock().unwrap();

// 檢查執(zhí)行狀態(tài)是否已經(jīng)完成

if shared_state.completed {

Poll::Ready(())

} else {

// 未完成的話,傳入 waker,并返回 Pending

shared_state.waker = Some(cx.waker().clone());

Poll::Pending

}

}

}

async fn test_1() {

TimerFuture::new(Duration::new(1, 0)).await;

println!("run test_1");

}

async fn test_2() {

TimerFuture::new(Duration::new(2, 0)).await;

println!("run test_2");

}

// 創(chuàng)建讀端和寫端

let (task_sender, ready_queue) = sync_channel(10);

// 創(chuàng)建分發(fā)器

let spawner = Spawner { task_sender };

// 創(chuàng)建執(zhí)行器

let executor = Executor { ready_queue };

// 分發(fā) test_1

spawner.spawn(test_1());

// 分發(fā) test_1

spawner.spawn(test_2());

// 釋放分發(fā)器,告訴讀端后續(xù)無數(shù)據(jù)可讀

drop(spawner);

// 開始運行

executor.run();

}

柚子快報邀請碼778899分享:rust 異步編程概念小結(jié)

http://yzkb.51969.com/

好文推薦

評論可見,查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。

轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://m.gantiao.com.cn/post/19091619.html

發(fā)布評論

您暫未設(shè)置收款碼

請在主題配置——文章設(shè)置里上傳

掃描二維碼手機訪問

文章目錄