我在Spark Streaming中使用Java编写简单的数据管道,从Kafka中提取JSON数据,将JSON解析为自定义类(Transaction
),然后插入数据转换成卡桑德拉表,但我无法使mapToRow()
函数正常工作。Spark Streaming - Java - 从Kafka插入JSON到Cassandra
我见过吨的例子是说,你所要做的就是沿着这个线的东西:
JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(
streamingContext,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
JavaDStream<String> lines = stream.map(
new Function<Tuple2<String,String>, String>(){
@Override
public String call(Tuple2<String,String> tuple2) {
return tuple2._2();
}
}
);
javaFunctions(lines).writerBuilder("myKeyspace", "myTableName", mapToRow(Transaction.class)).saveToCassandra();
然而,当我这样做,我得到的错误:
The method mapToRow(Class<Transaction>) is undefined for the type SaveTransactions
我想我所缺乏的是我班上的某种装饰,但是我没有成功地弄清楚哪一个。我试着去光秃秃的骨头,从根本上让类的属性袋:
public class Transaction implements java.io.Serializable{
public int TransactionId;
...
public Transaction(){}
}
我已经尝试了所有的DataStax映射注释:
@Table(keyspace = "myKeyspace", name = "myTableName",
readConsistency = "QUORUM",
writeConsistency = "QUORUM",
caseSensitiveKeyspace = false,
caseSensitiveTable = false)
public class Transaction implements java.io.Serializable{
@PartitionKey(0)
@Column(name="transaction_id")
public int TransactionId;
...
public Transaction(){}
}
我也尝试建立公共get/set方法每个属性和设置属性以私人:
public class Transaction implements java.io.Serializable{
private int transactionId;
...
public Transaction(){}
public int getTransactionId() {
return transactionId;
}
public void setTransactionId(int transactionId) {
this.transactionId = transactionId;
}
}
我已经能够解析DStream
到的RDD
210使用下面的类:
public class Transaction implements java.io.Serializable{
...
public static class ParseJSON implements FlatMapFunction<Iterator<String>, Transaction> {
public Iterable<Transaction> call(Iterator<String> lines) throws Exception {
ArrayList<Transaction> transactions = new ArrayList<Transaction>();
ObjectMapper mapper = new ObjectMapper();
while (lines.hasNext()) {
String line = lines.next();
try {
transactions.add(mapper.readValue(line, Transaction.class));
} catch (Exception e) {
System.out.println("Skipped:" + e);
}
}
return transactions;
}
}
}
在用下面的代码的同时,作用在lines
对象上从上方:
JavaDStream<Transaction> events = lines.mapPartitions(new Transaction.ParseJSON());
但是,一旦我有它在此从它仍然不工作用writeBuilder()。saveToCassandra()链。
这里的任何帮助,非常感谢。