2016-02-04 74 views
2

我有一个用Scala编写的spark工作,我只想写一行用逗号分隔的行,来自Kafka生产者到Cassandra数据库。 但我无法调用saveToCassandra。 我看到几个wordcount的例子,他们正在写地图结构到两列的卡桑德拉表,它似乎工作正常。但是我有很多列,我发现数据结构需要并行化。 这里是我的代码示例:使用spark将数据写入cassandra

object TestPushToCassandra extends SparkStreamingJob { 
def validate(ssc: StreamingContext, config: Config): SparkJobValidation = SparkJobValid 

def runJob(ssc: StreamingContext, config: Config): Any = { 

val bp_conf=BpHooksUtils.getSparkConf() 
val brokers=bp_conf.get("bp_kafka_brokers","unknown_default") 


val input_topics = config.getString("topics.in").split(",").toSet 


val output_topic = config.getString("topic.out") 


val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) 
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, input_topics) 


val lines = messages.map(_._2) 
val words = lines.flatMap(_.split(",")) 

val li = words.par 

li.saveToCassandra("testspark","table1", SomeColumns("col1","col2","col3")) 
li.print() 



words.foreachRDD(rdd => 
    rdd.foreachPartition(partition => 
    partition.foreach{ 
     case x:String=>{ 

     val props = new HashMap[String, Object]() 
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) 
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
      "org.apache.kafka.common.serialization.StringSerializer") 
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
      "org.apache.kafka.common.serialization.StringSerializer") 

     val outMsg=x+" from spark" 
     val producer = new KafkaProducer[String,String](props) 
     val message=new ProducerRecord[String, String](output_topic,null,outMsg) 
     producer.send(message) 
     } 
    } 


) 
) 


ssc.start() 
ssc.awaitTermination() 
} 
} 

我认为这是斯卡拉的是,我没有得到正确的语法。 在此先感谢。

+1

words.par的调用几乎肯定不是正确的做法。 Dstream的“词汇”已经是一个DStream,它的本质已经被分布和并行化了。没有这个,你有什么问题? – RussS

+0

它工作没有“.par”,但现在我想知道如何分割值提取col1,col2,col3的值?例如,如果在kafka生产者中写入“val1,val2,val3”,那么我如何提取这些值并分别存储在col1,col2和col3中? – user3925365

+0

你是说你不能.split(“,”)字符串? – RussS

回答

1

您需要将单词DStream更改为连接器可以处理的内容。

如元组

val words = lines 
    .map(_.split(",")) 
    .map(wordArr => (wordArr(0), wordArr(1), wordArr(2)) 

或案例类

case class YourRow(col1: String, col2: String, col3: String) 
val words = lines 
    .map(_.split(",")) 
    .map(wordArr => YourRow(wordArr(0), wordArr(1), wordArr(2))) 

或CassandraRow

这是因为如果你把一个阵中还有所有的本身就可以在一个阵列C *你试图插入而不是3列。

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md

+0

感谢您的回答。当我尝试解决方案时,它在数据库中的存储位有所不同。也许我错过了一些小的语法。如果我通过abc,def,ghi,并且这里是我的代码,则从Kafka生产商处获得:val lines = messages.map(_._ 2) val words = lines.flatMap(_。split(“”)) val innerWords = words.flatMap (_.split(“,”)) val wordCounts = innerWords.map(wordArr =(wordArr(0),wordArr(1),wordArr(2))) wordCounts.saveToCassandra(“keyspace01”,“table1” ,SomeColumns(“col1”,“col2”,“col3”)) 这段代码在数据库中产生三个条目,即1st:a,b,c 2nd:d,e,f 3rd:g,h,i – user3925365

+0

woops我不应该复制你的线条,这应该是地图,而不是flatMap地图 – RussS

+0

如果我使用地图与词首先分裂,然后它给了我编译错误words.foreachRDD函数在行“case x :串”。它说监察人员与模式类型不相容; found:字符串 必需:数组[String] – user3925365

相关问题