2016-11-25 38 views
0

我流的流动,其目标是计算简单“校验”的内容在一组的.zip文件。Observable.Using和异步流越来越损坏的数据

要做到这一点,我已经设置了观察到的是:

  1. 发生在一个给定的文件夹
  2. 读取每个文件的内容中的所有文件(阅读为ZipArchive
  3. 每个条目在每个文件中,执行校验和

为了说明它的计算中,我已创建这个前充足:

通知AsyncContext.Runhttps://stackoverflow.com/a/9212343/1025407)的使用,使Main方法等待GetChecksum因为它是一个控制台应用程序

namespace DisposePoC 
{ 
    using System.Collections.Generic; 
    using System.IO; 
    using System.IO.Compression; 
    using System.Reactive.Linq; 
    using Nito.AsyncEx; 
    using System.Linq; 
    using System.Threading.Tasks; 


    class Program 
    { 
     private static void Main() 
     { 
      AsyncContext.Run(GetChecksums); 
     } 

     private static async Task<IList<byte>> GetChecksums() 
     { 
      var bytes = Directory.EnumerateFiles("FolderWithZips") 
       .ToObservable() 
       .SelectMany(path => Observable.Using(() => CreateZipArchive(path), archive => archive.Entries.ToObservable())) 
       .SelectMany(entry => Observable.Using(entry.Open, stream => Observable.FromAsync(() => CalculateChecksum(stream, entry.Length)))); 

      return await bytes.ToList(); 
     } 

     private static ZipArchive CreateZipArchive(string path) 
     { 
      return new ZipArchive(new FileStream(path, FileMode.Open, FileAccess.Read)); 
     } 

     private static async Task<byte> CalculateChecksum(Stream stream, long entryLength) 
     { 
      var bytes = await GetBytesFromStream(stream, entryLength); 
      return bytes.Aggregate((b1, b2) => (byte) (b1^b2)); 
     } 

     private static async Task<byte[]> GetBytesFromStream(Stream stream, long entryLength) 
     { 
      byte[] bytes = new byte[entryLength]; 
      await stream.ReadAsync(bytes, 0, (int)entryLength); 
      return bytes;    
     } 
    } 
} 

运行应用程序,我得到的各种错误:

'System.IO.InvalidDataException':本地文件头损坏。 'System.NotSupportedException':Stream不支持读取。 'System.ObjectDisposedException':无法访问处置的对象。 'System.IO.InvalidDataException':块长度与其补码不匹配。

我在做什么错?

观察本身是否存在问题,还是因为ZipArchive不是线程安全的?如果不是,我如何使代码工作?

+1

我会对此做一个评论,因为我目前无法验证代码内容,但我怀疑问题是在第一个SelectMany中创建的ZipArchive正在被Using语句处置然后才能阅读下一行中的入口流 - 实质上,一次性范围确定是错误的。我会将第二个SelectMany的逻辑移到第一个。我还会验证您的测试数据没有被破坏,正如第一个例外所示。 – Andrew

+0

我想我明白你的观点。但是,如果范围是错误的,那么我如何修改代码以避免处理每个ZipArchive,直到处理完所有条目?它甚至有可能吗? – SuperJMN

回答

1

似乎没有关于您的问题的“Rx”。

如果国防部整个事情势在必行一套循环的正常工作

private static async Task<IList<byte>> GetChecksums() 
{ 
    var bytes = new List<byte>(); 
    foreach (var path in Directory.EnumerateFiles("FolderWithZips")) 
    { 
     using (var archive = CreateZipArchive(path)) 
     { 
      foreach (var entry in archive.Entries) 
      { 
       using (var stream = entry.Open()) 
       { 
        var checksum = await CalculateChecksum(stream, entry.Length); 
        bytes.Add(checksum); 
       } 
      } 
     } 
    } 

    return bytes; 
} 

所以我会想象你有一组的比赛条件(并行)和/或乱序处理的问题。

+0

我认为Observable.Using会以正确的顺序处理流的处理,所以我不会得到ObjectDisposedExceptions。我是否错误地使用了它,或者它与问题的本质有着内在联系? (同时从ZipArchive中读取) – SuperJMN

+1

Observable.Using将在序列终止时处理由提供的工厂创建的资源(处置/错误/完成)。但这一切都是学术性的,因为你迫使Rx成为一个没有反应能力的问题。这是我看到的主要问题是外围问题是通过不向2(不必要的)ToObservable()调用提供IScheduler而引入线程化问题 –

2

Rx可能不是最适合这个的。说实话,你甚至可以做到没有异步。

Directory.EnumerateFiles("FolderWithZips") 
     .AsParallel() 
     .Select(folder => CalculateChecksum(folder)) 
     .ToList() 
+0

呃,CalculateChecksum只是一个简化问题的例子。在我的现实生活中,这是一个异步方法,我无法修改(第三方)。它如何修改你的方法? (异步) – SuperJMN