即时试图将我的流数据从火花保存到卡桑德拉,火花接到卡夫卡,它的工作正常,但保存到卡桑德拉,使我变得疯狂。即时通讯使用火花2.0.2,卡夫卡0.10和2.23卡桑德拉,火花,卡桑德拉,流媒体,蟒蛇,错误,数据库,卡夫卡
这是submiting如何IM引发
spark-submit --verbose --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0 --jars /tmp/pyspark-cassandra-0.3.5.jar --driver-class-path /tmp/pyspark-cassandra-0.3.5.jar --py-files /tmp/pyspark-cassandra-0.3.5.jar --conf spark.cassandra.connection.host=localhost /tmp/direct_kafka_wordcount5.py localhost:9092 testing
,这是我的代码,它只是从火花塞的例子,它的作品,但我一点点修改着保存这些数据卡桑德拉....
,这什么我尝试做的,但只是计数结果 http://rustyrazorblade.com/2015/05/spark-streaming-with-python-and-kafka/
from __future__ import print_function
import sys
import os
import time
import pyspark_cassandra
import pyspark_cassandra.streaming
from pyspark_cassandra import CassandraSparkContext
import urllib
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
from pyspark.sql.functions import from_unixtime, unix_timestamp, min, max
from pyspark.sql.types import FloatType
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: direct_kafka_wordcount.py <broker_list> <topic>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 1)
sqlContext = SQLContext(sc)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
lines = kvs.map(lambda x: x[1])
counts=lines.count()
counts.saveToCassandra("spark", "count")
counts.pprint()
ssc.start()
ssc.awaitTermination()
我得到这个错误,
回溯(最近通话最后一个): 文件 “/tmp/direct_kafka_wordcount5.py”,行88,在 counts.saveToCassandra( “火花”, “计数”)
如何将整合器传递给savecassandra? ,我知道pyspark cassandra它过时了,但我使用spark 1.6 – logyport