【美团技术团队博客】RACSignal的Subscription深入分析

ReactiveCocoa是一个FRP的思想在Objective-C中的实现框架,目前在美团的项目中被广泛使用。对于ReactiveCocoa的基本用法,网上有很多相关的资料,本文不再讨论。RACSignal是ReactiveCocoa中一个非常重要的概念,而本文主要关注RACSignal的实现原理。在阅读之前,你需要基本掌握RACSignal的基本用法

本文主要包含2个部分,前半部分主要分析RACSignal的subscription过程,后半部分是对前半部分的深入,在subscription过程的基础上分析ReactiveCocoa中比较难理解的两个操作:multicast && replay。 PS:为了解释清楚,我们下面只讨论next,不讨论error以及completed,这二者与next类似。本文基于ReactiveCocoa 2.x版本。 我们先刨析RACSignal的subscription过程

RACSignal的常见用法

-(RACSignal *)signInSignal {
// part 1:[RACSignal createSignal]来获得signal
  return [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
    [self.signInService
     signInWithUsername:self.usernameTextField.text
     password:self.passwordTextField.text
     complete:^(BOOL success) {

    // part 3: 进入didSubscribe,通过[subscriber sendNext:]来执行next block
       [subscriber sendNext:@(success)];
       [subscriber sendCompleted];
     }];
    return nil;
  }];
}

// part 2 : [signal subscribeNext:]来获得subscriber,然后进行subscription
[[self signInSignal] subscribeNext:^(id x) { 
    NSLog(@"Sign in result: %@", x); 
}];

Subscription过程概括

RACSignal的Subscription过程概括起来可以分为三个步骤:

  1. [RACSignal createSignal]来获得signal
  2. [signal subscribeNext:]来获得subscriber,然后进行subscription
  3. 进入didSubscribe,通过[subscriber sendNext:]来执行next block

步骤一:[RACSignal createSignal]来获得signal

RACSignal.m中:
+ ( RACSignal *)createSignal:( RACDisposable * (^)( id < RACSubscriber > subscriber))didSubscribe {
  return [ RACDynamicSignal   createSignal :didSubscribe];
}
RACDynamicSignal.m中
+ ( RACSignal *)createSignal:( RACDisposable * (^)( id < RACSubscriber > subscriber))didSubscribe {
  RACDynamicSignal *signal = [[ self   alloc ] init ];
 signal-> _didSubscribe = [didSubscribe copy ];
  return [signal setNameWithFormat : @"+createSignal:" ];
}

[RACSignal createSignal]会调用子类RACDynamicSignal的createSignal来返回一个signal,并在signal中保存后面的 didSubscribe这个block

步骤二:[signal subscribeNext:]来获得subscriber,然后进行subscription

RACSignal.m中:
- ( RACDisposable *)subscribeNext:( void (^)( id x))nextBlock {
  RACSubscriber *o = [ RACSubscriber   subscriberWithNext :nextBlock error : NULL   completed : NULL ];
  return [ self  subscribe :o];
}
RACSubscriber.m中:

+ ( instancetype )subscriberWithNext:( void (^)( id x))next error:( void (^)( NSError *error))error completed:( void (^)( void ))completed {
  RACSubscriber *subscriber = [[ self   alloc ] init ];
 subscriber-> _next = [next copy ];
 subscriber-> _error = [error copy ];
 subscriber-> _completed = [completed copy ];
  return subscriber;
}
RACDynamicSignal.m中:
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
    RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
    subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
    if (self.didSubscribe != NULL) {
        RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
            RACDisposable *innerDisposable = self.didSubscribe(subscriber);
            [disposable addDisposable:innerDisposable];
        }];
        [disposable addDisposable:schedulingDisposable];
    }
    return disposable;
}
  1. [signal subscribeNext]先会获得一个subscriber,这个subscriber中保存了nextBlock、errorBlock、completedBlock
  2. 由于这个signal其实是RACDynamicSignal类型的,这个[self subscribe]方法会调用步骤一中保存的didSubscribe,参数就是1中的subscriber

步骤三:进入didSubscribe,通过[subscriber sendNext:]来执行next block

RACSubscriber.m中:
- (void)sendNext:(id)value {
    @synchronized (self) {
        void (^nextBlock)(id) = [self.next copy];
        if (nextBlock == nil) return;
        nextBlock(value);
    }
}

