2011-11-08 16 views
3

我有TCP服务器,没有休息接收大数据。 而我需要将此流广播给许多客户端。如何将流复制到许多流异步C#.NET

更新: 我需要广播视频流。也许有现成的解决方案?

+0

请提供更多关于您尝试过的代码以及哪些未运行的详细信息。 –

+0

是否需要使用TCP进行广播?你的需求通常是通过UDP实现的...... – Yahia

+0

我尝试广播的数据 - 它是流。 –

回答

2

如果您希望异步做到这一点,那么你可以采取System.Threading.Tasks namespace的优势。

首先,你需要地图的Stream实例的给Task可以完成被伺候:

IDictionary<Stream, Task> streamToTaskMap = outputStreams. 
    ToDictionary(s => s, Task.Factory.StartNew(() => { }); 

有在上面轻微的开销,因为有一个浪费Task实例什么都不做,但是由于您需要执行的Task实例和延续的数量,该价格很小。

从那里,你会从流读取内容,然后异步地写出来给每个Stream实例:

byte[] buffer = new byte[<buffer size>]; 
int read = 0; 

while ((read = inputStream.Read(buffer, 0, buffer.Length)) > 0) 
{ 
    // The buffer to copy into. 
    byte[] copy = new byte[read]; 

    // Perform the copy. 
    Array.Copy(buffer, copy, read); 

    // Cycle through the map, and replace the task with a continuation 
    // on the task. 
    foreach (Stream stream in streamToTaskMap.Keys) 
    { 
     // Continue. 
     streaToTaskMap[stream] = streaToTaskMap[stream].ContinueWith(t => { 
      // Write the bytes from the copy. 
      stream.Write(copy, 0, copy.Length); 
     }); 
    } 
} 

而在最后,你可以等待所有写入流致电:

Task.WaitAll(streamToTaskMap.Values.ToArray()); 

有几件事要注意。

首先,需要buffer的副本,因为lambda传递给ContinueWith; lambda是一个将封装buffer的闭包,并且由于它是异步处理的,所以内容可能会改变。每个连续性都需要自己的缓冲区副本来读取。

这也是为什么拨打Stream.Write使用Array.Length属性;否则,read变量将不得不通过循环的每次迭代进行复制。

此外,能够利用Stream类中的BeginWrite/EndWrite方法更为理想;因为没有ContinueWithAsync方法,它将采用Task并继续使用异步方法,调用read的异步版本没有任何好处。

这是其中一种情况,它可能更好地调用BeginWrite/EndWrite自己(以及BeginRead/EndRead)以充分利用异步操作;当然,这会稍微复杂一点,因为你不会封装Task提供的操作结果,如果你使用匿名方法/闭包,你将不得不采取与buffer相同的预防措施。

1

菌种的料流传递给它,以及该流的线程写入

byte[] buffer = new byte[BUFFER_SIZE]; 
int btsRead = 0; 

while ((btsRead = inputStream.Read(buffer, 0, BUFFER_SIZE)) > 0) 
{ 
    foreach (Stream oStream in outputStreams) 
     oStream.Write(buffer, 0, btsRead); 
} 

编辑:为了并行写操作:

Parallel.ForEach(outputStreams, oStream => 
{ 
    oStream.Write(buffer, 0, btsRead); 
}); 

与取代的foreach块

+0

“foreach”?这是好的解决方案吗?而如果我将拥有大约1000个客户呢? –

+0

foreach与for循环相同。我刚才介绍的代码应该在服务器收到上传流时由后台线程运行。您可以尝试其他技术,例如对客户端流执行异步写入以进一步分摊成本,但我所介绍的是一个相对简单的解决方案,应该可以工作。 – Mranz

+0

好的。谢谢。我会尝试。 –