2017-04-06 25 views
4

我有一个Hyper HTTP请求期货的大型矢量,并且想要将它们解析为结果向量。由于有最大打开文件的限制,我想限制并发到N期货。加入期货有限的并发

我已经用Stream::buffer_unordered进行了实验,但似乎是一个接一个地执行期货。

+2

你可以发布你已经有的代码吗? –

+0

请[编辑]你的问题来解释你为什么说“似乎是一个接一个地执行期货”。我已经使用'buffer_unordered'来达到**这个确切的目的**,并且它对我有用。 – Shepmaster

回答

4

我们在项目中使用了code like this以避免打开太多的TCP套接字。这些期货内有Hyper期货,所以看起来正好是同样的情况。

// Convert the iterator into a `Stream`. We will process 
// `PARALLELISM` futures at the same time, but with no specified 
// order. 
let all_done = 
    futures::stream::iter(iterator_of_futures.map(Ok)) 
    .buffer_unordered(PARALLELISM); 

// Everything after here is just using the stream in 
// some manner, not directly related 

let mut successes = Vec::with_capacity(LIMIT); 
let mut failures = Vec::with_capacity(LIMIT); 

// Pull values off the stream, dividing them into success and 
// failure buckets. 
let mut all_done = all_done.into_future(); 
loop { 
    match core.run(all_done) { 
     Ok((None, _)) => break, 
     Ok((Some(v), next_all_done)) => { 
      successes.push(v); 
      all_done = next_all_done.into_future(); 
     } 
     Err((v, next_all_done)) => { 
      failures.push(v); 
      all_done = next_all_done.into_future(); 
     } 
    } 
} 

这在一块示例代码被使用,所以事件循环(core)被明确地驱动。观看程序使用的文件句柄的数量表明它已被封顶。另外,在添加这个瓶颈之前,我们很快耗尽了允许的文件句柄,而之后我们没有。