任何时候这个[subscriber sendNext:],就直接调用nextBlock

signal的subscription过程回顾

从上面的三个步骤,我们看出:

  • 先通过createSignal和subscribeNext这两个调用,声明了流中value到来时的处理方式
  • didSubscribe block块中异步处理完毕之后,subscriber进行sendNext,自动处理

搞清楚了RAC的subscription过程,接着在此基础上我们讨论一个RACSignal中比较容易混淆的两个操作:multicast和replay。

为什么要清楚这两者的原理

RACSignal+Operation.h中
- (RACMulticastConnection *)publish;

- (RACMulticastConnection *)multicast:(RACSubject *)subject;

- (RACSignal *)replay;

- (RACSignal *)replayLast;

- (RACSignal *)replayLazily;
  • 在RACSignal+Operation.h中,连续定义了5个跟我们这个主题有关的RACSignal的操作,这几个操作的区别很细微,但用错的话很容易出问题。只有理解了原理之后,才明白它们之间的细微区别
  • 很多时候我们意识不到需要用这些操作,这就可能因为side effects执行多次而导致程序bug

multicast && replay的应用场景

"Side effects occur for each subscription by default, but there are certain situations where side effects should only occur once – for example, a network request typically should not be repeated when a new subscriber is added."

// 引用ReactiveCocoa源码的Documentation目录下的一个例子
// This signal starts a new request on each subscription.
RACSignal *networkRequest = [RACSignal createSignal:^(id<RACSubscriber> subscriber) {
    AFHTTPRequestOperation *operation = [client
        HTTPRequestOperationWithRequest:request
        success:^(AFHTTPRequestOperation *operation, id response) {
            [subscriber sendNext:response];
            [subscriber sendCompleted];
        }
        failure:^(AFHTTPRequestOperation *operation, NSError *error) {
            [subscriber sendError:error];
        }];

    [client enqueueHTTPRequestOperation:operation];
    return [RACDisposable disposableWithBlock:^{
        [operation cancel];
    }];
}];

// Starts a single request, no matter how many subscriptions `connection.signal`
// gets. This is equivalent to the -replay operator, or similar to
// +startEagerlyWithScheduler:block:.
RACMulticastConnection *connection = [networkRequest multicast:[RACReplaySubject subject]];
[connection connect];

[connection.signal subscribeNext:^(id response) {
    NSLog(@"subscriber one: %@", response);
}];

[connection.signal subscribeNext:^(id response) {
    NSLog(@"subscriber two: %@", response);
}];
  1. 在上面的例子中,如果我们不用RACMulticastConnection的话,那就会因为执行了两次subscription而导致发了两次网络请求。
  2. 从上面的例子中,我们可以看到对一个Signal进行multicast之后,我们是对connection.signal进行subscription而不是原来的networkRequest。这点是"side effects should only occur once"的关键,我们将在后面解释

multicast原理分析

replay是multicast的一个特殊case而已,而multicast的整个过程可以拆分成两个步骤,下面进行详细讨论

multicast的机制Part 1:

RACMulticastConnection.m中:
- (id)initWithSourceSignal:(RACSignal *)source subject:(RACSubject *)subject {
    NSCParameterAssert(source != nil);
    NSCParameterAssert(subject != nil);
    self = [super init];
    if (self == nil) return nil;
    _sourceSignal = source;
    _serialDisposable = [[RACSerialDisposable alloc] init];
    _signal = subject;

    return self;
}
  • 结合上面的例子来看,RACMulticastConnection的init是以networkRequest作为sourceSignal,而最终connnection.signal指的是[RACReplaySubject subject]
RACMulticastConnection.m中:
- (RACDisposable *)connect {
    BOOL shouldConnect = OSAtomicCompareAndSwap32Barrier(0, 1, &_hasConnected);
    if (shouldConnect) {
        self.serialDisposable.disposable = [self.sourceSignal subscribe:_signal];
    }
    return self.serialDisposable;
}
  • 结合上面的RACSignal分析的Subscription过程,[self.sourceSignal subscribe:_signal]会执行self.sourceSignal的didSubscribe这个block。再结合上面的例子,也就是说会把_signal作为subscriber,发网络请求,success的时候,_signal会sendNext,这里的这个signal就是[RACReplaySubject subject]。可以看出,一旦进入到这个didSubscribe中,后续的不管是sendNext还是subscription,都是对这个[RACReplaySubject subject]进行的,与原来的sourceSignal彻底无关了。这就解释了为什么"side effects only occur once"。

