2017-08-25 40 views
0

你好,我正在写一个小的cassandra触发器,它在插入到某个表后发送信息给kafka。这里是我的触发代码:cassandra触发器创建期间的java.lang.NoClassDefFoundError

public class InsertDataTrigger implements ITrigger { 

    public Collection<Mutation> augment(Partition update) { 

     //checking if trigger works and some debug info; 
     SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); 
     System.out.println("Hello " + dateFormat.format(new Date())); 
     System.out.println("This Insert Data Trigger"); 
     System.out.println("default charset " + Charset.defaultCharset());  //IMPORTANT check if it's important 

     //here we're gonna build the message to kafka based on inserted data 
     try { 
      UnfilteredRowIterator it = update.unfilteredIterator(); 
      CFMetaData cfMetaData = update.metadata(); 

      System.out.println("PartitionKey " + new String(update.partitionKey().getKey().array())); 
      System.out.println("update.metadata().clusteringColumns().toString() " + update.metadata().clusteringColumns().toString()); 

      while (it.hasNext()) { 
       JSONObject message = new JSONObject(); 

       Unfiltered un = it.next(); 
       Clustering clt = (Clustering) un.clustering(); 

       message.put("partitionkey", new String(update.partitionKey().getKey().array())); 

       System.out.println("clt.toString(cfMetaData) " + clt.toString(cfMetaData)); 
       System.out.println("clt.getRawValues() " + new String(clt.getRawValues()[0].array())); 
       System.out.println("partition.columns().toString() " + update.columns().toString()); 

       message.put("datetime", new String(clt.getRawValues()[0].array())); 

       Iterator<Cell> cells = update.getRow(clt).cells().iterator(); 

       while (cells.hasNext()) { 
        Cell cell = cells.next(); 
        System.out.println("cell.column().name.toString() " + cell.column().name.toString()); 
        System.out.println("cell.toString()" + cell.toString()); 
        Double x = cell.value().getDouble(); 
        System.out.println("cell.value().getDouble() " + x); 
        //if(cell.column().name.toString() == "value") 
        System.out.println(x); 
        message.put(cell.column().name.toString(), x); 
        //else 
        // message.put(cell.column().name.toString(),cell.value().toString()); 
       } 
       System.out.println("un.toString()" + un.toString(cfMetaData)); 

       if (!message.isEmpty()) { 
        System.out.println(message.toString()); 

        //Sending data to kafka 
        Properties props = new Properties(); 
        props.put("bootstrap.servers", "localhost:9092"); 
        props.put("acks", "all"); 
        props.put("retries", 0); 
        props.put("batch.size", 16384); 
        props.put("linger.ms", 1); 
        props.put("buffer.memory", 33554432); 
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 

        Producer<String, String> producer = new KafkaProducer<>(props); 
        producer.send(new ProducerRecord<>("test", message.toString()));//move topic name to some properties 
        producer.close(); 
       } 


      } 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 

     return Collections.emptyList(); 
    } } 

这里是我的POM文件:

<?xml version="1.0" encoding="UTF-8"?> 
    <project xmlns="http://maven.apache.org/POM/4.0.0" 
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 

    <build> 
     <plugins> 
      <plugin> 
       <groupId>org.apache.maven.plugins</groupId> 
       <artifactId>maven-compiler-plugin</artifactId> 
       <version>3.1</version> 
       <configuration> 
        <source>1.8</source> 
        <target>1.8</target> 
       </configuration> 
      </plugin> 
     </plugins> 
    </build> 

    <modelVersion>4.0.0</modelVersion> 

    <groupId>io.github.carldata</groupId> 
    <artifactId>InsertDataTrigger</artifactId> 
    <version>1.0</version> 

    <dependencies> 
     <!-- https://mvnrepository.com/artifact/org.apache.cassandra/cassandra-all --> 
     <dependency> 
      <groupId>org.apache.cassandra</groupId> 
      <artifactId>cassandra-all</artifactId> 
      <version>3.11.0</version> 
     </dependency> 

     <dependency> 
      <groupId>org.apache.kafka</groupId> 
      <artifactId>kafka-clients</artifactId> 
      <version>0.11.0.0</version> 
     </dependency> 
    </dependencies> 

</project> 

该项目建立罚款,并创建一个jar文件,但是当我尝试在卡桑德拉创建触发器失败与上述异常。

回答

2

最有可能的kafka客户端jar不在Cassandra lib目录中。除非你的项目包含了它的依赖(例如构建一个胖/超级罐子)。

可能在kafka-clients jar和Cassandra依赖关系中存在依赖冲突的问题。特别是org.xerial.snappy snappy-java有不同的版本。它可能会解决,但它的东西要注意。如果它的问题,你可以建立自己的卡夫卡客户端罐与它的依赖阴影,所以他们不冲突。

+0

非常感谢你我创造了超级罐子,它解决了问题。 – CodeDog