2017-05-31 41 views
0

我有一个类似的代码:未来呼唤Scala中的理解 - 我如何顺序处理它们?

for (n <- 1 to 1000) { 
    someFuture map { 
    // some other stuff 
} 

这是一个代码基本部件和工作正常。但是,somefuture会对数据库执行一些查询,并且数据库不能并行接收多个查询,而这正是之前发生的情况(它会产生大量执行somefuture的线程,正如人们所期望的那样)。理想情况下,我希望顺序执行(即当n = 1时调用someFuture,做一些处理,当n = 2时调用someFuture,做一些处理等)。我想过使用一些阻止方法(从Await),但这发生在一个演员内部,所以阻止不是一个好主意。另一个想法是创建一个固定的线程池为这个特定的未来呼叫,但听起来像矫枉过正。我该怎么做呢?

更新:我发现this answer,它建议创建一个固定的线程池,因为我认为。不过,这是做这件事的正确方法吗?

回答

0

一种方法是将消息发送给处理数据的参与者。 由于actor一个接一个地处理消息,因此您会按顺序并行执行查询。

for (n <- 1 to 1000) { 
    someFuture map { 
     x => actor ! x 
    } 
} 
+0

查询发生在'someFuture'中,因此'''''''''''''Future'里面有什么并不重要,对吧? –

+0

是的,如果查询在someFuture中运行,那么它意味着即使在涉及scala的for comprehension块之前,所有的查询执行都已被触发。因此,在你的情况下,你必须在理解块之前将查询发送给你的演员。 –

1

您想要映射或平面化单个未来。

scala> val f = Future(42) 
f: scala.concurrent.Future[Int] = Future(Success(42)) 

scala> (1 to 10).foldLeft(f)((f,x) => f.map(i => i + 1)) 
res1: scala.concurrent.Future[Int] = Future(<not completed>) 

scala> res1 
res2: scala.concurrent.Future[Int] = Future(Success(52)) 

scala> (1 to 10).foldLeft(f)((f,i) => { 
    | println(i) 
    | f.map(x => x+i) }) 
1 
2 
3 
4 
5 
6 
7 
8 
9 
10 
res4: scala.concurrent.Future[Int] = Future(<not completed>) 

scala> res4 
res5: scala.concurrent.Future[Int] = Future(Success(97)) 
+0

此代码不是堆栈安全的。如果一些期货很大,你会得到'StackOverflowError'。 – simpadjo

+0

@simpadjo你尝试过什么?对我来说,'(1到Int.MaxValue)'耗尽堆,而不是堆栈。 –

+0

这是一篇很好的文章,涵盖期货堆垛安全问题https://alexn.org/blog/2017/01/30/asynchronous-programming-scala.html#h3-3。我也在生产中面对它。 – simpadjo

0

可能理想的长期处理方法是使用连接池的数据库访问层。大多数框架如play或slick都有处理这种情况的首选方式,或者如果您想单独使用,DBCP可能是一个不错的选择。我认为大多数应该有一个“自然”的方法来限制连接数量到一个固定的大小,如果没有连接池可用,这会限制你的并行性。

除了像这样引入其他一些依赖关系之外,使用线程池执行上下文是您提到的方法。这并不过分;这是非常普遍的,并且比其他任何处理方式都要少得多。

+0

我已经完成了多个EC,所以我不想说它不可行,但对于链式执行的简单用例,我没有看到future.map或类似的优势,这意味着在运行时未来完成。根据使用情况,工作分配的方式可能会有很大差异。 –