Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RACSubject created on dispatch_queue sendComplete doesn't make it to merged signal

When in a dispatch_async block running on the DISPATCH_QUEUE_PRIORITY_DEFAULT gcd queue: I create two RACSubject objects, use RACSignal merge: and then subscribe complete. Then, for the purposes of this test (and to replicate the scenario in my actual code), I send sendComplete on both of them. The merged signal completion subscription never fires. I attached two completion subscriptions to the subjects independently, those do fire. If I do this same test on the main thread instead of the gcd queue then it works as expected.

Is there a way to make this work or am I going to have to refactor to get all of my subjects on the main thread?

#import <ReactiveCocoa/ReactiveCocoa.h>

@interface rac_signal_testTests: SenTestCase
@end

@implementation rac_signal_testTests

- (void)setUp
{
    [super setUp];

    // Set-up code here.
}

- (void)tearDown
{
    // Tear-down code here.

    [super tearDown];
}

-(void)test_merged_subjects_will_complete_on_main_thread{
    RACSubject *subject1 = [[RACSubject subject] setNameWithFormat:@"subject1"];
    RACSubject *subject2 = [[RACSubject subject] setNameWithFormat:@"subject2"];

    RACSignal *merged = [RACSignal merge:@[subject1, subject2]];

    __block BOOL completed_fired = NO;

    [merged subscribeCompleted:^{
        completed_fired = YES;
    }];

    [subject1 sendNext:@"1"];
    [subject2 sendNext:@"2"];

    [subject1 sendCompleted];
    [subject2 sendCompleted];

    STAssertTrue(completed_fired, nil);
}

//test proving that throttling isn't breaking the merged signal (initial hypothesis).
-(void)test_merged_subjects_will_complete_if_one_of_them_has_a_throttled_subscriber_on_main_thread{
    RACSubject *subject1 = [[RACSubject subject] setNameWithFormat:@"subject1"];
    RACSubject *subject2 = [[RACSubject subject] setNameWithFormat:@"subject2"];

    __block NSString * hit_subject2_next = nil;
    [[subject2 throttle:.5] subscribeNext:^(NSString *value){
        hit_subject2_next = value;
    }];

    RACSignal *merged = [RACSignal merge:@[subject1, subject2]];

    __block BOOL completed_fired = NO;

    [merged subscribeCompleted:^{
        completed_fired = YES;
    }];

    [subject2 sendNext:@"2"];
    [subject2 sendCompleted];
    [subject1 sendCompleted];
    STAssertEqualObjects(@"2", hit_subject2_next, nil);
    STAssertTrue(completed_fired, nil);
}

-(void)test_merged_subjects_will_complete_if_on_gcd_queue{
    __block BOOL complete = NO;

    dispatch_queue_t global_default_queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);

    dispatch_async(global_default_queue, ^{
        RACSubject *subject1 = [[RACSubject subject] setNameWithFormat:@"subject1"];
        RACSubject *subject2 = [[RACSubject subject] setNameWithFormat:@"subject2"];

        __block NSString * hit_subject2_next = nil;

        RACScheduler *global_default_scheduler = [RACScheduler schedulerWithQueue:global_default_queue name:@"com.test.global_default"];

        RACSignal *sig1 = [subject1 deliverOn:RACScheduler.mainThreadScheduler];
        RACSignal *sig2 = [subject2 deliverOn:RACScheduler.mainThreadScheduler];

        [sig2    subscribeNext:^(NSString *value){
            hit_subject2_next = value;
        }];

        [sig2 subscribeCompleted:^{
            NSLog(@"hit sig2 complete");
        }];

        [sig1 subscribeCompleted:^{
            NSLog(@"hit sig1 complete");
        }];

        RACSignal *merged = [[RACSignal merge:@[sig1, sig2]] deliverOn:RACScheduler.mainThreadScheduler];

        [merged subscribeCompleted:^{
            complete = YES;
        }];

        [subject2 sendNext:@"2"];
//        if we dispatch the send complete calls to the main queue then this code works but that seems like it shoul be unnecessary.
//        dispatch_async(dispatch_get_main_queue(), ^{
            [subject1 sendCompleted];
            [subject2 sendCompleted];
//        });
    });

    NSDate *startTime = NSDate.date;
    do{
        [NSRunLoop.mainRunLoop runMode:NSDefaultRunLoopMode beforeDate:[NSDate dateWithTimeIntervalSinceNow:.5]];
    }while(!complete && [NSDate.date timeIntervalSinceDate:startTime] <= 10.0);

    STAssertTrue(complete, nil);
}

@end
like image 762
Jon Avatar asked Feb 15 '23 17:02

Jon


1 Answers

So this is a fairly shitty case, caused by the interaction of GCD and RAC. Strictly speaking, there's no bug. But it is surprising and weird. We talk about this requirement in the design guidelines at https://github.com/ReactiveCocoa/ReactiveCocoa/blob/1bd47736f306befab64859602dbdea18f7f9a3f6/Documentation/DesignGuidelines.md#subscription-will-always-occur-on-a-scheduler.

The key is that subscription must always happen on a known scheduler. This is a requirement that RAC enforces internally. If you're just using plain old GCD, there is no known scheduler so RAC has to send the subscription off to a scheduler asynchronously.

So to go to your test:

[merged subscribeCompleted:^{
    complete = YES;
}];

The actual subscription happens asynchronously because there is no known scheduler. The subscription ends up happening after the -sendCompleted calls and it misses them entirely. It's really a race condition, but realistically you're probably never going to see it succeed.

The fix is to use RACSchedulers instead of GCD if possible. If you need to use a specific GCD queue, you can use RACTargetQueueScheduler. For example, a working, simplified version of your test:

-(void)test_merged_subjects_will_complete_if_on_gcd_queue{
    __block BOOL complete = NO;

    dispatch_queue_t global_default_queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);

    RACScheduler *scheduler = [[RACTargetQueueScheduler alloc] initWithName:@"testScheduler" targetQueue:global_default_queue];
    [scheduler schedule:^{
        RACSubject *subject1 = [[RACSubject subject] setNameWithFormat:@"subject1"];
        RACSubject *subject2 = [[RACSubject subject] setNameWithFormat:@"subject2"];

        RACSignal *merged = [RACSignal merge:@[subject1, subject2]];

        [merged subscribeCompleted:^{
            complete = YES;
        }];

        [subject1 sendCompleted];
        [subject2 sendCompleted];
    }];

    NSDate *startTime = NSDate.date;
    do{
        [NSRunLoop.mainRunLoop runMode:NSDefaultRunLoopMode beforeDate:[NSDate dateWithTimeIntervalSinceNow:.5]];
    }while(!complete && [NSDate.date timeIntervalSinceDate:startTime] <= 10.0);

    STAssertTrue(complete, nil);
}

Since the subscription happens from within a scheduler, the subscribeCompleted: is done synchronously, gets the completed events, and everything behaves as you'd expect.

If you don't need to use a specific GCD queue and just want it done on a non-main queue, then do something like:

[[RACScheduler scheduler] schedule:^{
    RACSubject *subject1 = [[RACSubject subject] setNameWithFormat:@"subject1"];
    RACSubject *subject2 = [[RACSubject subject] setNameWithFormat:@"subject2"];

    RACSignal *merged = [RACSignal merge:@[subject1, subject2]];

    [merged subscribeCompleted:^{
        complete = YES;
    }];

    [subject1 sendCompleted];
    [subject2 sendCompleted];
}];

I hope that clarifies what you're seeing. Let me know if I need to re-word something.

like image 135
joshaber Avatar answered Apr 08 '23 13:04

joshaber



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!