2017-09-12 54 views
0

我写了使用Alpakka卡桑德拉图书馆为什么Akka Streams应用程序不能正常终止?

package com.abhi 

import akka.actor.ActorSystem 
import akka.stream.{ActorMaterializer, ClosedShape} 
import akka.stream.alpakka.cassandra.scaladsl.CassandraSource 
import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink} 
import com.datastax.driver.core.{Cluster, Row, SimpleStatement} 
import scala.concurrent.Await 
import scala.concurrent.duration._ 

object MyApp extends App { 
    implicit val actorSystem = ActorSystem() 
    implicit val actorMaterializer = ActorMaterializer() 
    implicit val session = Cluster 
     .builder 
     .addContactPoints(List("localhost") :_*) 
     .withPort(9042) 
     .withCredentials("foo", "bar") 
     .build 
     .connect("foobar") 
    val stmt = new SimpleStatement("SELECT col1, col2 FROM foo").setFetchSize(20) 
    val source = CassandraSource(stmt) 
    val toFoo = Flow[Row].map(row => Foo(row.getLong(0), row.Long(1))) 
    val sink = Sink.foreach[Foo](foo => println(foo.col1, foo.col2)) 
    val graph = RunnableGraph.fromGraph(GraphDSL.create(sink){ implicit b => 
     s => 
     import GraphDSL.Implicits._ 
     source.take(10) ~> toFoo ~> s 
     ClosedShape 
    }) 
    // let us run the graph 
    val future = graph.run() 
    import actorSystem.dispatcher 
    future.onComplete{_ => 
     session.close() 
     Await.result(actorSystem.terminate(), Duration.Inf) 
    } 
    Await.result(future, Duration.Inf) 
    System.exit(0) 
} 

case class Foo(col1: Long, col2: Long) 

这个简单的应用程序本应用程序运行完全按照预期它打印屏幕上的10行。

但发布它挂起。当执行System.exit(0)调用它抛出一个异常

Exception: sbt.TrapExitSecurityException thrown from the UncaughtExceptionHandler in thread "run-main-0" 

但还是应用程序不停止运行。它只是挂起。

我不明白为什么没有这个应用程序正常结束(其实它不应该甚至需要system.exit(0)调用。

退出该应用程序的唯一方法是通过控制ç 。

回答

2

因为SBT在其自己的JVM实例中运行的代码可能发生这种情况,你System.exit会再退出SBT的JVM给予上述结果

你尝试设置:?fork in run := true某处您的SBT构建

我也不确定使用actorSystem.dispatcher执行onComplete回调(因为您使用它来等待actor系统本身的终止)是一个好主意。

有些事情,你可以尝试,而不是:

import actorSystem.dispatcher 
future.onComplete{ _ => 
    session.close() 
    actorSystem.terminate() 
} 
Await.result(actorSystem.whenTerminated, Duration.Inf) 

注意,JVM会退出而不需要你打电话System.exit时留下的唯一线程是守护线程(参见例如What is Daemon thread in Java?)。