2014-03-01 85 views
5

我一直在为RX Java使用scala绑定一段时间,并且正在考虑将它与Akka Actor结合起来。 我想知道在Akka Actor s之间是否安全/可能通过RX Observable s。例如,程序打印偶数的平方高达20(每秒):将RX Observable传递给演员(scala)是否安全?

/* producer creates an observable and sends it to the worker */ 
object Producer extends Actor { 
    val toTwenty : Observable[Int] = Observable.interval(1 second).take(20) 

    def receive = { 
    case o : Observable[Int] => 
     o.subscribe(onNext => println) 
    } 

    worker ! toTwenty 
} 


/* worker which returns squares of even numbers */ 
object Worker extends Actor { 
    def receive = { 
    case o : Observable[Int] => 
     sender ! o filter { _ % 2 == 0 } map { _^2 } 
    } 
} 

(请把它当作伪代码;它不会编译)。注意我从一个演员到另一个演员的观察对象。我想了解:

  • 是否Akka和RX会自动同步对Observable的访问?
  • Observable不能通过分布式系统发送 - 它是对本地内存中对象的引用。但是,它会在本地工作吗?
  • 假设在这个微不足道的例子中,工作将在Producersubscribe调用中安排。我是否可以拆分作品,以便分别对每个演员完成作品?

题外话:我已经看到了一些项目,看起来到RX相结合,演员:

http://jmhofer.johoop.de/?p=507https://github.com/jmhofer/rxjava-akka

但这些不同在于它们不是简单地通过Observable作为演员之间的消息。他们首先拨打subscribe()以获取值,然后将这些值发送到演员信箱,并从中创建一个新的Observable。还是我误会了?

+0

评论删除 – mavilein

回答

9

你的方法不是一个好主意。 Akka背后的主要思想是将消息发送到演员的邮箱,演员按顺序处理它们(在一个主题上)。通过这种方式,2个线程不可能访问参与者的状态,也不会产生并发问题。

在你的情况下,你在Observable上使用订阅。您的onNext回调可能会在另一个线程上执行。因此,2个线程突然可以访问您的演员的状态。所以你必须非常谨慎,你在回调中做了什么。这是你最后一次观察其他实现的原因。那些实现似乎抓取onNext内的值并将此值作为消息发送。 您不得在这种回调中更改演员的内部状态。相反,将消息发送给同一个演员。这样一个线程上的顺序处理再次得到保证。

+0

感谢您的回复。你介意看看我发布的第二个链接(github),并告诉我自述文件中第一个代码示例正在做什么?它以某种方式将observable与actor同步,所以你可以在'receive'中使用它。这是否允许您在actor中使用Observable(即使您不能将它作为消息传递)? – Luciano

+0

嘿,你有没有关于你想用Observable做什么的例子?第二个github链接显示了一个集成,但这个例子太简单了,因为它只是记录并且没有副作用。 – mavilein

4

我花了一些时间做实验,发现可以在Akka中使用Observable。实际上,因为Observable可以被认为是Future的多变量扩展,所以您可以按照与演员和期货合并相同的指导原则。在Akka中使用Future实际上在官方文件和教科书(例如Akka Concurrency,Wyatt 2013)中都受到支持/鼓励,尽管存在很多警告。

