Skip to main content

Rust 并行处理磁盘扫描: 工具 Dust 和并行库 Rayon

dust 扫描磁盘时仅用时 2 秒就搞定, 目前初版用了 9 秒, 需要看源码并明确为什么能够这么快的速度.

Dust

概述

Dust 是一个查看磁盘空间占用的工具. Dust 会展示比终端高度低一点点的输出.

APP 的启动过程? 即 Main 函数解析

从 src/main.rs 文件的 main 函数开始:

首先调用 build_cliget_matches 以获取命令行参数:

/// 获取用户输入命令参数
let options = build_cli().get_matches();

/// 使用 `clap` 构建支持的整个命令结构, 并返回名为 Command 的命令行接口, 内部实际是 `Command::new` 的调用.
///
/// 内部实际进行参数/子命令/解析行为/帮助输出的相关配置, 可理解为一个命令行接口对象 Builder.
///
/// 当所有配置完成后, 在 Command 上调用 get_matches 方法以启动运行时的命令解析过程.
fn build_cli() -> Command<'static>;

/// Command 对象方法: 从 env::args_os 中取得命令行参数, 并通过已配置的信息解析为 ArgMatches
pub fn get_matches(self) -> ArgMatches

获取用户 Home 目录的配置项(支持配置文件进行默认配置):

let config = get_config();

/// 从固定目录获取配置, 组织为一个 Config 对象, Config 对象包含的是和遍历/输出行为有关的配置.
///
/// 和命令行参数类似.
pub fn get_config() -> Config;

然后从标准输入读取输入行, 因为支持从标准输入传入目录列表(其中目录非包含关系, 包含的话就是顶层那个了):

let stdin_lines = get_lines_from_stdin();

fn get_lines_from_stdin() -> Option<Vec<String>>;

如果命令行参数没有才会从标准输入去读待扫描目录:

let target_dirs = match options.values_of("inputs") {
Some(values) => values.collect(),
None => stdin_lines.as_ref().map_or(vec!["."], |lines| {
lines.iter().map(String::as_str).collect()
}),
};

而后读取一些配置信息以便对应处理:

// 是否输出文件类型汇总大小: 这样可以使用: dust --file_types . 来汇总不同文件类型的总大小
let summarize_file_types = options.is_present("types");

// 获取文件路径正则过滤器参数: 例如仅包含 png 则 -e "\.png$"
let filter_regexs = get_regex_value(options.values_of("filter"));
// 获取被排除的文件路径正则过滤器参数: 例如排除 png 则 -v "\.png$"
let invert_filter_regexs = get_regex_value(options.values_of("invert_filter"));
// 获取命令行宽度
let terminal_width = options
.value_of_t("width")
.unwrap_or_else(|_| get_width_of_terminal());
// 获取输出时的文件树图形深度, 1 表示画一层, 3 表示三层, 以此类推.
let depth = options.value_of_t("depth").unwrap_or(usize::MAX);

// If depth is set, then we set the default number_of_lines to be max
// instead of screen height
let default_height = if depth != usize::MAX {
usize::MAX
} else {
get_height_of_terminal()
};

let number_of_lines = options
.value_of("number_of_lines")
.and_then(|v| {
v.parse()
.map_err(|_| eprintln!("Ignoring bad value for number_of_lines"))
.ok()
})
.unwrap_or(default_height);

let no_colors = init_color(config.get_no_colors(&options));

// Map<Values, fn from<PathBuf, &str>(&str) -> PathBuf> 类型的忽略目录
// 在其中忽略的目录不会被扫描
let ignore_directories = options
.values_of("ignore_directory")
.unwrap_or_default()
.map(PathBuf::from);

// 是否展示目录文件个数而非目录大小
let by_filecount = options.is_present("by_filecount");
// 是否仅限一个扫描入口相同文件系统
let limit_filesystem = options.is_present("limit_filesystem");

然后将传入的目标路径进行标准化(或简化):

// 将传入的 target_dirs 对应的 Vec<&str> 类型数组转换为标准路径集合(类型为 HashSet<PathBuf>)
// 进行了目标目录的是否包含判断, 以及规范化路径.
let simplified_dirs = simplify_dir_names(target_dirs);

// ...

创建一个在扫描过程中使用的跨线程配置对象:

 let walk_data = WalkData {
ignore_directories: ignored_full_path,
filter_regex: &filter_regexs,
invert_filter_regex: &invert_filter_regexs,
allowed_filesystems,
use_apparent_size: config.get_apparent_size(&options),
by_filecount,
ignore_hidden: config.get_ignore_hidden(&options),
};

