2013-01-02 77 views
2

我有两个IDisposables,我需要按顺序处置。订购是重要的,因为第一个IDisposable杀死了一个Rx订阅,该订阅依赖于将被第二个IDisposable杀死的服务。这是在一个Windows窗体应用程序中,IObservable的订阅需要在不同的线程上发生,但观察和处理需要在UI线程上发生。 (其实,我不,如果所述设置UI线程上发生的,只要顺序确保在意。)所以,在代码中,我大致有以下的(一次降低):确保顺序处置多个IDisposables

SomeService = new DisposableService(); 
Subscription = Foo(someService).SubscribeOn(NewThreadScheduler.Default).ObserveOn(theForm).Subscribe(...) 

在一些UI事件我需要按顺序处理这两个(订阅,然后SomeService)。要做到这一点我一直在使用的Rx的CompositeDisposable除了ContextDisposable提供相同的线程上串行配置之尝试:

_Disposable = new CompositeDisposable(new[] {      
    new ContextDisposable(WindowsFormsSynchronizationContext.Current, Subscription),      
    new ContextDisposable(WindowsFormsSynchronizationContext.Current, SomeService)}); 

以上不工作,但是。根据我的日志记录_DisposableContextDisposableSomeService在同一个线程上调用,但ContextDisposable仍然发生在与服务正在处理(从而导致竞争条件和NPE)的不同线程上。

我只是编程了几个星期的C#,所以我确信问题在于我对上下文和调度程序的工作方式的误解。这个问题的正确方法是什么?

+0

是否有一个原因,你不能只在同一个线程中处置两个?如果同步dispose方法和所有其他方法,则可以自己控制顺序。如果你必须从一个线程中使用并在另一个线程中处理,那么你需要让所有的使用者线程安全,并在你做任何事情之前检查它是否已被处置。 – devshorts

回答

0

除非我误解了某些东西,否则可以控制哪个线程处理什么。谁在哪个线程上订阅并不重要。看下面这个例子

internal class Program 
{ 
    private static void Main(string[] args) 
    { 
     ReactiveTest rx1 = null; 
     ReactiveTest rx2 = null; 

     var thread1 = new Thread(() => rx1 = new ReactiveTest()); 
     var thread2 = new Thread(() => rx2 = new ReactiveTest()); 

     thread1.Start(); 
     thread2.Start(); 

     Thread.Sleep(TimeSpan.FromSeconds(1)); 

     thread1.Join(); 
     thread2.Join(); 

     rx1.Dispose(); 
     rx2.Dispose(); 
    } 
} 

public class ReactiveTest : IDisposable 
{ 
    private IDisposable _timerObservable; 

    private object _lock = new object(); 

    public ReactiveTest() 
    { 
     _timerObservable = Observable.Interval(TimeSpan.FromMilliseconds(250)).Subscribe(i => 
      Console.WriteLine("[{0}] - {1}", Thread.CurrentThread.ManagedThreadId, i)); 
    } 

    public void Dispose() 
    { 
     lock (_lock) 
     { 
      _timerObservable.Dispose(); 
      Console.WriteLine("[{0}] - DISPOSING", Thread.CurrentThread.ManagedThreadId); 
     } 
    } 
} 

此输出

[14] - 0 
[7] - 0 
[15] - 1 
[7] - 1 
[14] - 2 
[15] - 2 
[10] - DISPOSING 
[10] - DISPOSING 

你可以看到我们的订阅两个不同的线程,则设置在第三。我只锁定了处置,以防您在订阅中需要发生线程安全问题。在这个例子中它确实没有必要。

+0

你在说什么是有道理的,你的代码的行为和我预期的一样。我在应用程序中看到了不同的行为,但关于正在处理哪些线程订阅。在第二次看后,我认为我的问题可能与我使用'Publish'和'RefCount'有关。我的配置只会减少ref的数量,而“真正的”配置会在不同的线程上发生,因为count会达到0.竞争条件的出现是因为RefCount在另一个线程上做了事情,而我的第二个配置在服务上正在射击。 –

+0

那肯定会做到的! – devshorts

+0

原来'RefCount'毕竟不是罪魁祸首。它与我使用'SubscribeOn'有关,但我仍然没有想出如何解决它。我用工作代码发布了一个新问题,显示我面临的问题:http://stackoverflow.com/questions/14131306/controlling-what-thread-an-rx-subscription-is-disposed-on-after-subscribeon-具有 –

0

SubscribeOn安排呼叫SubscribeDispose。因此,在您的订购变量上调用Dispose,无论执行是否当前在UI线程上,都会导致订阅被安排为NewThreadScheduler.Default处置。

这几乎不是一个好主意,使用SubscribeOn;然而,在你的情况下,你声称它解决了50%的问题 - 这比我见过的大多数用途多50% - 所以我必须质疑你是否确实需要在后台线程中执行预订第一个地方。如果所有方法都是开始一些异步工作,比如发送网络请求或读取文件,那么创建一个全新的线程然后调用一个方法相比,直接在UI线程上调用一个方法的代价非常昂贵。也许如果计算要发送的网络消息被证明过于耗时,那么使用SubscribeOn可能是正确的;当然,只有当你想要处理时也是如此。

如果您的observable的订阅必须在后台线程上执行,但处置必须保持自由线程,那么请考虑使用以下运算符(未经测试)。

public static class ObservableExtensions 
{ 
    public static IObservable<TSource> SubscribeOn<TSource>(
    this IObservable<TSource> source, 
    bool doNotScheduleDisposal, 
    IScheduler scheduler) 
    { 
    if (!doNotScheduleDisposal) 
    { 
     return source.SubscribeOn(scheduler); 
    } 

    return Observable.Create<TSource>(observer => 
     { 
     // Implementation is based on that of the native SubscribeOn operator in Rx 
     var s = new SingleAssignmentDisposable(); 
     var d = new SerialDisposable(); 
     d.Disposable = s; 
     s.Disposable = scheduler.Schedule(() => 
     { 
      d.Disposable = source.SubscribeSafe(observer); 
     }); 
     return d; 
     }); 
    } 
}