Resetting Replaysubject In Rxjs 6
Solution 1:
If you want to be able to reset a subject without having its subscribers explicitly unsubscribe and resubscribe, you could do something like this:
import { Observable, Subject } from"rxjs";
import { startWith, switchMap } from"rxjs/operators";
function resettable<T>(factory: () =>Subject<T>): {
observable: Observable<T>,
reset(): void,
subject: Subject<T>
} {
const resetter = newSubject<any>();
const source = newSubject<T>();
let destination = factory();
let subscription = source.subscribe(destination);
return {
observable: resetter.asObservable().pipe(
startWith(null),
switchMap(() => destination)
),
reset: () => {
subscription.unsubscribe();
destination = factory();
subscription = source.subscribe(destination);
resetter.next();
},
subject: source
};
}
resettable
will return an object containing:
- an
observable
to which subscribers to the re-settable subject should subscribe; - a
subject
upon which you'd callnext
,error
orcomplete
; and - a
reset
function that will reset the (inner) subject.
You'd use it like this:
import { ReplaySubject } from"rxjs";
const { observable, reset, subject } = resettable(() =>newReplaySubject(3));
observable.subscribe(value =>console.log(`a${value}`)); // a1, a2, a3, a4, a5, a6
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
observable.subscribe(value =>console.log(`b${value}`)); // b2, b3, b4, b5, b6reset();
observable.subscribe(value =>console.log(`c${value}`)); // c5, c6
subject.next(5);
subject.next(6);
Solution 2:
The problem becomes easier if you can use the fact that the buffer consumes data from the original source, and that subscribers to buffered data can switch to the original source after receiving all the old values.
Eg.
let data$ = newSubject<any>() // Data sourcelet buffer$ = newReplaySubject<any>()
let bs = data$.subscribe(buffer$) // Buffer subscribes to data// Observable that returns values until nearest resetletgetRepeater = () => {
returnconcat(buffer$.pipe(
takeUntil(data$), // Switch from buffer to original source when data comes in
), data$)
}
To clear, replace the buffer
// Begin Buffer Clear Sequence
bs.unsubscribe()
buffer$.complete()
buffer$ = newReplaySubject()
bs = data$.subscribe(buffer$)
buffObs.next(buffer$)
To make the code more functional, you can replace the function getRepeater() with a subject that reflects the latest reference
let buffObs = newReplaySubject<ReplaySubject<any>>(1)
buffObs.next(buffer$)
let repeater$ = concat(buffObs.pipe(
takeUntil(data$),
switchMap((e) => e),
), data$)
The following
let data$ = newSubject<any>()
let buffer$ = newReplaySubject<any>()
let bs = data$.subscribe(buffer$)
let buffObs = newReplaySubject<ReplaySubject<any>>(1)
buffObs.next(buffer$)
let repeater$ = concat(buffObs.pipe(
takeUntil(data$),
switchMap((e) => e),
), data$)
// Begin Test
data$.next(1)
data$.next(2)
data$.next(3)
console.log('rep1 sub')
let r1 = repeater$.subscribe((e) => {
console.log('rep1 ' + e)
})
// Begin Buffer Clear Sequence
bs.unsubscribe()
buffer$.complete()
buffer$ = newReplaySubject()
bs = data$.subscribe(buffer$)
buffObs.next(buffer$)
// End Buffer Clear Sequenceconsole.log('rep2 sub')
let r2 = repeater$.subscribe((e) => {
console.log('rep2 ' + e)
})
data$.next(4)
data$.next(5)
data$.next(6)
r1.unsubscribe()
r2.unsubscribe()
data$.next(7)
data$.next(8)
data$.next(9)
console.log('rep3 sub')
let r3 = repeater$.subscribe((e) => {
console.log('rep3 ' + e)
})
Outputs
rep1 sub
rep1 1
rep1 2
rep1 3
rep2 sub
rep1 4
rep2 4
rep1 5
rep2 5
rep1 6
rep2 6
rep3 sub
rep3 4
rep3 5
rep3 6
rep3 7
rep3 8
rep3 9
Solution 3:
Here is a class that is using the resettable factory posted here before, so you can use
const myReplaySubject = new ResettableReplaySubject<myType>()
import { ReplaySubject, Subject, Observable, SchedulerLike } from"rxjs";
import { startWith, switchMap } from"rxjs/operators";
exportclassResettableReplaySubject<T> extendsReplaySubject<T> {
reset: () =>void;
constructor(bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike) {
super(bufferSize, windowTime, scheduler);
const resetable = this.resettable(() =>newReplaySubject<T>(bufferSize, windowTime, scheduler));
Object.keys(resetable.subject).forEach(key => {
this[key] = resetable.subject[key];
})
Object.keys(resetable.observable).forEach(key => {
this[key] = resetable.observable[key];
})
this.reset = resetable.reset;
}
private resettable<T>(factory: () =>Subject<T>): {
observable: Observable<T>,
reset(): void,
subject: Subject<T>,
} {
const resetter = newSubject<any>();
const source = newSubject<T>();
let destination = factory();
let subscription = source.subscribe(destination);
return {
observable: resetter.asObservable().pipe(
startWith(null),
switchMap(() => destination)
) asObservable<T>,
reset: () => {
subscription.unsubscribe();
destination = factory();
subscription = source.subscribe(destination);
resetter.next();
},
subject: source,
};
}
}
Solution 4:
I had kind of same problem: One of my components subscribed to an ReplaySubject of a shared service. Once navigated away and coming back the former values where still delivered to the component. Just completing the subject was not enough.
The solutions above seemed to complicated for this purpose but I found another real simple solution in just completing the subject and assigning a newly created one in the shared service like so:
constructor() {
this.selectedFeatures = new ReplaySubject()
this.selectedFeaturesObservable$ = this.selectedFeatures.asObservable()
}
completeSelectedFeatures() {
this.selectedFeatures.complete()
this.selectedFeatures = new ReplaySubject()
this.selectedFeaturesObservable$ = this.selectedFeatures.asObservable()
}
I also printed the constructor of the shared service to show the types I used. That way any time I move away from my component I just call that method on my shared service and hence get a new fresh and empty ReplaySubject anytime I navigate back to my component thats consuming the shared services observable. I call that method inside ngOnDestroy Angular lifecycle hook:
ngOnDestroy() {
console.log('unsubscribe')
this.featureSub.unsubscribe()
this.sharedDataService.completeSelectedFeatures()
}
Post a Comment for "Resetting Replaysubject In Rxjs 6"