2

我有一系列使用RX发布/订阅模型的模块。无效扩展 - 引发异步事件并在特定线程上订阅

这里是事件注册代码(每个订阅模块重复):

_publisher.GetEvent<DataEvent>()     
    .Where(sde => sde.SourceName == source.Name) 
    .ObserveOn(Scheduler.TaskPool) 
    .Subscribe(module.OnDataEvent); 

出版商很简单,这要归功于José Romaniello's code

public class EventPublisher : IEventPublisher 
{ 
    private readonly ConcurrentDictionary<Type, object> _subjects = 
     new ConcurrentDictionary<Type, object>(); public IObservable<TEvent> GetEvent<TEvent>() 
    { 
     var subject = (ISubject<TEvent>)_subjects.GetOrAdd(typeof(TEvent), t => new Subject<TEvent>()); 
     return subject.AsObservable(); 
    } 
    public void Publish<TEvent>(TEvent sampleEvent) 
    { 
     object subject; 
     if (_subjects.TryGetValue(typeof(TEvent), out subject)) 
     { 
      ((ISubject<TEvent>)subject).OnNext(sampleEvent); 
     } 
    } 
} 

现在我的问题:正如你可以在上面看到我使用.ObserveOn(Scheduler.TaskPool)方法从每个模块的池中为每个事件分配一个新线程。这是因为我有很多事件和模块。当然,问题在于事件按时间顺序混合在一起,因为有些事件彼此接近,然后以错误的顺序调用OnDataEvent回调(每个OnDataEvent都带有一个时间戳)。

有没有简单的方法来使用RX来确保正确的事件顺序?或者我可以编写自己的调度程序,以确保每个模块按顺序获取事件?

当然,事件按照正确的顺序发布。

在此先感谢。

回答

0

尝试同步方法,如:

_publisher.GetEvent<DataEvent>()     
    .Where(sde => sde.SourceName == source.Name) 
    .ObserveOn(Scheduler.TaskPool).Synchronize() 
    .Subscribe(module.OnDataEvent); 

虽然我想你的情况有相同的代码,发现数据的顺序到达,并且不重叠。可能这是你的应用程序特有的东西。

+0

'Synchronize'方法除了确保底层observable遵守可观察合约 - 即'OnNext *(OnError | OnCompleted)'外不做任何其他事情。 – Enigmativity

1

尝试使用该实施EventPublisher的:

public class EventPublisher : IEventPublisher 
{ 
    private readonly EventLoopScheduler _scheduler = new EventLoopScheduler(); 
    private readonly Subject<object> _subject = new Subject<object>(); 

    public IObservable<TEvent> GetEvent<TEvent>() 
    { 
     return _subject 
      .Where(o => o is TEvent) 
      .Select(o => (TEvent)o) 
      .ObserveOn(_scheduler); 
    } 

    public void Publish<TEvent>(TEvent sampleEvent) 
    { 
     _subject.OnNext(sampleEvent); 
    } 
} 

它使用一个EventLoopScheduler,以确保所有的事件发生顺序和在同一后台线程。

从订阅中删除ObserveOn,因为如果您在另一个线索上观察,则可能会再次发生错误顺序的事件。

这是否解决您的问题?

+0

为了与原始示例保持一致,您可以通过执行观察位并在最初的订阅模块中声明调度器来为每个观察者创建一个线程,只需使用每个模块的事件循环调度程序而不是任务池。 – ForbesLindesay

+0

谢谢大家,我会尝试这个并报告回来! – Gizz