4
我有一个Hyper HTTP请求期货的大型矢量,并且想要将它们解析为结果向量。由于有最大打开文件的限制,我想限制并发到N期货。加入期货有限的并发
我已经用Stream::buffer_unordered
进行了实验,但似乎是一个接一个地执行期货。
我有一个Hyper HTTP请求期货的大型矢量,并且想要将它们解析为结果向量。由于有最大打开文件的限制,我想限制并发到N期货。加入期货有限的并发
我已经用Stream::buffer_unordered
进行了实验,但似乎是一个接一个地执行期货。
我们在项目中使用了code like this以避免打开太多的TCP套接字。这些期货内有Hyper期货,所以看起来正好是同样的情况。
// Convert the iterator into a `Stream`. We will process
// `PARALLELISM` futures at the same time, but with no specified
// order.
let all_done =
futures::stream::iter(iterator_of_futures.map(Ok))
.buffer_unordered(PARALLELISM);
// Everything after here is just using the stream in
// some manner, not directly related
let mut successes = Vec::with_capacity(LIMIT);
let mut failures = Vec::with_capacity(LIMIT);
// Pull values off the stream, dividing them into success and
// failure buckets.
let mut all_done = all_done.into_future();
loop {
match core.run(all_done) {
Ok((None, _)) => break,
Ok((Some(v), next_all_done)) => {
successes.push(v);
all_done = next_all_done.into_future();
}
Err((v, next_all_done)) => {
failures.push(v);
all_done = next_all_done.into_future();
}
}
}
这在一块示例代码被使用,所以事件循环(core
)被明确地驱动。观看程序使用的文件句柄的数量表明它已被封顶。另外,在添加这个瓶颈之前,我们很快耗尽了允许的文件句柄,而之后我们没有。
你可以发布你已经有的代码吗? –
请[编辑]你的问题来解释你为什么说“似乎是一个接一个地执行期货”。我已经使用'buffer_unordered'来达到**这个确切的目的**,并且它对我有用。 – Shepmaster