2017-06-04 127 views
0

即时试图将我的流数据从火花保存到卡桑德拉,火花接到卡夫卡,它的工作正常,但保存到卡桑德拉,使我变得疯狂。即时通讯使用火花2.0.2,卡夫卡0.10和2.23卡桑德拉,火花,卡桑德拉,流媒体,蟒蛇,错误,数据库,卡夫卡

这是submiting如何IM引发

spark-submit --verbose --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0 --jars /tmp/pyspark-cassandra-0.3.5.jar --driver-class-path /tmp/pyspark-cassandra-0.3.5.jar --py-files /tmp/pyspark-cassandra-0.3.5.jar --conf spark.cassandra.connection.host=localhost /tmp/direct_kafka_wordcount5.py localhost:9092 testing 

,这是我的代码,它只是从火花塞的例子,它的作品,但我一点点修改着保存这些数据卡桑德拉....

,这什么我尝试做的,但只是计数结果 http://rustyrazorblade.com/2015/05/spark-streaming-with-python-and-kafka/

from __future__ import print_function 
import sys 
import os 
import time 
import pyspark_cassandra 
import pyspark_cassandra.streaming 
from pyspark_cassandra import CassandraSparkContext 
import urllib 
from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.kafka import KafkaUtils 
from pyspark.sql import SQLContext 
from pyspark.sql import Row 
from pyspark.sql.types import IntegerType 
from pyspark.sql.functions import udf 
from pyspark.sql.functions import from_unixtime, unix_timestamp, min, max 
from pyspark.sql.types import FloatType 
from pyspark.sql.functions import explode 
from pyspark.sql.functions import split 
if __name__ == "__main__": 
    if len(sys.argv) != 3: 
     print("Usage: direct_kafka_wordcount.py <broker_list> <topic>", file=sys.stderr) 
     exit(-1) 
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount") 
    ssc = StreamingContext(sc, 1) 
    sqlContext = SQLContext(sc) 
    brokers, topic = sys.argv[1:] 
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}) 
    lines = kvs.map(lambda x: x[1]) 
    counts=lines.count() 
    counts.saveToCassandra("spark", "count") 
    counts.pprint() 
    ssc.start() 
    ssc.awaitTermination() 

我得到这个错误,

回溯(最近通话最后一个): 文件 “/tmp/direct_kafka_wordcount5.py”,行88,在 counts.saveToCassandra( “火花”, “计数”)

回答

0

Pyspark Casasndra停止前一阵子被更新,最新版本只支持最多星火1.6 https://github.com/TargetHolding/pyspark-cassandra

此外

counts=lines.count() // Returns data to the driver (not an RDD) 

counts现在是一个整数。这意味着功能saveToCassandra不适用,因为这是RDD的功能

+0

如何将整合器传递给savecassandra? ,我知道pyspark cassandra它过时了,但我使用spark 1.6 – logyport