Skip to main content

并发编程

Rust 致力于实现并发编程的安全和高效.

并发编程实际是两个概念组成:

  • 并发: 程序的不同部分独立运行(concurrent)
  • 并行: 程序的不同部分同时运行(parallel)

由于目前多核 CPU 的广泛应用, 并行处理越来越重要.(下面说的并发实际是并发和并行的统称)

Rust 通过独特的 Ownership 和 Borrow check 以及类型系统来保证并发编程的安全和高效. 且错误都是在编译时就暴露出来了, 因此在 Rust 中有 fearless concurrency 的说法.

不同的语言提供来并行编程的相关工具, 比如 Erlang 的消息传递, Go 的 channel 等. Rust 也提供来一系列的工具用于并行编程:

  • 创建平台线程
  • 消息传递 channel
  • 内存共享
  • Sync 和 Send trait.

使用线程同时运行代码

提升程序性能的一个最直接的方式是: 通过线程将程序中的计算分解并同时进行. 但这个方式会增加程序的复杂性, 同时也会引入一些问题:

  • 竞争条件: 不同线程同时访问同一资源时顺序不一致造成的结果不一致.
  • 死锁: 两个或多个线程都在等待对方(形成环形依赖关系), 从而导致这些线程均不能继续运行.
  • 出现一些无法重现或很难重现的 bug, 因为这些 bug 是在特定多线程环境下才会出现的.

不同的编程语言的线程实现方式也各不相同, 不过绝大部分操作系统都提供有系统 API 用于创建线程. Rust 标准库使用的是 1:1 的线程实现模型, 即 Rust 中创建一个线程就等同于使用系统 API 创建一个线程. 当然也有三方的 Rust crate 提供其它的一些线程模型, 比如 1:N 的.

创建和使用线程

可以使用 thread::spawn 函数来创建线程, 并提供一个表示需要执行任务的闭包:

let handle = thread::spawn(|| {
for i in 1..10 {
println!("hi number {} from the spawned thread!", i);
thread::sleep(Duration::from_millis(1));
}
});

可以通过 join 来等待所有线程完成:

for i in 1..10 {
let handle = thread::spawn(|| {
println!("num is {}", i);
});
handle.join().unwrap();
}

使用 move 将上下文值所有权传入线程闭包中:

use std::thread;

fn main() {
let v = vec![1, 2, 3];

// 如果不使用 move, 则 v 的引用可能在 main 函数执行的主线程中被释放掉.
let handle = thread::spawn(move || {
println!("Here's a vector: {:?}", v);
});

handle.join().unwrap();
}

多个线程间的消息传递

还是 Golang 的那句老话: 不要通过共享内存进行沟通, 而是通过沟通来共享内存.

为实现多线程间消息传递, Rust 标准库提供了 channel 工具. 可以把 Rust 中的 channel 理解为有方向的管道, 管道一端是发送者, 另外一端是接收者.

如果 channel 的发送或接收端有任意一方被 drop 了, 则 channel 就被关闭(close).

下面是简单的用法:

use std::sync::mpsc;
use std::thread;

fn main() {
// 创建 多发单收 channel
let (tx, rx) = mpsc::channel();

thread::spawn(move || {// 将发送端放到新建的线程中
let val = String::from("hi");
tx.send(val).unwrap();
});

// 接收消息
// 1. 接收消息会阻塞当前线程, 这里是主线程.(如果需要非阻塞的, 则使用 try_recv)
// 2. 当发送端关闭时, recv 会返回错误.
let received = rx.recv().unwrap();
println!("Got: {}", received);
}

recv 相比, try_recv 不会阻塞当前线程, 且也返回 Result<T, E>, 在实践时可以使用 while 循环结合 try_recv 来实现同样的"消息等待", 且可以在 while 中放入其它任务, 这样不会阻塞线程运行. 比如通过这种模式实现一个运行循环并接收外部事件.

channel 和所有权转移

在使用 channel 时, 如果将值通过 tx.send 发送, 则值的所有权被转移到 channel 中. send 会取得值的所有权, 并将所有权转移给接收者端, 换句话说就是: 值发送后就无法再在发送端使用了, 因为已经 move 了.

通过 channel 连续发送多个值

使用 mpsc::channel 的接收者端 Receiver 实现了 Iterator 协议, 因此可以使用如下方式进行消息获取:

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
let (tx, rx) = mpsc::channel();

thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];

for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});

for received in rx {
println!("Got: {}", received);
}
}

由于发送端线程会 1 秒钟睡一次, 接收端通过迭代器获取消息, 而 main 线程也会在 for 循环中 1 秒打印一个消息, 我们可以看到在接收端使用迭代器方式获取消息时, 也是阻塞的.

可以通过 tx 的 clone 方法来得到多个发送端, 并分别给到不同线程:

let (tx, rx) = mpsc::channel();

let tx1 = tx.clone();
thread::spawn(move || {
// tx1.send(xxx)
});

let tx2 = tx.clone();
thread::spawn(move || {
// tx2.send(yyy)
});

for value in rx {
// handle values...
}

如果在上面的两个线程中发送多个消息, 可以看到消息接收时候的顺序是不固定的.

并行编程共享状态

如果多线程同时访问一个数据, 则需要同步机制, 否则会出现竞争条件.

为了同步多线程共享数据, Rust 提供了 Mutex, 即互斥量, 如果一个线程要访问互斥量中的数据, 它需要首先获取互斥量的锁, 这个锁是互斥量数据结构的组成部分, 用于追踪当前是谁在唯一地访问数据, 因此在 Rust 中, 互斥量可以描述为是一个通过锁系统来保护数据的守卫(guarding).

在传统编程语言(比如 C)中, 使用互斥量比较困难, 因为:

  • 在访问数据前需要获取互斥量的锁
  • 使用完数据后, 需要解锁以便其他线程可以获取锁

