第一章 并发和异步编程概述
原书 p3 - p20.
从头学习异步编程, 需要有一个完整的上下文以开始.
在没有明白原理前, 先看看在 Rust 中如何使用 async/await.
https://rust-lang.github.io/async-book/01_getting_started/02_why_async.html
Rust 异步
异步编程是 并发编程模型 的一种, 让大量并发任务运行在少量 OS 线程上(或单线程上).
并发编程模型
- OS 线程: 无需修改代码, 通过线程表达并发, 缺点是线程间同步问题需要考虑, 存在性能开销. 在 I/O 密集场景下不适用.
- 事件驱动: 结合 callback 使用可实现高性能. 但会让代码看起来 "非线性", 数据流及错误传递会非常繁琐.
- 协程: 轻量级线程, 实现细节被抽象到底层支持库和运行时中.
- actor 模型: 将并发计算分配到任务单元(actor), actor 间进行消息通信. 但过程控制和重试逻辑比较难实现.
Rust 的异步模型和其他语言的区别
- 惰性 Future: Rust 中 Future 仅在 poll 后才执行, 被 drop 的 Future 不会继续执行.
- 零开销 Async: 使用 async 无需堆分配以及动态派发.
- 没有内置运行时: Rust 的异步运行时需要引入.
- 可自由选择单线程和多线程异步运行时
Rust 中 Async VS threads
根据实际情况选用, 若仅简单且计算工作大的任务, 可使用 thread, 而 I/O 密集任务选择 async. 在 thread 和 async 间切换则需要重构代码.
同一个程序中, 原则上在 async 环境执行 I/O 密集任务, 在 thread 执行 CPU 密集任务.
许多运行时都有提供两类 API 便于使用. 如 tokio 中有 task::spawn_blocking
用于 CPU 重任务在另外一个线程去执行, 这样仍然能够保证同一套 async 语法. 而 I/O 密集任务可以直接使用 task::spawn
.
而真正 CPU 计算密集型任务更多考虑的是 CPU 核心数和线程数匹配, 此时可以使用 rayon 这类并行任务处理库去做, 并且 rayon 的 api 也可以融入到 async 中, 这样可以保证代码的一致性.
Rust 中 async/await 简单使用
Rust async/await 的语法已在编译器支持, 使用时需要至少引入两个库:
- futures: 官方提供的, 包含许多异步编程支持的核心 trait 定义(
Future
,Stream
等), 以及实用工具如join!
,select!
等. - 一个运行时: 有多种选择, 比如 tokio, async_std, smol 等等.
比如下面的代码:
futures = { version = "0.3" }
use futures::{executor::block_on, join};
fn main() {
block_on(async_main());
}
async fn async_main() {
join!(learning(), dance());
}
async fn learning() {
learn_song().await;
sing_song().await;
}
async fn learn_song() {/**/}
async fn sing_song() {/**/}
async fn dance() {/**/}
此外, 由于 async
已做了转换, 在 rust 中 async fn 的返回值不需要再写 Future 包裹, 如下所示:
async fn get_data() -> Result<MyData, MyError> {/*...*/}
如下代码是使用 futures
中提供的 Executor 运行 Future 的示例:
use futures::{
channel::mpsc::{self, UnboundedReceiver, UnboundedSender},
executor::{self, ThreadPool},
StreamExt,
};
mod crsbm_utils_t;
mod life_time_illu;
mod tcp_server;
fn main() {
// 创建一个用于执行 future 的线程池.
let pool = ThreadPool::new().expect("Failed to build pool.");
let (tx, rx) = mpsc::unbounded::<i32>();
let v = executor::block_on(get_fut_values(&pool, tx, rx));
println!("Values: {v:?}");
}
async fn get_fut_values(
pool: &ThreadPool,
tx: UnboundedSender<i32>,
rx: UnboundedReceiver<i32>,
) -> Vec<i32> {
pool.spawn_ok(fut_tx_result(tx));
let fv = rx.map(|v| v * 2).collect();
fv.await
}
async fn fut_tx_result(tx: UnboundedSender<i32>) {
(0..100).for_each(|v| {
tx.unbounded_send(v).expect("Failed to send");
});
}
看完了简单的使用示例, 下面跟着书慢慢看内部细节.
概述: 并发和异步编程
下面内容是通用概念, 不局限于 Rust.
多任务处理的发展史
按时间线, 多任务处理的发展史如下:
- 非抢占式多任务: 即 OS 负责启动任务并接受输入, 程序运行时, 由程序自己将控制权交还给系统. 这种方式的缺点是当程序出现任何小问题时, 都可能造成系统瘫痪. 比如 Windows 3.1 中, 提供了系统调用用于交还控制权.
- 抢占式多任务处理: 由 OS 负责调度 CPU 资源给程序使用, OS 可暂停某进程并运行另外一个进程, 然后再切换回来. 上下文切换非常快速, 且不会影响到用户使用. 此方式是目前 OS 设计的主流形式.
- 超线程: CPU 有 6 个物理核心, 可以通过超线程技术"模拟"为 12 个逻辑核心. 通过多线程技术, 在单核心 CPU 上, 性能提升能够有 30%, 但此种性能提升和任务类型直接相关.
- 多核心处理器: 处理器通过缓存, 分支预测, 预执行, 乱序执行以及处理管线等技术, 运行地比之前快很多. 且可以在同一个 CPU 上放入多个核心, 每个核心都有超线程能力.
代码都是同步的吗?
在程序员的角度, 我们写的代码都是顺序执行的(和我们写的顺序一致).
在 OS 的角度, 程序可以被中断后挂起, 执行其他任务后, 再继续执行.
在 CPU 的角度, 它不关心指令是哪个程序的, 只要是发生中断, 就执行中断调用. CPU 就是通过此形式实现并发的.
由于 CPU 可能对指令顺序进行优化(乱序执行), 因此为保证 happens before 关系, 在需要同步的场景下, 就需要在代码特定位置生成特定指令, 以保证代码中 happens before 关系.
CPU 和 OS 间是紧密合作关系, 内存管理, 中断处理, 特权指令等.