2016-12-21 76 views
2

我尝试在Scala上实现AKKA http和一些演员。 我用akka创建了一个web应用程序。 但我有一个或2个不同的路线/ 16这个错误。 (它显然是随机的):akka http与演员:错误503超时

服务器无法及时响应您的请求。 请稍后再试!

您能向我解释为什么以及如何解决? 我真的是阿卡的新手。

主类:

object WebServer extends App { 

    implicit val system = ActorSystem("app-1") 
    implicit val materializer = ActorMaterializer() 
    // needed for the future flatMap/onComplete in the end 
    implicit val executionContext = system.dispatcher 
    val routes = SessionRoute.route 
    val bindingFuture = Http().bindAndHandle(routes, ipServer, configApplication.getInt("spray.can.client.proxy.http.port")) 
    println("serv http launch") 
    StdIn.readLine 
    bindingFuture 
     .flatMap(_.unbind()) // trigger unbinding from the port 
     .onComplete(_ => { 
     cluster.close() 
     system.terminate() 
    }) 
    bindingFuture.onFailure { 
     case ex: Exception => 
      println(ex, "Failed to bind to {}:{}!", ipServer, configApplication.getInt("spray.can.client.proxy.http.port")) 
    } 
    } 

我的路线是:

object SessionRoute extends TokenValidator { 
implicit val formats = org.json4s.DefaultFormats 

val sessionHandler = WebServer.system.actorOf(SessionHandler.props(), "SessionHandler") 
implicit val timeout = Timeout(60.seconds) 


val route: Route = get { 
    authenticated(doAuthPublisher) { app => 
     getActiveUserPublisher 
    } 
} 
def getActiveUserPublisher = 
    path("session"/JavaUUID/"active_user") { publisher_id => 
     parameters('date_start.as[Long], 'date_end.as[Long]) { 
      (date_start, date_end) => { 
       onSuccess(sessionHandler ? SessionActiveUser(SessionRequest(publisher_id, date_start, date_end, null))) { 
        case response: Answer => 
         complete(StatusCodes.OK, response.result) 
        case _ => 
         complete(StatusCodes.InternalServerError, "Error on the page") 
       } 
      } 
     } 

    } 
} 

我的演员是:使用

object SessionHandler { 
     def props(): Props = { 
      Props(classOf[SessionHandler]) 
     } 
    } 
    class SessionService(implicit actorSystem: ActorSystem) extends toolService { 
    def activeUser(sessionRequest: SessionRequest): Map[String, Any] = { 
    .... 
     } 
    } 

class SessionHandler extends Actor with ActorLogging { 
    implicit val system = ActorSystem("session") 
    implicit val formats = org.json4s.DefaultFormats 

    def receive: Receive = { 
    case request: SessionActiveUser => 
    sender() ! Answer(Serialization.write(new SessionService().activeUser(request.sessionRequest))) 
    }} 

而我的情况下类:

final case class Answer(result: String) 
case class SessionActiveUser(sessionRequest: SessionRequest) 
case class SessionRequest(publisher_id: UUID = null, date_start: Long, date_end: Long, app_id: String = null) 

我configuration.conf:

akka { 
    loglevel = INFO 
    stdout-loglevel = INFO 
    loggers = ["akka.event.slf4j.Slf4jLogger"] 
    default-dispatcher { 
    fork-join-executor { 
     parallelism-min = 8 
    } 
    } 
// event-handlers = ["akka.event.slf4j.Slf4jLogger"] 
} 

回答

1

您所看到的错误是造成你的route不能够产生配置的请求超时时间内响应。如果您没有明确设置它,则默认为20秒。有关请求超时的更多信息,请参见here

关于发生这种情况的原因,您可以详细了解activeUser函数中会发生什么吗?在那里发生任何重大阻塞?如果是这样,所有传入的请求将被顺序化并阻止activeUser,最终导致请求超时来终止您的请求。

可能的解决方案是:

  • 让你的服务异步/无阻塞
  • 按照有关如何处理阻塞阿卡-HTTP路线内来电docs