首先是积极的:

  • Observable S,像Future s为不可变的,所以他们应该在理论上是安全的邮件中来回传递。
  • Observable允许您指定执行上下文,非常类似于Future。这是使用Observable.observeOn(scheduler)完成的。您可以通过将Akka调度程序(例如system.dispatchercontext.dispatcher)传递给rx.lang.scala.ExecutorScheduler构造函数,从Akka的exec环境创建调度程序。这应该确保它们是同步的。
  • 与上述相关的是,作品中的rx-scala有一个增强(https://github.com/Netflix/RxJava/issues/815#issuecomment-38793433),可以隐式指定observable的调度程序。
  • 期货很好地融入了Akka的ask模式。 Observables可以使用类似的模式(参见本文的底部)。这也解决了向远程观察者发送消息的问题。

现在的告诫:

  • 它们共享相同的问题作为未来。例如,参见页面底部:http://doc.akka.io/docs/akka/2.3.2/general/jmm.html。另外还有关于Wyatt 2013期货的一章。
  • 正如在@ mavilein的回答中,这意味着Observable.subscribe()不应该使用Actor的封闭范围来访问它的内部状态。例如,您不应在订阅中拨打sender。相反,将其存储到val中,然后访问此val,如下例所示。
  • Akka使用的调度程序的分辨率与Rx不同。它的默认分辨率是100 ms(Wyatt 2013)。如果有人遇到了可能导致的问题,请在下面评论!

最后,我已经实现了相当于Observable的ask模式。它使用toObservable??异步返回一个Observable,由一个临时actor和一个PublishSubject作为幕后支持。请注意,由源发送的消息类型为rx.lang.scala.Notification使用materialize(),因此它们满足完整错误状态在可观察合约中。否则,我们没有办法将这些状态传达给接收器。然而,没有任何东西阻止你发送仲裁类型的消息;这些将简单地称为onNext()。如果在特定时间间隔内没有收到消息,observable会有超时异常并停止。

它用于像这样:

import akka.pattern.RX 
implicit val timeout = akka.util.Timeout(10 seconds) 
case object Req 

val system = ActorSystem("test") 
val source = system.actorOf(Props[Source],"thesource") 

class Source() extends Actor { 
    def receive : Receive = { 
    case Req => 
     val s = sender() 
     Observable.interval(1 second).take(5).materialize.subscribe{s ! _} 
    } 
} 

val obs = source ?? Req 
obs.observeOn(rx.lang.scala.schedulers.ExecutorScheduler(system.dispatcher)).subscribe((l : Any) => println ("onnext : " + l.toString), 
       (error : Throwable) => { error.printStackTrace ; system.shutdown() }, 
      () => { println("completed, shutting system down"); system.shutdown() }) 

,并产生这样的输出:

onnext : 0 
onnext : 1 
onnext : 2 
onnext : 3 
onnext : 4 
completed, shutting system down 

源如下。它是AskSupport.scala的修改版本。

package akka.pattern 

/* 
* File : RxSupport.scala 
* This package is a modified version of 'AskSupport' to provide methods to 
* support RX Observables. 
*/ 

import rx.lang.scala.{Observable,Subject,Notification} 
import java.util.concurrent.TimeoutException 
import akka.util.Timeout 
import akka.actor._ 
import scala.concurrent.ExecutionContext 
import akka.util.Unsafe 
import scala.annotation.tailrec 
import akka.dispatch.sysmsg._ 

class RxTimeoutException(message: String, cause: Throwable) extends TimeoutException(message) { 
    def this(message: String) = this(message, null: Throwable) 
    override def getCause(): Throwable = cause 
} 

trait RxSupport { 
    implicit def toRx(actorRef : ActorRef) : RxActorRef = new RxActorRef(actorRef) 
    def toObservable(actorRef : ActorRef, message : Any)(implicit timeout : Timeout) : Observable[Any] = actorRef ?? message 
    implicit def toRx(actorSelection : ActorSelection) : RxActorSelection = new RxActorSelection(actorSelection) 
    def toObservable(actorSelection : ActorSelection, message : Any)(implicit timeout : Timeout): Observable[Any] = actorSelection ?? message 
} 

final class RxActorRef(val actorRef : ActorRef) extends AnyVal { 
    def toObservable(message : Any)(implicit timeout : Timeout) : Observable[Any] = actorRef match { 
    case ref : InternalActorRef if ref.isTerminated => 
     actorRef ! message 
     Observable.error(new RxTimeoutException(s"Recepient[$actorRef] has alrady been terminated.")) 
    case ref : InternalActorRef => 
     if (timeout.duration.length <= 0) 
     Observable.error(new IllegalArgumentException(s"Timeout length must not be negative, message not sent to [$actorRef]")) 
     else { 
     val a = RxSubjectActorRef(ref.provider, timeout, targetName = actorRef.toString) 
     actorRef.tell(message, a) 
     a.result.doOnCompleted{a.stop}.timeout(timeout.duration) 
     } 
    } 
    def ??(message :Any)(implicit timeout : Timeout) : Observable[Any] = toObservable(message)(timeout) 
} 

final class RxActorSelection(val actorSel : ActorSelection) extends AnyVal { 
    def toObservable(message : Any)(implicit timeout : Timeout) : Observable[Any] = actorSel.anchor match { 
    case ref : InternalActorRef => 
     if (timeout.duration.length <= 0) 
     Observable.error(new IllegalArgumentException(s"Timeout length must not be negative, message not sent to [$actorSel]")) 
     else { 
     val a = RxSubjectActorRef(ref.provider, timeout, targetName = actorSel.toString) 
     actorSel.tell(message, a) 
     a.result.doOnCompleted{a.stop}.timeout(timeout.duration) 
     } 
    case _ => Observable.error(new IllegalArgumentException(s"Unsupported recipient ActorRef type, question not sent to [$actorSel]")) 
    } 
    def ??(message :Any)(implicit timeout : Timeout) : Observable[Any] = toObservable(message)(timeout) 
} 


private[akka] final class RxSubjectActorRef private (val provider : ActorRefProvider, val result: Subject[Any]) extends MinimalActorRef { 
    import RxSubjectActorRef._ 
    import AbstractRxActorRef.stateOffset 
    import AbstractRxActorRef.watchedByOffset 

    /** 
    * As an optimization for the common (local) case we only register this RxSubjectActorRef 
    * with the provider when the `path` member is actually queried, which happens during 
    * serialization (but also during a simple call to `toString`, `equals` or `hashCode`!). 
    * 
    * Defined states: 
    * null     => started, path not yet created 
    * Registering   => currently creating temp path and registering it 
    * path: ActorPath  => path is available and was registered 
    * StoppedWithPath(path) => stopped, path available 
    * Stopped    => stopped, path not yet created 
    */ 
    @volatile 
    private[this] var _stateDoNotCallMeDirectly: AnyRef = _ 

    @volatile 
    private[this] var _watchedByDoNotCallMeDirectly: Set[ActorRef] = ActorCell.emptyActorRefSet 

    @inline 
    private[this] def watchedBy: Set[ActorRef] = Unsafe.instance.getObjectVolatile(this, watchedByOffset).asInstanceOf[Set[ActorRef]] 

    @inline 
    private[this] def updateWatchedBy(oldWatchedBy: Set[ActorRef], newWatchedBy: Set[ActorRef]): Boolean = 
    Unsafe.instance.compareAndSwapObject(this, watchedByOffset, oldWatchedBy, newWatchedBy) 

    @tailrec // Returns false if the subject is already completed 
    private[this] final def addWatcher(watcher: ActorRef): Boolean = watchedBy match { 
    case null => false 
    case other => updateWatchedBy(other, other + watcher) || addWatcher(watcher) 
    } 

    @tailrec 
    private[this] final def remWatcher(watcher: ActorRef): Unit = watchedBy match { 
    case null =>() 
    case other => if (!updateWatchedBy(other, other - watcher)) remWatcher(watcher) 
    } 

    @tailrec 
    private[this] final def clearWatchers(): Set[ActorRef] = watchedBy match { 
    case null => ActorCell.emptyActorRefSet 
    case other => if (!updateWatchedBy(other, null)) clearWatchers() else other 
    } 

    @inline 
    private[this] def state: AnyRef = Unsafe.instance.getObjectVolatile(this, stateOffset) 

    @inline 
    private[this] def updateState(oldState: AnyRef, newState: AnyRef): Boolean = 
    Unsafe.instance.compareAndSwapObject(this, stateOffset, oldState, newState) 

    @inline 
    private[this] def setState(newState: AnyRef): Unit = Unsafe.instance.putObjectVolatile(this, stateOffset, newState) 

    override def getParent: InternalActorRef = provider.tempContainer 

    def internalCallingThreadExecutionContext: ExecutionContext = 
    provider.guardian.underlying.systemImpl.internalCallingThreadExecutionContext 

    /** 
    * Contract of this method: 
    * Must always return the same ActorPath, which must have 
    * been registered if we haven't been stopped yet. 
    */ 
    @tailrec 
    def path: ActorPath = state match { 
    case null => 
     if (updateState(null, Registering)) { 
     var p: ActorPath = null 
     try { 
      p = provider.tempPath() 
      provider.registerTempActor(this, p) 
      p 
     } finally { setState(p) } 
     } else path 
    case p: ActorPath  => p 
    case StoppedWithPath(p) => p 
    case Stopped => 
     // even if we are already stopped we still need to produce a proper path 
     updateState(Stopped, StoppedWithPath(provider.tempPath())) 
     path 
    case Registering => path // spin until registration is completed 
    } 

    override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = state match { 
    case Stopped | _: StoppedWithPath => provider.deadLetters ! message 
    case _ => 
     if (message == null) throw new InvalidMessageException("Message is null") 
     else 
     message match { 
      case n : Notification[Any] => n.accept(result) 
      case other     => result.onNext(other) 
     } 
    } 

    override def sendSystemMessage(message: SystemMessage): Unit = message match { 
    case _: Terminate      => stop() 
    case DeathWatchNotification(a, ec, at) => this.!(Terminated(a)(existenceConfirmed = ec, addressTerminated = at)) 
    case Watch(watchee, watcher) => 
     if (watchee == this && watcher != this) { 
     if (!addWatcher(watcher)) 
      // NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS 
      watcher.sendSystemMessage(DeathWatchNotification(watchee, existenceConfirmed = true, addressTerminated = false)) 
     } else System.err.println("BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, this)) 
    case Unwatch(watchee, watcher) => 
     if (watchee == this && watcher != this) remWatcher(watcher) 
     else System.err.println("BUG: illegal Unwatch(%s,%s) for %s".format(watchee, watcher, this)) 
    case _ => 
    } 

    @deprecated("Use context.watch(actor) and receive Terminated(actor)", "2.2") override def isTerminated: Boolean = state match { 
    case Stopped | _: StoppedWithPath => true 
    case _       => false 
    } 

    @tailrec 
    override def stop(): Unit = { 
    def ensureCompleted(): Unit = { 
     result.onError(new ActorKilledException("Stopped")) 
     val watchers = clearWatchers() 
     if (!watchers.isEmpty) { 
     watchers foreach { watcher => 
      // NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS 
      watcher.asInstanceOf[InternalActorRef] 
      .sendSystemMessage(DeathWatchNotification(watcher, existenceConfirmed = true, addressTerminated = false)) 
     } 
     } 
    } 
    state match { 
     case null => // if path was never queried nobody can possibly be watching us, so we don't have to publish termination either 
     if (updateState(null, Stopped)) ensureCompleted() else stop() 
     case p: ActorPath => 
     if (updateState(p, StoppedWithPath(p))) { try ensureCompleted() finally provider.unregisterTempActor(p) } else stop() 
     case Stopped | _: StoppedWithPath => // already stopped 
     case Registering     => stop() // spin until registration is completed before stopping 
    } 
    } 
} 

private[akka] object RxSubjectActorRef { 
    private case object Registering 
    private case object Stopped 
    private final case class StoppedWithPath(path : ActorPath) 

    def apply(provider: ActorRefProvider, timeout: Timeout, targetName: String): RxSubjectActorRef = { 
    val result = Subject[Any]() 
    new RxSubjectActorRef(provider, result) 
    /*timeout logic moved to RxActorRef/Sel*/ 
    } 
} 
/* 
* This doesn't work, need to create as a Java class for some reason ... 
final object AbstractRxActorRef { 
    final val stateOffset = Unsafe.instance.objectFieldOffset(RxSubjectActorRef.getClass.getDeclaredField("_stateDoNotCallMeDirectly")) 
    final val watchedByOffset = Unsafe.instance.objectFieldOffset(RxSubjectActorRef.getClass.getDeclaredField("_watchedByDoNotCallMeDirectly")) 
}*/ 

package object RX extends RxSupport 

更新2015年9月10日

想我会在这里添加一些简单的代码来实现??操作。这与上面的略有不同,因为a)它不支持网络上的数据,b)它返回Observable[Observable[A]],这使得更容易同步响应。其优点是,它不乱用阿卡内脏:

