2016-05-10 22 views
6

我有Observable.Interval(TimeSpan.FromSeconds(1))和一个用户每隔一段时间检查一次数据库中的内容。但有时候,当我从DB执行检查时,我想立即执行另一次检查(再次调用该用户,因为我知道队列中有东西)。无效 - 将可观察间隔与手动触发器组合在一起

我已经成功地实现由订户方法内部间隔与while结合类似的事情:

Observable    
.Interval(TimeSpan.FromSeconds(1)) 
.Sample(TimeSpan.FromSeconds(1)) //to avoid multiple 'stacked' intervals 
.Subscribe(RepeatAction); 


private void RepeatAction(long _) 
{ 
    bool wasSuccess; 
    do 
    { 
     wasSuccess = CheckingInDB(); //Long operation 
    } while (wasSuccess); 
} 

但有可能实现这种行为与反应?

+1

什么是SubscribeInContext方法?那从哪里来的? – Enigmativity

回答

3

是的。有可能的。

首先,你对Rx有误解。

如果你运行这段代码:

void Main() 
{ 
    Observable 
     .Interval(TimeSpan.FromSeconds(1.0)) 
     .Sample(TimeSpan.FromSeconds(1.0)) 
     .Timestamp() 
     .Subscribe(RepeatAction); 
} 

private void RepeatAction(Timestamped<long> _) 
{ 
    Console.WriteLine(_.Timestamp); 
    Thread.Sleep(10000); 
} 

你会得到这样的结果:

2016/05/11 10:37:57 +00:00 
2016/05/11 10:38:07 +00:00 
2016/05/11 10:38:17 +00:00 
2016/05/11 10:38:27 +00:00 

你会看到正在生产的每个值之间的步骤是10秒,而不是1。 Interval运营商只需确保每个值之间的差距至少为的持续时间,但如果观察者需要更长的时间,则持续时间将变为与每个用户一样长。它不排列值。

查看它的另一种方法是.Sample(TimeSpan.FromSeconds(1))什么都不做,因为.Interval(TimeSpan.FromSeconds(1.0))确保值之间的最小间隔已经是1秒。

现在,使用纯粹的Rx操作符来解决问题。试试这个:

var query = 
    Observable 
     .Interval(TimeSpan.FromSeconds(1.0)) 
     .Select(_ => 
      Observable 
       .While(
        () => CheckingInDB(), 
        Observable.Return(Unit.Default))) 
     .Switch(); 

这将尝试每秒检查数据库,但一旦它击中值时,它迅速重复检查,直至没有。然后等待1秒钟并再次尝试。

+0

谢谢你的回答,这听起来很合乎逻辑,但我无法让它工作。它从不触发CheckingInDb()。有任何想法吗? –

+0

@Knopo - 我刚刚通过复制粘贴的方式测试了我的查询到我的开发环境,并且按照预期的那样调用'CheckingInDB'工作正常。试着制作一个非常简单的'CheckingInDB',看看会发生什么?我是'公共布尔CheckingInDB(){Console.WriteLine(“!”);返回true; }'。 – Enigmativity

+1

@Knopo - 你确实订阅了observable,对吧? – Enigmativity