2011-11-14 31 views
1

我的服务方法需要侦听器并用数据流调用它。我正尝试将此流转换为IObservable<T>。到目前为止,这是我所做的:将流的侦听器转换为IObservable

public class MessageListener : IMessageListener 
{ 

    private readonly Subject<string> stream = new Subject<string>(); 

    public IObservable<string> MessageStream 
    { 
     get 
     { 
      return this.stream; 
     } 
    } 

    public void OnMessageAdded(string message) 
    { 
     this.stream.OnNext(message); 
    } 

} 

//Calling code  
public IObservable<string> GetMessage() 
{ 
    var listener = new MessageListener(); 
    service.Subscribe(listener); 
    listener.MessageStream.SubscribeOn(Scheduler.NewThread); 
} 

我不确定这是否足够好。我相信SubscribeOn只会在新线程上运行订阅代码。我将如何确保OnMessageAdded被新线程接收?

回答

1

正如你已经发现自己,SubscribeOn只会控制订阅将发生的地方。但是,在您的代码中,SubscribeOn完全没有效果,因为它之后没有订阅。请记住,SubscribeOn返回一个可用于订阅的Observable。这并不是说你可以调用“SubscribeOn”来在源代码中设置一些全局标志或类似的东西。

你实际上想要做的是在订阅发生之前在服务权限内调用“ObserveOn”。 ObserveOn定义了处理传入消息的线程。另一种选择是将ObserveOn直接写入消息监听,这样它看起来像这样:

public IObservable<string> MessageStream 
{ 
    get 
    { 
     return this.stream.ObserveOn(Scheduler.ThreadPool).AsObservable(); 
    } 
} 

另一件事要注意,在你MessageStream物业最好打电话this.stream.AsObservable(),而不是直接返回的主题。

+0

我打电话给ViewModel级别,我调用GetMessage方法,我也做ObserveOnDispatcher()。 – anivas

+0

也许你应该提供更多关于服务实际执行情况的信息。我理解为服务订阅了MessageStream,并且您希望消息由另一个线程处理。这就是为什么你需要在你的服务中调用ObserveOn(Scheduler.NewThread)。或者,也许我让你完全错了? – Christoph

+0

它是一个wpf应用程序,其中消息流显示在列表框中。 GetMessage()方法是一个ServiceProxy类,它调用通过调用service.Subscribe(listener)来传输数据的实际java服务。我想断开监听这个流的serviceproxy的UI线程。否则,每当有一个数据流进入时UI会冻结。 – anivas