2017-01-23 47 views
1

我有一个项目列表,通过期货:: Sink派:如何通过期货发送物品清单:: Sink?

let mut list = VecDeque::new(); 
/* add a bunch of Packet items to list */ 
let (sink, stream) = tcp_stream.framed(PacketCodec).split(); 

我可以给使用

if let Some(first) = list.pop_front() { 
    sink.send(first); 
} 

如何发送整个列表中的一个包?

+2

*“以下显然不起作用*” - 怎么样?你可以包括错误,或描述你看到的行为? –

+2

在提出有关堆栈溢出的问题时,请生成一个[MCVE]。 – Shepmaster

+1

我敢打赌,**缺少的错误信息**说了一些关于*期待'T',找到了'&T' *。如果是这样的话,我强烈建议你回头阅读[* The Rust Programming Language *](https://doc.rust-lang.org/stable/book/),因为这不会是关于Tokio的问题,而是Rust的基本所有权。 – Shepmaster

回答

7

当使用一个新的软件时,我发现在潜入太深之前阅读文档非常有用。一般来说,Rust社区提供了相当不错的资源。

例如,除了selection of working examples之外,Tokio项目还有一个entire page talking about Streams and Sinksgenerated API documentation for Sink也是非常宝贵的。

我推荐所有新程序员学会做的另一件事是创建一个MCVE的问题。这使他们能够专注于问题的核心,同时还能够消除问题背后的言辞和磨砺。这里有一个这种情况:

extern crate futures; 

fn thing<S>(sink: S) 
    where S: futures::Sink<SinkItem = i32>, 
{ 
    let mut all_the_things = vec![1, 2, 3, 4, 5]; 

    while let Some(v) = all_the_things.pop() { 
     sink.send(v); 
    } 
} 

fn main() {} 

我们得到错误信息

error[E0382]: use of moved value: `sink` 
--> src/main.rs:9:9 
    | 
9 |   sink.send(v); 
    |   ^^^^ value moved here in previous iteration of loop 
    | 
    = note: move occurs because `sink` has type `S`, which does not implement the `Copy` trait 

让我们回头API文档...

fn send(self, item: Self::SinkItem) -> Send<Self> 
    where Self: Sized 

从这一点,我们可以看到,send消耗接收器并返回一个Send对象。这很奇怪,不是吗!实际上并不奇怪,一旦你看到Send实现Future - 将某些内容推入流中可能会阻塞流是否已满;这是背压的概念。结果为Future是您如何知道该项目何时实际添加到流中。 Future的执行SendItem = SSink的具体类型。因此一个解决方案是推动未来完成获得汇回来,这样做的一个方法是调用wait

use std::fmt; 
use futures::Future; 

fn thing<S>(mut sink: S) 
    where S: futures::Sink<SinkItem = i32>, 
      S::SinkError: fmt::Debug, 
{ 
    let mut all_the_things = vec![1, 2, 3, 4, 5]; 

    while let Some(v) = all_the_things.pop() { 
     let f = sink.send(v); 
     sink = f.wait().expect("Something bad happened while sending"); 
    } 
} 

这编译,但不是超级靓。回到文档,我们也可以看到Sink::send_all,这需要Stream。我们可以通过使用stream::iter创建一个迭代器Stream

fn thing<S>(sink: S) 
    where S: futures::Sink<SinkItem = i32>, 
      S::SinkError: fmt::Debug, 
{ 
    let all_the_things = vec![1, 2, 3, 4, 5]; 
    let wrapped = all_the_things.into_iter().map(Result::Ok); 

    sink.send_all(futures::stream::iter(wrapped)).wait().expect("Something bad happened while sending"); 
} 

我们再次使用wait带动未来完成。如果我们忘记了一个很好的错误:“期货什么也不做,除非被调查”。但是,wait并不是真正处理此问题的正确方法。通常,您想要将Future返回给调用者,因为他们想要知道什么时候所有东西都被推入水槽。

+2

很好的答案! https://tokio.rs也是一个梦幻般的指针。它肯定需要在谷歌中排名靠前。我一直在使用源代码和文档以及一些博客来找出tokio。 –

+0

我相信Google的排名会稳步攀升; Tokio 0.1仅在2周前发布! – Shepmaster

+0

感谢您的详细解答。它为我点了点东西,说明为什么你会故意设计一个API来利用Rust的移动语义。 –