2017-05-05 36 views
0

我有两种方法,我们称它们为load()init()。每个人都在自己的线程中启动一个计算,并在其自己的执行上下文中返回Future。这两个计算是独立的。用回调编写两个Scala未来,没有第三个ExecutionContext

val loadContext = ExecutionContext.fromExecutor(...) 
def load(): Future[Unit] = { 
    Future 
} 

val initContext = ExecutionContext.fromExecutor(...) 
def init(): Future[Unit] = { 
    Future { ... }(initContext) 
} 

我想这两个来自一些三线程中调用 - 说这是从main() - 并且都完成后执行一些其他的计算。

def onBothComplete(): Unit = ... 

现在:

  1. 我不在乎它完成第一
  2. 我不在乎什么的线程上执行其它计算,除了:
  3. 我不想阻止两个线程等待另一个线程;
  4. 我不想阻塞第三个(调用)线程;和
  5. 我不想为了设置标志而开始第四个线程。

如果我用-内涵,我得到的是这样的:

val loading = load() 
val initialization = initialize() 

for { 
    loaded <- loading 
    initialized <- initialization 
} yield { onBothComplete() } 

,我得到无法找到一个隐含的ExecutionContext。

我认为这意味着斯卡拉希望第四个线程等待两个期货的完成并设置标志,无论是明确的新的ExecutionContext还是ExecutionContext.Implicits.global。所以看起来理解力已经不存在了。

我想我也许可以嵌套回调:

initialization.onComplete { 
    case Success(_) => 
    loading.onComplete { 
     case Success(_) => onBothComplete() 
     case Failure(t) => log.error("Unable to load", t) 
    } 
    case Failure(t) => log.error("Unable to initialize", t) 
} 

不幸的是onComplete也需要一个隐含的ExecutionContext,我也得到了同样的错误。 (另外,这是难看的,并且从loading失去错误消息,如果initialization失败。)

是否有任何方式来组成的Scala期货而不阻塞和不引入另一个ExecutionContext?如果没有,我可能不得不把它们扔给Java 8 CompletableFutures Javaslang Vavr Futures,它们都能够在执行原始工作的线程上运行回调。

更新澄清阻塞两个等待另一个的线程也是不可接受的。

再次更新对于完成后计算没有那么具体。

+0

你应该做'Future.firstCompletedOf(名单(初始化,加载))'如果你想抓住未来的结果首先完成。为了列表理解将等待两个期货在完成收益之前完成。 – pcting

+0

@pcting在执行其他计算之前,我确实希望等待两个期货的完成。我不想阻止当前的线程来做到这一点。 –

+0

如果这是为了将AtomicBoolean设置为true的确切目的,那么可以改为在完成时指示完成。 'promise.completeWith(初始化zip加载)' –

回答

1

为什么不重用自己的执行上下文之一?不知道您对这些要求,但如果你使用一个单独的线程执行,你可以只重用一个作为您的理解与执行上下文,你不会得到任何新创建的线程:

implicit val loadContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor) 

如果你真的无法重用它们,您可以将其视为隐式执行上下文:

implicit val currentThreadExecutionContext = ExecutionContext.fromExecutor(
    (runnable: Runnable) => { 
    runnable.run() 
    }) 

哪个将在当前线程上运行期货。但是,Scala文档明确建议不要这样做,因为它引入了非确定性,其中线程运行Future(但正如您所说的,您并不在乎它运行哪个线程,所以这可能无关紧要)。

请参阅Synchronous Execution Context为什么这是不可取的。

与该上下文的一个例子:

val loadContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor) 

def load(): Future[Unit] = { 
    Future(println("loading thread " + Thread.currentThread().getName))(loadContext) 
} 

val initContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor) 

def init(): Future[Unit] = { 
    Future(println("init thread " + Thread.currentThread().getName))(initContext) 
} 

val doneFlag = new AtomicBoolean(false) 

val loading = load() 
val initialization = init() 

implicit val currentThreadExecutionContext = ExecutionContext.fromExecutor(
    (runnable: Runnable) => { 
    runnable.run() 
    }) 

for { 
    loaded <- loading 
    initialized <- initialization 
} yield { 
    println("yield thread " + Thread.currentThread().getName) 
    doneFlag.set(true) 
} 

打印:

loading thread pool-1-thread-1 
init thread pool-2-thread-1 
yield thread main 

虽然yield线可以打印任何pool-1-thread-1pool-2-thread-1根据运行。

+0

正如我所说的,我不想阻塞当前线程,所以SynchronousExecutionContext已经结束。如果我明确选择了现有的ExecutionContext中的一个,它是否意味着(假设它们是单线程的)该线程被阻塞,等待另一个线程?我也不想那样。 (我会更新这个问题。) –

1

在Scala中,Future表示要执行的异步工作(即与其他工作单元同时执行)。 ExecutionContext表示用于执行Future的线程池。换句话说,ExecutionContext是执行实际工作的工作人员的团队。

为了提高效率和可扩展性,最好是有大团队(S)(例如,单ExecutionContext有10个线程执行10 Future的),而不是小团队(例如5 ExecutionContext 2个线程各执行10 Future的)。

在你的情况,如果你想线程数限制为2,您可以:

def load()(implicit teamOfWorkers: ExecutionContext): Future[Unit] = { 
    Future { ... } /* will use the teamOfWorkers implicitly */ 
} 

def init()(implicit teamOfWorkers: ExecutionContext): Future[Unit] = { 
    Future { ... } /* will use the teamOfWorkers implicitly */ 
} 

implicit val bigTeamOfWorkers = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2)) 
/* All async works in the following will use 
    the same bigTeamOfWorkers implicitly and works will be shared by 
    the 2 workers (i.e. thread) in the team */ 
for { 
    loaded <- loading 
    initialized <- initialization 
} yield doneFlag.set(true) 

无法找到一个隐含的ExecutionContext错误并不意味着斯卡拉想要额外的线程。这只意味着Scala想要ExecutionContext来完成这项工作。而额外的ExecutionContext并不一定意味着额外的“线程”,例如以下ExecutionContext,而不是创建新的线程,将在当前线程执行的工作:

val currThreadExecutor = ExecutionContext.fromExecutor(new Executor { 
    override def execute(command: Runnable): Unit = command.run() 
}) 
相关问题