multicast的机制Part 2:

在进行multicast的步骤二之前,需要介绍一下RACSubject以及RACReplaySubject

---------------------恼人的分隔线 start------------------

RACSubject

"A subject can be thought of as a signal that you can manually control by sending next, completed, and error."

RACSubject的一个用法如下:

RACSubject *letters = [RACSubject subject];
// Outputs: A B
[letters subscribeNext:^(id x) {
    NSLog(@"%@ ", x);
}];
[letters sendNext:@"A"];
[letters sendNext:@"B"];

接下来分析RACSubject的原理

RACSubject.m中:
- (id)init {
    self = [super init];
    if (self == nil) return nil;
    _disposable = [RACCompoundDisposable compoundDisposable];
    _subscribers = [[NSMutableArray alloc] initWithCapacity:1];    

    return self;
}
  • RACSubject中有一个subscribers数组
RACSubject.m中:
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
    NSCParameterAssert(subscriber != nil);
    RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
    subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
    NSMutableArray *subscribers = self.subscribers;
    @synchronized (subscribers) {
        [subscribers addObject:subscriber];
    }
    return [RACDisposable disposableWithBlock:^{
        @synchronized (subscribers) {
            // Since newer subscribers are generally shorter-lived, search
            // starting from the end of the list.
            NSUInteger index = [subscribers indexOfObjectWithOptions:NSEnumerationReverse passingTest:^ BOOL (id<RACSubscriber> obj, NSUInteger index, BOOL *stop) {
                return obj == subscriber;
            }];
            if (index != NSNotFound) [subscribers removeObjectAtIndex:index];
        }
    }];
}
  • 从subscribe:的实现可以看出,对RACSubject对象的每次subscription,都是将这个subscriber加到subscribers数组中而已
RACSubject.m中:
- (void)sendNext:(id)value {
    [self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
        [subscriber sendNext:value];
    }];
}
  • 从sendNext:的实现可以看出,每次RACSubject对象sendNext,都会对其中保留的subscribers进行sendNext,如果这个subscriber是RACSignal的话,就会执行Signal的next block。

RACReplaySubject

"A replay subject saves the values it is sent (up to its defined capacity) and resends those to new subscribers.",可以看出,replaySubject是可以对它send next(error,completed)的东西进行buffer的。 RACReplaySubject是继承自RACSubject的,它的内部的实现例如subscribe:、sendNext:的实现也会调用super的实现

RACReplaySubject.m中:
- (instancetype)initWithCapacity:(NSUInteger)capacity {
    self = [super init];
    if (self == nil) return nil;

    _capacity = capacity;
    _valuesReceived = (capacity == RACReplaySubjectUnlimitedCapacity ? [NSMutableArray array] : [NSMutableArray arrayWithCapacity:capacity]);

    return self;
}
  • 从init中我们看出,RACReplaySubject对象持有capacity变量(用于决定valuesReceived缓存多少个sendNext:出来的value,这在区分replay和replayLast的时候特别有用)以及valuesReceived数组(用来保存sendNext:出来的value),这二者接下来会重点涉及到
RACReplaySubject.m中:
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
    RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
    RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
        @synchronized (self) {
            for (id value in self.valuesReceived) {
                if (compoundDisposable.disposed) return;
                [subscriber sendNext:(value == RACTupleNil.tupleNil ? nil : value)];
            }
            if (compoundDisposable.disposed) return;
            if (self.hasCompleted) {
                [subscriber sendCompleted];
            } else if (self.hasError) {
                [subscriber sendError:self.error];
            } else {
                RACDisposable *subscriptionDisposable = [super subscribe:subscriber];
                [compoundDisposable addDisposable:subscriptionDisposable];
            }
        }
    }];
    [compoundDisposable addDisposable:schedulingDisposable];
    return compoundDisposable;
}
  • 从subscribe:可以看出,RACReplaySubject对象每次subscription,都会把之前valuesReceived中buffer的value重新sendNext一遍,然后调用super把当前的subscriber加入到subscribers数组中
