2016-12-16 29 views
0

我试图使用火花流库,org.apache.spark.streaming.kinesis.KinesisUtils消耗Kinesis流。我可以使用python脚本验证Stream中是否有数据。但是,在尝试用scala编写消费者时,我一直在收集空的数据。这里是我的代码:Spark Streaming Kinesis消费者返回空数据

def getKinesisData = { 

    val endpointUrl = "https://kinesis.us-west-2.amazonaws.com" 
    val streamName = "myAwesomeStream" 
    val credentials = new DefaultAWSCredentialsProviderChain().getCredentials() 
    require(credentials != null, "No AWS credentials found.") 

    val kinesisClient = new AmazonKinesisClient(credentials) 
    kinesisClient.setEndpoint(endpointUrl) 

    val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size  
    val numStreams = numShards 
    val batchInterval = Milliseconds(2000) 
    val kinesisCheckpointInterval = batchInterval 

    val sparkConfig = new SparkConf().setAppName("myAwesomeApp").setMaster("local") 
    val ssc = new StreamingContext(sparkConfig, batchInterval) 

    val kinesisStreams = (0 until numStreams).map { i => 
     println(i) 
     KinesisUtils.createStream(ssc, "myAwesomeApp", streamName, endpointUrl, regionName, 
     InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2 
    ) 
    } 

    val unionStreams = ssc.union(kinesisStreams) 

    // Convert each line of Array[Byte] to String, and split into words 
    val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" ")) 

    val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) 
    wordCounts.print() 
    } 

我得到这个代码从GitHub一个例子,我真的不关心所有的工会,并flatmapping并已在代码的最新部分已经完成wordcounts。我只需要知道如何从流中获取实际数据。

UPDATE: 它在控制台上打印下面,而我运行它

16/12/16 14:57:01 INFO SparkContext: Running Spark version 2.0.0 
16/12/16 14:57:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
16/12/16 14:57:02 INFO SecurityManager: Changing view acls to: 
16/12/16 14:57:02 INFO SecurityManager: Changing modify acls to: 
16/12/16 14:57:02 INFO SecurityManager: Changing view acls groups to: 
16/12/16 14:57:02 INFO SecurityManager: Changing modify acls groups to: 
16/12/16 14:57:02 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(username); groups with view permissions: Set(); users with modify permissions: Set(username); groups with modify permissions: Set() 
16/12/16 14:57:02 INFO Utils: Successfully started service 'sparkDriver' on port 54774. 
16/12/16 14:57:02 INFO SparkEnv: Registering MapOutputTracker 
16/12/16 14:57:02 INFO SparkEnv: Registering BlockManagerMaster 
16/12/16 14:57:02 INFO DiskBlockManager: Created local directory at 
16/12/16 14:57:02 INFO MemoryStore: MemoryStore started with capacity 2004.6 MB 
16/12/16 14:57:02 INFO SparkEnv: Registering OutputCommitCoordinator 
16/12/16 14:57:02 INFO Utils: Successfully started service 'SparkUI' on port 4040. 
16/12/16 14:57:02 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://<I masked this IP address and port> 
16/12/16 14:57:03 INFO Executor: Starting executor ID driver on host localhost 
16/12/16 14:57:03 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 54775. 
16/12/16 14:57:03 INFO NettyBlockTransferService: Server created on <I masked this IP address and port> 
16/12/16 14:57:03 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, <I masked this IP address and port>) 
16/12/16 14:57:03 INFO BlockManagerMasterEndpoint: Registering block manager <I masked this IP address and port> with 2004.6 MB RAM, BlockManagerId(driver, <I masked this IP address and port>) 
16/12/16 14:57:03 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, <I masked this IP address and port>) 
16/12/16 14:57:03 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data. 

0 <-- printing shard 
1 <-- printing shard 
#### PRINTING kinesisStreams ###### 
Vector([email protected], [email protected]) 
#### PRINTING unionStreams ###### 
() 
#### words###### 
[email protected] 
#### PRINTING wordCounts###### 
[email protected] 

16/12/16 14:57:03 INFO SparkContext: Invoking stop() from shutdown hook 
16/12/16 14:57:03 INFO SparkUI: Stopped Spark web UI at http://<I masked this IP address and port> 
16/12/16 14:57:03 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
16/12/16 14:57:03 INFO MemoryStore: MemoryStore cleared 
16/12/16 14:57:03 INFO BlockManager: BlockManager stopped 
16/12/16 14:57:03 INFO BlockManagerMaster: BlockManagerMaster stopped 
16/12/16 14:57:03 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 
16/12/16 14:57:03 INFO SparkContext: Successfully stopped SparkContext 
16/12/16 14:57:03 INFO ShutdownHookManager: Shutdown hook called 
16/12/16 14:57:03 INFO ShutdownHookManager: Deleting directory 
+0

我想你想'createStream'中'InitialPositionInStream.TRIM_HORIZON'从流的开头读取。 –

+0

@YuvalItzchakov我也试过,输出是这样的'Kinesis Streams包含Vector([email protected],[email protected]) ( )< - 返回空当我打印unionStreams变量 这些是单词[email protected] 这是字数org.apache.spark.streaming.dstream.ShuffledDStream @ 790a251b' – summerNight

+0

运行它时是否将任何内容打印到控制台? –

回答

0

的问题是与1.5.2版本星火图书馆不与室壁运动很好地工作。

相关问题