2012-09-23 130 views
1

我在阅读有关Rx并通过一些示例。我试图执行此操作:Rx多个异步操作

List<A> LIstA {get;set;} 
List<B> LIstB {get;set;} 
List<C> LIstC {get;set;} 

private void GetItems() 
{ 
    ListA = GetItemsA(); 
    ListB = GetItemsB(); 
    ListC = GetItemsC(); 
} 

所有此代码都在主线程上执行。我删除了主线程(UI)使用此列表的代码。我想要的是以异步方式获取这些项目,但我需要以预定义顺序进行的结果。

以异步方式运行它并不麻烦,但我在按预定义顺序获取结果时遇到问题。因此,我运行代码,仅在此预定义序列中显示UI,几秒钟后会显示ListA填充,然后ListB,最后显示ListC

如何实现这一目标?

回答

1

Taskasync可能比Rx​​更适合此模型。

不过,从我可以推断出你的问题,使用CONCAT连接可观察到的前一个完成后:

 Func<Action, IObservable<Unit>> fetch = 
      action => Observable.Defer(() => Observable.Start(action)); 

     fetch(() => A()) 
     .Concat(fetch(() => B())) 
     .Concat(fetch(() => C())) 
     .Subscribe(); 
+0

嗨阿斯蒂,我已经用Task实现了它。在这种情况下,我需要使用协程,并且这种复杂的一些其他的东西(在子类中重写)。如果你知道使用任务的例子,而不使用协程,请让我知道。至于你使用Rx的例子,单元究竟是什么?它是A类,B类和C类的基类吗?什么是行动?从您发布的代码中不太清楚。 – Goran

+0

[单元](http://msdn.microsoft.com/en-us/library/system.reactive.unit(v = vs.103).aspx)在功能性编程中等同于'void'。它可以被认为是执行不返回任何内容的方法的结果。第一行是避免重复的代表。你可以用'()=> ListA = GetItemsA()'替换'()=> A()'。 – Asti

+0

因此,现在我可以编译,数据按预定义的顺序异步检索 - 工作正常。问题:我不能在UI(主线程)上使用LIstA和ListB,因为接收到:调用线程不能访问此对象,因为不同的线程拥有它。“。我曾尝试在.Subscribe()之前添加.ObserveOn(SynchronizationContext.Current) ,但它并没有帮助。如何获得结果列表回到主线程? – Goran

0

做这项工作的吗?

GetItemsA().ToObservable() 
    .Zip(GetItemsB().ToObservable(), (a, b) => new { a, b, }) 
    .Zip(GetItemsC().ToObservable(), (ab, c) => new { ab.a, ab.b, c, }) 
    .Subscribe(abc => 
    { 
     ListA = abc.a; 
     ListB = abc.b; 
     ListC = abc.c; 
    }); 
+0

这确保在调用Subscribe之前执行所有这三件事,但这并不像OP请求 –

+0

@PaulBetts那样按顺序执行它们 - 我认为这个任务必须是顺序的。 – Enigmativity

0

根据你的问题,你有三个不同类型的集合。很显然,人们不能共同分配三种不同类型的流。我觉得@ Enigmativity的是的allmost正确的,但为了使它工作,你需要将其更改为:

var uiScheduler = new SynchronizationContextScheduler(SynchronizationContext.Current); 
ListA = new ListA(); 
ListB = new ListB(); 
ListC = new ListC(); 

GetItemsA().ToObservable() 
.Zip(GetItemsB().ToObservable(), (a, b) => new { a, b, }) 
.Zip(GetItemsC().ToObservable(), (ab, c) => new { ab.a, ab.b, c, }) 
.ObserveOn(uiScheduler) 
.Subscribe(abc => 
{ 
    ListA.Add(abc.a); 
    ListB.Add(abc.b); 
    ListC.Add(abc.c); 
}); 

另一种解决方案可能是:

var a = Observable.Start(() => GetListA()); 
var b = Observable.Start(() => GetListB()); 
var c = Observable.Start(() => GetListC()); 

a.Zip(b, c, (x, y, z) => Tuple.Create(x, y, z)) 
    .ObserveOn(uiScheduler) 
    .Subscribe(result => 
       { 
        ListA = result.Item1; 
        ListB = result.Item2; 
        ListC = result.Item3; 
       }); 

我尽快与您创建它们想要的收藏:

a.ObserveOn(uiScheduler) 
    .Do(l => ListA = l) 
    .Zip(b, (la, lb) => lb) 
    .ObserveOn(uiScheduler) 
    .Do(l => ListB = l) 
    .Zip(c, (lb, lc) => lc) 
    .ObserveOn(uiScheduler) 
    .Subscribe(listc => ListC = listc); 
+0

这与这个问题是a,b和c同时评估。来自OP的问题:“我需要以预定义顺序发布结果。” – Asti

0

你可以使用动态以及...:

void Main() 
{ 
    var q = 
    from @as in Observable.Start(() => GetItemsA()) 
    from bs in Observable.Start(() => GetItemsB()) 
    from cs in Observable.Start(() => GetItemsC()) 
    select @as.Cast<dynamic>().Concat(bs.Cast<dynamic>()).Concat(cs.Cast<dynamic>()); 
    q.Subscribe(t => t.Dump()); 
} 

// Define other methods and classes here 
private IEnumerable<int> GetItemsA() { 
    yield return 1; 
    Thread.Sleep(500); 
    yield return 2; 
    yield return 3; 
} 
private IEnumerable<string> GetItemsB() { 
    yield return "A"; 
    yield return "B"; 
    Thread.Sleep(1000); 
    yield return "C"; 
} 
private List<float> GetItemsC() { 
    return new List<float> { 1.1f, 2.2f, 3.3f }; 
}