2012-09-27 41 views
3

我正在使用连接到COM端口的Reactive Extensions Observable数据流,并且显示在一定时间间隔内从该数据流获取的缓冲区。如何在满足特定值或条件时触发缓冲Observable数据流?

这是我的基本Rx代码,其中字节数据以25毫秒的块返回。我想在第一次触及特定阈值时触发缓冲区的生成,然后只在收集完上一个缓冲区后才再次执行。

var o = serialData.Buffer(TimeSpan.FromMilliseconds(25)) 
        .ObserveOn(SynchronizationContext.Current); 
var mySerialObserver = o.Subscribe<IList<byte>>(SubscribeAction()); 

serialData对象是来自USB COM端口的连续字节值流的IObservable。该代码被改编自巴特迪斯梅特交:

How to implement SerialPort parser with Rx

使用Rx缓冲区(时间跨度)方法我可以品尝serialData和在图上显示的缓冲器值(我的SubscribeAction方法内使用DynamicDataDisplay )。

我想将功能扩展为像示波器触发器一样,这可能涉及在serialData值超过给定阈值但不收集重叠缓冲区时调用Rx缓冲区方法(这可能类似于示波器时基在特定输入电压下触发,但在扫描完成之前不会再触发)

请有人给我一些关于如何实现的想法吗?

+0

我想你需要解释一些关于你的问题/要求。 “阈”? “集”? etc .. – Enigmativity

+0

我有一个IObservable流从一个COM端口 - 我从巴特代斯梅特代码示例获得的serialData –

+0

对不起,Enigmativity,我已经把更多的细节放在主帖子 –

回答

3

缓冲区只会释放所有的值直到缓冲区关闭,这对于实时图不是很有用。 您必须将值分成不重叠的窗口 - 从给定的触发器开始,并在扫描条件完成时关闭 - 一个完整扫描周期的窗口。 不幸的是,窗口在启动时仍会给我们值,所以我们将不得不跳过触发器触发前所有的值。

static IObservable<IObservable<T>> TriggeredSweep<T>(
     this IObservable<T> source, 
     Func<T, bool> triggerCondition, 
     Func<T, bool> sweepEnd 
     ) 
    { 
     source = source.Publish().RefCount(); 
     return source.Window(() => source.Where(triggerCondition).Sample(source.Where(sweepEnd))) 
        .Select(s => s.SkipWhile(v => !triggerCondition(v))); 

    } 

测试了这一点,最好的办法是在其上这个的前提是非常示波器型号:

 double period = 1000/0.5; //0.5 Hz 
     int cycles = 4;    //cycles to display 
     int quantization = 100;  //cycles to display    
     int amplitude = 10;   //signal peak    

     int range = quantization * cycles; //full range 

     //Sine wave generator for n cycles 
     //makes tuple of (t, sin(t)) 
     var source = Observable.Interval(TimeSpan.FromMilliseconds(period/range)) 
           .Select(s => s % (range + 1)) 
           .Select(s => Tuple.Create(s, amplitude * Math.Sin((double)s/((double)range/(double)cycles) * 2 * Math.PI))); 


     source.TriggeredSweep(
      value => value.Item2 > 5, //Trigger when Signal value > 5 
      value => value.Item1/quantization >= cycles //end sweep when all cycles are done 
      ) 
       .Subscribe(window => 
       { 
        Console.Clear(); //Clear CRO Monitor 

        window.Subscribe(value => 
        { 
         //Set (x, y) 
         Console.CursorLeft = (int)((double)value.Item1/range * (Console.WindowWidth - 1)); 
         Console.CursorTop = (int)((amplitude - value.Item2)/(2 * amplitude) * (Console.WindowHeight - 1)); 

         //draw 
         Console.Write("x"); 
        }); 
       }); 

     //prevent close 
     Console.ReadLine(); 

输出:

xxx     xxxx     xxx     xxxx 
    xx x     x x     xx x     x x 
    x x     xx x     x x     xx x 
    xx x    x x    xx x    x x 
    x  x    xx  x    x  x    xx  x 
    x  x    x  x    x  x    x  x 
    x  x    x  x    xx  x    x  x 
     x    x  x    x  x    x  x 
     x    x  x    x  x    x  x 
      x   x  x    x  x   x  x 
      x   x  xx   x   x   x  xx 
      x   x   x   x   x   x   x 
      x   x   x   x   x   x   x 
      x   x   x   x   x   x   x   x 
      x   x   x   x   x   x   x   x 
      x   x   x   x   x   x   x   x 
      xx  x   x   x   xx  x   x   x 
      x  x   x  x    x  x   x  x 
      x  x    x  x    x  x    x  x 
      x  x    x  x    x  x    x  x 
      x  x    x  xx    x  x    x  xx 
      x  x    x  x    x  x    x  x 
      x  xx    x  x    x  xx    x  x 
       x x    x xx    x x    x xx 
       x xx     x x     x xx     x x 
       x x     x xx     x x     x xx 
       xxxx     xxx     xxxx     xxx 
       x      x      x      x 

我希望这些代码可能是有用的使用Rx测试简单的信号处理功能。 :)

+0

感谢您的解决方案,它是带我一段时间让我的头在Reactive Extensions附近,你的例子确实有帮助。我正在做一个简单的假设,即在一个缓冲区中收集的所有数据在时间间隔内均匀分布。 –

+0

@CliveG没问题。顺便说一句,如果你没有问题,你可以把它标记为答案。 – Asti

相关问题