2014-11-15 64 views
1

以下代码将结果延迟2秒。我想要的是立即返回结果,但每2秒发射一个新的观察值。我错过了什么?Observable.Generate延迟结果

输出:

**The current output is:** 
05: 1. Run 
07 Result: 1 

07: 2. Run 
09 Result: 2 

09: 3. Run 
11 Result: 3 


**Desired output is:** 
05: 1. Run 
05 Result: 1 

07: 2. Run 
07 Result: 2 

09: 3. Run 
09 Result: 3 

CODE:

var sources = Enumerable.Range(1, 8).Select(i => 
               { 
                Console.WriteLine("{0}: {1}. Run", DateTimeOffset.Now.ToString("ss"), i); 

                return Observable.Return(i, CurrentThreadScheduler.Instance); 
               }); 

    Observable.Generate(sources.GetEnumerator(), e => e.MoveNext(), e => e, e => e.Current, e => TimeSpan.FromMilliseconds(2000), ThreadPoolScheduler.Instance) 
       .Merge() 
       .Timestamp() 
       .Do(r => 
        { 
         Console.WriteLine("{0} Result: {1}{2}", r.Timestamp.ToString("ss"), r.Value, Environment.NewLine); 
        }, 
        ex => 
        { 
         Console.WriteLine(ex.ToString()); 
        }, 
       () => 
        { 
         Console.WriteLine("Completed"); 
        }) 
       .Subscribe(); 
+0

这真的不是完全清楚你在这里以后有什么 - 我相信保罗的回答您的评论表示您希望可变数据驱动的区间,但除此之外,还有(给我训练有素的Rx眼睛)代码中有很多“奇怪的东西”。也许用非rx的术语解释你试图达到的目标是有用的。例如,它并不完全清楚为什么你要为'sources'创建'IEnumerable ',因为它看起来像'IEnumerable <>'会做的。目前还不清楚'sources'是否应该包含指示期望间隔的数据。 –

回答

0

如何:

Observable.Interval(TimeSpan.Zero, TimeSpan.FromSeconds(2.0)) 
    .SelectMany(_ => GenerateAnObservable()) 
    .Subscribe(/* ... */); 
+0

我在每个循环中为可变周期选择器使用generate。 –

0

如何替换当前Observable.Generate本:

Observable 
    .Generate(
     0, 
     i => true, 
     i => i + 1, 
     i => i, 
     i => TimeSpan.FromMilliseconds(i == 0 ? 0 : 2000), 
     ThreadPoolScheduler.Instance) 
    .Zip(sources, (g, s) => s) 

我得到了这样的结果:

41: 1. Run 
41 Result: 1 

43: 2. Run 
43 Result: 2 

45: 3. Run 
45 Result: 3 

47: 4. Run 
47 Result: 4 

49: 5. Run 
49 Result: 5 

51: 6. Run 
51 Result: 6 

53: 7. Run 
53 Result: 7 

55: 8. Run 
55 Result: 8 

Completed