2016-04-27 125 views
1

裹异步代码,我在MongoDB中插入一条记录:呼叫阻塞

val observable: Observable[Completed] = collection.insertOne(doc) 

observable.subscribe(new Observer[Completed] { 
    override def onNext(result: Completed): Unit = { println("Inserted"); } 
    override def onError(e: Throwable): Unit = { println(" \n\nFailed " + e + "\n\n"); fail() } 
    override def onComplete(): Unit = { println("Completed"); } 
}) 

即使onError调用回调函数的测试通过。这是因为insertOne是异步方法,并且在调用onError之前完成测试。我想将insertOne方法封装成阻塞调用,因此subscribe不会被调用,直到observable值被设置。

有没有一种惯用的方法来实现这一点是在斯卡拉?

+0

你可以看看[ReactiveMongo(http://reactivemongo.org/) – cchantep

回答

2

同步阻止异步操作的最简单方法是在Future上使用Await.result。由于MongoCollection.insertOne返回Observable[Complete],你可以在上面使用隐ScalaObservable.toFuture

val observable = collection.insertOne(doc) 
Await.result(observable.toFuture, Duration.Inf) 

observable.subscribe(new Observer[Completed] { 
    override def onNext(result: Completed): Unit = { println("Inserted"); } 
    override def onError(e: Throwable): Unit = { println(" \n\nFailed " + e + "\n\n"); fail() } 
    override def onComplete(): Unit = { println("Completed"); } 
}) 
+0

应导致含有可观察到的,我要订阅? –

+0

'result'应该已经是'Seq [T]',意思是'Seq [Completed]'实例。 –

+0

应该不会是类型Observable [Completed],因为这是'collection.insertOne(doc)'返回的? –