2017-03-28 56 views
2

我目前正在做一些实验akka及其持久性堆栈,用akka-http堆栈包装。akka-http和JsonEntityStreamingSupport

注意:为了持久性,我使用非官方插件将Akka FSM保留为mongodb。

但我的问题是使用JsonEntityStreamingSupportrecommended by akka to serve Source as json

在我的情况,我有这段代码

implicit val jsonEntityStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json() 

val readJournal = PersistenceQuery(system).readJournalFor[ScalaDslMongoReadJournal](MongoReadJournal.Identifier) 

val route = 
    path("workflows") { 
    get { 
     complete(readJournal.currentPersistenceIds()) 
    } 
} 

Http().bindAndHandle(route, "localhost", 8081) 

但不幸的是,我想出了这个错误:

$ curl localhost:8081/workflows 
curl: (56) Recv failure: Connection reset by peer 

我没有看到任何错误或日志,这可能会导致信息关于服务器关闭连接的原因。

有没有人已经做过这种实验?

我与阿卡2.4.16和测试它阿卡-HTTP 10.0.5

回答

2

好吧,我想通了。

readJournal.currentPersistenceIds()给我一个Source[String, NotUsed]

但是,因为它是在akka-http specs规定,

This is wrong since we try to render JSON, but String is not a valid top level element we need to provide an explicit Marshaller[String, ByteString] if we really want to render a list of strings.

所以,我必须为它提供一个Marshaller。例如,通过那些相同的测试给出:

implicit val stringFormat = Marshaller[String, ByteString] { ec ⇒ s ⇒ 
    Future.successful { 
    List(Marshalling.WithFixedContentType(ContentTypes.`application/json`,() ⇒ 
     ByteString("\"" + s + "\"")) // "raw string" to be rendered as json element in our stream must be enclosed by "" 
    ) 
    } 
}