让我先解释一下我想达到的目标。以Rx为基础的数据缓冲
可以说我有以下数据传入形式的事件流
var data = new string[] {
"hello",
"Using",
"ok:michael",
"ok",
"begin:events",
"1:232",
"2:343",
"end:events",
"error:dfljsdf",
"fdl",
"error:fjkdjslf",
"ok"
};
当我订阅的数据源,我想获得以下结果
"ok:michael"
"ok"
"begin:events 1:232 2:343 end:events"
"error:dfljsdf"
"error:fjkdjslf"
"ok"
基本上,我想得到以开头的数据为OK或错误和数据开始和结束。
到目前为止,我已经试过这个..
var data = new string[] {
"hello",
"Using",
"ok:michael",
"ok",
"begin:events",
"1:232",
"2:343",
"end:events",
"error:dfljsdf",
"fdl",
"error:fjkdjslf",
"ok"
};
var dataStream = Observable.Generate(
data.GetEnumerator(),
e => e.MoveNext(),
e => e,
e => e.Current.ToString(),
e => TimeSpan.FromSeconds(0.1));
var onelineStream = from d in dataStream
where d.StartsWith("ok") || d.StartsWith("error")
select d;
// ???
// may be need to buffer? I want to get data like "begin:events 1:232 2:343 end:events"
// but it is not working...
var multiLineStream = from list in dataStream.Buffer<string, string, string>(
bufferOpenings: dataStream.Where(d => d.StartsWith("begin")),
bufferClosingSelector: b => dataStream.Where(d => d.StartsWith("end")))
select String.Join(" ", list);
// merge two stream????
// but I have no clue how to merge these twos :(
mergeStream .Subscribe(d =>
{
Console.WriteLine(d);
Console.WriteLine();
});
因为我很新的反应式编程,我不能让自己在被动的方式思考。 :(
在此先感谢。
我希望我可以upvote 10倍。 :) –
为其他访问者的信息,如果'dataStream'是从事件创建的(在我的情况下,'Observable.FromEvent'工厂方法),'发布'方法不是必需的。 –
这真的是一个很好的答案,以非常具有教育意义的方式全面覆盖了原始代码中的所有问题。充分的信贷。 – yamen