2014-02-14 33 views
1

我正在学习TPL数据流,我已经通过我的一些朋友看到了它的强大功能,我遇到了一个与我的实现有关的问题。以无限并行方式多次运行相同的ActionBlock

我想/需要的是尽可能快地发送消息。我在做一些LinqPad原型,这是我迄今:

// Holds all the messages for my loadMessage ActionBlock to grab its data from 
var bufferBlock = new BufferBlock<string>(); 

// Sends message to where it needs to go as fast as it can. 
var loadMessage = new ActionBlock<string>(msg => 
{ 
    msg.Dump(); 
}, 
new ExecutionDataflowBlockOptions 
{ 
    MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded 
}); 

// Links the blocks together 
bufferBlock.LinkTo(loadMessage); 

// Loads the Buffer 
for (int i = 0; i < 10; i++) 
{ 
    bufferBlock.Post("This is a message"); 
} 

//Calls completion to stop threads 
bufferBlock.Complete(); 
loadMessage.Complete(); 

的问题是,loadMessageBlock没有在例子倾销上述消息。我一直在寻找一些有点运气的见解。我想我错过了TPL的基本要求。我的理解是,BufferBlock保存要由其他块处理的信息,并且ActionBlocked(与BufferBlock链接)应该从缓冲区中获取数据并执行其所需的操作。在将信息放到缓冲区的For循环之后,停止完成被调用以停止线程。

在实现中,我有一个Parallel.For,它运行我的loadMessage内的代码就好了。我只是不能实现TPL来做我想做的事情,我的理解是TPL将比Parallel.For更快。

我在这里如何认为这是假设工作?我错误地使用TPL吗?我将继续研究一个答案,任何指针将受到高度赞赏。谢谢!

回答

-2
  1. 我想/需要的是和我一样快可以发送邮件:

    为了实现这一目标,就需要发布/同时接收数据,并从缓冲块。下面是摘录:

    var bufferBlock = new BufferBlock<string>(); 
    // Write to and read from the message block concurrently. 
    var post01 = Task.Run(() => 
    { 
        // Loads the Buffer 
        for (int i = 0; i < 10; i++) 
        { 
         bufferBlock.Post(string.Format("This is a message {0}",i)); 
        } 
    }); 
    var receive = Task.Run(() => 
    { 
        for (int i = 0; i < 10; i++) 
        { 
         var message = bufferBlock.Receive(); 
         message.Dump(); 
        } 
    }); 
    
    
    Task.WaitAll(post01, receive); 
    

    更多关于这一点,你可以看到在MSDN link

  2. 我的理解是,TPL会比更快的Parallel.For。

    这不正确,因为它们使用相同的基础结构。它们属于同一个名称空间System.Threading.Tasks

+0

感谢您的帮助。我没有最终使用TPL来处理这种特殊情况,但我正在阅读有关Task和Threading的更多信息以供将来实现。 – Schanckopotamus

+0

这里没有必要使用'Task'接收,这正是'ActionBlock'的作用。 – svick

+0

@svick无论你选择什么方式都很好,甚至在我发布的MSDN链接的例子中,他们都使用任务。没有理由认为吃面包比吃米饭更好! –

1

首先,一个关于术语的解释:TPL(以下简称任务并行库)是不一样的TPL数据流,这只是一个子集。整个TPL包括诸如Parallel.For()Task类型的东西。

现在,您的代码的问题是,您正在尽快完成loadMessage块。致电Complete()后,该块不会再接受任何消息,因此您发布到bufferBlock的消息永远不会达到loadMessage

你需要的只是在bufferBlock发送完所有消息后才需要完成loadMessage。这正是PropagateCompletion做:

bufferBlock.LinkTo(
    loadMessage, new DataflowLinkOptions { PropagateCompletion = true }); 

// post your data to bufferBlock here 

bufferBlock.Complete(); 
await loadMessage.Completion; 

而且,在这种特定的情况下,是不是需要在所有的bufferBlock,你可能只是直接发布消息loadMessage

我的理解是,太平人寿将快于的Parallel.For

我不明白为什么应该在一般的快。一般情况下,他们的表现应该是可比的。所以你应该更好地使用适合你的问题的方法,而不是选择一个,因为“更快”。如果你真的关心性能,那就用两种方式编写代码,然后测量哪一个更好。