99re热这里只有精品视频,7777色鬼xxxx欧美色妇,国产成人精品一区二三区在线观看,内射爽无广熟女亚洲,精品人妻av一区二区三区

Rust 使用消息傳遞在線程間傳送數(shù)據(jù)

2023-03-22 15:15 更新
ch16-02-message-passing.md
commit 24e275d624fe85af7b5b6316e78f8bfbbcac23e7

一個(gè)日益流行的確保安全并發(fā)的方式是 消息傳遞message passing),這里線程或 actor 通過發(fā)送包含數(shù)據(jù)的消息來相互溝通。這個(gè)思想來源于 Go 編程語言文檔中 的口號:“不要通過共享內(nèi)存來通訊;而是通過通訊來共享內(nèi)存?!保ā癉o not communicate by sharing memory; instead, share memory by communicating.”)

Rust 中一個(gè)實(shí)現(xiàn)消息傳遞并發(fā)的主要工具是 信道channel),Rust 標(biāo)準(zhǔn)庫提供了其實(shí)現(xiàn)的編程概念。你可以將其想象為一個(gè)水流的渠道,比如河流或小溪。如果你將諸如橡皮鴨或小船之類的東西放入其中,它們會順流而下到達(dá)下游。

編程中的信息渠道(信道)有兩部分組成,一個(gè)發(fā)送者(transmitter)和一個(gè)接收者(receiver)。發(fā)送者位于上游位置,在這里可以將橡皮鴨放入河中,接收者則位于下游,橡皮鴨最終會漂流至此。代碼中的一部分調(diào)用發(fā)送者的方法以及希望發(fā)送的數(shù)據(jù),另一部分則檢查接收端收到的消息。當(dāng)發(fā)送者或接收者任一被丟棄時(shí)可以認(rèn)為信道被 關(guān)閉closed)了。

這里,我們將開發(fā)一個(gè)程序,它會在一個(gè)線程生成值向信道發(fā)送,而在另一個(gè)線程會接收值并打印出來。這里會通過信道在線程間發(fā)送簡單值來演示這個(gè)功能。一旦你熟悉了這項(xiàng)技術(shù),就能使用信道來實(shí)現(xiàn)聊天系統(tǒng),或利用很多線程進(jìn)行分布式計(jì)算并將部分計(jì)算結(jié)果發(fā)送給一個(gè)線程進(jìn)行聚合。

首先,在示例 16-6 中,創(chuàng)建了一個(gè)信道但沒有做任何事。注意這還不能編譯,因?yàn)?Rust 不知道我們想要在信道中發(fā)送什么類型:

文件名: src/main.rs

use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
}

示例 16-6: 創(chuàng)建一個(gè)信道,并將其兩端賦值給 tx 和 rx

這里使用 mpsc::channel 函數(shù)創(chuàng)建一個(gè)新的信道;mpsc 是 多個(gè)生產(chǎn)者,單個(gè)消費(fèi)者multiple producer, single consumer)的縮寫。簡而言之,Rust 標(biāo)準(zhǔn)庫實(shí)現(xiàn)信道的方式意味著一個(gè)信道可以有多個(gè)產(chǎn)生值的 發(fā)送sending)端,但只能有一個(gè)消費(fèi)這些值的 接收receiving)端。想象一下多條小河小溪最終匯聚成大河:所有通過這些小河發(fā)出的東西最后都會來到下游的大河。目前我們以單個(gè)生產(chǎn)者開始,但是當(dāng)示例可以工作后會增加多個(gè)生產(chǎn)者。

mpsc::channel 函數(shù)返回一個(gè)元組:第一個(gè)元素是發(fā)送端,而第二個(gè)元素是接收端。由于歷史原因,tx 和 rx 通常作為 發(fā)送者transmitter)和 接收者receiver)的縮寫,所以這就是我們將用來綁定這兩端變量的名字。這里使用了一個(gè) let 語句和模式來解構(gòu)了此元組;第十八章會討論 let 語句中的模式和解構(gòu)。如此使用 let 語句是一個(gè)方便提取 mpsc::channel 返回的元組中一部分的手段。

