Cassandra v2.1.12
Spark v1.4.1
Scala 2.10
和Cassandra是监听
rpc_address:127.0.1.1
rpc_port:9160
例如,连接卡夫卡和火花流,一边听每4秒卡夫卡,我有以下的火花工作
sc = SparkContext(conf=conf)
stream=StreamingContext(sc,4)
map1={'topic_name':1}
kafkaStream = KafkaUtils.createStream(stream, 'localhost:2181', "name", map1)
并且spark-streaming不断收听kafka经纪人,每隔4秒钟输出一次内容。
同样的方式,我想要火花流媒体来收听cassandra并输出指定表格的内容,比方说每4秒。
如何转换上面的流代码,使其与cassandra而不是kafka一起使用?
非流媒体解决方案
我可以明显地保持在一个无限循环中运行的查询,但事实并非如此流吧?
火花的工作:
from __future__ import print_function
import time
import sys
from random import random
from operator import add
from pyspark.streaming import StreamingContext
from pyspark import SparkContext,SparkConf
from pyspark.sql import SQLContext
from pyspark.streaming import *
sc = SparkContext(appName="sparkcassandra")
while(True):
time.sleep(5)
sqlContext = SQLContext(sc)
stream=StreamingContext(sc,4)
lines = stream.socketTextStream("127.0.1.1", 9160)
sqlContext.read.format("org.apache.spark.sql.cassandra")\
.options(table="users", keyspace="keyspace2")\
.load()\
.show()
像这样运行
sudo ./bin/spark-submit --packages \
datastax:spark-cassandra-connector:1.4.1-s_2.10 \
examples/src/main/python/sparkstreaming-cassandra2.py
,我得到表值这rougly看起来像
lastname|age|city|email|firstname
那么什么是正确的方法“流媒体”来自cassandra的数据?