I'm implementing an analytics service into my Angular app. I create and append the 3rd party script in the service's constructor. There can be a race condition where other services try to fire telemetry events before the script is loaded. I wanted to create a buffer on the telemetry methods to hold messages until the script loads and then to flush the buffer and then push as normal after that.
Pseudo code:
// Global variable set by 3rd party library after it loads
declare let analytics: any;
class AnalyticsService {
isLoaded$ = new BehaviorSubject<boolean>(false);
identify$ = new BehaviorSubject<user>(null);
constructor() {
this.loadScript();
// Here, I want to buffer messages until the 3rd party script initializes
// Once isLoaded$ fires, forEach the queue and pass to the 3rd party library
// Then for every message after, send one at a time as normal like
// the buffer wasn't there
this.identify$
.pipe(buffer(this.isLoaded$.pipe(skip(1)) // Essentially want to remove this after it fires
.subscribe((user) => analytics.identify(user));
}
loadScript(): void {
const script = document.createElement('script');
script.innerHTML = `
// setup function from 3rd party
`;
document.querySelector('head').appendChild(script);
interval(1000)
.pipe(take(5), takeUntil(this.isLoaded$))
.subscribe(_ => {
if (analytics) this.isLoaded$.next(true);
})
}
identify(user): void {
this.identify$.next(user);
}
}
If I were to use two subscriptions, it would be like
identify$
.pipe(
takeUntil(this.isLoaded$),
buffer(this.isLoaded$),
).subscribe(events => events.forEach(user => analytics.identify(user)));
identify$
.pipe(
filter(_ => this.isLoaded$.value),
).subscribe(user => analytics.identify(user))
Is there a way to do this with one subscription?
Here might be a way to achieve what you're looking for:
constructor () {
this.loadScript();
this.identify$
.pipe(
buffer(
concat(
this.isLoaded$.pipe(skip(1), take(1)),
this.identify.pipe(skip(1)),
)
)
)
.subscribe((user) => analytics.identify(user));
}
The gist resides in
concat(
this.isLoaded$.pipe(skip(1), take(1)),
this.identify$.pipe(skip(1)),
)
So, we first wait for isLoaded$
to emit true
, then this.identify.pipe(skip(1))
will be subscribed.
After the script is loaded, you want to proceed immediately when this.identify$
emits. And here is why we subscribe to it once more, from the buffer
's closing notifier. Basically, this.identify$
will have 2 subscribers now. The first one is subscribe((user) => analytics.identify(user))
and the second one is the one from concat
(which is buffer
's closing notifier). When this.identify$
emits, the value will be sent to its subscribers in order. So, the value will end up being added into the buffer and then immediately passed along to the next subscriber in the chain, because this.identify$
's second subscriber will receive the value synchronously.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With