RxSwift - 过滤操作符
· 10 min read
RxSwift 中的若干个过滤功能的操作符介绍.
包括如下操作符(有些操作符可能是操作符族):
-
ignoreElements
: 应用这个操作符会生成一个 Completable, 故只有错误和完成, 所有其他元素均被忽略. -
elementAt
: 在一个可观察序列中获取指定下标的元素. -
skip
: 在序列中跳过若干个元素后开始取元素. -
skipWhile
: 一直跳过元素, 直到遇到满足条件的元素才不再执 行跳过. -
skipUntil
: 在另外一个序列没有开始发射时就一直忽略元素, 直到另外一个序列发射后停止忽略. -
take
: 在序列中只取指定个数的元素. -
takeWhile
: 一直取元素, 直到遇到不满足指定条件的元素就终止取元素, 需要注意的是, 如果序列的第一个元素就不满足条件, 则所有的元素都不会取. -
takeUntil
: 在另外一个序列没有发射时一直取元素, 直到另外一个序列发射就停止取. -
distinctUntilChanged
: 在连续的元素中, 如果有若干个相邻元素相等, 则只取相邻元素中的第一个, 直到遇到不同的再取.
完整示例代码如下所示:
/*
含义: 应用这个操作符会生成一个 Completable, 故只有错误和完成, 所有其他元素均被忽略
*/
exampleOf(msg: "过滤操作符: ignoreElements", action: {
let strikes = PublishSubject<String>()
strikes.ignoreElements().subscribe(onCompleted: {
print("完成")
}, onError: { error in
print(error)
}).disposed(by: $0)
strikes.onNext("X")
strikes.onNext("X")
strikes.onNext("X")
strikes.onNext("X")
strikes.onCompleted()
})
/*
含义: 在一个可观察序列中获取指定下标的元素
*/
exampleOf(msg: "过滤操作符: elementAt", action: {
let strikes = PublishSubject<String>()
strikes.elementAt(2).subscribe(onNext: {
print($0)
}, onError: {
print($0)
}, onCompleted: {
print("完成")
}, onDisposed: {
print("释放")
}).disposed(by: $0)
strikes.onNext("X") // 0
strikes.onNext("Y") // 1
strikes.onNext("Z") // 2
strikes.onNext("1") // 3
strikes.onCompleted()
})
/*
含义: 在序列中跳过若干个元素后开始取元素直到完成或错误.
*/
exampleOf(msg: "过滤操作符: skip", action: {
let flow = PublishSubject<String>()
flow.skip(3).subscribe(onNext: {
print($0)
}, onError: {
print($0)
}, onCompleted: {
print("完成")
}, onDisposed: {
print("被释放")
}).disposed(by: $0)
flow.onNext("X")
flow.onNext("Y")
flow.onNext("Z")
flow.onNext("1")
flow.onNext("2")
flow.onCompleted()
})
/*
一直跳过元素, 直到遇到满足条件的元素就不再执行跳过操作
*/
exampleOf(msg: "过滤操作符: skipWhile", action: { bag in
let flow = PublishSubject<String>()
flow.skipWhile({ $0.count > 2 }).subscribe(onNext: {
print($0)
}, onError: {
print($0)
}, onCompleted: {
print("完成")
}, onDisposed: {
print("被释放")
}).disposed(by: bag)
flow.onNext("1234")
flow.onNext("256")
flow.onNext("377")
flow.onNext("4")
flow.onNext("5")
flow.onNext("6")
flow.onNext("7")
flow.onCompleted()
})
/*
在另外一个序列没有开始发射时就一直忽略元素, 直到另外一个序列发射后停止忽略.
*/
exampleOf(msg: "过滤操作符: skipUntil", action: { bag in
let flow = PublishSubject<String>()
let trigger = PublishSubject<Void>()
flow.skipUntil(trigger).subscribe(onNext: {
print($0)
}, onError: {
print($0)
}, onCompleted: {
print("完成")
}, onDisposed: {
print("被释放")
}).disposed(by: bag)
flow.onNext("X")
flow.onNext("X")
flow.onNext("X")
trigger.onNext(())
flow.onNext("Y")
flow.onNext("Z")
flow.onCompleted()
})
/*
取指定个数的元素
*/
exampleOf(msg: "过滤操作符: take", action: { bag in
Observable.from([1, 2, 3, 4, 5, 6]).take(2).subscribe(onNext: {
print($0)
}, onError: {
print($0)
}, onCompleted: {
print("完成")
}, onDisposed: {
print("被释放")
}).disposed(by: bag)
})
/*
一直取元素, 直到遇到不满足指定条件的元素就终止取元素.
需要注意的是, 如果序列的第一个元素就不满足条件, 则之后所有的元素都不会取.
*/
exampleOf(msg: "过滤操作符: takeWhile", action: { bag in
Observable.of(1, 2, 3, 4, 5, 6, 7)
.takeWhile({ $0 > 3 })
.subscribe(onNext: {
print($0)
}, onError: {
print($0)
}, onCompleted: {
print("完成")
}, onDisposed: {
print("被释放")
}).disposed(by: bag)
Observable.of(1, 2, 3, 4, 5, 6, 7)
.enumerated()
.takeWhile({ index, elem in index < 3 && elem < 2 })
.subscribe(onNext: {
print($0)
}, onError: {
print($0)
}, onCompleted: {
print("完成")
}, onDisposed: {
print("被释放")
}).disposed(by: bag)
})
/*
在另外一个序列没有发射时一直取元素, 直到另外一个序列发射就停止取
*/
exampleOf(msg: "过滤操作符 takeUntil", action: { bag in
let flow = PublishSubject<String>()
let trigger = PublishSubject<Void>()
flow.takeUntil(trigger).subscribe(onNext: {
print($0)
}, onError: {
print($0)
}, onCompleted: {
print("完成")
}, onDisposed: {
print("被释放")
}).disposed(by: bag)
flow.onNext("X")
flow.onNext("X")
flow.onNext("X")
flow.onNext("X")
flow.onNext("X")
trigger.onNext(())
flow.onNext("Y")
flow.onNext("Z")
flow.onNext("1")
flow.onCompleted()
})
/*
在连续的元素中, 如果有若干个相邻元素相等, 则只取相邻元素中的第一个, 直到遇到不同的再取.
*/
exampleOf(msg: "过滤 操作符: distinctUntilChanged", action: { bag in
let flow = PublishSubject<String>()
flow.distinctUntilChanged().subscribe(onNext: {
print($0)
}, onError: {
print($0)
}, onCompleted: {
print("完成")
}, onDisposed: {
print("被释放")
}).disposed(by: bag)
flow.onNext("X")
flow.onNext("X")
flow.onNext("Y")
flow.onNext("X")
flow.onNext("Z")
flow.onNext("Z")
flow.onNext("X")
flow.onCompleted()
})
额外介绍一些过滤操作符以及一些实际开发的做法, 包括如下:
-
share 操作符
-
filter 操作符
-
将老的异步代码转换为返回可观察序列
-
takeLast 操作符
-
take(_: scheduler:)
即只在固定时间段内取元素, 否则自动取消订阅. -
throttle 操作符: 在固定时间段内只取第一个元素.
代码如下所示:
/*
1. share 操作符
如果不作处理的话, 每次针对某个 Observable 的订阅均会引起 Observable 调用 create. 且可能每次 create 时序列中的
元素都和之前的不一样了.
故可以使用 share 操作符来避免这种多余的 create 发生.
share 操作符的原理是: 只有当订阅者的数量从 0 到 1 时才会调用 create. 如果之后的订阅者进来, share 则会提供之前已经创建
好的订阅供它们使用.
当所有的订阅都被释放后, share 也会将共享序列释放掉.
这个操作符暂时没有例子.
*/
/*
2. filter 操作符: 过滤效果和 swift 本身提供的类似.
另外可以利用 filter 操作符来做一些额外的工作, 比如在判断的同时进行一些计算, 并将结果保存到对象的属性中.
*/
private func filterElem() {
exampleOf(msg: "过滤操作符: filter", action: { bag in
let obv = Observable.of(1, 3, 5, 7, 9, 2, 4, 6, 8, 10)
obv.filter({ $0 % 2 == 0 }).subscribe(onNext: {
print($0)
}, onError: {
print($0)
}, onCompleted: {
print("完成")
}, onDisposed: {
print("被释放")
}).disposed(by: bag)
})
}
/*
3. 将老的异步代码转换为返回 Observable 或 Traits 或 Subject 的办法很简单:
*/
private func createAnObservable() {
let observable = Observable<Data>.create({ observer in
let disposeables = Disposables.create()
let session = URLSession.shared
session.dataTask(
with: URL(string: "https://www.baidu.com")!,
completionHandler: { data, resp, error in
if let error = error {
observer.onError(error)
}
observer.onNext(data!)
observer.onCompleted()
}).resume()
return disposeables
})
observable.subscribe(onNext: {
print($0)
}, onError: {
print($0)
}, onCompleted: {
print("完成")
}, onDisposed: {
print("被释放")
}).disposed(by: disposeBag)
}
/*
4. takeLast 操作符: 不用再费劲心机想跳过多少多少个了, 直接取最后一个
*/
private func takeLastElem() {
exampleOf(msg: "takeLast 操作符", action: { bag in
let obv = Observable.of(1,2,3,4,5,6)
obv.takeLast(1)
.subscribe(onNext: {
print($0)
}, onError: {
print($0)
}, onCompleted: {
print(" takeLast 的那个订阅完成")
}, onDisposed: {
print("释放 takeLast 订阅")
}).disposed(by: bag)
})
}
/*
5. take(_: scheduler:) 在等待一段时机后自动结束某个订阅: 在网络操作下非常常见.
通过下面这个例子可以发现, 停止订阅时正好间隔了 5 秒(时间在 μs 即微秒级别上有误差.)
*/
private func takeSomeTime() {
let publish = PublishSubject<Void>()
print("当前时间戳: \(Date().timeIntervalSinceReferenceDate)")
publish.take(5.0, scheduler: MainScheduler.instance).subscribe(onNext: {
print("发布了!")
print("当前时间戳: \(Date().timeIntervalSinceReferenceDate)")
}, onError: {
print($0)
}, onCompleted: {
print("完成")
print("当前时间戳: \(Date().timeIntervalSinceReferenceDate)")
}, onDisposed: {
print("被释放")
print("当前时间戳: \(Date().timeIntervalSinceReferenceDate)")
}).disposed(by: disposeBag)
publish.onNext(())
}
/*
6. throttle 操作符: 过滤掉指定时间段内的相邻元素, 即固定时间段内只取一个元素.
这个操作符用来处理连续点击的情况非常有用.
暂无示例.
*/