2014-07-16 68 views
3

有人可以帮我做一个IObserver的同步订阅,这样调用方法会阻塞,直到订阅完成。 如:无效扩展同步订阅

出版商

public static class Publisher { 
public static IObservable<string> NonBlocking() 
    { 
     return Observable.Create<string>(
      observable => 
      { 
       Task.Run(() => 
       { 
        observable.OnNext("a"); 
        Thread.Sleep(1000); 
        observable.OnNext("b"); 
        Thread.Sleep(1000); 
        observable.OnCompleted(); 
        Thread.Sleep(1000); 
       }); 

       return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed")); 
      }); 
    } 

}

用户

public static class Subscriber{ 
public static bool Subscribe() 
    { 
     Publisher.NonBlocking().Subscribe((s) => 
     { 
      Debug.WriteLine(s); 
     },() => 
     { 
      Debug.WriteLine("Complete"); 
     }); 
     // This will currently return true before the subscription is complete 
     // I want to block and not Return until the Subscriber is Complete 
     return true; 
    } 

}

回答

5

你需要使用System.Reactive.Threading.Task此:

把你观察到成一个任务...

var source = Publisher.NonBlocking() 
    .Do(
     (s) => Debug.WriteLines(x), 
     () => Debug.WriteLine("Completed") 
    ) 
    .LastOrDefault() 
    .ToTask(); 

Do(...).Subscribe()就像Subscribe(...)。所以Do只是增加了一些副作用。

LastOrDefault就在那里,因为ToTask创建Task只能等待来自源Observable的第一个项目,如果没有项目取得了它会失败(抛出)。因此,LastOrDefault有效地导致Task等待源完成,无论它产生什么。

所以之后我们还有一个任务,就等着它:

task.Wait(); // blocking 

或者使用异步/ AWAIT:

await task; // non-blocking 

编辑:

科里斯尼尔森取得了一个很好的点:

在C#和Visual Studio的最新版本中,您实际上可以使用await一个IObservable<T>。这是一个很酷的功能,但它的工作方式与等待Task的方式稍有不同。

当您等待任务时,它会导致任务运行。如果多次等待任务的单个实例,则该任务将只执行一次。可观测量略有不同。你可以将一个observable想象成一个具有多个返回值的异步函数......每当你订阅observable时,observable/function都会执行。因此,这两个代码段有不同的含义:

等待可观察到的:

// Console.WriteLine will be invoked twice. 
var source = Observable.Return(0).Do(Console.WriteLine); 
await source; // Subscribe 
await source; // Subscribe 

通过任务等待可观察到的:

// Console.WriteLine will be invoked once. 
var source = Observable.Return(0).Do(Console.WriteLine); 
var task = source.ToTask(); 
await task; // Subscribe 
await task; // Just yield the task's result. 

因此,在本质上,等待可观察到的是这样的:

// Console.WriteLine will be invoked twice. 
var source = Observable.Return(0).Do(Console.WriteLine); 
await source.ToTask(); // Subscribe 
await source.ToTask(); // Subscribe 

但是,await observable语法不会w Xamerin Studio中的ork(截至撰写本文时)。如果您使用的是Xamerin Studio,我强烈建议您在最后时刻使用ToTask来模拟Visual Studio的await observable语法的行为。

+0

奇妙的是,不知道'.Do(..)'看起来像一个LastOrDefaultAsync()方法,所以我可以做一个.Wait()从那个 – Lukie

+2

你可以直接'await'一个'的IObservable <>'。它会返回序列中的最后一个项目。 –

+0

你是对的。我会更新答案。 –