1

我正在使用非常特定的使用TCP和SSL流的Web API。与API的连接是持久的(不应在每次写入/读取后关闭),因为此连接用于接收来自服务器的事件通知。创建异步方法并从另一种方法设置任务结果

所以,我实现了一个循环的阅读方法:

private void Listen() 
{ 
    int length; 
    do 
    { 
     var buffer = new byte[TcpClient.ReceiveBufferSize]; 
     length = SslStream.Read(buffer, 0, TcpClient.ReceiveBufferSize); 

     if (length > 0) 
     { 
      // Process received data 
      Listen(); 
     } 
    } while (length > 0); 

    //reconnect 
} 

现在我需要调用一些API方法。我希望他们支持TPL(异步和等待)。问题是这些方法的异步部分实际上是在上面的阅读方法(Listen)中实现的。例如:

public async Task<bool> Authenticate(string clientId, string clientSecret) 
{ 
    await SendAuthMessageOverNetwork(clientId, clientSecret); 
    return await ReceiveAuthResponseFromServer(); 
    // The problem is that there is no ReceiveAuthResponseFromServer method - 
    // the response is received in reading thread (Listen). 
} 

我对TPL还不是很熟悉。我使用任务实际上并没有做任何事情,但是从读线程等待一个信号(的AutoResetEvent)解决了这个问题:

private AutoResetEvent _loginEvent; 
private bool _loginResult; 

public async Task<bool> Authenticate(string clientId, string clientSecret) 
{ 
    await SendAuthMessageOverNetwork(clientId, clientSecret); 
    return await Task<bool>.Factory.StartNew(() => { _loginEvent.WaitOne(); return _loginResult; }); 
} 

private void Listen() 
{ 
    ... 
    if (msg.Type == MessageTypes.AuthenticationResponse) 
    { 
     _loginResult = true; 
     _loginEvent.Set(); 
    } 
    ... 
} 

但我不太喜欢这种解决方案。也许有更简单的方法来实现我想要的?这可以通过仅使用任务功能完成,无需AutoResetEvent和中间全局变量?

回答

3

你说得对,使用AutoResetEvent是做错的办法。你真正想要做的是:

  • 假设每SendAuthMessageOverNetwork由ReceiveAuthResponseFromServer匹配,它们组合成一个方法。
  • 在该方法中,发送请求并放置一个新TaskCompletionSource成可以被读取循环
  • 返回任务完成源的任务属性作为你的结果
  • 在读取循环可以看出一个队列,当你阅读消息时,从队列中出队下一完成源和使用taskSource.SetResult(responseFromServer)

因此,像这样:

private readonly Queue<ResponseMessageType> _responseQueue = new Queue<ResponseMessageType>(); 

public async Task<bool> Authenticate(string clientId, string clientSecret) { 
    var response = AsyncRequestAResponse(MakeAuthMessage(clientId, clientSecret)); 
    return (await response).Type == MessageTypes.AuthenticationResponse 
} 

public Task<bool> AsyncRequestAResponse(RequestMessageType request) { 
    var responseSource = new TaskCompletionSource<ResponseMessageType>(); 
    _responseQueue.Enqueue(responseSource); 
    Send(request); 
    return responseSource.Task 
} 

private void Listen() { 
    ... 
    if (_responseQueue.Count == 0) 
     throw new Exception("Erm, why are they responding before we requested anything?"); 
    _responseQueue.Dequeue().SetResult(msg); 
} 

在其它WOR ds,请使用TaskCompletionSource<T>将网络内部读/写的内容翻译成您公开给调用者的异步内容。

它可能不会像上面那样......我假设在示例中存在继承顺序和一对一响应/请求。您可能必须将请求ID与响应ID或类似内容匹配起来,并且超时会将例外而不是结果放入排队的任务等。

此外,如果响应可能与发送请求同时到达,那么在触摸队列的操作周围放置锁非常重要。

+0

谢谢!正是我需要的。 –

+3

@AkkseyShubin应该注意到这里有一个微妙的时刻。如果启动了'await Task'的线程是一个带默认同步上下文的非UI线程,调用'_responseQueue.Dequeue().SetResult(msg)'将会立即调用继续回调(在'await'后面的代码)线程所在的'Listen()'正在运行。因此,'Listen'将被阻塞,直到消费者端的代码流中有另一个'await'。这可能会导致死锁,具体取决于逻辑工作流程。更多信息:http://stackoverflow.com/q/19481964/1768303 – Noseratio

+1

@Noseratio正确。永远不要阻止任务,除非你了解TPL是如何工作的以及如何从上到下完成任务。 –