2015-12-19 92 views
0
  1. 我公司开发的Java类,从卡夫卡的队列中读取数据和编码,我做的Maven构建的JAR文件后,打印出来部署风暴构建JAR

    ZkHosts zkHosts=new ZkHosts("localhost:2181"); 
    String topic_name="test"; 
    String consumer_group_id="storm"; 
    String zookeeper_root=""; 
    SpoutConfig kafkaConfig=new SpoutConfig(zkHosts, 
         topic_name, zookeeper_root, consumer_group_id); 
    kafkaConfig.scheme=new SchemeAsMultiScheme(new StringScheme()); 
    /*kafkaConfig.forceFromStart=false; 
    kafkaConfig.startOffsetTime =-2;*/ 
    
    KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig); 
    TopologyBuilder builder=new TopologyBuilder(); 
    //builder.setSpout("KafkaSpout", kafkaSpout, 1); 
    builder.setSpout("KafkaSpout", kafkaSpout); 
    builder.setBolt("PrinterBolt", new PrinterBolt()).globalGrouping("KafkaSpout"); 
    Map<String, Object> conf = new HashMap<String, Object>(); 
    conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, 2181); 
    conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Arrays.asList("localhost")); 
    conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 20000); 
    conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 20000); 
    conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 3); 
    conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 30); 
    LocalCluster cluster=new LocalCluster(); 
    try{ 
        cluster.submitTopology("KafkaConsumerTopology", conf, builder.createTopology()); 
        Thread.sleep(120000); 
    }catch (Exception e) { 
        //throw new IllegalStateException("Couldn't initialize the topology", e); 
        System.out.println(e.getMessage()); 
    } 
    
  2. 和移动的罐子亚马逊AWS集群

  3. ,然后运行命令状nohup java -cp uber-***-0.0.1-SNAPSHOT.jar com.***.&&&.kafka.App

但是我在这里面临的一个错误,任何人都可以告诉我什么雾我正在部署?我在想以下我必须做的:

  • 我需要在strom配置文件夹中部署此jar文件,我需要吗?但我确实把罐子放在AWS中的独立文件夹中(不在风暴文件夹中)
  • 如何查看系统输出
  • 是否需要在我的项目中包含任何yml文件?

请找到下面的例外:

29537 [Thread-14-KafkaSpout] ERROR backtype.storm.util - Async loop died! 
java.lang.ExceptionInInitializerError: null 
    at org.apache.log4j.Logger.getLogger(Logger.java:39) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] 
    at kafka.utils.Logging$class.logger(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] 
    at kafka.network.BlockingChannel.logger$lzycompute(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] 
    at kafka.network.BlockingChannel.logger(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] 
    at kafka.utils.Logging$class.debug(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] 
    at kafka.network.BlockingChannel.debug(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] 
    at kafka.network.BlockingChannel.connect(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] 
    at kafka.consumer.SimpleConsumer.connect(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] 
    at kafka.consumer.SimpleConsumer.getOrMakeConnection(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] 
    at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] 
    at kafka.consumer.SimpleConsumer.getOffsetsBefore(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] 
    at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] 
    at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:77) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] 
    at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:67) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] 
    at storm.kafka.PartitionManager.<init>(PartitionManager.java:83) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] 
    at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] 
    at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] 
    at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] 
    at backtype.storm.daemon.executor$fn__3373$fn__3388$fn__3417.invoke(executor.clj:565) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] 
    at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] 
    at clojure.lang.AFn.run(AFn.java:24) [uber-iot-0.0.1-SNAPSHOT.jar:na] 
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_66] 
Caused by: java.lang.IllegalStateException: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting StackOverflowError. See also http://www.slf4j.org/codes.html#log4jDelegationLoop for more details. 
    at org.apache.log4j.Log4jLoggerFactory.<clinit>(Log4jLoggerFactory.java:49) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] 
    ... 22 common frames omitted 
+0

错误的堆栈跟踪将会有所帮助。 –

+0

@ MatthiasJ.Sax,我现在在上面添加了。请检查一次。 –

+0

@ MatthiasJ.Sax,你有什么想法在这条线上, kafka.consumer.SimpleConsumer.getOffsetsBefore(Unknown Source)〜[uber-iot-0.0.1-SNAPSHOT.jar:na] kafka.javaapi.consumer。 SimpleConsumer.getOffsetsBefore(Unknown Source)〜[uber-iot-0.0.1-SNAPSHOT.jar:na] –

回答

0

@Matthias J.萨克斯和每一个人,感谢您的帮助。 我在这里做的错误是,我跟随的部署过程是错误的。 要部署toplogy构建我必须folow中下面的过程中,

  1. 罐必须推到风暴AWS文件夹,然后有以下运行,使其由风暴

RM -f命令认可* .OUT

(nohup的仓/风暴雨云> nimubus.out)&

(nohup的仓/风暴主管> supervisor.out)&

(nohup的斌/风暴罐子TOPOS/IoT.jar com.bridgera.iot.test.App01> IoT.out)&

这里我说的是风暴哪里能找到我的罐子并从主类它可以找到拓扑结构生成器...

谢谢你们......