2012-05-11 32 views
1

让我先解释一下我想达到的目标。以Rx为基础的数据缓冲

可以说我有以下数据传入形式的事件流

var data = new string[] { 
       "hello", 
       "Using", 
       "ok:michael", 
       "ok", 
       "begin:events", 
       "1:232", 
       "2:343", 
       "end:events", 
       "error:dfljsdf", 
       "fdl", 
       "error:fjkdjslf", 
       "ok" 
      }; 

当我订阅的数据源,我想获得以下结果

"ok:michael" 
"ok" 
"begin:events 1:232 2:343 end:events" 
"error:dfljsdf" 
"error:fjkdjslf" 
"ok" 

基本上,我想得到以开头的数据为OK或错误和数据开始和结束

到目前为止,我已经试过这个..

var data = new string[] { 
       "hello", 
       "Using", 
       "ok:michael", 
       "ok", 
       "begin:events", 
       "1:232", 
       "2:343", 
       "end:events", 
       "error:dfljsdf", 
       "fdl", 
       "error:fjkdjslf", 
       "ok" 
      }; 



      var dataStream = Observable.Generate(
           data.GetEnumerator(), 
           e => e.MoveNext(), 
           e => e, 
           e => e.Current.ToString(), 
           e => TimeSpan.FromSeconds(0.1));   

      var onelineStream = from d in dataStream 
           where d.StartsWith("ok") || d.StartsWith("error") 
           select d; 

      // ??? 
      // may be need to buffer? I want to get data like "begin:events 1:232 2:343 end:events" 
      // but it is not working... 
      var multiLineStream = from list in dataStream.Buffer<string, string, string>(
           bufferOpenings: dataStream.Where(d => d.StartsWith("begin")), 
           bufferClosingSelector: b => dataStream.Where(d => d.StartsWith("end"))) 
           select String.Join(" ", list); 

      // merge two stream???? 
      // but I have no clue how to merge these twos :(

      mergeStream .Subscribe(d => 
      { 
       Console.WriteLine(d); 
       Console.WriteLine(); 
      }); 

因为我很新的反应式编程,我不能让自己在被动的方式思考。 :(

在此先感谢。

回答

6

你是如此,所以非常接近正确答案!

基本上你有onelineStream & multiLineStream查询恰到好处。

一起合并它们的非常容易,只要做到这一点:

onelineStream.Merge(multiLineStream) 

但是,如果您的疑问下降shor t在Observable.Generate中,您用于介绍值之间的延迟。这创造了一种可观察的情况,即如果你有多个用户,那就是“粉碎”这些值。

考虑您的数据和对dataStream外观定义这个代码的行为:

dataStream.Select(x => "!" + x).Subscribe(Console.WriteLine); 
dataStream.Select(x => "@" + x).Subscribe(Console.WriteLine); 

你得到这些值:

!hello 
@Using 
!ok:michael 
@ok 
@1:232 
!begin:events 
@2:343 
!end:events 
!fdl 
@error:dfljsdf 
!error:fjkdjslf 
@ok 

注意,有些得到了由一个订阅处理和其他人得到处理由另一个。这意味着即使您的查询恰好正确,它们也只会看到一些数据,因此不像您期望的那样运行。

您还可以获取可跳过和重复值的竞争条件。所以最好避免这种可观察的情况。

一种更好的方法来介绍值之间的延迟要做到这一点:

var dataStream = data.ToObservable().Do(_ => Thread.Sleep(100)); 

现在,这创造了一个“冷”观察到,这意味着每一个新的用户将获得的观察到这样从开始一个新的订阅第一个值。

您的multiLineStream查询将无法正常工作在冷观察。

为了使数据流成为一个“热”可观察的(其在用户之间共享值),我们使用Publish运算符。

所以,multiLineStream现在看起来是这样的:

var multiLineStream = 
    dataStream.Publish(ds => 
     from list in ds.Buffer(
      ds.Where(d => d.StartsWith("begin")), 
      b => ds.Where(d => d.StartsWith("end"))) 
     select String.Join(" ", list)); 

那么你可以得到你的结果,像这样:

onelineStream.Merge(multiLineStream).Subscribe(d => 
{ 
    Console.WriteLine(d); 
    Console.WriteLine(); 
}); 

这是我得到:

ok:michael 
ok 
begin:events 1:232 2:343 end:events 
error:dfljsdf 
error:fjkdjslf 
ok 

让我知道这是否适合你。

+0

我希望我可以upvote 10倍。 :) –

+1

为其他访问者的信息,如果'dataStream'是从事件创建的(在我的情况下,'Observable.FromEvent'工厂方法),'发布'方法不是必需的。 –

+1

这真的是一个很好的答案,以非常具有教育意义的方式全面覆盖了原始代码中的所有问题。充分的信贷。 – yamen