讓我們將發(fā)送端移動(dòng)到一個(gè)新建線程中并發(fā)送一個(gè)字符串,這樣新建線程就可以和主線程通訊了,如示例 16-7 所示。這類似于在河的上游扔下一只橡皮鴨或從一個(gè)線程向另一個(gè)線程發(fā)送聊天信息:

文件名: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
}

示例 16-7: 將 tx 移動(dòng)到一個(gè)新建的線程中并發(fā)送 “hi”

這里再次使用 thread::spawn 來創(chuàng)建一個(gè)新線程并使用 move 將 tx 移動(dòng)到閉包中這樣新建線程就擁有 tx 了。新建線程需要擁有信道的發(fā)送端以便能向信道發(fā)送消息。

信道的發(fā)送端有一個(gè) send 方法用來獲取需要放入信道的值。send 方法返回一個(gè) Result<T, E> 類型,所以如果接收端已經(jīng)被丟棄了,將沒有發(fā)送值的目標(biāo),所以發(fā)送操作會返回錯(cuò)誤。在這個(gè)例子中,出錯(cuò)的時(shí)候調(diào)用 unwrap 產(chǎn)生 panic。不過對于一個(gè)真實(shí)程序,需要合理地處理它:回到第九章復(fù)習(xí)正確處理錯(cuò)誤的策略。

在示例 16-8 中,我們在主線程中從信道的接收端獲取值。這類似于在河的下游撈起橡皮鴨或接收聊天信息:

文件名: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

示例 16-8: 在主線程中接收并打印內(nèi)容 “hi”

信道的接收端有兩個(gè)有用的方法:recv 和 try_recv。這里,我們使用了 recv,它是 receive 的縮寫。這個(gè)方法會阻塞主線程執(zhí)行直到從信道中接收一個(gè)值。一旦發(fā)送了一個(gè)值,recv 會在一個(gè) Result<T, E> 中返回它。當(dāng)信道發(fā)送端關(guān)閉,recv 會返回一個(gè)錯(cuò)誤表明不會再有新的值到來了。

try_recv 不會阻塞,相反它立刻返回一個(gè) Result<T, E>Ok 值包含可用的信息,而 Err 值代表此時(shí)沒有任何消息。如果線程在等待消息過程中還有其他工作時(shí)使用 try_recv 很有用:可以編寫一個(gè)循環(huán)來頻繁調(diào)用 try_recv,在有可用消息時(shí)進(jìn)行處理,其余時(shí)候則處理一會其他工作直到再次檢查。

出于簡單的考慮,這個(gè)例子使用了 recv;主線程中除了等待消息之外沒有任何其他工作,所以阻塞主線程是合適的。

如果運(yùn)行示例 16-8 中的代碼,我們將會看到主線程打印出這個(gè)值:

Got: hi

完美!

信道與所有權(quán)轉(zhuǎn)移

所有權(quán)規(guī)則在消息傳遞中扮演了重要角色,其有助于我們編寫安全的并發(fā)代碼。防止并發(fā)編程中的錯(cuò)誤是在 Rust 程序中考慮所有權(quán)的一大優(yōu)勢?,F(xiàn)在讓我們做一個(gè)試驗(yàn)來看看信道與所有權(quán)如何一同協(xié)作以避免產(chǎn)生問題:我們將嘗試在新建線程中的信道中發(fā)送完 val 值 之后 再使用它。嘗試編譯示例 16-9 中的代碼并看看為何這是不允許的:

文件名: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {}", val);
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

示例 16-9: 在我們已經(jīng)發(fā)送到信道中后,嘗試使用 val 引用

這里嘗試在通過 tx.send 發(fā)送 val 到信道中之后將其打印出來。允許這么做是一個(gè)壞主意:一旦將值發(fā)送到另一個(gè)線程后,那個(gè)線程可能會在我們再次使用它之前就將其修改或者丟棄。其他線程對值可能的修改會由于不一致或不存在的數(shù)據(jù)而導(dǎo)致錯(cuò)誤或意外的結(jié)果。然而,嘗試編譯示例 16-9 的代碼時(shí),Rust 會給出一個(gè)錯(cuò)誤:

