2016-04-14 54 views
1

功能,我想从Observable就像这样的数组创建一个Observable调用(超载)RxJava从斯卡拉

package rxtest 

import concurrent._ 
import concurrent.ExecutionContext.Implicits.global 

import rx.lang.scala._ 
import rx.lang.scala.JavaConversions._ 
import rx.lang.scala.schedulers._ 

object A extends App { 
    val ps = Array.fill(3)(Promise[Int]()) 
    val os = ps map { 
      p => Observable from p.future observeOn NewThreadScheduler() 
     } 
    val v = rx.Observable.merge(os map toJavaObservable) 
} 

,该程序不能编译,因为Observable有几个重载方法都称merge

[error] /home/xgp/work/rxtest/src/main/scala/rxtest/A.scala:15: overloaded method value merge with alternatives: 
[error] [T](x$1: Array[rx.Observable[_ <: T]])rx.Observable[T] <and> 
[error] [T](x$1: rx.Observable[_ <: rx.Observable[_ <: T]])rx.Observable[T] <and> 
[error] [T](x$1: Iterable[_ <: rx.Observable[_ <: T]])rx.Observable[T] 
[error] cannot be applied to (Array[rx.Observable[_ <: Int]]) 
[error]  val v = rx.Observable.merge(os map toJavaObservable) 
[error]       ^
[error] one error found 

然后我想与另一个Java类的有助于消除超载:

public class RxUtils { 
    public final static <T> Observable<T> merge(Observable<? extends T>[] os) { 
     return Observable.merge(os); 
    } 
} 

的Scala代码成为(只有相关部分在此列出):

val ps = Array.fill(3)(Promise[Int]()) 
val os = ps map { 
     p => Observable from p.future observeOn NewThreadScheduler() 
    } 
val v = RxUtils.merge(os map toJavaObservable) 

这个程序仍然不能编译:

[error] /home/xgp/work/rxtest/src/main/scala/rxtest/A.scala:17: no type parameters for method merge: (os: Array[rx.Observable[_ <: T]])rx.Observable[T] exist so that it can be applied to arguments (Array[rx.Observable[_ <: Int]]) 
[error] --- because --- 
[error] argument expression's type is not compatible with formal parameter type; 
[error] found : Array[rx.Observable[_ <: Int]] 
[error] required: Array[rx.Observable[_ <: ?T]] 
[error]  val v = RxUtils.merge(os map toJavaObservable) 
[error]     ^
[error] /home/xgp/work/rxtest/src/main/scala/rxtest/A.scala:17: type mismatch; 
[error] found : Array[rx.Observable[_ <: Int]] 
[error] required: Array[rx.Observable[_ <: T]] 
[error]  val v = RxUtils.merge(os map toJavaObservable) 
[error]       ^
[error] two errors found 

我有三个问题:

  1. 如何在第一种情况下使用纯斯卡拉调用merge方法?
  2. 为什么第二个程序不能编译?
  3. 如何调用斯卡拉上述RxUtilsmerge方法?
+0

如果您真的对Java和Scala generic的黑魔法感兴趣,请检查RxScala如何实现'flatten':https://github.com/ReactiveX/RxScala/blob/a385e1a474a05af5173d3a6c5f380b0f87b50dff/src/main/scala/rx/lang /scala/Observable.scala#L2669 – zsxwing

回答

2

我与你在这里做什么真的很困惑。为什么要混合rx.Observablerx.lang.scala.Observable。只要选择其中一个:如果你在斯卡拉工作,选择后者;如果您正在编写Java代码,请选择前者!

我也想你指向this page,这两种类型的Observable进行比较。

关于你的第一个程序,如果我是正确的ps类型为Array[Promise[Int]],所以os必须有类型Array[Observable[Int]]。如果您想将它们合并为一个Observable,您可以按照上面的链接在左栏中搜索merge(Array<Observable<? extends T>>)。事实证明,你可以在Scala中编写这个文件,如Observable.from(os).flattenos.toObservable.flatten

关于第二个和第三个问题:我真的没有选中此,但它可能具有与Java和Scala之间的协方差差异有关。可能通过提供一些额外的类型信息来帮助类型系统可以做到这一点。但是我想,如果你只是停留在Scala语言中,并且按照它的意思使用RxScala库,那么你就不必处理这样的问题。