Skip to main content

Rust 笔记: Rust 写一个线程池

这个文章是通过如下两篇文章总结来的:

  1. 美团的年度技术报告 中第一篇.
  2. Rust 官方提供的文档: The Rust Programming Language 的最后一章

服务器的演化

总体是: 单线程服务器->多进程服务器 -> 多线程服务器

如何提高吞吐量

提高吞吐量有如下办法:

  1. 使用线程池
  2. 使用 Fork/Join 模型中的其他方式(轻量级线程/多进程/线程池...)
  3. 单线程异步 I/O 模型

由于多进程处理的时候, 进程创建的开销相比线程大很多, 因此准备使用多线程方式处理. 多线程处理时, 也不能无限制地创建线程, 因为始终在创建时都有开销. 一个更好的办法是, 通过一个线程池, 有任务提交后, 自动创建线程, 然后将这些已创建的线程进行复用.

线程池的工作流程如下:

  1. 当程序收到一个任务, 它会将任务分配给线程池中某个线程
  2. 该线程处理这个任务, 线程池中的其他线程则在等待更多任务
  3. 当该线程处理完任务后, 它会重新被放入线程池并标记为可用

线程池简介

翻译自TRPL 的这里.

线程池是一组已经生成的线程, 它们等待并处理被提交到线程池的任务. 任务完成后, 线程再次回到到线程池, 等待执行新任务. 线程池的优势在于并发处理请求, 提高吞吐量.

使用线程池提高吞吐量

在实现过程的开端, 首先就需要明确我们的需求, 编写测试(体现我们的接口设计):

  1. 我们会将线程池中的线程数量控制在一个较小的范围, 避免无限制地创建线程导致资源耗尽.
  2. 线程池维护着一个待进行的任务队列
  3. 我们可以控制并行任务的数量

为此, 定义如下接口:

/// 创建线程池
ThreadPool::new(usize)

/// 通过 execute 方法将任务传递给线程池中的 Worker
ThreadPool.execute(Job)

/// 负责接收并执行 Job
Worker

/// 需要执行的任务的抽象
Job

/// ThreadPool 以及 Worker 的析构
Drop

通过 Worker 持有 JoinHandle 和需要进行的任务 Fn, ThreadPool 中管理一组 Worker. 外部使用时, 实际就是将消息传递给 Worker 管理的线程, 办法是通过 channel 进行传递, ThreadPool 接受任务, 然后将任务传递给可用的 worker. 通过 mpsc 创建 channel, ThreadPool 端持有 sender, Worker 端持有 receiver, receiver 中包含的就是要进行的任务.

由于多个 Worker 共享一个 receiver, 因此需要使用 Arc 配合 Mutex 进行, Arc 进行原子引用计数, Mutex 保证每次只有一个 Worker 可以访问 receiver.

目前成熟的线程池实现也有若干:

  1. https://github.com/rust-threadpool/rust-threadpool
  2. https://github.com/progschj/ThreadPool/blob/master/ThreadPool.h
  3. https://github.com/tokio-rs/tokio
    • 异步程序运行时候的支持环境: 运行时

TRPL 中线程池例子的介绍翻译

下面的内容是 TRPL 线程池例子的完整翻译.

设计线程池的接口

库的设计者经常在设计实际库代码之前先编写客户接口, 这样对库代码本身的设计非常有利.