2014-04-01 112 views
4

我想在IObservable<T>上订阅并取消订阅(dipose)收到T类型的第一个元素之后的订阅权限,即我只想在获得的第一个元素后调用操作订阅。订阅并立即取消订阅第一个动作

这是我想出了办法:

public static class Extensions 
{ 
    public static void SubscribeOnce<T>(this IObservable<T> observable, Action<T> action) 
    { 
     IDisposable subscription = null; 
     subscription = observable.Subscribe(t => 
     { 
      action(t); 
      subscription.Dispose(); 
     }); 
    } 
} 

用法示例:

public class Program 
{ 
    public static void Main() 
    { 
     var subject = new Subject<int>(); 

     subject.OnNext(0); 
     subject.OnNext(1); 
     subject.SubscribeOnce(i => Console.WriteLine(i)); 
     subject.OnNext(2); 
     subject.OnNext(3); 

     Console.ReadLine(); 
    } 
} 

它将按预期只打印2这个或其他方面有什么不对吗?有没有一种更清洁的方式来使用RX开箱即用的扩展方法?

回答

13
var source = new Subject(); 

source 
    .Take(1) 
    .Subscribe(Console.WriteLine); 

source.OnNext(5); 
source.OnNext(6); 
source.OnError(new Exception("oops!")); 
source.OnNext(7); 
source.OnNext(8); 

// Output is "5". Subscription is auto-disposed. Error is ignored. 

Take自动部署的nth元件的产率后的订阅。 :)

至于其他要考虑的事情,对于您的自定义observable,您应该注意,您可能还想要将OnError和OnCompleted通知传递给您的观察者,Take也为您处理。

内置的运营商也有其他好处,例如更好的Dispose处理。

+0

哇......不知道为什么我忘记了“Take”! (和'Skip'等等)感谢您的回应! –