Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Want to buffer a Observable until another fires, then remove buffer and fire normally with one subscription

I'm implementing an analytics service into my Angular app. I create and append the 3rd party script in the service's constructor. There can be a race condition where other services try to fire telemetry events before the script is loaded. I wanted to create a buffer on the telemetry methods to hold messages until the script loads and then to flush the buffer and then push as normal after that.

Pseudo code:

// Global variable set by 3rd party library after it loads
declare let analytics: any;

class AnalyticsService {
  isLoaded$ = new BehaviorSubject<boolean>(false);
  identify$ = new BehaviorSubject<user>(null);

  constructor() {
    this.loadScript();

    // Here, I want to buffer messages until the 3rd party script initializes
    // Once isLoaded$ fires, forEach the queue and pass to the 3rd party library
    // Then for every message after, send one at a time as normal like 
    // the buffer wasn't there
    this.identify$
      .pipe(buffer(this.isLoaded$.pipe(skip(1)) // Essentially want to remove this after it fires
      .subscribe((user) => analytics.identify(user));
  }

  loadScript(): void {
    const script = document.createElement('script');
    script.innerHTML = `
      // setup function from 3rd party
    `;

    document.querySelector('head').appendChild(script);

    interval(1000)
      .pipe(take(5), takeUntil(this.isLoaded$))
      .subscribe(_ => {
        if (analytics) this.isLoaded$.next(true);
      })
  }

  identify(user): void {
    this.identify$.next(user);
  }
}

If I were to use two subscriptions, it would be like

identify$
  .pipe(
    takeUntil(this.isLoaded$),
    buffer(this.isLoaded$),
  ).subscribe(events => events.forEach(user => analytics.identify(user)));

identify$
  .pipe(
    filter(_ => this.isLoaded$.value),
  ).subscribe(user => analytics.identify(user))

Is there a way to do this with one subscription?

like image 774
Dan Avatar asked Sep 13 '25 13:09

Dan


1 Answers

Here might be a way to achieve what you're looking for:

constructor () {
  this.loadScript();

  this.identify$
    .pipe(
      buffer(
        concat(
          this.isLoaded$.pipe(skip(1), take(1)),
          this.identify.pipe(skip(1)),
        )
      )
    )
    .subscribe((user) => analytics.identify(user));
}

The gist resides in

concat(
  this.isLoaded$.pipe(skip(1), take(1)),
  this.identify$.pipe(skip(1)),
)

So, we first wait for isLoaded$ to emit true, then this.identify.pipe(skip(1)) will be subscribed.

After the script is loaded, you want to proceed immediately when this.identify$ emits. And here is why we subscribe to it once more, from the buffer's closing notifier. Basically, this.identify$ will have 2 subscribers now. The first one is subscribe((user) => analytics.identify(user)) and the second one is the one from concat(which is buffer's closing notifier). When this.identify$ emits, the value will be sent to its subscribers in order. So, the value will end up being added into the buffer and then immediately passed along to the next subscriber in the chain, because this.identify$'s second subscriber will receive the value synchronously.

like image 188
Andrei Gătej Avatar answered Sep 17 '25 06:09

Andrei Gătej