$ cargo run
   Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
  --> src/main.rs:10:31
   |
8  |         let val = String::from("hi");
   |             --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
9  |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val is {}", val);
   |                               ^^^ value borrowed here after move

For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` due to previous error

我們的并發(fā)錯(cuò)誤會造成一個(gè)編譯時(shí)錯(cuò)誤。send 函數(shù)獲取其參數(shù)的所有權(quán)并移動(dòng)這個(gè)值歸接收者所有。這可以防止在發(fā)送后再次意外地使用這個(gè)值;所有權(quán)系統(tǒng)檢查一切是否合乎規(guī)則。

發(fā)送多個(gè)值并觀察接收者的等待

示例 16-8 中的代碼可以編譯和運(yùn)行,不過它并沒有明確的告訴我們兩個(gè)獨(dú)立的線程通過信道相互通訊。示例 16-10 則有一些改進(jìn)會證明示例 16-8 中的代碼是并發(fā)執(zhí)行的:新建線程現(xiàn)在會發(fā)送多個(gè)消息并在每個(gè)消息之間暫停一秒鐘。

文件名: src/main.rs

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {}", received);
    }
}

示例 16-10: 發(fā)送多個(gè)消息,并在每次發(fā)送后暫停一段時(shí)間

這一次,在新建線程中有一個(gè)字符串 vector 希望發(fā)送到主線程。我們遍歷他們,單獨(dú)的發(fā)送每一個(gè)字符串并通過一個(gè) Duration 值調(diào)用 thread::sleep 函數(shù)來暫停一秒。

在主線程中,不再顯式調(diào)用 recv 函數(shù):而是將 rx 當(dāng)作一個(gè)迭代器。對于每一個(gè)接收到的值,我們將其打印出來。當(dāng)信道被關(guān)閉時(shí),迭代器也將結(jié)束。

當(dāng)運(yùn)行示例 16-10 中的代碼時(shí),將看到如下輸出,每一行都會暫停一秒:

Got: hi
Got: from
Got: the
Got: thread

因?yàn)橹骶€程中的 for 循環(huán)里并沒有任何暫?;虻却拇a,所以可以說主線程是在等待從新建線程中接收值。

通過克隆發(fā)送者來創(chuàng)建多個(gè)生產(chǎn)者

之前我們提到了mpsc是 multiple producer, single consumer 的縮寫??梢赃\(yùn)用 mpsc 來擴(kuò)展示例 16-10 中的代碼來創(chuàng)建向同一接收者發(fā)送值的多個(gè)線程。這可以通過克隆信道的發(fā)送端來做到,如示例 16-11 所示:

文件名: src/main.rs

    // --snip--

    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {}", received);
    }

    // --snip--

示例 16-11: 從多個(gè)生產(chǎn)者發(fā)送多個(gè)消息

這一次,在創(chuàng)建新線程之前,我們對信道的發(fā)送端調(diào)用了 clone 方法。這會給我們一個(gè)可以傳遞給第一個(gè)新建線程的發(fā)送端句柄。我們會將原始的信道發(fā)送端傳遞給第二個(gè)新建線程。這樣就會有兩個(gè)線程,每個(gè)線程將向信道的接收端發(fā)送不同的消息。

如果運(yùn)行這些代碼,你 可能 會看到這樣的輸出:

Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you

雖然你可能會看到這些值以不同的順序出現(xiàn);這依賴于你的系統(tǒng)。這也就是并發(fā)既有趣又困難的原因。如果通過 thread::sleep 做實(shí)驗(yàn),在不同的線程中提供不同的值,就會發(fā)現(xiàn)他們的運(yùn)行更加不確定,且每次都會產(chǎn)生不同的輸出。

現(xiàn)在我們見識過了信道如何工作,再看看另一種不同的并發(fā)方式吧。


以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號