1

我打算在某个时间点在我的项目中使用Rx,并且一直在研究我可以用Rx做什么。无功扩展和并行处理

我的项目使用TPL并行处理状态机转换(利用所有可用的处理器核心)。但是,为了提高性能,我想用Rx推送机制替换当前的IList拉动机制。

由于我对这项技术不太了解,我想确定Rx是否与将令牌推入并行状态转换兼容。我将要求所有并行状态转换订阅Rx主题并检索下一个标记。每个状态转换都需要来自主题的相同标记。从我自己的研究中得出的结论是,一旦令牌被推送给观察者(在这种情况下是状态转换),该令牌就会消失。如果是这种情况,其他转换(对于相同的状态)将永远不会收到该令牌并最终处于错误状态。

难道有人会告诉我我的担忧是否正确?我没有代码可以显示,因为我只是在探索我的选择。

谢谢。

回答

0

您可能正在寻找

IConnectableObservable<T> IObservable<T>.Publish() 

可以连接的可观察到的是原始观察到的多播变量,可以订阅多次。要开始您将所有观察者订阅到connectableobservable,然后调用connectableObservable上的connect。

1

它可能有助于将Rx Subject视为与.NET事件类似,即注册事件处理程序的线程安全列表。在主体的情况下,这些可能被认为等同于单独的Observer OnNext代表。事件触发后,每个处理程序都会使用相同的事件参数进行调用。类似地,订阅主题的每个观察者都会使用相同的参数对象调用OnNext。该参数可能包含状态转换所需的令牌的值类型或引用类型。据推测,Observer的OnNext处理程序会将此标记(比如并发队列)存储为相应的并行任务消耗。

因此,除非您简单地忽略OnNext调用或丢弃它们传递给您的对象,否则没有任何好处。

如果您的任务之一更改了令牌,则可能共享状态损坏问题。或者你可能会遇到同步问题,如果一个任务处理了几个状态转换令牌,而另一个任务没有足够的时间将第一个出队。但是这不是特定于使用Rx Subject,而是使用其他技术来推送令牌。

-1

@Tony很好地指出Observable Sequence(IObservable)就像一个事件,因为它可以有一个线程安全的注册订户/事件处理程序列表。就像事件一样,许多注册订户可以接收事件有效载荷(以串行方式,首先呼叫第一个订户)。

但是,没有解决的问题是Rx可能会帮助您解决这些事件不会为自己提供帮助的时间。例如,您可能在所有并行状态转换处理程序设置完成之前发布/推送了一个标记。如果您使用的是事件或标准的可观察序列,则有效载荷将丢失。您可以通过使用Rx中的缓存功能(如重播)来弥补这一点。这可以让你重播最后的值给晚期用户。

如果这是有趣的,那里有大量的信息。您可以在我的书中查看更多关于它的信息:http://introtorx.com/Content/v1.0.10621.0/02_KeyTypes.html