2012-06-12 54 views
4

我正在尝试使用TPL数据流,通过移植一些旧的套接字代码来使用TPL数据流和新的异步功能。尽管API感觉很稳定,但我的代码仍然感觉凌乱。我想知道我是否在这里错过了一些东西。TPL数据流使用异步操作

我的要求如下:一个套接字类公开:打开,关闭,发送和接收方法。所有返回一个任务,因此是异步的。打开和关闭是原子的。发送和接收可以彼此相邻,尽管两者一次只能处理1条命令。

逻辑上,这使我对内部控制的下一段代码:

// exposing an exclusive scheduler for connectivity related tasks and a parallel scheduler where send and receive can work with 
private readonly ConcurrentExclusiveSchedulerPair exclusiveConnectionSchedulerPair; 
private readonly ActionBlock<Action> connectionBlock; 
private readonly ActionBlock<Action> sendBlock; 
private readonly ActionBlock<Action> receiveBlock; 

// within the constructor: 
this.exclusiveConnectionSchedulerPair = new ConcurrentExclusiveSchedulerPair(); 
this.connectionBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions() { TaskScheduler = exclusiveConnectionSchedulerPair.ExclusiveScheduler }); 
this.sendBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions() { TaskScheduler = exclusiveConnectionSchedulerPair.ConcurrentScheduler }); 
this.receiveBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions() { TaskScheduler = exclusiveConnectionSchedulerPair.ConcurrentScheduler }); 

到目前为止都很好。我可以安全地将操作发送到发送和接收模块,而无需担心与此同时运行的连接相关操作。另外ActionBlock确保多个发送调用是同步的(同上接收,关闭和打开)。

问题是,没有简单的方法将任务传回给海报。现在我正在使用TaskCompletionSource传递结果。喜欢:

public Task Send(ArraySegment<byte> buffer, CancellationToken cancellationToken) 
{ 
    TaskCompletionSource<object> resultCompletionSource = new TaskCompletionSource<object>(); 

    sendBlock.Post(async() => 
    { 
     if (!tcpClient.Connected) 
      throw new InvalidOperationException("Cant send when not open"); 
     else 
     { 
      await sendStream.WriteAsync(buffer.Array, buffer.Offset, buffer.Count, cancellationToken); 
      resultCompletionSource.SetResult(null); 
     } 
    }); 

    return resultCompletionSource.Task; 
} 

这只是感觉丑陋和笨拙。我的问题是:有没有办法使用TPL同步工作流而不必使用TaskCompletionSource进行通信?

谢谢!

+1

我没有使用TPL数据流,所以这可能是遥远,但如果你需要的动作完成时就知道了,好像你不希望发布Post的'fire and forget',而应该使用SendAsync,以便让Task返回而不是? http://msdn.microsoft.com/en-us/library/hh194681(v=vs.110) –

+1

据我了解,当帖子被接受或明确拒绝时,SendAsync会返回一个任务。因为我使用无限制的动作块。帖子将始终被直接接受。 SendAsync不能帮助我的案例 – Polity

回答

4

首先,您根本不需要TPL Dataflow,因为您实际上没有任何数据流。

其次,使用TaskScheduler这样的s也不是正确的解决方案。 TaskScheduler的日程安排代码,但是当你有一些东西时,没有代码在运行。因此,虽然WriteAsync()正在执行其异步工作,但代码Open()可以运行。

你真正需要的是像ReaderWriterLock,但与async效果很好。框架中没有类似的东西,但是您可以使用Stephen Toub's article Building Async Coordination Primitives, Part 7: AsyncReaderWriterLock中的代码,它完全符合您的需求。文章还详细解释了为什么使用TaskScheduler是错误的。

使用AsyncReaderWriterLock,你的代码可能是这样的:

public async Task Send(ArraySegment<byte> buffer, CancellationToken cancellationToken) 
{ 
    using (await readerWriterLock.ReaderLockAsync()) 
    { 
     if (!tcpClient.Connected) 
      throw new InvalidOperationException("Can't send when not open"); 

     await sendStream.WriteAsync(buffer.Array, buffer.Offset, buffer.Count, cancellationToken); 
    } 
} 
+0

在阅读OP的问题时,我的第一个想法是链接到Toub的文章并给出一些背景知识。我犹豫不决,因为我不记得Toub写过关于这个问题的博客文章,白皮书等。然后我看到这个答案已经熟练地解释了它,并链接到了确切的文章!每隔一段时间,您都会阅读一个答案,希望您可以多次赞扬! – payo

+0

谢谢你的回答,尽管它还没有让我满意。打开和关闭应完全由发送和接收以及另一个来运行。尽管在使用其中的2个时异步rwlock会起作用,但这将是一个难以理解的解决方案。基于ConcurrentExclusiveSchedulerPair调度任务似乎是简单而易于同步的。我认为你错误的假设是Open应该能够在Send运行时运行,而不是这种情况。当打开被调用时,任何正在运行的发送/接收应该在实际打开之前完成。 – Polity

+0

@Polity我的观点是,如果你有'async'方法并且使用'ConcurrentExclusiveSchedulerPair',那么当'Send()'正在等待时,'Open()'将能够运行。但是如果你使用'AsyncReaderWriterLock',它不会。我认为这正是你想要的。 – svick