2013-06-25 24 views
0

我正在使用原始TCP套接字与中央服务器进行通信的客户端应用程序。应用程序消息被序列化,然后以长度为前缀创建传递到TCP流中的帧。使用基于任务的异步模式(TAP)的高效套接字I/O

处理此问题的一个经典方法是直接调用socket类上的Receive或BeginReceive,将回调中的消息反序列化,将消息传递到另一个线程处理的单独队列中,然后让回调开始另一个接收再次插座。

这种方法的天真实现对我来说并不理想 - 它紧密地将消息序列化和反序列化耦合到套接字,并需要相当多的“管道”才能让队列在不同的线程/回调中良好地发挥作用。它也是一种漏洞抽象 - 它需要调用代码掌握底层套接字的知识,而不是输入和输出消息的“数据流”。

鉴于我完全在.NET 4.5中工作,使用TPL(TaskFactory.FromAsync)封装Socket的Begin和End异步方法是一个明显的选择。但是,我不清楚如何从这一点出发,原因有很多:

  1. 我需要一个从未完成接收数据的异步“任务”。只要套接字已连接,我想要处理一个消息流。任何中断(断开连接,套接字错误或取消请求)都是例外情况,而不是传统的任务完成。根据Stephen Toub(http://blogs.msdn.com/b/pfxteam/archive/2011/10/02/10218999.aspx),我应该总是完成我的任务。这造成了一些问题 - 套接字接收在传统意义上从未完成。 Stephen似乎在他的“等待套接字操作”的帖子里稍微有点矛盾,他在那里显示了一个没有套接字错误永不完成的套接字读取(http://blogs.msdn.com/b/pfxteam/archive/2011/12/15/10248293.aspx)。
  2. 我需要一种同步“排队”数据发送的方法。呼叫者应该能够发送消息而不被阻塞,并且消息应该通过套接字顺序传输。换句话说,由于消息成帧,一次只有一个发送给套接字本身。 TPL数据流是否合适,或者我应该使用不同的排队模式?
  3. 我想在消息序列化和消息传输之间清晰分离关注点。

我还没有看到这种策略类型的很多例子,只有“直接”的套接字I/O或微不足道的实现。我的直觉告诉我,TPL Dataflow非常适合,因为序列化和反序列化可以流水线化。

我不清楚如何在TPL Dataflow或类似的东西之间架设一个有效的无限接收任务链。

任何想法?

回答

2

在这个时候,没有好的方法的例子。一旦你找到了解决方案,我建议你将它发布到某个地方。

1.我需要一个异步的“任务”,从来没有完成接收数据...

我不赞同该表述“一个套接字接收传统意义上永远不会完成”。较低级别的“接收”操作(例如,,围绕BeginReceive/EndReceive或Toub的ReceiveAsync一个FromAsync包装)返回一个Task当有可用的数据,将完成,关闭套接字,或者存在错误。当套接字关闭或出现错误时,更高级别的“接收器”操作(例如,ReadAsyncfrom Toub's post)将完成。

可以Task s表示可以采取无限期地完成,但只要他们完成这很好。 Toub在this post中指出的是,您的Tasks应该最终完成(特别是处理错误情况)。这与Rx的方法不同,在这种方法中,有一个永远不会产生数据且永不结束的观察值是完全有效的。

2.我需要的同步方法“排队”待发送数据...

从技术上说,它的“序列化”(“按顺序”),而不是“同步”。理想的解决方案将是异步串行化。这有几种方法。

我会说TPL Dataflow是一个很好的选择。请注意,每个套接字实际上有两个逻辑“流”(读取和写入是独立的)。我已经在基于数据流的套接字封装上取得了一些成功,但还没有足够的时间来提高产品质量。由于两个流(我的每个可插拔“块”必须有两个TPL数据流块,一个用于输入,一个用于输出),API非常尴尬。

另一种方法是Rx。 Rx的学习曲线比普通Task s或TPL Dataflow高,但提供了相当多的功率(和效率)。几年前,我使用基于Rx的套接字玩过,但从来没有任何工作。 Rx的文档和示例现在好多了,所以我认为它现在是一个可行的选择。

还有干脆直接使用async方法的方法。您将使用同步而不是队列。例如,你可以使用SemaphoreSlim来确保只有一个SendAsync方法可以一次运行给定的套接字。然而,这确实会改变语义,将一些“序列化”推到你的调用代码上:而不是一个简单的入队和完成任务(除非你打你的发送节流,否则它总是同步完成)一个异步等待发送然后完成任务(几乎总是异步等待)。您可以通过构建一个async生产者/消费者队列(like the one I wrote)来减轻这一负担,但您需要跟踪一个单独的消费者Task,并且此时您只需重写TPL数据流。

我不清楚如何将一个有效的“无尽的”接收任务链“桥接”到TPL数据流或类似的东西。

没有一个好的内置的方式做到这一点。

一个简单的解决方案是有一个“接收器” Task即推送数据到TPL数据流管道负责。然而,如果发生错误,您必须监视Task以确保管道不会被放弃,并且您需要一种方法将其彻底关闭。

我编写了一个FuncBlock type来处理这种情况(将其用于套接字读取和其他基于I/O的数据流输入)。花费了一段时间才解决了方法和数据流块如何相互作用的所有语义(特别是关于取消/错误/完成),但我认为这会很有用。我会很感激任何反馈。

+0

神奇的信息。你觉得在API设计方面,“外部”(不确定)接收方法返回标记为“长时间运行”的任务并且永远不会转换到完成状态是可以接受的吗? (即/任务最终总是以已取消或故障结束,从未完成 - 套接字关闭是一种故障形式)。另一种选择是向外部类公开相同的“单个接收每个消息”语义,并构建一个适配器,以某种方式将其连接到TPL数据流缓冲区。适配器将管理接收的“生命周期”。 – ShadowChaser

+0

只要“任务”完成,没关系(它不必有成功完成*的机会)。 'LongRunning'不适用于'async'任务;别担心。在某些时候,你需要一个适配器(比如'FuncBlock'),你可以把它移动到尽可能接近'Socket'的位置,并且使用Dataflow模块来处理其他事情。 –