2016-07-01 120 views
0

我在Spark Streaming中使用Java编写简单的数据管道,从Kafka中提取JSON数据,将JSON解析为自定义类(Transaction),然后插入数据转换成卡桑德拉表,但我无法使mapToRow()函数正常工作。Spark Streaming - Java - 从Kafka插入JSON到Cassandra

我见过吨的例子是说,你所要做的就是沿着这个线的东西:

JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(
     streamingContext, 
     String.class, 
     String.class, 
     StringDecoder.class, 
     StringDecoder.class, 
     kafkaParams, 
     topicsSet 
); 

JavaDStream<String> lines = stream.map(
    new Function<Tuple2<String,String>, String>(){ 
     @Override 
     public String call(Tuple2<String,String> tuple2) { 
      return tuple2._2(); 
     } 
    } 
); 

javaFunctions(lines).writerBuilder("myKeyspace", "myTableName", mapToRow(Transaction.class)).saveToCassandra(); 

然而,当我这样做,我得到的错误:

The method mapToRow(Class<Transaction>) is undefined for the type SaveTransactions 

我想我所缺乏的是我班上的某种装饰,但是我没有成功地弄清楚哪一个。我试着去光秃秃的骨头,从根本上让类的属性袋:

public class Transaction implements java.io.Serializable{ 

    public int TransactionId; 
    ... 

    public Transaction(){} 
} 

我已经尝试了所有的DataStax映射注释:

@Table(keyspace = "myKeyspace", name = "myTableName", 
     readConsistency = "QUORUM", 
     writeConsistency = "QUORUM", 
     caseSensitiveKeyspace = false, 
     caseSensitiveTable = false) 
public class Transaction implements java.io.Serializable{ 

    @PartitionKey(0) 
    @Column(name="transaction_id") 
    public int TransactionId; 
    ... 

    public Transaction(){} 
} 

我也尝试建立公共get/set方法每个属性和设置属性以私人:

public class Transaction implements java.io.Serializable{ 

    private int transactionId; 
    ... 

    public Transaction(){} 

    public int getTransactionId() { 
     return transactionId; 
    } 

    public void setTransactionId(int transactionId) { 
     this.transactionId = transactionId; 
    } 
} 

我已经能够解析DStream到的RDD 210使用下面的类:

public class Transaction implements java.io.Serializable{ 

    ... 

    public static class ParseJSON implements FlatMapFunction<Iterator<String>, Transaction> { 
     public Iterable<Transaction> call(Iterator<String> lines) throws Exception { 
      ArrayList<Transaction> transactions = new ArrayList<Transaction>(); 
       ObjectMapper mapper = new ObjectMapper(); 
       while (lines.hasNext()) { 
        String line = lines.next(); 
        try { 
         transactions.add(mapper.readValue(line, Transaction.class)); 
        } catch (Exception e) { 
         System.out.println("Skipped:" + e); 
        } 
       } 

       return transactions; 
     } 
    } 
} 

在用下面的代码的同时,作用在lines对象上从上方:

JavaDStream<Transaction> events = lines.mapPartitions(new Transaction.ParseJSON()); 

但是,一旦我有它在此从它仍然不工作用writeBuilder()。saveToCassandra()链。

这里的任何帮助,非常感谢。

回答

0

原来这个问题只是一个导入问题。我曾导入com.datastax.spark.connector.japi.CassandraStreamingJavaUtil.*,认为它会给我所需的所有东西,但我还需要为.mapToRow()函数引入com.datastax.spark.connector.japi.CassandraJavaUtil.*

一旦我解决了这一点,我开始收到以下错误:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/catalyst/package$ScalaReflectionLock$ 
    at org.apache.spark.sql.catalyst.ReflectionLock$.<init>(ReflectionLock.scala:5) 
    at org.apache.spark.sql.catalyst.ReflectionLock$.<clinit>(ReflectionLock.scala) 
    at com.datastax.spark.connector.mapper.ReflectionColumnMapper.<init>(ReflectionColumnMapper.scala:38) 
    at com.datastax.spark.connector.mapper.JavaBeanColumnMapper.<init>(JavaBeanColumnMapper.scala:10) 
    at com.datastax.spark.connector.util.JavaApiHelper$.javaBeanColumnMapper(JavaApiHelper.scala:93) 
    at com.datastax.spark.connector.util.JavaApiHelper.javaBeanColumnMapper(JavaApiHelper.scala) 
    at com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow(CassandraJavaUtil.java:1204) 
    at com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow(CassandraJavaUtil.java:1222) 
    at globalTransactions.Process.main(Process.java:77) 
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.catalyst.package$ScalaReflectionLock$ 
    at java.net.URLClassLoader.findClass(Unknown Source) 
    at java.lang.ClassLoader.loadClass(Unknown Source) 
    at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source) 
    at java.lang.ClassLoader.loadClass(Unknown Source) 
    ... 9 more 

这是由在火花SQL项目拉动解决:

<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-sql_2.10</artifactId> 
    <version>1.6.2</version> 
</dependency> 

希望这有助于未来的家伙/加仑。

相关问题