我假设你想要的以下行为:
- 初始事件发生后,缓冲所有事件在接下来的10秒。
- 一旦这个10秒的窗口关闭,下一个应该在10秒后为所有事件触发一个新的10秒缓冲区。
所以我们假设我们有5个事件在5秒内均匀分布,13秒间隔,然后在5秒内均匀分布另外5个事件。大理石图是这样的:
timeline: 0--1--2--3--4--5--6--7--8--9-10-11-12-13-14-15-16-17-18-19-20-21-22-23-24-25-26-27
events : x--x--x--x--x-------------------------------------x--x--x--x--x------------------
stdbuff : |----------------------------|-----------------------------|---------------------
desired : BeginCapture-----------------Return---------------BeginCapture------------------Return
使用直线前进Buffer
的是,它看起来像上面谱写的stdbuff
,而分手事件的第二组分为两组,造成两人列出了问题第二组活动:一个有三个活动,一个有两个活动。您需要一个列表(针对第二组),使用类似desired
流的逻辑。开始在0捕获,返回列表在10.开始在17捕获,在27返回列表。
如果我误解了你(再次),请张贴大理石图表,类似于上面,代表你想要如何事情要工作。
假设我理解你正确,下面的代码将工作...
//var initialSource = Observable.FromEventPattern<FileSystemEventArgs>(fileWatcher, nameof(FileSystemWatcher.Created))
// .Merge(Observable.FromEventPattern<FileSystemEventArgs>(fileWatcher, nameof(FileSystemWatcher.Changed)));
//Comment this out, and use the above lines for your code. This just makes testing the Rx components much easier.
var initialSource = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5)
.Concat(Observable.Empty<long>().Delay(TimeSpan.FromSeconds(13)))
.Concat(Observable.Interval(TimeSpan.FromSeconds(1)).Take(5));
initialSource
.Publish(_source => _source
.Buffer(_source
.Scan(DateTimeOffset.MinValue, (lastPrimary, _) => DateTimeOffset.Now - lastPrimary > TimeSpan.FromSeconds(10) ? DateTimeOffset.Now : lastPrimary)
.DistinctUntilChanged()
.Delay(TimeSpan.FromSeconds(10))
)
)
.Subscribe(list =>
{
Debug.WriteLine($"Time-stamp: {DateTime.Now.ToLongTimeString()}");
Debug.WriteLine($"List Count: {list.Count}");
});
说明:
首先,我们需要确定 '主要事件',那些代表BeginCapture
上面的desired
流描述中的注释。可以发现这样的:
var primaryEvents = initialSource
.Scan(DateTimeOffset.MinValue, (lastPrimary, _) => DateTimeOffset.Now - lastPrimary > TimeSpan.FromSeconds(10) ? DateTimeOffset.Now : lastPrimary)
.DistinctUntilChanged();
一旦我们有了BeginCapture
事件,它可以表示一个窗口开放,这是很容易找到Return
事件,或闭窗:
var closeEvents = primaryEvents.Delay(TimeSpan.FromSeconds(10));
在实践中,因为没有密切和开放的,我们关心之间发生的,我们只需要担心关闭事件,所以我们可以把它收缩到这一点:
var closeEvents = initialSource
.Scan(DateTimeOffset.MinValue, (lastPrimary, _) => DateTimeOffset.Now - lastPrimary > TimeSpan.FromSeconds(10) ? DateTimeOffset.Now : lastPrimary)
.DistinctUntilChanged()
.Delay(TimeSpan.FromSeconds(10));
把它插入到Buffer
与closeEvents
作为bufferBoundaries
:
var bufferredLists = initialSource
.Buffer(initialsource
.Scan(DateTimeOffset.MinValue, (lastPrimary, _) => DateTimeOffset.Now - lastPrimary > TimeSpan.FromSeconds(10) ? DateTimeOffset.Now : lastPrimary)
.DistinctUntilChanged()
.Delay(TimeSpan.FromSeconds(10))
);
最后,因为我们有多个订阅initialSource
,我们需要使用Publish
确保并发正常工作,导致最终的答案。
你目前的代码有什么问题?你为什么不使用'list'?或者你想单独处理列表中的每个项目? –
你只想让它触发一次或什么?这听起来像你的解决方案是适当的。 – Shlomo
在旁注中:您应该保留对订阅的引用,并在您不再对这些事件感兴趣时进行处置。如果您不处理订阅并且超出范围,那么它可能仍然对该事件保持强烈的引用(并且可能导致泄漏)。 –