2013-12-21 21 views
3

我的要求:如何创建调用方法的Observable Timer并在方法正在运行时阻止取消,直到完成?

  1. 运行方法DoWork在指定的时间间隔。
  2. 如果在DoWork的呼叫之间呼叫停止,则只需停止计时器。
  3. 如果在DoWork正在运行时调用stop,则阻塞直到DoWork完成。
  4. 如果DoWork在调用停止后超时完成,超时。

我有一个解决方案,似乎工作到目前为止,但我不是超级满意,并认为我可能会失去一些东西。下面是从我的测试应用程序的无效的主要:

var source = new CancellationTokenSource(); 

// Create an observable sequence for the Cancel event. 
var cancelObservable = Observable.Create<Int64>(o => 
{ 
    source.Token.Register(() => 
    { 
     Console.WriteLine("Start on canceled handler."); 
     o.OnNext(1); 
     Console.WriteLine("End on canceled handler."); 
    }); 

    return Disposable.Empty; 
}); 

var observable = 
    // Create observable timer. 
    Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), Scheduler.Default) 
     // Merge with the cancel observable so we have a composite that 
     // generates an event every 10 seconds AND immediately when a cancel is requested. 
     .Merge(cancelObservable) 
     // This is what I ended up doing instead of disposing the timer so that I could wait 
     // for the sequence to finish, including DoWork. 
     .TakeWhile(i => !source.IsCancellationRequested) 
     // I could put this in an observer, but this way exceptions could be caught and handled 
     // or the results of the work could be fed to a subscriber. 
     .Do(l => 
     { 
      Console.WriteLine("Start DoWork."); 
      Thread.Sleep(TimeSpan.FromSeconds(5)); 
      Console.WriteLine("Finish DoWork."); 
     }); 

var published = observable.Publish(); 

var disposable = published.Connect(); 

// Press key between Start DoWork and Finish DoWork to test the cancellation while 
// running DoWork. 
// Press key between Finish DoWork and Start DoWork to test cancellation between 
// events. 
Console.ReadKey(); 

// I doubt this is good practice, but I was finding that o.OnNext was blocking 
// inside of register, and the timeout wouldn't work if I blocked here before 
// I set it up. 
Task.Factory.StartNew(source.Cancel); 

// Is there a preferred way to block until a sequence is finished? My experience 
// is there's a timing issue if Cancel finishes fast enough the sequence may already 
// be finished by the time I get here and .Wait() complains that the sequence contains 
// no elements. 
published.Timeout(TimeSpan.FromSeconds(1)) 
    .ForEach(i => { }); 

disposable.Dispose(); 

Console.WriteLine("All finished! Press any key to continue."); 
Console.ReadKey(); 

回答

4

首先,在你cancelObservable,确保并返回Token.Register结果作为您的可支配而不是返回Disposable.Empty

下面是转向CancellationTokens到可观了良好的扩展方法:

public static IObservable<Unit> AsObservable(this CancellationToken token, IScheduler scheduler) 
{ 
    return Observable.Create<Unit>(observer => 
    { 
     var d1 = new SingleAssignmentDisposable(); 
     return new CompositeDisposable(d1, token.Register(() => 
      { 
       d1.Disposable = scheduler.Schedule(() => 
       { 
        observer.OnNext(Unit.Default); 
        observer.OnCompleted(); 
       }); 
      })); 
    }); 
} 

现在,您的实际要求:

public IObservable<Unit> ScheduleWork(IObservable<Unit> cancelSignal) 
{ 
    // Performs work on an interval 
    // stops the timer (but finishes any work in progress) when the cancelSignal is received 
    var workTimer = Observable 
     .Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10)) 
     .TakeUntil(cancelSignal) 
     .Select(_ => 
     { 
      DoWork(); 
      return Unit.Default; 
     }) 
     .IgnoreElements(); 

    // starts a timer after cancellation that will eventually throw a timeout exception. 
    var timeoutAfterCancelSignal = cancelSignal 
     .SelectMany(c => Observable.Never<Unit>().Timeout(TimeSpan.FromSeconds(5))); 

    // Use Amb to listen to both the workTimer 
    // and the timeoutAfterCancelSignal 
    // Since neither produce any data we are really just 
    // listening to see which will complete first. 
    // if the workTimer completes before the timeout 
    // then Amb will complete without error. 
    // However if the timeout expires first, then Amb 
    // will produce an error 
    return Observable.Amb(workTimer, timeoutAfterCancelSignal); 
} 

// Usage 
var cts = new CancellationTokenSource(); 
var s = ScheduleWork(cts.Token.AsObservable(Scheduler.Default)); 

using (var finishedSignal = new ManualResetSlim()) 
{ 
    s.Finally(finishedSignal.Set).Subscribe(
     _ => { /* will never be called */}, 
     error => { /* handle error */ }, 
     () => { /* canceled without error */ }); 

    Console.ReadKey(); 
    cts.Cancel(); 

    finishedSignal.Wait(); 
} 

注意,而不是取消标记,你也可以这样做:

var cancelSignal = new AsyncSubject<Unit>(); 
var s = ScheduleWork(cancelSignal); 

// .. to cancel .. 
Console.ReadKey(); 
cancelSignal.OnNext(Unit.Default); 
cancelSignal.OnCompleted(); 
+0

感谢布兰登!我不知道为什么使用TakeUntil打断计时器的概念没有出现在我身上,这完全有道理。使用Amb听两个可观察对象也很酷。您的示例与AsyncSubject一起工作良好,但是当我将Thread.Sleep添加到DoWork以模拟工作时,CancellationToken示例在超时时未按预期方式运行,则直到DoWork完成后才会调用取消阻止。当我将TakeUntil(cancelSignal)更改为TakeUntil(cancelSignal.ObserveOn(Scheduler.Default))时,一切都很顺利。我将相应地编辑您的示例,如果错误请撤消。 – MichaC

+1

而不是在你的'TakeUntil'调用中嵌入'ObserveOn',通过修改'ToObservable'帮助器方法来接受一个'IScheduler'来保持关注点分离和更多的'Rx-y'令牌被取消。我已经修改了答案。 – Brandon

相关问题