RACReplaySubject.m中:
- (void)sendNext:(id)value {
    @synchronized (self) {
        [self.valuesReceived addObject:value ?: RACTupleNil.tupleNil];
        [super sendNext:value];
        if (self.capacity != RACReplaySubjectUnlimitedCapacity && self.valuesReceived.count > self.capacity) {
            [self.valuesReceived removeObjectsInRange:NSMakeRange(0, self.valuesReceived.count - self.capacity)];
        }
    }
}

从sendNext:可以看出,RACReplaySubject对象会buffer每次sendNext的value,然后会调用super,对subscribers中的每个subscriber,调用sendNext。buffer的数量是根据self.capacity来决定的

---------------------恼人的分隔线 end------------------

介绍完了RACReplaySubject之后,我们继续进行multicast的part 2部分。 在上面的例子中,我们对connection.signal进行了两次subscription,结合上面的RACReplaySubject的subscription的subscribe:,我们得到以下过程:

  1. [RACReplaySubject subject]会将这两次subscription过程中的subscriber都保存在subscribers数组中
  2. 当网络请求success后,会[subscriber sendNext:response],前面已经讲过这个subscriber就是[RACReplaySubject subject],这样,就会把sendNext:的value保存在valuesReceived数组中,供后续subscription使用(不知道你是否注意到RACReplaySubject的subscribe:中有个for循环),然后对subscribers中保存的每个subscriber执行sendNext。

后续思考

  1. 上面讨论的是RACReplaySubject对象先进行subscription,再进行sendNext,如果是先sendNext,再subscription呢?其实魅力就在于RACReplaySubject的subscribe:中的for循环。具体过程留作思考
  2. 在RACSignal+Operation中关于multicast && replay的,一共有5个操作:publish、multicast、replay、replayLast、replayLazily,他们之间有什么细微的差别呢?相信在我上面内容的基础上,他们之间的细微差别不难理解,这里推荐一篇帮助大家理解的blog

参考资料

ReactiveCocoa github主页 ReactiveCocoa Documentation ReactiveCocoa raywenderlich上的资料

原文发布于微信公众号 - 美团点评技术团队(meituantech)

原文发表时间:2015-07-16

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏HansBug's Lab

1029: [JSOI2007]建筑抢修

1029: [JSOI2007]建筑抢修 Time Limit: 4 Sec  Memory Limit: 162 MB Submit: 2382  Solve...

2836
来自专栏大数据挖掘DT机器学习

Python抓取上海各地区房价平均值

作者:司开星 http://blog.csdn.net/chroming/article/details/46471155 用Python写了一个抓取上海地区...

4485
来自专栏大数据文摘

陆金所融资12亿美元的PPT长什么样?

42312
来自专栏域名资讯

蚂蜂窝D轮融资1.33亿美元,域名保护到位

三拼域名虽然字符较长,但对于国内用户来说不难记忆。目前不少终端启用三拼域名,例如人人车(renrenche.com)、聚美丽(jumeili.cn...

2020
来自专栏王磊的博客

Express调用mssql驱动公共类dbHelper

直接上代码: /** * Created by chaozhou on 2015/9/18. */ var mssql = require('mssql')...

4047
来自专栏点滴科技资讯

金融科技—数字技术正颠覆金融世界

金融科技(FinTech)正改变金融业的发展格局,并对金融业产生颠覆性影响。本信息图对全球金融科技发展格局进行了深入分析,值得一看! ? ? ? ? ? ?...

3027
来自专栏智能计算时代

Earshot Builds with Watson APIs to Enhance its Marketing

Social Media IS Big Data. On any given day more than 500 million tweets and 55 m...

2746
来自专栏域名资讯

亿欧网启用i+系列域名完成B1轮融资

中国500强企业沂州集团旗下的沂景投资已于2017年7月完成了对产业创新服务平台亿欧公司B1轮融资。 

440
来自专栏CDA数据分析师

淘宝重口味大数据:十二星座男到底哪个不能嫁?

临近年关,各种大数据报告都出现了。今天,淘宝也玩了一出大数据,汇总了10条不能嫁的“汉子”。 具体来说,江苏狮子座的男子最败家,因为他们经常把钱花在艺术品和烟...

2058
来自专栏数据结构与算法

洛谷P4015 运输问题(费用流)

题目描述 WW 公司有 mm 个仓库和 nn 个零售商店。第 ii 个仓库有 a_iai​ 个单位的货物;第 jj 个零售商店需要 b_jbj​ 个单位的货物。...

3519

扫码关注云+社区

领取腾讯云代金券