不过通过 Rust 的类型系统和所有权系统的配合, 使用 Mutex 非常简单.

Mutex<T> 的 API

在某个线程中(比如主线程)使用 mutex:

use std::sync::Mutex;

fn main() {
let m = Mutex::new(5);

{
let mut num = m.lock().unwrap();
*num = 6;
}

println!("m = {:?}", m);
}

创建 Mutex 后, 我们通过 lock 来获取锁, 如果已经获取锁的另外一个线程已经 panic 了, 则 lock 会失败. 这里使用 unwrap 来让这个线程也 panic, 因为永远无法获取到锁.

当获取到锁后, 我们就可以得到一个 Mutex 中包含数据的可变引用.

Mutex<T> 是一个智能指针, 当我们调用 lock 后会返回一个包含在 LockResult 中的 MutexGuard 类型的智能指针. MutexGuard 实现了 Deref 协议, 因此我们可以像使用普通引用那样使用它引用的值, 同时它也实现了 Drop 协议, 在它的 drop 实现中会自动进行解锁, 因此我们无需手动释放锁.

在多线程间共享 Mutex<T>

由于多线程间共享, 因此会需要多 Ownership 才可能办到, 类似 Rc. 但 Rc 非线程安全, 就要使用另外一种线程安全的引用计数智能指针 Arc, 它实现了 Send.

Arc 即原子引用计数智能指针, 原子性意味着它的多线程安全是通过系统底层保证的, 在它的计数操作时是不能中断的, 这里不再展开讲. Rust 标准库中原子类型 std::sync::atomic.

原子性也意味着性能的影响, 因此由开发者自己决定只有在需要原子性的时候才使用特定类型达到目的, Rust 没有对任何基础数据类型默认实现原子性支持.

通过 Arc 在多线程共享互斥量:

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];

for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();

*num += 1;
});
handles.push(handle);
}

for handle in handles {
handle.join().unwrap();
}

println!("Result: {}", *counter.lock().unwrap());
}

通过 Arc<Mutex<T>> 使得互斥量具有了线程安全的多所有权, 在多个线程间共享数据 T. 被互斥量包含的数据 T 在各个线程的访问是安全的, 因为线程访问时进行了加锁解锁操作, 不会出现数据竞争条件问题.

结合之前讲过的内容, 和 Rc + RefCell 类似, Arc + Mutex 实现了一个多线程环境下线程安全的多所有权内部可变模式支持. (如果没有内部可变模式支持, 多所有权只能达到只读数据的目的, 这显然是非常弱的)

和 Rc 使用时的循环引用问题类似, 在使用 Mutex 时要注意死锁问题: 比如当操作涉及两个资源在两个线程中的情况, 一个线程获取了一个资源并等待获取另外一个资源时, 下面是一个非常简单的例子:

use std::{sync::{Arc, Mutex}, thread::{spawn, self}, time::Duration};

fn main() {
let res1 = Arc::new(Mutex::new(1));
let res2 = Arc::new(Mutex::new(2));

let r1c = Arc::clone(&res1);
let r2c = Arc::clone(&res2);
let ha1 = spawn(move || {
println!("thread 1 开始");
let n = r1c.lock().unwrap();
println!("获取到 1 lock {}", n);
thread::sleep(Duration::from_secs(1));
println!("thread1 睡醒, 准备获取 2 的 lock");
let n2 = r2c.lock().unwrap();
println!("获取到 2 的 lock {}", n2);
});

let r1c1 = Arc::clone(&res1);
let r2c2 = Arc::clone(&res2);
let ha2 = spawn(move || {
println!("thread 2 开始");
let n2 = r2c2.lock().unwrap();
println!("获取到 2 的 lock {}", n2);
thread::sleep(Duration::from_secs(1));
println!("thread2 睡醒, 准备获取 1 的 lock");
let n = r1c1.lock().unwrap();
println!("获取到 1 lock {}", n);
});

ha1.join().unwrap();
ha2.join().unwrap();
}

上述代码构造的是两个线程中对两个 Mutex 的获取:

  1. 线程 1 获取资源 1 后, 还未释放锁就进入了睡眠, 醒来后去获取资源 2.
  2. 同理, 线程 2 获取了资源 2 后, 还未释放锁就进入了睡眠, 醒来后准备获取资源 1.
  3. 由于两个线程都是在睡醒后想获取现在对方手上已有的锁, 就造成两边都在等对方解锁, 从而造成死锁.

要处理的话, 调整资源获取顺序是一个解决办法, 当然还有其他的办法.

自定义类型的线程安全 SyncSend

前面已经讲过的内容, 都是在 Rust 标准库中提供的并发编程工具.

而 Rust 语言内嵌的并发支持是两个 Trait: SyncSend, 这两个 trait 在 std::marker 模块中.

Send Trait

表示可以安全地在线程间传递所有权的类型.

基本上所有的 Rust 类型都实现了 Send, 但有一些例外, 比如 Rc<T> 就不是 Send, 因为如果在多个线程间共享 Rc 的引用, 就可能造成多个线程同时更改引用计数, 出现数据竞争条件, 引用计数的个数就可能错误.

一个数据结构中如果所有成员都是 Send 的, 则它自己也是 Send 的. 并且除了 raw pointer 外几乎所有的基本数据类型都是 Send 的, 因为它们都是单所有权的, 不存在多线程同时修改的可能.

Sync Trait

表示能够安全地在多个线程中传递引用的类型.

如果 &TSend 的, 则 TSync 的, 即不可变引用可以安全地传递到另外一个线程.

RcRefCell 等都不是 Sync 的.

手动实现符合 SendSync 的类型需要使用 unsafe 的功能. 这里暂时不讲了.