3
我的要求:如何创建调用方法的Observable Timer并在方法正在运行时阻止取消,直到完成?
- 运行方法DoWork在指定的时间间隔。
- 如果在DoWork的呼叫之间呼叫停止,则只需停止计时器。
- 如果在DoWork正在运行时调用stop,则阻塞直到DoWork完成。
- 如果DoWork在调用停止后超时完成,超时。
我有一个解决方案,似乎工作到目前为止,但我不是超级满意,并认为我可能会失去一些东西。下面是从我的测试应用程序的无效的主要:
var source = new CancellationTokenSource();
// Create an observable sequence for the Cancel event.
var cancelObservable = Observable.Create<Int64>(o =>
{
source.Token.Register(() =>
{
Console.WriteLine("Start on canceled handler.");
o.OnNext(1);
Console.WriteLine("End on canceled handler.");
});
return Disposable.Empty;
});
var observable =
// Create observable timer.
Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), Scheduler.Default)
// Merge with the cancel observable so we have a composite that
// generates an event every 10 seconds AND immediately when a cancel is requested.
.Merge(cancelObservable)
// This is what I ended up doing instead of disposing the timer so that I could wait
// for the sequence to finish, including DoWork.
.TakeWhile(i => !source.IsCancellationRequested)
// I could put this in an observer, but this way exceptions could be caught and handled
// or the results of the work could be fed to a subscriber.
.Do(l =>
{
Console.WriteLine("Start DoWork.");
Thread.Sleep(TimeSpan.FromSeconds(5));
Console.WriteLine("Finish DoWork.");
});
var published = observable.Publish();
var disposable = published.Connect();
// Press key between Start DoWork and Finish DoWork to test the cancellation while
// running DoWork.
// Press key between Finish DoWork and Start DoWork to test cancellation between
// events.
Console.ReadKey();
// I doubt this is good practice, but I was finding that o.OnNext was blocking
// inside of register, and the timeout wouldn't work if I blocked here before
// I set it up.
Task.Factory.StartNew(source.Cancel);
// Is there a preferred way to block until a sequence is finished? My experience
// is there's a timing issue if Cancel finishes fast enough the sequence may already
// be finished by the time I get here and .Wait() complains that the sequence contains
// no elements.
published.Timeout(TimeSpan.FromSeconds(1))
.ForEach(i => { });
disposable.Dispose();
Console.WriteLine("All finished! Press any key to continue.");
Console.ReadKey();
感谢布兰登!我不知道为什么使用TakeUntil打断计时器的概念没有出现在我身上,这完全有道理。使用Amb听两个可观察对象也很酷。您的示例与AsyncSubject一起工作良好,但是当我将Thread.Sleep添加到DoWork以模拟工作时,CancellationToken示例在超时时未按预期方式运行,则直到DoWork完成后才会调用取消阻止。当我将TakeUntil(cancelSignal)更改为TakeUntil(cancelSignal.ObserveOn(Scheduler.Default))时,一切都很顺利。我将相应地编辑您的示例,如果错误请撤消。 – MichaC
而不是在你的'TakeUntil'调用中嵌入'ObserveOn',通过修改'ToObservable'帮助器方法来接受一个'IScheduler'来保持关注点分离和更多的'Rx-y'令牌被取消。我已经修改了答案。 – Brandon