我有一系列使用RX发布/订阅模型的模块。无效扩展 - 引发异步事件并在特定线程上订阅
这里是事件注册代码(每个订阅模块重复):
_publisher.GetEvent<DataEvent>()
.Where(sde => sde.SourceName == source.Name)
.ObserveOn(Scheduler.TaskPool)
.Subscribe(module.OnDataEvent);
出版商很简单,这要归功于José Romaniello's code:
public class EventPublisher : IEventPublisher
{
private readonly ConcurrentDictionary<Type, object> _subjects =
new ConcurrentDictionary<Type, object>(); public IObservable<TEvent> GetEvent<TEvent>()
{
var subject = (ISubject<TEvent>)_subjects.GetOrAdd(typeof(TEvent), t => new Subject<TEvent>());
return subject.AsObservable();
}
public void Publish<TEvent>(TEvent sampleEvent)
{
object subject;
if (_subjects.TryGetValue(typeof(TEvent), out subject))
{
((ISubject<TEvent>)subject).OnNext(sampleEvent);
}
}
}
现在我的问题:正如你可以在上面看到我使用.ObserveOn(Scheduler.TaskPool)方法从每个模块的池中为每个事件分配一个新线程。这是因为我有很多事件和模块。当然,问题在于事件按时间顺序混合在一起,因为有些事件彼此接近,然后以错误的顺序调用OnDataEvent回调(每个OnDataEvent都带有一个时间戳)。
有没有简单的方法来使用RX来确保正确的事件顺序?或者我可以编写自己的调度程序,以确保每个模块按顺序获取事件?
当然,事件按照正确的顺序发布。
在此先感谢。
'Synchronize'方法除了确保底层observable遵守可观察合约 - 即'OnNext *(OnError | OnCompleted)'外不做任何其他事情。 – Enigmativity