2014-01-29 59 views
5

我正在学习有关F#代理(MailboxProcessor)工作的MailboxProcessor。与一个LIFO逻辑

我处理的相当非常规的问题。

  • 我有一个代理(dataSource),它是流数据的来源。数据必须由一系列代理处理(dataProcessor)。我们可以将dataProcessor视为某种跟踪设备。
  • 数据可以在除了与该dataProcessor可以是能够处理其输入端的转速更快流动。
  • 它是确定有一定的延迟。但是,我必须确保代理处于其工作之上,并且不会在过时的观察结果下堆积起来

我正在探索解决此问题的方法。

第一个想法是在dataSource中实施stack(LIFO)。当dataProcessor变得可用于接收和处理数据时,dataSource将发送最新可用观察值。此解决方案可能会工作,但可能会变得复杂,因为dataProcessor可能需要被阻止并重新激活;并将其状态通知给dataSource,导致双向通信问题。这个问题可以归结为在consumer-producer problem一个blocking queue,但我不知道..

第二个想法是有dataProcessor电话留言的照顾排序。在这个架构中,dataSource只会在dataProcessor的队列中发布更新。 dataProcessor将使用Scan来获取队列中可用的最新数据。这可能是要走的路。但是,我不确定在MailboxProcessor的当前设计中是否可以清除消息队列,删除较旧的旧消息队列。此外,here,写的是:

不幸的是,在F#的当前版本TryScan功能 破两种方式。首先,整点是指定一个超时时间 ,但实现并没有真正遵守它。具体来说, 不相关的消息重置计时器。其次,作为与其它扫描 功能,消息队列,以防止任何 其他线程可以发布对于扫描的持续时间,其可以是 任意长的时间的锁下检查。因此,TryScan函数本身 倾向于锁定并发系统,甚至可能引入死锁 ,因为调用者的代码是在锁内部进行评估的(例如,从函数参数中发布 到Scan或TryScan可以使代理程序 死锁等待获取锁的锁块已经在 之下)。

拥有最新的观测反弹可能是一个问题。 这篇文章的作者,@Jon Harrop建议,

我设法围绕它建筑,所产生的架构实际上更好。实质上,我急切地使用我自己的本地队列Receive所有消息和过滤器。

这个想法肯定值得探索,但是在开始使用代码之前,我会欢迎一些关于如何构建我的解决方案的意见。

谢谢。

+1

FWIW,我只测试了F#3.1.1'TryScan'超时错误,它已得到修复。 –

回答

1

tl; dr我会试试这个:从FSharp.Actor或Zach Bray的博客文章中取出邮箱实现,用ConcurrentStack替换ConcurrentQueue(加上一些有界的容量逻辑),并使用这个已更改的代理作为调度程序将消息从dataSource传递给数据处理器部队作为普通MBP或演员执行。

TL; DR2如果工人是稀缺的和缓慢的资源,我们需要处理的消息是目前最新的当工人准备好了,那么这一切都归结到一个代理用栈代替一个队列(有一定的容量逻辑)加上工人的阻塞队列。分派器将已准备好的工作人员出列,然后从堆栈中弹出消息并将此消息发送给工作人员。作业完成后,工人在准备就绪时排队等待队列(例如在let! msg = inbox.Receive()之前)。然后调度程序消费者线程阻塞,直到任何工人准备好,而生产者线程保持更新有界的堆栈。 (界定堆叠可以以与阵列来完成+偏移的锁内+大小,下面是太复杂之一)

详细

MailBoxProcessor被设计成仅具有一个消费者。这甚至在MBP here(搜索单词'DRAGONS':)的源代码中被评论)

如果您将数据发布到MBP,那么只有一个线程可以将其从内部队列或堆栈中取出。 在你特定用途的情况下,我会直接或更好的包裹使用ConcurrentStackBlockingCollection

  • 它将使许多并发消费者
  • 这是非常快和线程安全
  • BlockingCollection具有BoundedCapacity属性,可以让你限制集合的大小。它抛出Add,但你可以捕捉它或使用TryAdd。如果A是主堆栈,B是备用数据库,则将TryAdd复制到A,将虚拟0​​复制到B并将两者与Interlocked.Exchange交换,然后在A中处理所需的消息,清除它,创建新的备用数据库 - 或者在处理时使用三个堆栈A可能会比B再长一些;通过这种方式,你不会阻止并且不会丢失任何消息,但是可以丢弃不需要的消息是一种受控制的方式。

BlockingCollection具有像AddToAny/TakeFromAny这样的方法,这些方法可用于BlockingCollections数组。这可以帮助,例如:

  • 的dataSource产生消息,以与实施ConcurrentStack(BCCS)
  • 另一个线程从BCCS接收消息并将其发送到处理BCCSs阵列的BlockingCollection。你说有很多数据。您可能会牺牲一个线程阻塞并无限期地发送消息
  • 每个处理代理都有其自己的BCCS,或者实现为调度员发送消息的代理/ Actor/MBP。在你的情况下,你只需要发送一条消息给一个processorAgent,这样你就可以将处理代理存储在一个循环缓冲区中,以便总是向最近最少使用的处理器发送消息。

