0

如何从Twitter中读取响应数据Streaming API - POST状态/过滤器? 我建立了连接,并收到200个状态代码,但我不知道如何阅读推文。我只是想在他们来的时候发布推文。Playframework和Twitter Streaming API

ws.url(url) 
.sign(OAuthCalculator(consumerKey, requestToken)) 
.withMethod("POST") 
.stream() 
.map { response => 
    if(response.headers.status == 200) 
    println(response.body) 
} 

编辑:我发现这个解决方案

ws.url(url) 
.sign(OAuthCalculator(consumerKey, requestToken)) 
.withMethod("POST") 
.stream() 
.map { response => 
    if(response.headers.status == 200){ 
    response.body 
     .scan("")((acc, curr) => if (acc.contains("\r\n")) curr.utf8String else acc + curr.utf8String) 
     .filter(_.contains("\r\n")) 
     .map(json => Try(parse(json).extract[Tweet])) 
     .runForeach { 
     case Success(tweet) => 
      println("-----") 
      println(tweet.text) 
     case Failure(e) => 
      println("-----") 
      println(e.getStackTrace) 
     } 
    } 
} 

回答

4

流式WS请求响应的主体是一个Akka Streams Source字节。由于Twitter Api响应以新行分隔(通常),因此您可以使用Framing.delimiter将它们拆分为字节块,将块解析为JSON,然后按照您的要求进行操作。像这样的东西应该工作:

import akka.stream.scaladsl.Framing 
import scala.util.{Success, Try} 
import akka.util.ByteString 
import play.api.libs.json.{JsSuccess, Json, Reads} 
import play.api.libs.oauth.{ConsumerKey, OAuthCalculator, RequestToken} 

case class Tweet(id: Long, text: String) 
object Tweet { 
    implicit val reads: Reads[Tweet] = Json.reads[Tweet] 
} 

def twitter = Action.async { implicit request => 
    ws.url("https://stream.twitter.com/1.1/statuses/filter.json?track=Rio2016") 
     .sign(OAuthCalculator(consumerKey, requestToken)) 
     .withMethod("POST") 
     .stream().flatMap { response => 
    response.body 
     // Split up the byte stream into delimited chunks. Note 
     // that the chunks are quite big 
     .via(Framing.delimiter(ByteString.fromString("\n"), 20000)) 
     // Parse the chunks into JSON, and then to a Tweet. 
     // A better parsing strategy would be to account for all 
     // the different possible responses, but here we just 
     // collect those that match a Tweet. 
     .map(bytes => Try(Json.parse(bytes.toArray).validate[Tweet])) 
     .collect { 
     case Success(JsSuccess(tweet, _)) => tweet.text 
     } 
     // Print out each chunk 
     .runForeach(println).map { _ => 
     Ok("done") 
    } 
    } 
} 

注:兑现你需要一个隐含的Materializer注入到控制器的流。

+0

感谢您的解释 – mkovacek

+0

以后可以关闭连接吗?我打算有多个跟踪不同单词的请求,我希望在未来某个时间关闭特定连接? – mkovacek

+1

查看Akka文档中的[Dynamic Stream Handling](http://doc.akka.io/docs/akka/2.4.8/scala/stream/stream-dynamic.html)。一个想法是:创建一个共享kill开关,然后用'source.via(killSwitch.flow)'将其添加到流中。在killswitch上运行'shutdown()'应该关闭连接。 – Mikesname

3

调用stream()给你回Future[StreamedResponse]。那么您必须使用阿卡语成语来转换ByteString区块。是这样的:

val stream = ws.url(url) 
    .sign(OAuthCalculator(consumerKey, requestToken)) 
    .withMethod("POST") 
    .stream() 

stream flatMap { res => 
    res.body.runWith(Sink.foreach[ByteString] { bytes => 
    println(bytes.utf8String) 
    }) 
} 

注意,我没有测试上面的代码(但它是基于关闭的https://www.playframework.com/documentation/2.5.x/ScalaWS流响应部分加上http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-flows-and-basics.html水槽描述的)

还指出,这将打印每大块在自己的行,并且我不确定如果twitter API给每个块回完整的json blob。如果要在打印块之前累积块,则可能需要使用Sink.fold