object TypedAskSupport { 
    import scala.concurrent.Future 
    import akka.actor.{ActorRef,ActorSelection} 
    import scala.reflect.ClassTag 

    implicit class TypedAskableActorRef(actor : ActorRef) { 
    val converted : akka.pattern.AskableActorRef = actor 
    def ?[R](topic : Subscribe[R])(implicit timeout : akka.util.Timeout) : Future[Observable[R]] = 
     converted.ask(topic).mapTo[Observable[R]] 
    def ??[R](topic : Subscribe[R])(implicit timeout : akka.util.Timeout, execCtx : scala.concurrent.ExecutionContext) : Observable[Observable[R]] = 
     Observable.from (this.?[R](topic)(timeout)) 
    def ?[R](topic : Request[R])(implicit timeout : akka.util.Timeout) : Future[R] = 
     converted.ask(topic).asInstanceOf[Future[R]] 
    def ??[R](topic : Request[R])(implicit timeout : akka.util.Timeout, execCtx : scala.concurrent.ExecutionContext) : Observable[R] = 
     Observable.from { this.?[R](topic)(timeout) } 
    } 

    implicit class TypedAskableActorSelection(actor : ActorSelection) { 
    val converted : akka.pattern.AskableActorSelection = actor 
    def ?[R](topic : Subscribe[R])(implicit timeout : akka.util.Timeout) : Future[Observable[R]] = 
     converted.ask(topic).mapTo[Observable[R]] 
    def ??[R](topic : Subscribe[R])(implicit timeout : akka.util.Timeout, execCtx : scala.concurrent.ExecutionContext) : Observable[Observable[R]] = 
     Observable.from (this.?[R](topic)(timeout)) 
    def ?[R](topic : Request[R])(implicit timeout : akka.util.Timeout) : Future[R] = 
     converted.ask(topic).asInstanceOf[Future[R]] 
    } 
} 
2

因为我张贴了原来的问题,RX-java和阿卡已经走过了很长的路要走。

目前候选版本可用于​​,我认为它在某种程度上试图提供与rx-java的Observable类似的原语。

另外还有一个Reactive Streams的倡议,它看起来也提供了通过方法toPublishertoSubscriber在不同的基元之间的互操作性; Akka流实现这个API,并且java-rx有一个提供这个接口的​​。在两者之间转换的例子可以在this blog post发现,摘录如下:

// create an observable from a simple list (this is in rxjava style) 
val first = Observable.from(text.split("\\s").toList.asJava); 
// convert the rxJava observable to a publisher 
val publisher = RxReactiveStreams.toPublisher(first); 
// based on the publisher create an akka source 
val source = PublisherSource(publisher); 

然后你就可以大概通过这些安全周围的演员里面。

相关问题