admin管理员组文章数量:1024677
With Tokio suppose I have a StreamReader
which implements AyncRead
, and I want to do something with it, e.g. save it to a file:
async fn save(save_to: &Path, mut stream_reader: impl AsyncRead + Unpin) -> Result<()> {
let mut file = tokio::fs::File::create(&save_to).await?;
tokio::io::copy(&mut stream_reader, &mut file).await?;
Ok(())
}
async fn download_response(
response: reqwest::Response,
save_to: &Path,
) {
let bytes_stream = response.bytes_stream().unwrap();
let stream_reader = tokio_util::io::StreamReader::new(bytes_stream);
save(save_to, stream_reader).await.unwrap();
}
That works fine. Now what if I want to do two things with it simultaneously, e.g. saving it to two different files (not my actual use case but for simplicity):
async fn download_response(
response: reqwest::Response,
save_to_0: &Path,
save_to_1: &Path,
) {
let bytes_stream = response.bytes_stream().unwrap();
let stream_reader = tokio_util::io::StreamReader::new(bytes_stream);
let f0 = save(save_to_0, stream_reader);
let f1 = save(save_to_1, stream_reader);
join!(f0, f1);
}
This obviously doesn't work because there's only one stream_reader
. How do I "duplicate" it so that it can be sent to multiple consumers? Ideally with a limit on the size of its internal buffer, to avoid the slow consumer problem (i.e. if saving to one file is very slow then you'll end up with the entire stream in memory).
Googling suggests maybe tokio_stream::wrappers::BroadcastStream
could help, but I'm not sure how. Also I found fork_stream
which seems like it might help but again I'm not sure (also I'd prefer a first-party solution if there is one).
With Tokio suppose I have a StreamReader
which implements AyncRead
, and I want to do something with it, e.g. save it to a file:
async fn save(save_to: &Path, mut stream_reader: impl AsyncRead + Unpin) -> Result<()> {
let mut file = tokio::fs::File::create(&save_to).await?;
tokio::io::copy(&mut stream_reader, &mut file).await?;
Ok(())
}
async fn download_response(
response: reqwest::Response,
save_to: &Path,
) {
let bytes_stream = response.bytes_stream().unwrap();
let stream_reader = tokio_util::io::StreamReader::new(bytes_stream);
save(save_to, stream_reader).await.unwrap();
}
That works fine. Now what if I want to do two things with it simultaneously, e.g. saving it to two different files (not my actual use case but for simplicity):
async fn download_response(
response: reqwest::Response,
save_to_0: &Path,
save_to_1: &Path,
) {
let bytes_stream = response.bytes_stream().unwrap();
let stream_reader = tokio_util::io::StreamReader::new(bytes_stream);
let f0 = save(save_to_0, stream_reader);
let f1 = save(save_to_1, stream_reader);
join!(f0, f1);
}
This obviously doesn't work because there's only one stream_reader
. How do I "duplicate" it so that it can be sent to multiple consumers? Ideally with a limit on the size of its internal buffer, to avoid the slow consumer problem (i.e. if saving to one file is very slow then you'll end up with the entire stream in memory).
Googling suggests maybe tokio_stream::wrappers::BroadcastStream
could help, but I'm not sure how. Also I found fork_stream
which seems like it might help but again I'm not sure (also I'd prefer a first-party solution if there is one).
1 Answer
Reset to default 1You can try a function like this to pass the result of a reader to multiple writers:
use tokio::io::{AsyncReadExt, AsyncWriteExt};
async fn multiplex(mut a: impl tokio::io::AsyncRead + Unpin, mut writers: Vec<impl tokio::io::AsyncWrite + Unpin>) -> std::io::Result<()> {
let mut buffer = [0u8; 1024];
loop {
let size = a.read(&mut buffer).await?;
if (size == 0) {
break;
}
for writer in &mut writers {
writer.write_all(&buffer[..size]).await?;
}
}
Ok(())
}
async fn download_response(
response: reqwest::Response,
save_to_0: &Path,
save_to_1: &Path,
) {
let bytes_stream = response.bytes_stream().unwrap();
let stream_reader = tokio_util::io::StreamReader::new(bytes_stream);
let f0 = multiplex(
stream_reader,
[save_to_0, save_to_1].into_iter().map(
|e|tokio::fs::File::create(e)
).collect::<tokio::task::JoinSet<_>>().join().await
).await.unwrap();
}
One major disadvantage of this method is it will force all writers to run at the same speed. If one writer is slow the others and the reader will not progress till the slow one catches up.
The advantage is this is low memory use.
If you are willing to add + Sync
you can write to the streams at the same time:
writers
.iter_mut()
.map(|writer| writer.write(&buffer[..size]))
.collect::<tokio::task::JoinSet<_>>()
.join_all()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
With Tokio suppose I have a StreamReader
which implements AyncRead
, and I want to do something with it, e.g. save it to a file:
async fn save(save_to: &Path, mut stream_reader: impl AsyncRead + Unpin) -> Result<()> {
let mut file = tokio::fs::File::create(&save_to).await?;
tokio::io::copy(&mut stream_reader, &mut file).await?;
Ok(())
}
async fn download_response(
response: reqwest::Response,
save_to: &Path,
) {
let bytes_stream = response.bytes_stream().unwrap();
let stream_reader = tokio_util::io::StreamReader::new(bytes_stream);
save(save_to, stream_reader).await.unwrap();
}
That works fine. Now what if I want to do two things with it simultaneously, e.g. saving it to two different files (not my actual use case but for simplicity):
async fn download_response(
response: reqwest::Response,
save_to_0: &Path,
save_to_1: &Path,
) {
let bytes_stream = response.bytes_stream().unwrap();
let stream_reader = tokio_util::io::StreamReader::new(bytes_stream);
let f0 = save(save_to_0, stream_reader);
let f1 = save(save_to_1, stream_reader);
join!(f0, f1);
}
This obviously doesn't work because there's only one stream_reader
. How do I "duplicate" it so that it can be sent to multiple consumers? Ideally with a limit on the size of its internal buffer, to avoid the slow consumer problem (i.e. if saving to one file is very slow then you'll end up with the entire stream in memory).
Googling suggests maybe tokio_stream::wrappers::BroadcastStream
could help, but I'm not sure how. Also I found fork_stream
which seems like it might help but again I'm not sure (also I'd prefer a first-party solution if there is one).
With Tokio suppose I have a StreamReader
which implements AyncRead
, and I want to do something with it, e.g. save it to a file:
async fn save(save_to: &Path, mut stream_reader: impl AsyncRead + Unpin) -> Result<()> {
let mut file = tokio::fs::File::create(&save_to).await?;
tokio::io::copy(&mut stream_reader, &mut file).await?;
Ok(())
}
async fn download_response(
response: reqwest::Response,
save_to: &Path,
) {
let bytes_stream = response.bytes_stream().unwrap();
let stream_reader = tokio_util::io::StreamReader::new(bytes_stream);
save(save_to, stream_reader).await.unwrap();
}
That works fine. Now what if I want to do two things with it simultaneously, e.g. saving it to two different files (not my actual use case but for simplicity):
async fn download_response(
response: reqwest::Response,
save_to_0: &Path,
save_to_1: &Path,
) {
let bytes_stream = response.bytes_stream().unwrap();
let stream_reader = tokio_util::io::StreamReader::new(bytes_stream);
let f0 = save(save_to_0, stream_reader);
let f1 = save(save_to_1, stream_reader);
join!(f0, f1);
}
This obviously doesn't work because there's only one stream_reader
. How do I "duplicate" it so that it can be sent to multiple consumers? Ideally with a limit on the size of its internal buffer, to avoid the slow consumer problem (i.e. if saving to one file is very slow then you'll end up with the entire stream in memory).
Googling suggests maybe tokio_stream::wrappers::BroadcastStream
could help, but I'm not sure how. Also I found fork_stream
which seems like it might help but again I'm not sure (also I'd prefer a first-party solution if there is one).
1 Answer
Reset to default 1You can try a function like this to pass the result of a reader to multiple writers:
use tokio::io::{AsyncReadExt, AsyncWriteExt};
async fn multiplex(mut a: impl tokio::io::AsyncRead + Unpin, mut writers: Vec<impl tokio::io::AsyncWrite + Unpin>) -> std::io::Result<()> {
let mut buffer = [0u8; 1024];
loop {
let size = a.read(&mut buffer).await?;
if (size == 0) {
break;
}
for writer in &mut writers {
writer.write_all(&buffer[..size]).await?;
}
}
Ok(())
}
async fn download_response(
response: reqwest::Response,
save_to_0: &Path,
save_to_1: &Path,
) {
let bytes_stream = response.bytes_stream().unwrap();
let stream_reader = tokio_util::io::StreamReader::new(bytes_stream);
let f0 = multiplex(
stream_reader,
[save_to_0, save_to_1].into_iter().map(
|e|tokio::fs::File::create(e)
).collect::<tokio::task::JoinSet<_>>().join().await
).await.unwrap();
}
One major disadvantage of this method is it will force all writers to run at the same speed. If one writer is slow the others and the reader will not progress till the slow one catches up.
The advantage is this is low memory use.
If you are willing to add + Sync
you can write to the streams at the same time:
writers
.iter_mut()
.map(|writer| writer.write(&buffer[..size]))
.collect::<tokio::task::JoinSet<_>>()
.join_all()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
本文标签: rustHow to duplicate Tokio StreamReader or AsyncReadStack Overflow
版权声明:本文标题:rust - How to duplicate Tokio StreamReader or AsyncRead? - Stack Overflow 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://it.en369.cn/questions/1745617273a2159365.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论