2015-09-12 43 views
2

我需要一个机制来异步调用一些回调的未来一个回调的列表...所以我已经实现了以下类:如何调用返回后另一个

class AsyncCallbacks[T] { 

    private val callbacks = new ListBuffer[T => Future[Unit]]() 

    def +=(f: T => Future[Unit]) = callbacks += f 
    def -=(f: T => Future[Unit]) = callbacks -= f 

    def invoke(data: T) = Future.sequence(callbacks.map(_(data))) 
} 

... 

def f1(i: Int) = Future { println(i) } 
def f2(i: Int) = Future { println(i) } 

val callbacks = new AsyncCallbacks[Int] 
callbacks += f1 
callbacks += f2 
callbacks.invoke(5) 

callbacks.invoke产生scala.concurrent.Future[scala.collection.mutable.ListBuffer[Unit]] ...我想知道是否有更好更有效的方法来调用所有已注册的回调,而不会生成无用的列表Unit s。

实现以上也有另外一个问题...让我们假设有下面的方法...

def l1 = Future { List.fill(5)("1") } 
def l2 = Future { List.fill(5)("2") } 

...然后我调用它们像这样:

for { 
    a <- l1 
    b <- l2 
    c <- callbacks.invoke(5) 
} yield b 

callbacks.invoke作品...但它看起来像它永远不会返回...

编辑

OK,我试图重新实现我的使用scalaz AsyncCallbacks类的建议通过I.K.

import scala.collection.mutable.ListBuffer 
import scala.concurrent.Future 
import scala.concurrent.ExecutionContext.Implicits.global 
import scalaz.concurrent.Task 

class AsyncCallbacks[T] { 

    private val tasks = new ListBuffer[Task[T => Future[Unit]]]() 

    /** Gets the number of callbacks registered. */ 
    def count = tasks.length 

    /** Clears all the registered callbacks. */ 
    def clear = tasks.clear 

    /* Adds the specified function to the list of callbacks to be invoked. */ 
    def +=(f: T => Future[Unit]) = tasks += Task(f) 

    /** Invokes all the registered callbacks. */ 
    def invoke(data: T) = Future { Task.gatherUnordered(tasks).map(_.map(_(data))).run.length } 
} 

这里是它的用法:

def f1(i: Int) = Future { println(i) } 
def f2(i: Int) = Future { println(i) } 

val callbacks = new AsyncCallbacks[Int]() 
callbacks += f1 
callbacks += f2 
callbacks.invoke(4) // prints 4 two times (f1 + f2) 

现在距离执行上面的代码REPL ...,然后尝试多次调用`callbacks.invoke(4),你会发现你不能再退出REPL(它仍然被阻塞,你必须用CTRL-C退出)。我认为这可能是一个真正的应用程序的问题。

+0

您打开使用'Scalaz'? –

+0

哇...是的,但我不知道它... – j3d

+0

如果你不关心回调函数成功芬兰(*没有产生一个无用单元列表*),你可以执行'callbacks.foreach (_(data))'这会给你'Unit'作为结果类型。 –

回答

1

从您的帖子看来,无论您想要放入Future正文中的数据类型如何,您都希望完成它并将其通知给您。

在斯卡拉,它将被建模为Task,其实质上是一个Future下面,但附带功能。

一些例子,

scala> import scalaz.concurrent.Task 
import scalaz.concurrent.Task 

scala> val tasks = (1 |-> 5).map(n => Task { Thread.sleep(100); n }) 
tasks: List[scalaz.concurrent.Task[Int]] = List([email protected], [email protected], [email protected], [email protected], [email protected]) 

scala> Task.gatherUnordered(tasks).run 
res10: List[Int] = List(4, 1, 2, 3, 5) 

scala> Task.gatherUnordered(tasks).run 
res11: List[Int] = List(3, 1, 2, 4, 5) 

scala> Task.gatherUnordered(tasks).run 
res12: List[Int] = List(2, 1, 3, 4, 5) 

正如你所看到的,这些任务运行完成的时候,输出会有所不同。 Task的实现是不确定的。

以你的榜样,

scala> val tasks = List(Task{1},Task{2}) 
tasks: List[scalaz.concurrent.Task[Int]] = List([email protected], [email protected]) 

scala> Task.gatherUnordered(tasks).run 
res13: List[Int] = List(1, 2) 

scala> val tasks = List(Task{List.fill(5)("1")}, Task{List.fill(5)("2")}) 
tasks: List[scalaz.concurrent.Task[List[String]]] = List([email protected], [email protected]) 

scala> Task.gatherUnordered(tasks).run 
res17: List[List[String]] = List(List(1, 1, 1, 1, 1), List(2, 2, 2, 2, 2)) 
+0

非常感谢:-)刚刚尝试......但看到我更新后的帖子。我已经使用scalaz重新实现了我的'AsyncCallback'类,但是与旧的实现一样,REPL在几次尝试后仍然被阻塞。 – j3d

+0

为什么你仍然需要你的'AsyncCallback'?在Scalaz中也有'runAsync'方法。 –

+0

在我的真实应用程序中,我有一组函数'T => Future [Unit]'在这样的理解中被调用:'用于事务< - Future.sequence(orders。地图{为了=> 为{ 交易< - createTransaction(order.id) _ < - myCallbacks(事务) }收率交易 }) }' – j3d

相关问题