# RxSwiftDemo **Repository Path**: dontgo/RxSwiftDemo ## Basic Information - **Project Name**: RxSwiftDemo - **Description**: RxSwift学习 - **Primary Language**: Swift - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2021-07-23 - **Last Updated**: 2022-06-17 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # RxSwift ## 介绍 `RxSwift`是 `ReactiveX` 家族的重要一员,`ReactiveX` 是 `Reactive Extensions` 的缩写,一般简写为 `Rx`。`ReactiveX` 官方给`Rx`的定义是:**Rx是一个使用可观察数据流进行异步编程的编程接口**。 > `ReactiveX`不仅仅是一个编程接口,它是一种编程思想的突破,它影响了许多其它的程序库和框架以及编程语言。它拓展了`观察者模式`,使你能够`自由组合多个异步事件`,而`不需要去关心线程,同步,线程安全,并发数据以及I/O阻塞`。 `RxSwift` 是 `Rx` 为 `Swift` 语言开发的一门**函数响应式编程**语言, 它可以代替iOS系统的 `Target Action`、`代理`、`闭包`、`通知`、 `KVO`,同时还提供`网络`、`数据绑定`、`UI事件处理`、`UI的展示和更新`、`多线程`等等。 `Swift`为**值类型**,在传值与方法回调上有影响,`RxSwift`一定程度上弥补`Swift`的灵活性: - 使得代码复用性较强,减少代码量 - 因为声明都是不可变更,增加代码可读性 - 使得更易于理解业务代码,抽象异步编程,统一代码风格 - 使得代码更易于编写集成单元测试,增加代码稳定性 ## 核心流程 `RxSwift`这个优秀的框架,设计的`api`也是非常精简,让陌生的用户也能非常快速上手: - 1、创建序列 - 2、订阅序列 - 3、发送信号 ``` // 1、创建序列 _ = Observable.create({ observer in // 3、发送信号 observer.onNext("11111111111") return Disposables.create() }) // 2、订阅序列 .subscribe(onNext: { text in print("订阅到:\(text)") }) // 控制台打印:"订阅到:11111111111" ``` ### 1、创建序列 ``` extension ObservableType { // MARK: create /** Creates an observable sequence from a specified subscribe method implementation. - seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html) - parameter subscribe: Implementation of the resulting observable sequence's `subscribe` method. - returns: The observable sequence with the specified implementation for the `subscribe` method. */ public static func create(_ subscribe: @escaping (AnyObserver) -> Disposable) -> Observable { AnonymousObservable(subscribe) } } ``` 由以上代码可以清晰的看到,`可观察序列`的创建是利用协议拓展功能`create`方法实现的,其中创建了一个`AnonymousObservable(匿名可观察序列)`,**这个类是一个内部类,具备一些通用特性(具备自己功能的类才会命名)**。 ![继承关系](https://gitee.com/dontgo/RxSwiftDemo/raw/master/images/%E7%BB%A7%E6%89%BF%E5%85%B3%E7%B3%BB.png) - `create` 方法的时候创建了一个内部对象 `AnonymousObservable` - `AnonymousObservable` 保存了外界的闭包 - `AnonymousObservable`继承了 `Producer` 具有非常重要的方法 `subscribe` ### 2、订阅序列 ``` extension ObservableType { ... public func subscribe( onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil ) -> Disposable { ... // ... 省略不影响我们探索的代码 let observer = AnonymousObserver { event in ... switch event { case .next(let value): onNext?(value) case .error(let error): if let onError = onError { onError(error) } else { Hooks.defaultErrorHandler(callStack, error) } disposable.dispose() case .completed: onCompleted?() disposable.dispose() } } return Disposables.create( self.asObservable().subscribe(observer), disposable ) } } ``` 由上述代码分析: - **Element**:`Swift` 的关联类型,这个`Element`就是我们的**序列类型**,这里就是`String` - 创建了一个 `AnonymousObserver (匿名内部观察者)` 手法和我们的 `AnonymousObservable` 差不多,它这里的初始化是闭包参数,保存了外界的 `onNext`,`onError`,`onCompleted`, `onDisposed` 的处理回调闭包的调用,下图是 `观察者` 的继承链关系: ![观察者的继承链关系](https://gitee.com/dontgo/RxSwiftDemo/raw/master/images/%E8%A7%82%E5%AF%9F%E8%80%85%E7%9A%84%E7%BB%A7%E6%89%BF%E9%93%BE%E5%85%B3%E7%B3%BB.png) - `self.asObservable()`:是`RxSwift`为了保持一致性的写法 - `self.asObservable().subscribe(observer)`:本质就是`self.subscribe(observer)`,通过可观察序列的继承关系,可以非常快速的定位 `Producer` 订阅代码 ``` class Producer: Observable { ... override func subscribe(_ observer: Observer) -> Disposable where Observer.Element == Element { if !CurrentThreadScheduler.isScheduleRequired { // The returned disposable needs to release all references once it was disposed. let disposer = SinkDisposer() let sinkAndSubscription = self.run(observer, cancel: disposer) disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription) return disposer } else { return CurrentThreadScheduler.instance.schedule(()) { _ in let disposer = SinkDisposer() let sinkAndSubscription = self.run(observer, cancel: disposer) disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription) return disposer } } } ... } ``` - `self.run`:最终由`Producer`延伸到具体的事务代码`AnonymousObservable.run` ``` override func run(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element { let sink = AnonymousObservableSink(observer: observer, cancel: cancel) let subscription = sink.run(self) return (sink: sink, subscription: subscription) } ``` - `sink.run`:业务处理下沉,使分工更明确 ``` func run(_ parent: Parent) -> Disposable { parent.subscribeHandler(AnyObserver(self)) } ``` - `parent`:就是上面传过来的`AnonymousObservable`对象 - `AnonymousObservable._subscribeHandler`:解释了**为什么序列订阅的时候流程会执行我们 序列闭包,然后去执行发送响应?** - `AnyObserver(self)`:在这个构造方法里面,创建了一个结构体 `AnyObserver` 保存了一个信息 `AnonymousObservableSink.on` 函数,不是 `AnonymousObservableSink` ``` /// Construct an instance whose `on(event)` calls `observer.on(event)` /// /// - parameter observer: Observer that receives sequence events. public init(_ observer: Observer) where Observer.Element == Element { self.observer = observer.on } ``` ### 3、发送信号 从上面的分析非常清晰地得知:`observer.onNext("11111111111")`的本质是`AnyObserver.onNext("11111111111")`。 这时候发现`AnyObserver`是没有这个方法,这很正常!一般思路:找父类,找协议。 ``` extension ObserverType { /// Convenience method equivalent to `on(.next(element: Element))` /// /// - parameter element: Next element to send to observer(s) public func onNext(_ element: Element) { self.on(.next(element)) } } ``` - `observer.onNext("11111111111")`再次变成`AnyObserver.on(.next("11111111111"))`,`AnyObserver`调用了 `on` 里面传的是 `.next函数`,`.next函数`带有我们最终的参数 ``` public struct AnyObserver : ObserverType { public init(_ observer: Observer) where Observer.Element == Element { self.observer = observer.on } public func on(_ event: Event) { self.observer(event) } } ``` - `self.observer` 构造初始化就是:`AnonymousObservableSink.on` 函数,`self.observer(event)`变成`AnonymousObservableSink.on(event)`,其中`event = .next("11111111111")`,最终核心逻辑又回到了`sink`,这也正是`RxSwift`设计的高明之处。 ``` final private class AnonymousObservableSink: Sink, ObserverType { ... func on(_ event: Event) { ... switch event { case .next: if load(self.isStopped) == 1 { return } self.forwardOn(event) case .error, .completed: if fetchOr(self.isStopped, 1) == 0 { self.forwardOn(event) self.dispose() } } } ... } ``` - `self.forwardOn(event)`:这也是执行的核心代码,因为 `AnonymousObservableSink` 继承 `Sink`,代码如下 ``` class Sink: Disposable { ... final func forwardOn(_ event: Event) { ... if isFlagSet(self.disposed, 1) { return } self.observer.on(event) } ... } ``` - `self._observer`:就是初始化保存的观察者`AnonymousObserver`,逻辑辗转回到了`订阅序列`时候创建的 `AnonymousObserver` 的参数闭包的调用! ### 总结:RxSwift的结构 - 就是序列概念,满世界都是序列:编码统一 ,随时随地享用 - 通过函数式思想把一些列的需求操作下沉(把开发者不关心的东西封装):优化代码,节省逻辑 - 解决`Swift`这门静态语言的响应能力,利用随时间维度序列变化为轴线,用户订阅关心能随轴线一直保活,达到订阅一次,响应一直持续 ## Observable序列的创建方式 **序列**在`RxSwift`的世界里面是非常重要,平时开发过程用好序列的创建,能够给开发带来事半功倍的效果!以下是常用的序列创建方式: ### 1、empty 首先,来一个空的序列,本序列时间是`Int`类型,这里调用`empty()`没有序列,只能`complete`。 ``` /// 空序列创建 private func empty() { let emptyOb = Observable.empty() _ = emptyOb.subscribe(onNext: { num in print("订阅: \(num)") }, onError: { error in print("error: \(error)") }, onCompleted: { print("完成回调") }, onDisposed: { print("释放回调") }) } /// 打印: 完成回调 释放回调 ``` > 这种方式不常用,通过源码解析查看 ``` extension ObservableType { public static func empty() -> Observable { EmptyProducer() } } final private class EmptyProducer: Producer { override func subscribe(_ observer: Observer) -> Disposable where Observer.Element == Element { observer.on(.completed) return Disposables.create() } } ``` 很明显在订阅的时候,直接`observer.on(.completed)` 发送了完成信号,非常简洁。 ### 2、just - 单个信号序列创建 - 该方法通过传入一个默认值来初始化,构建一个只有一个元素的`Observable`队列,订阅完信息自动`complete` - 下方代码中,标注了`Observable`的类型为`Observable<[String]>`,即指定了这个`Observable`所发出的事件携带的数据类型必须是`String`类型 ``` /// just 创建单个序列 private func just() { let array = ["111", "222"] _ = Observable<[String]>.just(array) .subscribe(onNext: { value in print("订阅: \(value)") }, onError: { error in print("error: \(error)") }, onCompleted: { print("完成回调") }, onDisposed: { print("释放回调") }) } // 打印: 订阅: ["111", "222"] 完成回调 释放回调 ``` > 有点数据遍历的感觉,在平时开发里面还是应用挺多的,通过源码解析查看 ``` final private class Just: Producer { ... override func subscribe(_ observer: Observer) -> Disposable where Observer.Element == Element { observer.on(.next(self.element)) observer.on(.completed) return Disposables.create() } } ``` `observer.on(.next(self._element))`常规订阅之后就会发送`.next`事件,之后会自动发送`.complete`事件。 ### 3、of - 创建一个**新的可观察序列**,该实具有可变数量的元素 - 可以接收可变数量的元素(必须是同类型) ``` /// of: 创建新的可观察序列 /// 多个元素:针对序列处理 private func of() { // String Observable.of("aaa", "bbb") .subscribe { event in print("event: \(event)") } .disposed(by: DisposeBag()) // 字典 Observable<[String: Any]>.of(["name": "T AO", "age": 29]) .subscribe { event in print("event: \(event)") } .disposed(by: DisposeBag()) // 数组 Observable<[String]>.of(["aaa", "bbb", "ccc"]) .subscribe { event in print("event: \(event)") } .disposed(by: DisposeBag()) } // 打印: event: next(aaa) event: next(bbb) event: completed event: next(["name": "T AO", "age": 29]) event: completed event: next(["aaa", "bbb", "ccc"]) event: completed ``` - 无论`多个元素`、`字典`、`数组`均正常使用 - 底层源码的结构也是中规中矩,初始化保存调度环境和传入的元素 - 订阅流程也是利用`sink`,然后通过`mutableIterator`迭代器处理发送 ``` public static func of(_ elements: Element ..., scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable { ObservableSequence(elements: elements, scheduler: scheduler) } final private class ObservableSequence: Producer { ... override func run(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element { let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel) let subscription = sink.run() return (sink: sink, subscription: subscription) } } final private class ObservableSequenceSink: Sink where Sequence.Element == Observer.Element { ... func run() -> Disposable { return self.parent.scheduler.scheduleRecursive(self.parent.elements.makeIterator()) { iterator, recurse in var mutableIterator = iterator if let next = mutableIterator.next() { self.forwardOn(.next(next)) recurse(mutableIterator) } else { self.forwardOn(.completed) self.dispose() } } } } ``` ### 4、from - 将`可选序列`转化为`可观察序列` - 从`集合`、`数组`、`set`中获取序列,有可选处理,更安全 ``` /// from: 可选序列 --> 可观察序列 private func from() { // 数组 Observable<[String]>.from(optional: ["a", "b"]) .subscribe { event in print("event: \(event)") } .disposed(by: DisposeBag()) /// 字典 Observable<[String: Any]>.from(optional: ["name": "T AO", "age": 29]) .subscribe { event in print("event: \(event)") } .disposed(by: DisposeBag()) /// set Observable.from(optional: [1, 2, 3, 1, 2]) .subscribe { event in print("event: \(event)") } .disposed(by: DisposeBag()) } // 打印: event: next(["a", "b"]) event: completed event: next(["age": 29, "name": "T AO"]) event: completed event: next([1, 3, 2]) event: completed ``` > 源码解析查看 ``` final private class ObservableOptional: Producer { private let optional: Element? init(optional: Element?) { self.optional = optional } override func subscribe(_ observer: Observer) -> Disposable where Observer.Element == Element { if let element = self.optional { observer.on(.next(element)) } observer.on(.completed) return Disposables.create() } } ``` - `self.optional = optional`底层初始化可选项保存,订阅流程判断是否匹配我们的可选项 - 发送`observer.on(.next(element))`序列,随即自动`observer.on(.completed)`完成序列发送 ### 5、deferred - 返回一个可观察序列,该序列在新观察者订阅时调用指定的工厂函数 - 这里有一个需求:**动态序列 - 根据外界的标识 - 动态输出** - 使用`deferred()`方法延迟`Observable`序列的初始化,通过传入的`block`来实现`Observable`序列的初始化并且返回 ``` /// deferred: 延迟序列初始化 private func deferred() { var isOdd = true _ = Observable.deferred({ isOdd = !isOdd if isOdd { return Observable.of(1, 3, 5, 7, 9) } else { return Observable.of(2, 4, 6, 8, 10) } }) .subscribe({ event in print("event: \(event)") }) .disposed(by: DisposeBag()) } 打印: event: next(2) event: next(4) event: next(6) event: next(8) event: next(10) event: completed ``` - `self.observableFactory = observableFactory`初始化保存了这段工厂闭包 ``` func run() -> Disposable { do { let result = try self.observableFactory() return result.subscribe(self) } catch let e { self.forwardOn(.error(e)) self.dispose() return Disposables.create() } } ``` - 在订阅流程到`sink`的时候,把这段工厂闭包执行,有种中间层被包装的感觉 ### 6、range - 使用**指定的调度程序生成并发送观察者消息**,生成指定范围内的可观察整数序列 ``` /// range: 使用指定的调度程序生成生成指定范围内的可观察整数序列 private func rang() { Observable.range(start: 2, count: 5) .subscribe { event in print("event: \(event)") } .disposed(by: DisposeBag()) } // 打印: event: next(2) event: next(3) event: next(4) event: next(5) event: next(6) event: completed ``` > 源码解析查看 ``` final private class RangeProducer: Producer { fileprivate let start: Element fileprivate let count: Element fileprivate let scheduler: ImmediateSchedulerType init(start: Element, count: Element, scheduler: ImmediateSchedulerType) { guard count >= 0 else { rxFatalError("count can't be negative") } guard start &+ (count - 1) >= start || count == 0 else { rxFatalError("overflow of count") } self.start = start self.count = count self.scheduler = scheduler } override func run(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element { let sink = RangeSink(parent: self, observer: observer, cancel: cancel) let subscription = sink.run() return (sink: sink, subscription: subscription) } } ``` - 保存序列中**第一个整数的值、要生成的顺序整数的数目、调度环境** ``` final private class RangeSink: Sink where Observer.Element: RxAbstractInteger { ... func run() -> Disposable { return self.parent.scheduler.scheduleRecursive(0 as Observer.Element) { i, recurse in if i < self.parent.count { self.forwardOn(.next(self.parent.start + i)) recurse(i + 1) } else { self.forwardOn(.completed) self.dispose() } } } } ``` - 根据之前保存的信息,数据的状态也不断攀升,然后递归到规定的要求 ### 7、generate 通过运行产生序列元素的状态驱动循环,使用指定的调度程序运行循环,发送观察者消息,从而生成一个可观察序列。 该方法创建一个只有当提供的所有的判断条件都为`true`的时候,才会给出动作的`Observable`序列。 - 初始值给定,然后**判断条件1**,再**判断条件2**会一直递归下去,直到**条件1或者条件2不满足**,类似数组遍历循环。 - **参数:** - 1、`initialState`:给定初始值 - 2、`condition`:终止生成的条件(返回`false`时) - 3、调度器:用来运行生成器循环的调度器,默认:`CurrentThreadScheduler.instance` - 4、`iterate`:迭代步骤函数 - 返回生成的序列 ``` /// generate: 给定条件循环运行,生成序列 private func generate() { Observable.generate(initialState: 0, // 初始值 condition: { $0 < 10 }, // 条件 1 iterate: { $0 + 2 }) // 条件 2 .subscribe { event in print(event) } .disposed(by: DisposeBag()) // 数组遍历 let array = ["a", "b", "c", "d", "e"] Observable.generate(initialState: 0, condition: { $0 < array.count }, iterate: { $0 + 1 }) .subscribe(onNext: { print(array[$0]) }) .disposed(by: DisposeBag()) } // 打印: next(0) next(2) next(4) next(6) next(8) completed a b c d e ``` ### 8、timer 返回一个可观察序列,该序列使用指定的调度程序运行计时器,在指定的初始相对到期时间过后定期生成一个值。 ``` /// timer: 在指定的初始相对到期时间过后定期生成一个值 private func timer() { Observable.timer(RxTimeInterval.milliseconds(5000), period: RxTimeInterval.seconds(2), scheduler: MainScheduler.instance) .subscribe { event in print("event: \(event)") } .disposed(by: disposeBag) // 因为没有指定期限period,故认定为一次性 Observable.timer(RxTimeInterval.seconds(1), scheduler: MainScheduler.instance) .subscribe { event in print("++++ \(event)") } .disposed(by: disposeBag) } ``` - 参数一:第一次响应距离现在的时间 - 参数二`period`:时间间隔 - 参数三`scheduler`:线程 - 状态码的不断攀升,间隔时间不断发送响应 ### 9、interval 返回一个可观察序列,该序列在每个周期之后生成一个值,使用指定的调度程序运行计时器并发送观察者消息。 ``` /// interval: 定时生成序列,发送响应 private func interval() { Observable.interval(RxTimeInterval.seconds(3), // 间隔 scheduler: MainScheduler.instance) // 线程 .subscribe { event in print("event: \(event)") } .disposed(by: disposeBag) } ``` ### 10、repeatElement 使用指定的调度程序发送观察者消息,生成无限重复给定元素的可观察序列。 ``` /// repeatElement: 生成无限重复给定元素可观察序列 private func repeatElement() { Observable.repeatElement(5) .subscribe { event in print("event: \(event)") } .disposed(by: disposeBag) } ``` ### 11、error - 返回一个以`error`结束的可观察序列 - 这个序列平时在开发也比较常见,请求网络失败也会发送失败信号! ``` /// error:生成一个以 error 结束的序列 private func error() { Observable.error(NSError(domain: "请求错误", code: 10086, userInfo: ["reason": "unknown"])) .subscribe { event in print("evnet: \(event)") } .disposed(by: disposeBag) } // 打印: evnet: error(Error Domain=请求错误 Code=10086 "(null)" UserInfo={reason=unknown}) ``` ### 12、never - 该方法创建一个永远不会发出 `Event(也不会终止)`的 `Observable` 序列 - 这种类型的响应源在测试或者在组合操作符中禁用确切的源非常有用 ``` /// never: 生成一个永远不会发出 event,也不会结束的序列 private func never() { Observable.never() .subscribe { event in print("event: \(event)") } .disposed(by: disposeBag) } ``` ### 13、create - 该方法接受一个**闭包形式的参数**,任务是对每一个过来的订阅进行处理 - 下面是一个简单的demo,为方便演示,这里增加了订阅相关代码 - 这也是序列创建的一般方式,应用非常之多 ``` private func create() { let observable = Observable.create { observer in // 对订阅者发出了.next事件,且携带了一个数据"www.baidu.com" observer.onNext("www.baidu.com") // 对订阅者发出了.completed事件 observer.onCompleted() // 因为一个订阅行为会有一个Disposable类型的返回值,所以在结尾一定要returen一个Disposable return Disposables.create() } // 订阅 observable.subscribe { print("event: \($0)") }.disposed(by: disposeBag) } // 打印 event: next(www.baidu.com) event: completed ``` ## 高阶函数 ### 1、组合操作符 #### 1.1、startWith 将一系列值添加到可观察序列。 ``` private func startWith() { Observable.of("1", "2", "3", "4") .startWith("A") .startWith("B") .startWith("C", "a", "b") .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) } // 打印:CabBA1234 ``` #### 1.2、merge 将给定可枚举序列中所有可观察序列的元素合并为单个可观察序列。 ``` private func merge() { let subject1 = PublishSubject() let subject2 = PublishSubject() Observable.of(subject1, subject2) .merge() .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) subject1.onNext("G") subject1.onNext("e") subject2.onNext("r") subject2.onNext("m") subject1.onNext("t") subject2.onNext("a") subject1.onNext("o") } // 打印:Germtao ``` #### 1.3、zip 每当所有可观察序列都在相应的索引处产生元素时,使用选择器函数将指定的可观察序列合并为一个可观察序列。 ``` /// zip: 只有两个序列同时有值的时候才会响应,否则存值 private func zip() { let stringSubject = PublishSubject() let intSubject = PublishSubject() Observable.zip(stringSubject, intSubject) { stringElement, intElement in "\(stringElement) \(intElement)" } .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) stringSubject.onNext("A") stringSubject.onNext("O") // 到这里存储了 A O 但是不会响应除非;另一个响应 intSubject.onNext(1) // 响应一个 intSubject.onNext(2) // 响应一个 stringSubject.onNext("T") // 存一个 intSubject.onNext(3) // 响应一个 } ``` #### 1.4、combineLatest 每当任何可观察序列产生元素时,使用选择器函数将指定的可观察序列合并为一个可观察序列。 ``` private func combineLatest() { let stringSubject = PublishSubject() let intSubject = PublishSubject() Observable.combineLatest(stringSubject, intSubject) { stringElement, intElement in "\(stringElement) \(intElement)" } .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) stringSubject.onNext("T") // 存一个 stringSubject.onNext("A") // 存一个 覆盖上一个,与 zip 不同 intSubject.onNext(1) // 发现stringSubject也有A 响应 A 1 intSubject.onNext(2) // 覆盖1 -> 2 发现stringSubject 有值A 响应 A 2 stringSubject.onNext("O") // 覆盖A -> O 发现intSubject 有值2 响应 O 2 // 应用非常频繁: 比如账户和密码同时满足->才能登陆. 不关系账户密码怎么变化的只要查看最后有值就可以 loginEnable } // 打印: A 1 A 2 O 2 ``` #### 1.5、switchLatest - 将可观察序列的可观察序列转换为可观察序列,仅从最近的可观察序列产生值。 - 每次收到新的内部可观察序列时,取消订阅先前的内部可观察序列。 ``` private func switchLatest() { let subject1 = BehaviorSubject(value: "T") let subject2 = BehaviorSubject(value: "1") // 选择了 subject1 就不会监听 subject2 let subject = BehaviorSubject(value: subject1) subject.asObservable() .switchLatest() .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) subject1.onNext("G") subject1.onNext("e") // TGe subject2.onNext("2") subject2.onNext("3") // 2 3 都不会监听,但是默认保存 2覆盖1 3覆盖2 subject.onNext(subject2) // 切换到 subject2 TGe3 subject1.onNext("r") // 不会监听,保存,切换到 subject1 后会打印 subject2.onNext("4") // TGe34 } ``` ### 2、映射操作符 #### 2.1、map 转换闭包应用于可观察序列发出的元素,并返回转换后的元素的新可观察序列。 ``` private func map() { let observable = Observable.of(1, 2, 3, 4) observable.map { num in num + 2 } .subscribe { print($0) } .disposed(by: disposeBag) } // 打印: next(3) next(4) next(5) next(6) completed ``` #### 2.2、flatMap、flatMapLatest - `flatMap`:将可观察序列的每个元素投影到可观察序列,并将生成的可观察序列合并为一个可观察序列。 - `flatMapLatest`:将可观察序列的每个元素投影到新的可观察序列序列中,然后将可观察序列的可观察序列转换为仅**从最近的**可观察序列产生值的可观察序列。是 `map` + `switchLatest` 运算符的组合。 ``` private func flatMap() { let boy = TTPlayer(score: 100) let girl = TTPlayer(score: 90) // BehaviorSubject:订阅后,获取Subject发出的最新值,然后是订阅后发出的值 let player = BehaviorSubject(value: boy) player.asObservable() // .flatMap { $0.score.asObservable() } .flatMapLatest { $0.score.asObservable() } .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) boy.score.onNext(60) player.onNext(girl) boy.score.onNext(50) boy.score.onNext(40) // flatMapLatest 就不会监听,也就是 50 40 不会打印 girl.score.onNext(10) girl.score.onNext(0) } ``` #### 2.3、scan 从初始就带有一个默认值开始,然后对可观察序列发出的每个元素应用累加器闭包,并以单个元素可观察序列的形式返回每个中间结果。 ``` private func scan() { Observable.of(10, 100, 1000) .scan(2) { aggregateValue, newValue in aggregateValue + newValue // 10 + 2, 100 + 10 + 2, 1000 + 100 + 10 + 2 } .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) } // 打印:12 112 1112 ``` ### 3、过滤条件操作符 #### 3.1、filter - 给定条件过滤可观察序列的元素。 ``` private func filter() { Observable.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 0) .filter { $0 % 2 == 0 } .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) } // 打印:2 4 6 8 0 ``` #### 3.2、distinctUntilChanged - 根据相等运算符返回一个仅包含**不同连续元素**的可观察序列。 ``` private func distinctUntilChanged() { Observable.of(1, 2, 2, 2, 2, 3, 3, 3, 4, 4) .distinctUntilChanged() .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) } // 打印:1 2 3 4 ``` #### 3.3、elementAt - 仅在可观察序列发出的所有元素的**指定索引处**发出元素。 ``` private func elementAt() { Observable.of("G", "e", "r", "m", "t", "a", "o") .element(at: 3) .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) } // 打印:m ``` #### 3.4、single - 只发出可观察序列发出的第一个元素(或满足条件的第一个元素)。如果可观察序列发出多个元素,将抛出一个错误。 ``` private func single() { Observable.of("1", "2", "3", "4") // 1 Unhandled error happened: Sequence contains more than one element. // .single() .single { $0 == "3" } // 3 .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) } ``` #### 3.5、take、takeLast - `take`:从可观察序列的**开头**返回指定数量的连续元素。 - `takeLast`:从可观察序列的**末尾**返回指定数量的连续元素。 ``` private func take() { Observable.of("1", "2", "3", "4") // .take(2) // 打印:1 2 .takeLast(2) // 打印:3 4 .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) } ``` #### 3.6、takeWhile - 只要指定条件为`true`,就从可观察序列中返回元素。 ``` private func takeWhile() { Observable.of(1, 2, 3, 4, 5) .take(while: { $0 < 3 }) .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) } // 打印:1 2 ``` #### 3.7、takeUntil - 返回`源observable序列`中的元素,直到`另一个observable序列`产生一个元素。 - 这个要重点,应用非常频繁:比如页面销毁了,就不能获取值了(cell重用运用) ``` private func takeUntil() { let sourceSequence = PublishSubject() let referenceSequence = PublishSubject() sourceSequence .take(until: referenceSequence) .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) sourceSequence.onNext("Germ") sourceSequence.onNext("TAO") sourceSequence.onNext("zzz") referenceSequence.onNext("---") // 条件一出来,下面就不走了 sourceSequence.onNext("sss") sourceSequence.onNext("ccc") sourceSequence.onNext("xxx") } // 打印:Germ TAO zzz ``` #### 3.8、skip、skipWhile - `skip`:绕过可观察序列中指定数量的元素,然后返回剩余的元素。 - `skip(while:)`:绕过可观察序列中指定条件为`true`的元素,然后返回剩余的元素。 ``` private func skip() { Observable.of(1, 2, 3, 4, 5, 6) // .skip(3) .skip(while: { $0 < 4 }) .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) } // 打印:4 5 6 ``` #### 3.9、skipUntil - 返回`源 observable 序列`中的元素在`另一个 observable 序列`产生一个元素后发出。 ``` private func skipUntil() { let sourceSequence = PublishSubject() let referenceSequence = PublishSubject() sourceSequence .skip(until: referenceSequence) .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) // 没有条件命令,下面走不了 sourceSequence.onNext("Germ") sourceSequence.onNext("TAO") sourceSequence.onNext("zzz") referenceSequence.onNext("---") // 条件一出来,下面就可以走了 sourceSequence.onNext("sss") sourceSequence.onNext("ccc") sourceSequence.onNext("xxx") } // 打印:sss ccc xxx ``` ### 4、集合控制操作符 #### 4.1、toArray - 将 `Observable` 转换为 `Single`,将整个序列作为单个数组发出,然后终止。 ``` private func toArray() { Observable.range(start: 1, count: 10) .toArray() .subscribe { print($0) } .disposed(by: disposeBag) } // 打印:success([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) ``` #### 4.2、reduce - 从一个设置的初始化值开始,然后对一个可观察序列发出的所有元素应用累加器闭包,并以单个元素可观察序列的形式返回聚合结果 - 类似`scan`。 ``` private func reduce() { Observable.of(10, 100, 1000) .reduce(1, accumulator: +) // 1 + 10 + 100 + 1000 .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) } // 打印:1111 ``` #### 4.3、concat - 连接所有内部可观察序列,只要之前的 `observable` 序列成功终止。 ``` private func concat() { let subject1 = BehaviorSubject(value: "TAO") let subject2 = BehaviorSubject(value: "1") let subject = BehaviorSubject(value: subject1) subject.asObservable() .concat() .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) subject1.onNext("Germ") subject1.onNext("xxx") // TAO Germ xxx subject.onNext(subject2) subject2.onNext("不打印") subject2.onNext("zzz") // zzz 覆盖 不打印 subject1.onCompleted() // 必须要等subject1 完成了才能订阅到! 用来控制顺序,网络数据的异步 subject2.onNext("aaa") } // 打印:TAO Germ xxx zzz aaa ``` ### 5、从可观察对象的错误通知中恢复的操作符 #### 5.1、catchAndReturn - 从错误事件中恢复,方法是返回一个可观察到的序列,该序列发出单个元素,然后终止。 ``` private func catchAndReturn() { let failSubject = PublishSubject() failSubject .catchAndReturn("TAO") .subscribe { print($0) } .disposed(by: disposeBag) failSubject.onNext("Germ") failSubject.onNext("xxx") // 正常序列发送成功的 // 发送失败的序列,一旦订阅到,返回我们之前设定的错误的预案 failSubject.onError(NSError(domain: "网络错误", code: 10086, userInfo: nil)) } ``` #### 5.2、catch - 继续一个 `observable` 序列,该序列因处理程序生成的 `observable` 序列出现错误而终止。 ``` private func catchError() { let subject = PublishSubject() subject .catch { print("error: \($0)") // 获取到了错误序列,我们在中间的闭包操作处理完毕,返回给用户需要的序列(showAlert) return subject } .subscribe { print($0) } .disposed(by: disposeBag) subject.onNext("aaa") subject.onNext("bbb") // 正常序列发送成功的 // 发送失败的序列 subject.onError(NSError(domain: "网络错误", code: 10086, userInfo: nil)) subject.onNext("ccc") } // 打印: next(aaa) next(bbb) error: Error Domain=网络错误 Code=10086 "(null)" error(Error Domain=网络错误 Code=10086 "(null)") ``` #### 5.3、retry - `retry`、`retry(_ maxAttemptCount: Int)`:重复源 `observable` 序列指定的次数,以防出现错误或直到它成功终止。 ``` private func retry() { var count = 1 // 外界变量控制流程 let observable = Observable.create { observer in observer.onNext("Germ") observer.onNext("TAO") observer.onNext("AAA") if count == 1 { // 流程进来之后就会过度-这里的条件可以作为出口,失败的次数 // 接收到错误序列,重试 observer.onError(NSError(domain: "错误", code: 10086, userInfo: nil)) print("错误序列来了") count += 1 } observer.onNext("aaa") observer.onNext("bbb") observer.onNext("ccc") observer.onCompleted() return Disposables.create() } observable .retry() .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) } ``` ### 6、Rx流程操作符 #### 6.1、debug - 打印所有订阅、事件和处理。 ``` private func debug() { var count = 1 let observable = Observable.create { observer in observer.onNext("AAA") observer.onNext("BBB") observer.onNext("CCC") if count < 5 { observer.onError(NSError(domain: "错误", code: -1, userInfo: nil)) print("错误序列发送") count += 1 } observer.onNext("aaa") observer.onNext("bbb") observer.onNext("ccc") observer.onCompleted() return Disposables.create() } observable .retry(3) .debug() .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) } // 打印: 2021-07-27 17:10:46.916: ViewController.swift:687 (debug()) -> subscribed 2021-07-27 17:10:47.048: ViewController.swift:687 (debug()) -> Event next(AAA) AAA 2021-07-27 17:10:47.048: ViewController.swift:687 (debug()) -> Event next(BBB) BBB 2021-07-27 17:10:47.048: ViewController.swift:687 (debug()) -> Event next(CCC) CCC 错误序列发送 2021-07-27 17:10:47.048: ViewController.swift:687 (debug()) -> Event next(AAA) AAA 2021-07-27 17:10:47.048: ViewController.swift:687 (debug()) -> Event next(BBB) BBB 2021-07-27 17:10:47.048: ViewController.swift:687 (debug()) -> Event next(CCC) CCC 错误序列发送 2021-07-27 17:10:47.048: ViewController.swift:687 (debug()) -> Event next(AAA) AAA 2021-07-27 17:10:47.048: ViewController.swift:687 (debug()) -> Event next(BBB) BBB 2021-07-27 17:10:47.048: ViewController.swift:687 (debug()) -> Event next(CCC) CCC 错误序列发送 2021-07-27 17:10:47.048: ViewController.swift:687 (debug()) -> Event error(Error Domain=错误 Code=-1 "(null)") Unhandled error happened: Error Domain=错误 Code=-1 "(null)" 2021-07-27 17:10:47.048: ViewController.swift:687 (debug()) -> isDisposed ``` #### 6.2、 RxSwift.Resources.total - 提供所有`Rx`资源分配的计数,这对于在开发期间检测泄漏非常有用。 > 首先,Podfile文件: ``` post_install do |installer| installer.pods_project.targets.each do |target| if target.name == 'RxSwift' target.build_configurations.each do |config| if config.name == 'Debug' config.build_settings['OTHER_SWIFT_FLAGS'] ||= ['-D', 'TRACE_RESOURCES'] end end end end end ``` ``` private func total() { print(RxSwift.Resources.total) let subject = BehaviorSubject(value: "Germ") let subscription1 = subject.subscribe(onNext: { print($0) }) print(RxSwift.Resources.total) let subscription2 = subject.subscribe(onNext: { print($0) }) print(RxSwift.Resources.total) subscription1.dispose() print(RxSwift.Resources.total) subscription2.dispose() print(RxSwift.Resources.total) } ``` ### 7、链接操作符 #### 7.1、publish - 返回一个可连接的可观察序列,该序列共享对基础序列的单个订阅。 - 该运算符是使用`PublishSubject`的`multicast`的特化。 ``` private func publish() { let netObservable = Observable.create { observer in sleep(2) // 模拟网络延迟 print("开始网络请求") observer.onNext("请求到的网络数据") observer.onNext("请求到的本地数据") observer.onCompleted() return Disposables.create { print("销毁回调了") } }.publish() netObservable .subscribe(onNext: { print("订阅-1:\($0)") }) .disposed(by: disposeBag) // 有时候不止一次网络订阅,因为数据可能用在不同的地方 // 所以再订阅一次,会出现什么问题? // netObservable // .subscribe(onNext: { print("订阅-2:\($0)") }) // .disposed(by: disposeBag) _ = netObservable.connect() } ``` - 底层逻辑:中间变量`ConnectableObservableAdapter`保存了`源序列source`、`中间序列makeSubject`。 - 订阅流程:`self.lazySubject.subscribe(observer)`一个懒加载序列,保证了中间变量`ConnectableObservableAdapter`每一次都是同一个响应序列,剩下就是`PublishSubject`的订阅。 - 等待源序列的响应,源序列的订阅是在`connect`函数里面!如果没有调用`connect`函数,意味着就永远不会发送响应。背后的逻辑就是,前面所有的发送响应在`connect`函数之前的都没有任何的意义! - 以上也就说明了`publish`就是**状态共享**的:`connnect`一次序列发送一次响应(响应所有订阅)。 #### 7.2、replay - 将源可观察序列转换为可连接的序列,并将向每个新订阅服务器重放以前存放的缓冲大小。 - 首先拥有和`publish`一样的能力,`共享 Observable sequence`, 其次使用`replay`还需要传入一个参数(`buffer size)`来缓存已发送的事件,当有新的订阅者订阅了,会把缓存的事件发送给新的订阅者。 ``` private func replay() { let interval = Observable.interval(.seconds(1), scheduler: MainScheduler.instance).replay(5) interval .subscribe(onNext: { print(Date.currentTime, "订阅-1, 事件: \($0)") }) .disposed(by: disposeBag) delay(2) { _ = interval.connect() } delay(4) { interval .subscribe(onNext: { print(Date.currentTime, "订阅-2, 事件: \($0)") }) .disposed(by: self.disposeBag) } delay(8) { interval .subscribe(onNext: { print(Date.currentTime, "订阅-3, 事件: \($0)") }) .disposed(by: self.disposeBag) } delay(20) { self.disposeBag = DisposeBag() } } ```