admin管理员组文章数量:1022944
I have a strange use case where I need to keep track of all previous emitted events.
Thanks to the ReplaySubject, it works perfectly so far. On every new subscriber, this Subject re-emits every previous events.
Now, for a specific scenario, I need to be able to give only the latest published events (a bit like a BehaviorSubject), but keeping the source the same events.
Here is a snippet of what I'm trying to achieve: stackblitz
import { ReplaySubject, BehaviorSubject, from } from "rxjs";
class EventManager {
constructor() {
this.mySubject = new ReplaySubject();
}
publish(value) {
this.mySubject.next(value);
}
fullSubscribe(next, error, plete) {
return this.mySubject.subscribe(next, error, plete);
}
subscribe(next, error, plete) {
return this.mySubject.pipe(/* an operator to get the last one on new subscribe */).subscribe(next, error, plete);
}
}
const myEventManager = new EventManager();
myEventManager.publish("Data 1");
myEventManager.publish("Data 2");
myEventManager.publish("Data 3");
myEventManager.fullSubscribe(v => {
console.log("SUB 1", v);
});
myEventManager.subscribe(v => {
console.log("SUB 2", v);
});
Thank you
I have a strange use case where I need to keep track of all previous emitted events.
Thanks to the ReplaySubject, it works perfectly so far. On every new subscriber, this Subject re-emits every previous events.
Now, for a specific scenario, I need to be able to give only the latest published events (a bit like a BehaviorSubject), but keeping the source the same events.
Here is a snippet of what I'm trying to achieve: stackblitz
import { ReplaySubject, BehaviorSubject, from } from "rxjs";
class EventManager {
constructor() {
this.mySubject = new ReplaySubject();
}
publish(value) {
this.mySubject.next(value);
}
fullSubscribe(next, error, plete) {
return this.mySubject.subscribe(next, error, plete);
}
subscribe(next, error, plete) {
return this.mySubject.pipe(/* an operator to get the last one on new subscribe */).subscribe(next, error, plete);
}
}
const myEventManager = new EventManager();
myEventManager.publish("Data 1");
myEventManager.publish("Data 2");
myEventManager.publish("Data 3");
myEventManager.fullSubscribe(v => {
console.log("SUB 1", v);
});
myEventManager.subscribe(v => {
console.log("SUB 2", v);
});
Thank you
Share Improve this question edited Apr 30, 2021 at 14:54 Patrick Roberts 52.1k10 gold badges117 silver badges163 bronze badges asked Apr 30, 2021 at 14:36 anousssanousss 3576 silver badges14 bronze badges 4-
1
The RxJS#
last()
operator does this – Mrk Sef Commented Apr 30, 2021 at 15:40 -
@MrkSef not exactly.
last()
won't work the way they're requesting, it just outputs the last published event before the observable pletes. OP is requesting that when theReplaySubject
is subscribed to, the "hot" subject immediately emits the last event that was published (likeBehaviorSubject
) rather than all the events that have ever been published to it like default. – Patrick Roberts Commented Apr 30, 2021 at 15:45 -
1
@PatrickRoberts Good point! Okay, use the RxJS#
debounceTime(0)
. I think0
should work here as the replay happens synchronously. If not, he can up the debounce time a little bit. – Mrk Sef Commented Apr 30, 2021 at 17:27 - I thought about debounceTime(0), the problem with that is that if subsequent emissions happen on the same tick, only the last one will be received. – BizzyBob Commented Apr 30, 2021 at 17:28
3 Answers
Reset to default 3If you keep track of the number of events you've published, you could use skip
:
subscribe(next, error?, plete?) {
return this.mySubject.pipe(
skip(this.publishCount - 1)
).subscribe(next, error, plete);
}
Here's a StackBlitz demo.
Instead of forcing a ReplaySubject
to behave like a BehaviorSubject
, you can arrive at ReplaySubject
-like behavior by manipulating a BehaviorSubject
.
import { BehaviorSubject, from, concat } from 'rxjs';
import { scan, shareReplay } from 'rxjs/operators';
class EventManager {
constructor() {
this.mySubject = new BehaviorSubject();
this.allEmittedValues = this.mySubject.pipe(
scan((xs, x) => [...xs, x], []),
shareReplay(1)
);
// Necessary since we need to start accumulating allEmittedValues
// immediately.
this.allEmittedValues.subscribe();
}
dispose() {
// ends all subscriptions
this.mySubject.plete();
}
publish(value) {
this.mySubject.next(value);
}
fullSubscribe(next, error, plete) {
// First, take the latest value of the accumulated array of emits and
// unroll it into an observable
const existingEmits$ = this.allEmittedValues.pipe(
take(1),
concatMap((emits) => from(emits))
);
// Then, subscribe to the main subject, skipping the replayed value since
// we just got it at the tail end of existingEmits$
const futureEmits$ = this.mySubject.pipe(skip(1));
return concat(existingEmits$, futureEmits$).subscribe(
next,
error,
plete
);
}
subscribe(next, error, plete) {
return this.mySubject.subscribe(next, error, plete);
}
}
Why not just have an instance of ReplaySubject
and BehaviorSubject
on EventManager
?
import { ReplaySubject, BehaviorSubject, from } from "rxjs";
class EventManager {
constructor() {
this.replaySubject = new ReplaySubject();
this.behaviorSubject = new BehaviorSubject();
}
publish(value) {
this.replaySubject.next(value);
this.behaviorSubject.next(value);
}
fullSubscribe(next, error, plete) {
return this.replaySubject.subscribe(next, error, plete);
}
subscribe(next, error, plete) {
return this.behaviorSubject.subscribe(next, error, plete);
}
}
I have a strange use case where I need to keep track of all previous emitted events.
Thanks to the ReplaySubject, it works perfectly so far. On every new subscriber, this Subject re-emits every previous events.
Now, for a specific scenario, I need to be able to give only the latest published events (a bit like a BehaviorSubject), but keeping the source the same events.
Here is a snippet of what I'm trying to achieve: stackblitz
import { ReplaySubject, BehaviorSubject, from } from "rxjs";
class EventManager {
constructor() {
this.mySubject = new ReplaySubject();
}
publish(value) {
this.mySubject.next(value);
}
fullSubscribe(next, error, plete) {
return this.mySubject.subscribe(next, error, plete);
}
subscribe(next, error, plete) {
return this.mySubject.pipe(/* an operator to get the last one on new subscribe */).subscribe(next, error, plete);
}
}
const myEventManager = new EventManager();
myEventManager.publish("Data 1");
myEventManager.publish("Data 2");
myEventManager.publish("Data 3");
myEventManager.fullSubscribe(v => {
console.log("SUB 1", v);
});
myEventManager.subscribe(v => {
console.log("SUB 2", v);
});
Thank you
I have a strange use case where I need to keep track of all previous emitted events.
Thanks to the ReplaySubject, it works perfectly so far. On every new subscriber, this Subject re-emits every previous events.
Now, for a specific scenario, I need to be able to give only the latest published events (a bit like a BehaviorSubject), but keeping the source the same events.
Here is a snippet of what I'm trying to achieve: stackblitz
import { ReplaySubject, BehaviorSubject, from } from "rxjs";
class EventManager {
constructor() {
this.mySubject = new ReplaySubject();
}
publish(value) {
this.mySubject.next(value);
}
fullSubscribe(next, error, plete) {
return this.mySubject.subscribe(next, error, plete);
}
subscribe(next, error, plete) {
return this.mySubject.pipe(/* an operator to get the last one on new subscribe */).subscribe(next, error, plete);
}
}
const myEventManager = new EventManager();
myEventManager.publish("Data 1");
myEventManager.publish("Data 2");
myEventManager.publish("Data 3");
myEventManager.fullSubscribe(v => {
console.log("SUB 1", v);
});
myEventManager.subscribe(v => {
console.log("SUB 2", v);
});
Thank you
Share Improve this question edited Apr 30, 2021 at 14:54 Patrick Roberts 52.1k10 gold badges117 silver badges163 bronze badges asked Apr 30, 2021 at 14:36 anousssanousss 3576 silver badges14 bronze badges 4-
1
The RxJS#
last()
operator does this – Mrk Sef Commented Apr 30, 2021 at 15:40 -
@MrkSef not exactly.
last()
won't work the way they're requesting, it just outputs the last published event before the observable pletes. OP is requesting that when theReplaySubject
is subscribed to, the "hot" subject immediately emits the last event that was published (likeBehaviorSubject
) rather than all the events that have ever been published to it like default. – Patrick Roberts Commented Apr 30, 2021 at 15:45 -
1
@PatrickRoberts Good point! Okay, use the RxJS#
debounceTime(0)
. I think0
should work here as the replay happens synchronously. If not, he can up the debounce time a little bit. – Mrk Sef Commented Apr 30, 2021 at 17:27 - I thought about debounceTime(0), the problem with that is that if subsequent emissions happen on the same tick, only the last one will be received. – BizzyBob Commented Apr 30, 2021 at 17:28
3 Answers
Reset to default 3If you keep track of the number of events you've published, you could use skip
:
subscribe(next, error?, plete?) {
return this.mySubject.pipe(
skip(this.publishCount - 1)
).subscribe(next, error, plete);
}
Here's a StackBlitz demo.
Instead of forcing a ReplaySubject
to behave like a BehaviorSubject
, you can arrive at ReplaySubject
-like behavior by manipulating a BehaviorSubject
.
import { BehaviorSubject, from, concat } from 'rxjs';
import { scan, shareReplay } from 'rxjs/operators';
class EventManager {
constructor() {
this.mySubject = new BehaviorSubject();
this.allEmittedValues = this.mySubject.pipe(
scan((xs, x) => [...xs, x], []),
shareReplay(1)
);
// Necessary since we need to start accumulating allEmittedValues
// immediately.
this.allEmittedValues.subscribe();
}
dispose() {
// ends all subscriptions
this.mySubject.plete();
}
publish(value) {
this.mySubject.next(value);
}
fullSubscribe(next, error, plete) {
// First, take the latest value of the accumulated array of emits and
// unroll it into an observable
const existingEmits$ = this.allEmittedValues.pipe(
take(1),
concatMap((emits) => from(emits))
);
// Then, subscribe to the main subject, skipping the replayed value since
// we just got it at the tail end of existingEmits$
const futureEmits$ = this.mySubject.pipe(skip(1));
return concat(existingEmits$, futureEmits$).subscribe(
next,
error,
plete
);
}
subscribe(next, error, plete) {
return this.mySubject.subscribe(next, error, plete);
}
}
Why not just have an instance of ReplaySubject
and BehaviorSubject
on EventManager
?
import { ReplaySubject, BehaviorSubject, from } from "rxjs";
class EventManager {
constructor() {
this.replaySubject = new ReplaySubject();
this.behaviorSubject = new BehaviorSubject();
}
publish(value) {
this.replaySubject.next(value);
this.behaviorSubject.next(value);
}
fullSubscribe(next, error, plete) {
return this.replaySubject.subscribe(next, error, plete);
}
subscribe(next, error, plete) {
return this.behaviorSubject.subscribe(next, error, plete);
}
}
本文标签: javascriptMake a ReplaySubject return only the last value on subscribeStack Overflow
版权声明:本文标题:javascript - Make a ReplaySubject return only the last value on subscribe - Stack Overflow 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://it.en369.cn/questions/1745536499a2154984.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论