事情是这样的:

  (data stream produces 'T) 
       | 
      [dispatcher's BCSC] 
       | 
      (a dispatcher thread consumes 'T and pushes to processors, manages capacity of BCCS and LRU queue) 
       |        | 
      [processor1's BCCS/Actor/MBP] ... [processorN's BCCS/Actor/MBP] 
       |        | 
       (process)       (process) 

相反ConcurrentStack的,你可能想了解heap data structure。如果您需要通过某些消息属性获取最新消息,例如时间戳,而不是按照它们到达堆栈的顺序(例如,如果可能存在传输和到达顺序的延迟<>创建顺序),则可以使用堆获取最新消息。

如果您仍然需要代理商语义/ API,你可以读除了Dave的链接几个来源,并以某种方式通过实施多个并发消费者:

  • interesting article扎克布雷高效演员的实现。你需要用async { execute true } |> Async.Start或类似的行替换(在注释// Might want to schedule this call on another thread.下)行execute true,因为否则生成线程将消耗线程 - 对于单个快速生产者来说不是好的。但是,对于像上面描述的调度员来说,这正是需要的。

  • FSharp.Actor(又名Fakkadevelopment branch和FSharp MPB源代码(上述第一个链接),这里可能是实现细节非常有用的。 FSharp.Actors库已经冻结了好几个月,但在dev分支中有一些活动。

  • 不应该错过在此上下文中Google Groups中的discussion about Fakka

我有一个类似的用例,在过去的两天里,我研究了所有可以在F#Agents/Actor上找到的东西。这个答案对我来说是一种尝试这些想法的TODO,其中一半是在写作过程中出生的。

+0

非常感谢您的意见。我有一个问题:如果消费者是以普通MBP的形式实现的,由于调度员发送的消息不确定,他们仍然可以开始并处理不是最新的观察数据,不是吗?在我的设计中,我应该尽量确保处理器的工作尽可能新鲜。 – NoIdeaHowToFixThis

+0

在此设计中,调度程序确保始终首先发送最新的可用消息。但是,调度员发送邮件的速度会非常快,而且工作人员可能会累积旧值......我应该更正文本,但不能达到最初编写的目标。如果你不能删除任何消息,但必须先处理最新消息,那么应该像调度员一样使用堆栈而不是队列来实现工人。如果您可以删除非最新的消息,则直接在调度程序中工作(如果您使用博客文章中的实现,请确保“发布和接收”位于不同的线程中)。 –

+0

......我完全不理解你的用例。在我的文本中,可能没有可行的答案来解决您的问题,但是有些构建块和其他有用的链接。 –

2

听起来像是你可能需要邮箱处理器的破坏性的扫描版本,我在博客系列,你可能有兴趣在实现了这个与TPL数据流。

我的博客目前是停机维护,但我可以点你以降价格式发布信息。

Part1
Part2
Part3

您还可以检查出github

我也在我潜伏的恐怖post

希望帮助与扫描写了关于这些问题的代码.. 。

+0

嗨。这是一个非常有趣的阅读。我会通过材料。顺便说一句,我有一个格式化的{%codeblock lang:fsharp%}'和其他标签的问题。有没有一个技巧来获得正确的布局?非常感谢。 – NoIdeaHowToFixThis

+0

其格式化为液体插件,github只是用标准标记处理器显示它。我会把他们转移到我目前的博客上,但还没有时间去做。你可以找到Wayback机器上也部分1和2: http://web.archive.org/web/20131126185321/http://moiraesoftware.com/blog/2012/01/22/FSharp-Dataflow-agents- I/ http://web.archive.org/web/20131127003757/http://moiraesoftware.com/blog/2012/01/30/FSharp-Dataflow-agents-II/ – 7sharp9

+0

@ 7sharp9感谢您的直接可读链接!昨天我尝试了Google缓存:)我在与Zach Bray文章相同的测试中测试了DataFlowAgent,DFA比我的机器上的标准MBP慢了4倍。它在DataflowBlock.ReceiveAsync/DataflowBlock.Post上花费了72%,它们在System.Threading.Tasks.Dataflow中,这正好解释了速度下降4倍。 STTD速度慢还是什么?这里的片段:http://fssnip.net/lt –

1

最简单的解决方案是,当一个人到达时,贪婪地吃掉收件箱中的所有邮件,并丢弃除最近的邮件外的所有邮件。使用TryReceive容易实现:

let rec readLatestLoop oldMsg = 
    async { let! newMsg = inbox.TryReceive 0 
      match newMsg with 
      | None -> oldMsg 
      | Some newMsg -> return! readLatestLoop newMsg } 
let readLatest() = 
    async { let! msg = inbox.Receive() 
      return! readLatestLoop msg } 

当面对我架构更加完善和高效的解决方案,我叫撤销流和F#日刊文here中描述的同样的问题。这个想法是开始处理消息,然后取消处理,如果他们被取代。如果正在进行重要的处理,这会显着提高并发性。