启动核心的扫描过程:

let _rayon = init_rayon(); // 使用自定义配置初始化全局的 rayon 线程池
// 传入目标目录和遍历时的配置, 获取对应的若干文件树
let (top_level_nodes, has_errors) = walk_it(simplified_dirs, walk_data);

遍历过程

遍历过程从 walk_it 调用开始.

/// 传入一串目标目录, 以及遍历配置后, 对所有目标目录进行遍历, 并返回对应的 root node 列表, 以及是否出现无权限错误.
pub fn walk_it(dirs: HashSet<PathBuf>, walk_data: WalkData) -> (Vec<Node>, bool);

在内部实现中:

// 创建一个线程安全的原子类型布尔值
let permissions_flag = AtomicBool::new(false);

// 将传入的目标目录转换为迭代器, 并
let top_level_nodes: Vec<_> = dirs.into_iter()
.filter_map(|d| {
let inode_set = HashSet::new();
// Option 也可以使用 ? 操作符来判断, 如果是 None 则直接返回 None.
let nodes = walk(d, &permissions_flag, &walk_data, 0)?;
// 是否使用显示大小
let use_apparent_size = walk_data.use_apparent_size
// 清理 inode 号重复的文件, 避免出现重复计算大小的情况, 返回 Some 或 None
clean_inodes(nodes, &mut inode_set, use_apparent_size)
)
})
.collect();

(top_level_nodes, permissions_flag.into_inner())

上述主要有两个方法需要分析, walkclean_inodes.

先来看 walk 的实现.

针对单一目标目录的遍历: walk 方法

walk 方法的主要作用是遍历以目标目录为根的文件树, 建立树结构并返回根结点.

/// 递归层序遍历 dir 目录并返回根结点, 若 dir 不是目录, 则返回 None.
///
/// dir: 待遍历的根结点目录
///
/// permissions_flag: 是否出现权限错误的记录位
///
/// walk_data: 遍历时可以读取的配置信息
///
/// depth: 当前遍历层级(初始时传入 0 表示第 0 级的遍历过程)
fn walk(dir: PathBuf,
permissions_flag: &AtomicBool,
walk_data: &WalkData,
depth: usize) -> Option<Node>

walk 方法实现中, 通过递归的方式层序遍历文件树, 收集的每个 entry 并行遍历(因为互相没有影响).

// 通过 ref 关键字获取到某个对象的引用而非消费它
.filter_map(|entry| { if let Ok(ref entry) = entry {
// ...
}

遍历完成后再深度遍历一次树, 递归清理相同 inodes 并计算大小

一些可以学习的:

  • bool 可以调用 then 方法, true 返回 Some(T), 包含值为 then 的闭包计算结果, false 返回 None.
  • 如果类型体系中有多处用到引用的地方, 实现时的生命期标记方法.
  • Path 上已有获取 metadata 的方法!

要点分析

  • APP 的启动过程
  • 如何开始扫描过程的
  • 扫描过程中的细节处理
  • 建立文件树
  • 文件树中结点关系
  • 获取了文件的哪些信息, 如何做到 4 倍速的?
  • 修改以适应 macOS, 特别是对于文件大小的计算方面, 比如 docker image 的大小计算方式, 物理大小比较恰当.
  • 直接传入目录, 不带任何参数时的结果处理
  • -s 参数的处理方法, 即输出显式大小而非磁盘空间占用的做法
  • -d 参数的处理方法, 即展示多少层目录
  • -r 的处理, 即反向输出
  • -x 的处理, 即仅展示相同文件系统下的目录
  • -i 的处理, 不展示隐藏文件
  • -f 的处理, 即文件个数计数
  • -z 的处理, 即仅包含大于某个数值的文件

rayon

它是一个并行库, 用于将串行操作转换为并行操作, 并且可以确保不出现数据竞争.

rayon 依赖 crossbeam, 更具体说是依赖其中的 deque 和 utils 两个 crate, 先看 crossbeam.

crossbeam

crossbeam 中 utils 是基础库.

crossbeam-utils(基于 0.8.20 版本)

其中包含的实用工具:

  • Atomics:
    • AtomicCell: 类似标准库的 Cell, 提供线程安全的可变内存位置, 即它可以在多线程间安全地共享.