2017-02-19 127 views
0

我试图使用Apache Flink获取Twitter流API的一些消息。Apache Flink - 无法从Twitter获取数据

但是,我的代码没有在输出文件中写入任何内容。我正在计算特定单词的输入数据。

普莱舍检查我的例子:

import java.util.Properties 

import org.apache.flink.api.scala._ 
import org.apache.flink.streaming.connectors.twitter._ 
import org.apache.flink.api.java.utils.ParameterTool 
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment 
import com.twitter.hbc.core.endpoint.{Location, StatusesFilterEndpoint, StreamingEndpoint} 
import org.apache.flink.streaming.api.windowing.time.Time 

import scala.collection.JavaConverters._ 


////////////////////////////////////////////////////// 
// Create an Endpoint to Track our terms 
class myFilterEndpoint extends TwitterSource.EndpointInitializer with Serializable { 
    @Override 
    def createEndpoint(): StreamingEndpoint = { 
    //val chicago = new Location(new Location.Coordinate(-86.0, 41.0), new Location.Coordinate(-87.0, 42.0)) 
    val endpoint = new StatusesFilterEndpoint() 
    //endpoint.locations(List(chicago).asJava) 
    endpoint.trackTerms(List("odebrecht", "lava", "jato").asJava) 
    endpoint 
    } 
} 

object Connection { 
    def main(args: Array[String]): Unit = { 

    val props = new Properties() 

    val params: ParameterTool = ParameterTool.fromArgs(args) 
    val env = StreamExecutionEnvironment.getExecutionEnvironment 

    env.getConfig.setGlobalJobParameters(params) 
    env.setParallelism(params.getInt("parallelism", 1)) 

    props.setProperty(TwitterSource.CONSUMER_KEY, params.get("consumer-key")) 
    props.setProperty(TwitterSource.CONSUMER_SECRET, params.get("consumer-key")) 
    props.setProperty(TwitterSource.TOKEN, params.get("token")) 
    props.setProperty(TwitterSource.TOKEN_SECRET, params.get("token-secret")) 

    val source = new TwitterSource(props) 
    val epInit = new myFilterEndpoint() 

    source.setCustomEndpointInitializer(epInit) 

    val streamSource = env.addSource(source) 

    streamSource.map(s => (0, 1)) 
     .keyBy(0) 
     .timeWindow(Time.minutes(2), Time.seconds(30)) 
     .sum(1) 
     .map(t => t._2) 
     .writeAsText(params.get("output")) 

    env.execute("Twitter Count") 
    } 
} 

的一点是,我没有错误消息,我可以在我的仪表盘看到的。我的源是发送数据到我的TriggerWindow。但它没有收到任何数据: enter image description here

我有两个问题在一次。

第一:为什么我的源发送字节到我的TriggerWindow如果没有收到任何东西?

Seccond:我的代码有些问题,我无法从twitter获取数据?

+0

第一次结果应该在2分钟后写出(即窗口的长度)。你等了那么久吗? TriggerWindow已经收到了数据,但是在43s之后,肯定不会有任何东西写入文件。你的代码看起来不错。 –

+0

嗨@DawidWysakowicz,是的,我等了那么久。随便我运行这个代码2个小时。我为这个问题拍了照片。但是Flink没有输出:( –

回答

1

您的应用程序源没有发送实际的记录,你可以通过查看记录看到窗口发送列。发送的字节属于Flink不时在任务之间发送的控制消息。更具体地说,它是用于测量Flink作业的端到端延迟的LatencyMarker消息。

该代码看起来不错。我甚至试过你的代码并为我工作。因此,我得出结论:Twitter连接证书必须有问题。请重新检查您是否输入了正确的凭证。