2016-01-25 65 views
3

我使用如何将火花流与cassandra连接?

Cassandra v2.1.12 
Spark v1.4.1 
Scala 2.10 

和Cassandra是监听

rpc_address:127.0.1.1 
rpc_port:9160 

例如,连接卡夫卡和火花流,一边听每4秒卡夫卡,我有以下的火花工作

sc = SparkContext(conf=conf) 
stream=StreamingContext(sc,4) 
map1={'topic_name':1} 
kafkaStream = KafkaUtils.createStream(stream, 'localhost:2181', "name", map1) 

并且spark-streaming不断收听kafka经纪人,每隔4秒钟输出一次内容。

同样的方式,我想要火花流媒体来收听cassandra并输出指定表格的内容,比方说每4秒

如何转换上面的流代码,使其与cassandra而不是kafka一起使用?


非流媒体解决方案

我可以明显地保持在一个无限循环中运行的查询,但事实并非如此流吧?

火花的工作:

from __future__ import print_function 
import time 
import sys 

from random import random 
from operator import add 
from pyspark.streaming import StreamingContext 
from pyspark import SparkContext,SparkConf 
from pyspark.sql import SQLContext 
from pyspark.streaming import * 

sc = SparkContext(appName="sparkcassandra") 
while(True): 
    time.sleep(5) 
    sqlContext = SQLContext(sc) 
    stream=StreamingContext(sc,4) 
    lines = stream.socketTextStream("127.0.1.1", 9160) 
    sqlContext.read.format("org.apache.spark.sql.cassandra")\ 
       .options(table="users", keyspace="keyspace2")\ 
       .load()\ 
       .show() 

像这样运行

sudo ./bin/spark-submit --packages \ 
datastax:spark-cassandra-connector:1.4.1-s_2.10 \ 
examples/src/main/python/sparkstreaming-cassandra2.py 

,我得到表值这rougly看起来像

lastname|age|city|email|firstname 

那么什么是正确的方法“流媒体”来自cassandra的数据?


回答

0

目前Cassandra是本身不支持作为星火1.6流源,必须实现自定义接收器为你自己的情况下(listen to cassandra and output the contents of the specified table every say 4 seconds.)。

请参阅实施指南:

Spark Streaming Custom Receivers

1

目前“正道”从C *流数据是不是从C *流数据:)相反,它通常使很多更有意义让你的消息队列(如卡夫卡)在C *和Stream之前关闭。 C *不容易支持增量表读取,但是如果集群密钥基于插入时间,则可以完成此操作。

如果您有兴趣使用C *作为流源一定要检查出和 https://issues.apache.org/jira/browse/CASSANDRA-8844 变更数据捕获评论

这是最有可能你在找什么。

如果你实际上只是想定期读取整个表,并做一些你可能是最好的,只是有cron作业推出一批操作你真的没有反正恢复状态的方式。