tpl-dataflow

    0热度

    1回答

    我开发了一个Dataflow管道,它读取文件集合,并为每个文件中的每一行执行一系列Dataflow块。 之后所有步骤已完成对文件中的每一行,我想对文件本身执行进一步的块,但我不知道这是如何可能的。 通过TransformManyBlock拆分处理很简单,但是如何整合呢? 我习惯了Apache Camel的Splitter和Aggregator功能 - 或者Dataflow的意图和我想要的用法之间存

    1热度

    1回答

    据我所知,要将数据放入TPL Dataflow目标,我可以使用Post或SendAsync,如果该项目可以放入目标,则会立即返回。我明白SendAsync将等待更长时间才能尝试放入,但我不确定SendAsync返回false的含义是什么。 SendAsyncSendAsync是否返回false表示目标(特别是BufferBlock)已完成并将永不接受更多消息? 它可能稍后开始接受消息吗?

    0热度

    1回答

    我使用TPL很多,并有大量数据流管道结构。 作为管道网络的一部分,我想将一些数据写入azure blob存储。我们有很多数据,因此我们有4存储帐户,我们希望在它们之间均匀分配数据。 希望继续使用数据流管道模式,因此我想实现一个SourceBlock,如果我将它链接到几个目标模块,它将使用循环法将消息发送给它们。 BufferBlock不够好,因为他正在将消息发送到接受它的第一个块,并假设所有目标块

    4热度

    1回答

    我正在写一个小型记录器,我想打开一次日志文件,在日志消息到达时继续写入,并在程序终止时处理所有内容。 我不知道如何保持FileStream打开和反应性地写入消息到达。 我想从我以前的解决方案中更新设计,我的ConcurrentQueue充当缓冲区,并且使用队列的using语句中的循环。 具体来说,我想同时利用using语句结构的,所以我没有显式地关闭流和作家,以及无功,无回路的编程风格。目前,我只

    1热度

    1回答

    我试图将一条消息的副本从ActionBlock<int>发送给多个使用者,这些使用者也是ActionBlock<int>。这很好,但是如果其中一个目标块引发异常,看起来这不会传播到源块。在这里,我怎么尝试处理异常,但它从未进入到catch部分: static void Main(string[] args) { var t1 = new ActionBlock<int>(async i

    3热度

    1回答

    我正在使用TPL数据流构建应用程序。其实我有以下问题。我有一个transformblock var tfb1 = new TranformBlock<InMsg, IReadOnlyCollection<OutMsg>>。因此,tfb1在消息中接收并创建出站消息列表。这个out-messages列表应该链接到一个路由器数据块,它接收OutMsg作为输入(而不是IReadOnlyCollection

    0热度

    1回答

    我想要实现使用持续运行类似问题here和代码here的BufferBlock消费者/生产模式。 我试图使用类似于OP的ActionBlock,但是如果bufferblock已满并且新消息位于其队列中,则新消息永远不会添加到ConcurrentDictionary _queue中。 在ConsumeAsync方法时,一个新的消息添加到与该呼叫的bufferblock不会被调用下面的代码:_messa

    1热度

    1回答

    我有一个非常简单的问题。我需要一种方法来轻松地对需要一些时间的消息执行一些处理。在处理过程中,可能会输入新的请求,但除最后一个之外的所有请求都可以被丢弃。 所以我认为TPL Broadcastblock应该这样做,例如查看文档和帖子,以及StackExchange。我创建了以下解决方案并为其添加了一些单元测试,但在单元测试中,有时最后一项不会发送。 这不是我所期望的。如果它应该放弃任何东西,我会说

    2热度

    1回答

    我有一个BroadcastBlock链接到ActionBlock。当我在BroadcastBlock和ActionBlock上顺序呼叫“完成”时,它不起作用。而仅仅在BroadCastBlock上打电话“完成”正在工作。 public class ActionTester { private readonly ActionBlock<int> _action; private

    0热度

    1回答

    TPL Dataflow SingleProducerConstrained选项是指源块的数量还是源块的最小总并行度? 即,如果我只有一个源块连接到使用此选项的块,则源的MaxDegreeOfParallelism必须是1还是不是?