2013-07-25 56 views
3

我被困在正确处理在应用程序退出时使用RX创建的线程。我在Process Explorer中看到,在应用程序关闭后,线程仍在运行,导致IO异常。在应用程序退出时处理RX线程

class Program 
{ 
    static void Main(string[] args) 
    { 
     CompositeDisposable subsriptions = new CompositeDisposable(); 

     subscriptions.Add(Observable.Interval(TimeSpan.FromSeconds(15)) 
       .Subscribe(_ => 
       { 
        getData(); 

       })); 
     Thread.Sleep(TimeSpan.FromSeconds(20));  
      subscriptions.Dispose(); 
     } 
    } 
} 

如果你看看,如果我取消注释subscription.Dispose(),线程终止没有得到任何数据。任何帮助,将不胜感激。谢谢

+0

什么是'的getData “干嘛? – JerKimball

+0

它只是从.txt读取一些值。 – Jim

回答

1

您需要某种延迟subsriptions.Add(...)subscriptions.Dispose()之间。在这之间没有任何延迟,你的应用程序只需立即订阅并处理它们,没有时间让这些线程完成工作。 (和Thread.Sleep(1000)不起作用,因为它是订阅功能里面,不是主要功能的一部分。)

+0

我看到,我已经移动了Thread.Sleep(20000),并在处理订阅之前增加了mlsec,但没有运气。 – Jim

+1

Thread.Sleep()的使用是邪恶的。看到我的答案。如果您通过ReactiveExtensions和async /使用IObservable一起正确地等待,您可以不使用它。 – bradgonesurfing

1

你正在寻找的模式与此类似

class Program { 

    public string GetData(){ 
     return "Hello"; 
    } 

    public string async GetDataAsync(){ 

     return await Observable 
      .Interval(TimeSpan.FromSeconds(15)) 
      .Take(1) 
      .Select(()=>GetData()); 


    } 

    static void Main(string[]args){ 

     var s = GetDataAsync().Wait(); 
    } 

} 

原因Wait是一个入口点,Main,在这种情况下不能是 ,标记为asyncWait阻止当前线程,直到任务返回 通过GetDataAsync产生一个值。

还要注意,IObservable与async/await兼容,并且会返回序列产生的最后一个 值。这就是为什么我加Take(1),因为它会产生 只有1个勾号。

另一种方法就是打电话给等候直接上的IObservable在

class Program { 

    public string GetData(){ 
     return "Hello"; 
    } 

    public IObservable<string> GetDataObservable(){ 

     return Observable 
      .Interval(TimeSpan.FromSeconds(15)) 
      .Take(1) 
      .Select(()=>GetData()); 


    } 

    static void Main(string[]args){ 

     var s = GetDataObservable().Wait(); 
    } 

} 
+0

感谢您的广泛响应,我可以问您如何处理在应用程序关闭之前或之后处理线程?目前,我只是在应用程序关闭时终止进程,但是我担心在工作时这不是正确的方式吗? – Jim

+2

你不需要杀死任何线程。线程自动返回到线程池,并且进程将退出。 – bradgonesurfing

+0

你很少需要直接接触线程我会使用被动扩展和异步/等待框架。我建议做一些研究。一旦你得到它,这是非常酷的东西。 – bradgonesurfing

0

您可以订阅你的观察到与CancellationToken将取消基础任务执行:

static void Main(string[] args) 
{ 
    var cts = new CancellationTokenSource(); 
    Observable. 
     Interval(TimeSpan.FromSeconds(15)). 
     Subscribe(_ => getData(), cts.Token)); 
    cts.CancelAfter(TimeSpan.FromSeconds(20)); 
}