我流的流动,其目标是计算简单“校验”的内容在一组的.zip文件。Observable.Using和异步流越来越损坏的数据
要做到这一点,我已经设置了观察到的是:
- 发生在一个给定的文件夹
- 读取每个文件的内容中的所有文件(阅读为
ZipArchive
) - 每个条目在每个文件中,执行校验和
为了说明它的计算中,我已创建这个前充足:
通知AsyncContext.Run
(https://stackoverflow.com/a/9212343/1025407)的使用,使Main
方法等待GetChecksum
因为它是一个控制台应用程序
namespace DisposePoC
{
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Reactive.Linq;
using Nito.AsyncEx;
using System.Linq;
using System.Threading.Tasks;
class Program
{
private static void Main()
{
AsyncContext.Run(GetChecksums);
}
private static async Task<IList<byte>> GetChecksums()
{
var bytes = Directory.EnumerateFiles("FolderWithZips")
.ToObservable()
.SelectMany(path => Observable.Using(() => CreateZipArchive(path), archive => archive.Entries.ToObservable()))
.SelectMany(entry => Observable.Using(entry.Open, stream => Observable.FromAsync(() => CalculateChecksum(stream, entry.Length))));
return await bytes.ToList();
}
private static ZipArchive CreateZipArchive(string path)
{
return new ZipArchive(new FileStream(path, FileMode.Open, FileAccess.Read));
}
private static async Task<byte> CalculateChecksum(Stream stream, long entryLength)
{
var bytes = await GetBytesFromStream(stream, entryLength);
return bytes.Aggregate((b1, b2) => (byte) (b1^b2));
}
private static async Task<byte[]> GetBytesFromStream(Stream stream, long entryLength)
{
byte[] bytes = new byte[entryLength];
await stream.ReadAsync(bytes, 0, (int)entryLength);
return bytes;
}
}
}
运行应用程序,我得到的各种错误:
'System.IO.InvalidDataException':本地文件头损坏。 'System.NotSupportedException':Stream不支持读取。 'System.ObjectDisposedException':无法访问处置的对象。 'System.IO.InvalidDataException':块长度与其补码不匹配。
我在做什么错?
观察本身是否存在问题,还是因为ZipArchive
不是线程安全的?如果不是,我如何使代码工作?
我会对此做一个评论,因为我目前无法验证代码内容,但我怀疑问题是在第一个SelectMany中创建的ZipArchive正在被Using语句处置然后才能阅读下一行中的入口流 - 实质上,一次性范围确定是错误的。我会将第二个SelectMany的逻辑移到第一个。我还会验证您的测试数据没有被破坏,正如第一个例外所示。 – Andrew
我想我明白你的观点。但是,如果范围是错误的,那么我如何修改代码以避免处理每个ZipArchive,直到处理完所有条目?它甚至有可能吗? – SuperJMN