admin管理员组文章数量:1025835
I have a bunch of events to send up to a service. But the requests are rate limited and each request has a count limit:
- 1 request per second:
bufferTime(1000)
- 100 event items per request:
bufferCount(100)
The problem is, I am not sure how to bine them in a way that makes sense.
Allowing pass-through
Complicating this further, I need to make sure that events go through instantaneously if we don't hit either limit.
For example, I don't want it to actually wait for 100 event items before letting it go through if it's only one single event during a non-busy time.
Legacy API
I also found that there was a bufferWithTimeOrCount
that existed in RxJS v4, although I am not sure how I'd use that even if I had it.
Test playground
Here is a JSBin I made for you to test your solution:
,console,output
Any help would be greatly appreciated.
I have a bunch of events to send up to a service. But the requests are rate limited and each request has a count limit:
- 1 request per second:
bufferTime(1000)
- 100 event items per request:
bufferCount(100)
The problem is, I am not sure how to bine them in a way that makes sense.
Allowing pass-through
Complicating this further, I need to make sure that events go through instantaneously if we don't hit either limit.
For example, I don't want it to actually wait for 100 event items before letting it go through if it's only one single event during a non-busy time.
Legacy API
I also found that there was a bufferWithTimeOrCount
that existed in RxJS v4, although I am not sure how I'd use that even if I had it.
Test playground
Here is a JSBin I made for you to test your solution:
http://jsbin./fozexehiba/1/edit?js,console,output
Any help would be greatly appreciated.
Share Improve this question asked Mar 30, 2017 at 23:29 adrianmcliadrianmcli 2,0063 gold badges24 silver badges49 bronze badges3 Answers
Reset to default 5The bufferTime()
operator takes three parameters which bines the functionality of bufferTime
and bufferCount
. See http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#instance-method-bufferTime.
With .bufferTime(1000, null, 3)
you can make a buffer every 1000ms or when it reaches 3 items. However, this means that it doesn't guarantee 1000ms delay between each buffer.
So you could use something like this which is pretty easy to use (buffers only 3 items for max 1000ms):
click$
.scan((a, b) => a + 1, 0)
.bufferTime(1000, null, 3)
.filter(buffer => buffer.length > 0)
.concatMap(buffer => Rx.Observable.of(buffer).delay(1000))
.timestamp()
.subscribe(console.log);
See live demo: http://jsbin./libazer/7/edit?js,console,output
The only difference to what you probably wanted is that the first emission might be delayed by more than 1000ms. This is because both bufferTime()
and delay(1000)
operators make a delay to ensure that there's always at least 1000ms gap.
I hope this works for you.
Operator
events$
.windowCount(10)
.mergeMap(m => m.bufferTime(100))
.concatMap(val => Rx.Observable.of(val).delay(100))
.filter(f => f.length > 0)
Doc
.windowCount(number)
: [ Rx Doc ].bufferTime(number)
: [ Rx Doc ]
Demo
// test case
const mock = [8, 0, 2, 3, 30, 5, 6, 2, 2, 0, 0, 0, 1]
const tInterval = 100
const tCount = 10
Rx.Observable.interval(tInterval)
.take(mock.length)
.mergeMap(mm => Rx.Observable.range(0, mock[mm]))
// start
.windowCount(tCount)
.mergeMap(m => m.bufferTime(tInterval))
.concatMap(val => Rx.Observable.of(val).delay(tInterval))
.filter(f => f.length > 0)
// end
.subscribe({
next: (n) => console.log('Next: ', n),
error: (e) => console.log('Error: ', e),
plete: (c) => console.log('Completed'),
})
<script src="https://unpkg./rxjs/bundles/Rx.min.js"></script>
Updated
After more testing. I found the answer above has some problem in extreme condition. I think they are caused by .window()
and .concat()
, and then I find a warning in the doc#concatMap.
Warning: if source values arrive endlessly and faster than their corresponding inner Observables can plete, it will result in memory issues as inner Observables amass in an unbounded buffer waiting for their turn to be subscribed to.
However, I thought the right way to limit the request rate possibly is, that we could limit the cycle time of requests. In your case, just limit there is only 1 request per 10 milliseconds. It is simpler and may be more efficient to control the requests.
Operator
const tInterval = 100
const tCount = 10
const tCircle = tInterval / tCount
const rxTimer = Rx.Observable.timer(tCircle).ignoreElements()
events$
.concatMap(m => Rx.Observable.of(m).merge(rxTimer)) // more accurate than `.delay()`
// .concatMap(m => Rx.Observable.of(m).delay(tCircle))
or
events$
.zip(Rx.Observable.interval(tCircle), (x,y) => x)
I've modified the answer I gave to this question to support your use case of adding a limited number of values (i.e. events) to pending requests.
The ments within should explain how it works.
Because you need to keep a record of the requests that have been made within the rate limit period, I don't believe that it's possible to use the bufferTime
and bufferCount
operators to do what you want - a scan
is required so that you can maintain that state within the observable.
function rateLimit(source, period, valuesPerRequest, requestsPerPeriod = 1) {
return source
.scan((requests, value) => {
const now = Date.now();
const since = now - period;
// Keep a record of all requests made within the last period. If the
// number of requests made is below the limit, the value can be
// included in an immediate request. Otherwise, it will need to be
// included in a delayed request.
requests = requests.filter((request) => request.until > since);
if (requests.length >= requestsPerPeriod) {
const leastRecentRequest = requests[0];
const mostRecentRequest = requests[requests.length - 1];
// If there is a request that has not yet been made, append the
// value to that request if the number of values in that request's
// is below the limit. Otherwise, another delayed request will be
// required.
if (
(mostRecentRequest.until > now) &&
(mostRecentRequest.values.length < valuesPerRequest)
) {
mostRecentRequest.values.push(value);
} else {
// until is the time until which the value should be delayed.
const until = leastRecentRequest.until + (
period * Math.floor(requests.length / requestsPerPeriod)
);
// concatMap is used below to guarantee the values are emitted
// in the same order in which they are received, so the delays
// are cumulative. That means the actual delay is the difference
// between the until times.
requests.push({
delay: (mostRecentRequest.until < now) ?
(until - now) :
(until - mostRecentRequest.until),
until,
values: [value]
});
}
} else {
requests.push({
delay: 0,
until: now,
values: [value]
});
}
return requests;
}, [])
// Emit only the most recent request.
.map((requests) => requests[requests.length - 1])
// If multiple values are added to the request, it will be emitted
// mulitple times. Use distinctUntilChanged so that concatMap receives
// the request only once.
.distinctUntilChanged()
.concatMap((request) => {
const observable = Rx.Observable.of(request.values);
return request.delay ? observable.delay(request.delay) : observable;
});
}
const start = Date.now();
rateLimit(
Rx.Observable.range(1, 250),
1000,
100,
1
).subscribe((values) => console.log(
`Request with ${values.length} value(s) at T+${Date.now() - start}`
));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg./rxjs@5/bundles/Rx.min.js"></script>
I have a bunch of events to send up to a service. But the requests are rate limited and each request has a count limit:
- 1 request per second:
bufferTime(1000)
- 100 event items per request:
bufferCount(100)
The problem is, I am not sure how to bine them in a way that makes sense.
Allowing pass-through
Complicating this further, I need to make sure that events go through instantaneously if we don't hit either limit.
For example, I don't want it to actually wait for 100 event items before letting it go through if it's only one single event during a non-busy time.
Legacy API
I also found that there was a bufferWithTimeOrCount
that existed in RxJS v4, although I am not sure how I'd use that even if I had it.
Test playground
Here is a JSBin I made for you to test your solution:
,console,output
Any help would be greatly appreciated.
I have a bunch of events to send up to a service. But the requests are rate limited and each request has a count limit:
- 1 request per second:
bufferTime(1000)
- 100 event items per request:
bufferCount(100)
The problem is, I am not sure how to bine them in a way that makes sense.
Allowing pass-through
Complicating this further, I need to make sure that events go through instantaneously if we don't hit either limit.
For example, I don't want it to actually wait for 100 event items before letting it go through if it's only one single event during a non-busy time.
Legacy API
I also found that there was a bufferWithTimeOrCount
that existed in RxJS v4, although I am not sure how I'd use that even if I had it.
Test playground
Here is a JSBin I made for you to test your solution:
http://jsbin./fozexehiba/1/edit?js,console,output
Any help would be greatly appreciated.
Share Improve this question asked Mar 30, 2017 at 23:29 adrianmcliadrianmcli 2,0063 gold badges24 silver badges49 bronze badges3 Answers
Reset to default 5The bufferTime()
operator takes three parameters which bines the functionality of bufferTime
and bufferCount
. See http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#instance-method-bufferTime.
With .bufferTime(1000, null, 3)
you can make a buffer every 1000ms or when it reaches 3 items. However, this means that it doesn't guarantee 1000ms delay between each buffer.
So you could use something like this which is pretty easy to use (buffers only 3 items for max 1000ms):
click$
.scan((a, b) => a + 1, 0)
.bufferTime(1000, null, 3)
.filter(buffer => buffer.length > 0)
.concatMap(buffer => Rx.Observable.of(buffer).delay(1000))
.timestamp()
.subscribe(console.log);
See live demo: http://jsbin./libazer/7/edit?js,console,output
The only difference to what you probably wanted is that the first emission might be delayed by more than 1000ms. This is because both bufferTime()
and delay(1000)
operators make a delay to ensure that there's always at least 1000ms gap.
I hope this works for you.
Operator
events$
.windowCount(10)
.mergeMap(m => m.bufferTime(100))
.concatMap(val => Rx.Observable.of(val).delay(100))
.filter(f => f.length > 0)
Doc
.windowCount(number)
: [ Rx Doc ].bufferTime(number)
: [ Rx Doc ]
Demo
// test case
const mock = [8, 0, 2, 3, 30, 5, 6, 2, 2, 0, 0, 0, 1]
const tInterval = 100
const tCount = 10
Rx.Observable.interval(tInterval)
.take(mock.length)
.mergeMap(mm => Rx.Observable.range(0, mock[mm]))
// start
.windowCount(tCount)
.mergeMap(m => m.bufferTime(tInterval))
.concatMap(val => Rx.Observable.of(val).delay(tInterval))
.filter(f => f.length > 0)
// end
.subscribe({
next: (n) => console.log('Next: ', n),
error: (e) => console.log('Error: ', e),
plete: (c) => console.log('Completed'),
})
<script src="https://unpkg./rxjs/bundles/Rx.min.js"></script>
Updated
After more testing. I found the answer above has some problem in extreme condition. I think they are caused by .window()
and .concat()
, and then I find a warning in the doc#concatMap.
Warning: if source values arrive endlessly and faster than their corresponding inner Observables can plete, it will result in memory issues as inner Observables amass in an unbounded buffer waiting for their turn to be subscribed to.
However, I thought the right way to limit the request rate possibly is, that we could limit the cycle time of requests. In your case, just limit there is only 1 request per 10 milliseconds. It is simpler and may be more efficient to control the requests.
Operator
const tInterval = 100
const tCount = 10
const tCircle = tInterval / tCount
const rxTimer = Rx.Observable.timer(tCircle).ignoreElements()
events$
.concatMap(m => Rx.Observable.of(m).merge(rxTimer)) // more accurate than `.delay()`
// .concatMap(m => Rx.Observable.of(m).delay(tCircle))
or
events$
.zip(Rx.Observable.interval(tCircle), (x,y) => x)
I've modified the answer I gave to this question to support your use case of adding a limited number of values (i.e. events) to pending requests.
The ments within should explain how it works.
Because you need to keep a record of the requests that have been made within the rate limit period, I don't believe that it's possible to use the bufferTime
and bufferCount
operators to do what you want - a scan
is required so that you can maintain that state within the observable.
function rateLimit(source, period, valuesPerRequest, requestsPerPeriod = 1) {
return source
.scan((requests, value) => {
const now = Date.now();
const since = now - period;
// Keep a record of all requests made within the last period. If the
// number of requests made is below the limit, the value can be
// included in an immediate request. Otherwise, it will need to be
// included in a delayed request.
requests = requests.filter((request) => request.until > since);
if (requests.length >= requestsPerPeriod) {
const leastRecentRequest = requests[0];
const mostRecentRequest = requests[requests.length - 1];
// If there is a request that has not yet been made, append the
// value to that request if the number of values in that request's
// is below the limit. Otherwise, another delayed request will be
// required.
if (
(mostRecentRequest.until > now) &&
(mostRecentRequest.values.length < valuesPerRequest)
) {
mostRecentRequest.values.push(value);
} else {
// until is the time until which the value should be delayed.
const until = leastRecentRequest.until + (
period * Math.floor(requests.length / requestsPerPeriod)
);
// concatMap is used below to guarantee the values are emitted
// in the same order in which they are received, so the delays
// are cumulative. That means the actual delay is the difference
// between the until times.
requests.push({
delay: (mostRecentRequest.until < now) ?
(until - now) :
(until - mostRecentRequest.until),
until,
values: [value]
});
}
} else {
requests.push({
delay: 0,
until: now,
values: [value]
});
}
return requests;
}, [])
// Emit only the most recent request.
.map((requests) => requests[requests.length - 1])
// If multiple values are added to the request, it will be emitted
// mulitple times. Use distinctUntilChanged so that concatMap receives
// the request only once.
.distinctUntilChanged()
.concatMap((request) => {
const observable = Rx.Observable.of(request.values);
return request.delay ? observable.delay(request.delay) : observable;
});
}
const start = Date.now();
rateLimit(
Rx.Observable.range(1, 250),
1000,
100,
1
).subscribe((values) => console.log(
`Request with ${values.length} value(s) at T+${Date.now() - start}`
));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg./rxjs@5/bundles/Rx.min.js"></script>
本文标签:
版权声明:本文标题:javascript - Rate-limiting and count-limiting events in RxJS v5, but also allowing pass-through - Stack Overflow 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://it.en369.cn/questions/1745639705a2160668.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论