C16 安全并发
https://doc.rust-lang.org/book/ch16-00-concurrency.html
- 如何创建线程实现并行
- 消息传递
- 共享状态
Sync
和Send
trait
使用线程实现并行
现代操作系统引入了线程和进程的概念, 使用线程实现并行计算可有效提升性能, 但也增加了程序复杂度, 且可能出现一些问题:
- 竞争条件: 多个线程同时访问同一数据或资源
- 死锁: 不同线程都在等待对方释放自己想要的资源, 从而无限等待.
- 其他只有在多线程环境下才会产生的低概率 bug, 不易调试解决.
在一些语言下, 由于实现的是 green thread, 意味着线程可能不是和操作系统线程一一对应的(语言管理线程), 而 Rust 标准库线程和操作系统线程一一对应(底层是线程系统调用).
Rust 线程 API: 创建 thread::spawn
使用标准库 thread::spawn
函数创建线程, 传入一个闭包(包含需要在线程上执行的代码).
thread::spawn(|| {
for i in 1..10 {
println!("hi number: {}", i);
}
});
需要注意: 当 main 线程结束后, 意味着进程结束, 所有其他线程进而也结束了(无论它们是否完成了指定任务). 同一进程中线程的执行顺序没有保证, 取决于操作系统如何调度线程执行的.
Rust 线程 API: 等待线程结束 JoinHandle
当使用 thread::spawn
创建线程后, 可以得到一个 JoinHandle
, 使用它的 join
方法可在当前线程等待新建线程运行结束.
let handle = thread::spawn(|| {
for i in 1..10 {
println!("hi number: {}", i);
}
});
handle.join().unwrap();
Rust 线程 API: 使用 move
闭包
当传入需要运行的代码到线程上, 可以使用 move
闭包, 即 FnOnce
闭包, 从而将上下文中值的所有权传递到线 程内(从一个线程传递到另外一个).
需要注意的是: 虽然 Rust 闭包会自动推断线程内如何去捕获上下文值, 当需要引用时只会捕获引用, 但此情况在多线程环境下不适用. 原因是当值在一个线程被 drop 后, 在另外线程上的引用无法确保是有效的. 因此即便传入线程的闭包中使用引用, 仍然需要将值 move 到线程内:
let v = vec![1, 2, 3];
let handle = thread::spawn(move || {
// 即便此处只需要 v 的引用, 但由闭包捕获上下文值传入另外线程, 因此仍然需要将值 move 到该线程上.
println!("the vec: {}", v);
});
handle.join().unwrap();
上述行为, 结合 ownership 规则, 确保了值在多线程环境下的合法可用. 比如上述代码将值 move 到另一线程后, 值无法再在此线程使用. 使用 move
后意味着确保传入线程闭包不再是隐式推断捕获引用, 而是直接将值 move 到线程.
下面再来看看 Rust 中线程间的通信, 包括消息传递和共享内存.
线程间的通信: 使用消息在线程间传递数据
通过消息通信来传递数据是安全并发中一种流行方式: "不要通过共享内存来进行通信, 而是通过通信来共享内存" -- golang 名言.
为实现并发环境下的消息通信, Rust 标准库提供了消息通道(channel
)的实现. channel 是一个通用编程概念, 用于在发送端
和接收端
间传递消息. 当任意一端被 drop 后, 该通道即被关闭
(closed).
下面是一个示例, 一个线程作为发送端, 另外一个线程作为接收端:
fn send_message() {
// mpsc 即多生产者单消费者模型
let (tx, rx) = mpsc::channel::new();
// 将发送端 move 到新建的线程上: 模拟在另一线程上产生的数据被发送的情况
thread::spawn(move || {
let val = String::from("Hello");
// 若接收端已被 drop(channel 被关闭), 则 send 调用会产生错误 Reuslt<T,E>
tx.send(val).unwrap();
});
// recv 方法会阻塞当前线程等待消息, 其结果仍然为 Result<T,E>, 实际项目中经常通过 while let Ok(r) = rx.recv() 来持续获取消息直到 channel 关闭.
// 此外还有 try_recv 方法, 该方法不会阻塞当前线程, 调用后若产生 Err 则表示当前没有消息, 我们仍然可以使用它结合 while 来做处理, 直到 channel 关闭. 此方法非常适合当前线程还需要做其他事情的情况.
let received_msg = rx.recv().unwrap();
println!("msg: {received_msg}");
}
Channel 和所有权转移
ownership 规则对于并发安全非常重要, 它可以确保在多线程环境下不会出现传统的空指针/野指针/多重释放等情况.
还是上一节的代码为例:
// ...
thread::spawn(move || {
let val = String::from("Hello");
// 若接收端已被 drop(channel 被关闭), 则 send 调用会产生错误 Reuslt<T,E>
tx.send(val).unwrap();
println("{val}"); // ! 此代码无法通过编译, 因 val 所有权已被 move 出去了(到了接收端)
});
// ...
多个消息发送和接收
下面是一个连续发送多个消息的例子, 可以看到消息发送后当前线程阻塞, 消息一个一个被接收:
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {received}");
}
}
运行后可以看到, 主线程每隔 1 秒接收到来自创建的后台线程发送的消息并打印出来. 且这里有个隐藏技巧, 在 for 循环中将 rx 作为一个 iterator 使用(因 for 中可以自动调用其 into_iter()
).
发送者分散在多个线程上
mpsc 本身就是多发送者模型, 因此要实现多个线程发送, 只需要将 tx 端进行克隆:
// --snip--
// 当前线程创建 channel 获取到发送/接收端
let (tx, rx) = mpsc::channel();
// 将发送端克隆并move到创建的不同线程上: 这里有 tx 和 tx1, 分别被move到了所创建的两个线程上.
let tx1 = tx.clone();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
// 仍然在当前线程接收消息
for received in rx {
println!("Got: {received}");
}
// --snip--
多个线程发送时, 接收端接收到来自不同线程的消息顺序是不固定的, 这也是并发环境下的一个难点所在, 即无法将某些情况稳定重现.
线程间的通信: 共享内存
消息传递是一种并发环境下数据传递的理想方式, 但并非唯一选择. 另外一个方法是通过共享内存, 共享内存时, 可以理解为一种特殊的 "多所有权" 数据访问: 即多个线程可以同时访问同一个内存地址. 但这种逻辑上的多所有权会带来额外的复杂性, 因为有多个 "所有者" 需要进行管理.
使用互斥锁(Mutex)实现共享内存
Mutex 在使用时, 一个线程首先需要将其加锁, 加锁可以理解为从 lock 系统中获取到使用权. 但互斥锁带来的复杂性如下:
- 必须在访问数据前尝试加锁, 若多个线程加锁多个资源互相等待, 则可能造成死锁
- 当使用完毕后, 必须释放锁, 这样其他线程才有机会访问该数据
在 Rust 的所有权规则和类型系统加持下, 上述加锁和释放锁的操作都能得到自动处理(死锁仍然可能出现): Rust 会在调用 lock
方法时获取到 MutexGuard
, 作用域走出后, 该 guard 会被自动释放, 即完成了释放锁操作.
下面来看 Mutex<T>
API 的用法:
fn demo() {
let m = Mutex:new(5);
{
// 获取到 MutexGuard
let mut num = m.lock().unwrap();
*num = 6; // 对 guard 进行解引用, 实际是获取到内部数据的解引用: 因 `MutexGuard` 实现了 `Deref`
}
// 此处仍然可以访问到 guard
println!("m = {m:?}");
} // 作用域结束, `MutexGuard` 被 drop, 故自动将锁释放了.
上述代码还无法在多线程环境下使用, 要将 Mutex 在多线程共享, 需要结合 Arc
, 即线程安全的引用计数(Atomic Reference Count):
fn demo() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
// 多个线程对计数器加 1
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
})
handles.push(handle);
}
// 等待所有线程结束
for handle in handles {
handle.join().unwrap();
}
// 打印结果
println!("result: {}", *counter.lock().unwrap());
}
可以看到输出稳定为 10, 上述代码也是使用 Arc
和 Mutex
的典型形式. 当然实际情况下, 如果只是对基本类型进行操作, 使用标准库中提供的 std::sync::atomic
模块更为合适, 其中就有 AtomicI8
, AtomicU16
等类型, 这些类型在多线程环境下可以直接使用(内部实际通过 CPU 提供的 Memory Ordering 相关的指令完成多线程同步).
RefCell<T> + Rc<T>
VS Mutex<T> + Arc<T>
从上面的代码我们可以看到, 定义的 counter 是不可变的, 但我们通过 Mutex
获取到了内部值的可变引用, 实际 Mutex
+ Arc
也是一种内部可变模式的例子.
上述两种组合的典型问题也需要时刻记住, Rc 使用时需要注意循环引用问题可能造成内存泄露, 而 Mutex 多线程环境使用时需要注意不要产生死锁.
并发环境下两个重要 trait: Send
和 Sync
待续...
https://doc.rust-lang.org/book/ch16-04-extensible-concurrency-sync-and-send.html