2016-01-28 70 views
0

我的拓扑正在运行,除了redis螺栓外,每个螺栓都能正常工作。我正在尝试将信息写入redis数据库,并且我在网上找到了一个示例。拓扑的作品,但是,DB螺栓执行时,它显示了这个错误:在风暴拓扑结构上Jedis“无法获得池资源”

3594 [Thread-18-print] INFO b.s.d.executor - Processing received message FOR 22 TUPLE: source: meal:20, stream: default, id: {}, [2009 +1.2815365e-01 :-) :-)] 
source: meal:20, stream: default, id: {}, [2009 +1.2815365e-01 :-) :-)] 
13595 [Thread-18-print] INFO b.s.d.executor - BOLT ack TASK: 22 TIME: TUPLE: source: meal:20, stream: default, id: {}, [2009 +1.2815365e-01 :-) :-)] 
13595 [Thread-18-print] INFO b.s.d.executor - Execute done TUPLE source: meal:20, stream: default, id: {}, [2009 +1.2815365e-01 :-) :-)] TASK: 22 DELTA: 
13595 [Thread-38-bd] ERROR o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Thread Thread[Thread-38-bd,5,main] died 
java.lang.RuntimeException: org.apache.storm.shade.org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /errors/test-1-1454011533/bd-last-error 
    at backtype.storm.util$wrap_in_runtime.invoke(util.clj:49) ~[storm-core-0.10.0.jar:0.10.0] 
    at backtype.storm.zookeeper$create_node.invoke(zookeeper.clj:92) ~[storm-core-0.10.0.jar:0.10.0] 
    at backtype.storm.cluster$mk_distributed_cluster_state$reify__4580.set_data(cluster.clj:106) ~[storm-core-0.10.0.jar:0.10.0] 
    at backtype.storm.cluster$mk_storm_cluster_state$reify__5120.report_error(cluster.clj:465) ~[storm-core-0.10.0.jar:0.10.0] 
    at backtype.storm.daemon.executor$throttled_report_error_fn$fn__5469.invoke(executor.clj:193) ~[storm-core-0.10.0.jar:0.10.0] 
    at backtype.storm.daemon.executor$mk_executor_data$fn__5523$fn__5524.invoke(executor.clj:256) ~[storm-core-0.10.0.jar:0.10.0] 
    at backtype.storm.util$async_loop$fn__545.invoke(util.clj:489) ~[storm-core-0.10.0.jar:0.10.0] 
    at clojure.lang.AFn.run(AFn.java:22) ~[clojure-1.6.0.jar:?] 
    at java.lang.Thread.run(Thread.java:745) [?:1.7.0_91] 
Caused by: org.apache.storm.shade.org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /errors/test-1-1454011533/bd-last-error 
    at org.apache.storm.shade.org.apache.zookeeper.KeeperException.create(KeeperException.java:119) ~[storm-core-0.10.0.jar:0.10.0] 
    at org.apache.storm.shade.org.apache.zookeeper.KeeperException.create(KeeperException.java:51) ~[storm-core-0.10.0.jar:0.10.0] 
    at org.apache.storm.shade.org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:783) ~[storm-core-0.10.0.jar:0.10.0] 
    at org.apache.storm.shade.org.apache.curator.framework.imps.CreateBuilderImpl$11.call(CreateBuilderImpl.java:676) ~[storm-core-0.10.0.jar:0.10.0] 
    at org.apache.storm.shade.org.apache.curator.framework.imps.CreateBuilderImpl$11.call(CreateBuilderImpl.java:660) ~[storm-core-0.10.0.jar:0.10.0] 
    at org.apache.storm.shade.org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107) ~[storm-core-0.10.0.jar:0.10.0] 
    at org.apache.storm.shade.org.apache.curator.framework.imps.CreateBuilderImpl.pathInForeground(CreateBuilderImpl.java:656) ~[storm-core-0.10.0.jar:0.10.0] 
    at org.apache.storm.shade.org.apache.curator.framework.imps.CreateBuilderImpl.protectedPathInForeground(CreateBuilderImpl.java:441) ~[storm-core-0.10.0.jar:0.10.0] 
    at org.apache.storm.shade.org.apache.curator.framework.imps.CreateBuilderImpl.forPath(CreateBuilderImpl.java:431) ~[storm-core-0.10.0.jar:0.10.0] 
    at org.apache.storm.shade.org.apache.curator.framework.imps.CreateBuilderImpl$3.forPath(CreateBuilderImpl.java:239) ~[storm-core-0.10.0.jar:0.10.0] 
    at org.apache.storm.shade.org.apache.curator.framework.imps.CreateBuilderImpl$3.forPath(CreateBuilderImpl.java:193) ~[storm-core-0.10.0.jar:0.10.0] 
    at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source) ~[?:?] 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.7.0_91] 
    at java.lang.reflect.Method.invoke(Method.java:606) ~[?:1.7.0_91] 
    at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) ~[clojure-1.6.0.jar:?] 
    at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:28) ~[clojure-1.6.0.jar:?] 
    at backtype.storm.zookeeper$create_node.invoke(zookeeper.clj:91) ~[storm-core-0.10.0.jar:0.10.0] 
    ... 7 more 

还是这等(s'ha refusat LAconnexió=拒绝连接):

13375 [Thread-48-bd2] ERROR b.s.util - Async loop died! 
java.lang.RuntimeException: redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool 
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:135) ~[storm-core-0.10.0.jar:0.10.0] 
    at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:106) ~[storm-core-0.10.0.jar:0.10.0] 
    at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.10.0.jar:0.10.0] 
    at backtype.storm.daemon.executor$fn__5694$fn__5707$fn__5758.invoke(executor.clj:819) ~[storm-core-0.10.0.jar:0.10.0] 
    at backtype.storm.util$async_loop$fn__545.invoke(util.clj:479) [storm-core-0.10.0.jar:0.10.0] 
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.6.0.jar:?] 
    at java.lang.Thread.run(Thread.java:745) [?:1.7.0_91] 
Caused by: redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool 
    at redis.clients.util.Pool.getResource(Pool.java:50) ~[jedis-2.7.0.jar:?] 
    at redis.clients.jedis.JedisPool.getResource(JedisPool.java:86) ~[jedis-2.7.0.jar:?] 
    at Storm.practice.Storm.Prova.ProvaTopology$RedisBolt.publish(ProvaTopology.java:175) ~[classes/:?] 
    at Storm.practice.Storm.Prova.ProvaTopology$RedisBolt.execute(ProvaTopology.java:157) ~[classes/:?] 
    at backtype.storm.daemon.executor$fn__5694$tuple_action_fn__5696.invoke(executor.clj:690) ~[storm-core-0.10.0.jar:0.10.0] 
    at backtype.storm.daemon.executor$mk_task_receiver$fn__5615.invoke(executor.clj:436) ~[storm-core-0.10.0.jar:0.10.0] 
    at backtype.storm.disruptor$clojure_handler$reify__5189.onEvent(disruptor.clj:58) ~[storm-core-0.10.0.jar:0.10.0] 
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:132) ~[storm-core-0.10.0.jar:0.10.0] 
    ... 6 more 
Caused by: redis.clients.jedis.exceptions.JedisConnectionException: java.net.ConnectException: S’ha refusat la connexió 
    at redis.clients.jedis.Connection.connect(Connection.java:154) ~[jedis-2.7.0.jar:?] 
    at redis.clients.jedis.BinaryClient.connect(BinaryClient.java:83) ~[jedis-2.7.0.jar:?] 
    at redis.clients.jedis.BinaryJedis.connect(BinaryJedis.java:1643) ~[jedis-2.7.0.jar:?] 
    at redis.clients.jedis.JedisFactory.makeObject(JedisFactory.java:85) ~[jedis-2.7.0.jar:?] 
    at org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:861) ~[commons-pool2-2.3.jar:2.3] 
    at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:435) ~[commons-pool2-2.3.jar:2.3] 
    at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:363) ~[commons-pool2-2.3.jar:2.3] 
    at redis.clients.util.Pool.getResource(Pool.java:48) ~[jedis-2.7.0.jar:?] 
    at redis.clients.jedis.JedisPool.getResource(JedisPool.java:86) ~[jedis-2.7.0.jar:?] 
    at Storm.practice.Storm.Prova.ProvaTopology$RedisBolt.publish(ProvaTopology.java:175) ~[classes/:?] 
    at Storm.practice.Storm.Prova.ProvaTopology$RedisBolt.execute(ProvaTopology.java:157) ~[classes/:?] 
    at backtype.storm.daemon.executor$fn__5694$tuple_action_fn__5696.invoke(executor.clj:690) ~[storm-core-0.10.0.jar:0.10.0] 
    at backtype.storm.daemon.executor$mk_task_receiver$fn__5615.invoke(executor.clj:436) ~[storm-core-0.10.0.jar:0.10.0] 
    at backtype.storm.disruptor$clojure_handler$reify__5189.onEvent(disruptor.clj:58) ~[storm-core-0.10.0.jar:0.10.0] 
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:132) ~[storm-core-0.10.0.jar:0.10.0] 
    ... 6 more 
Caused by: java.net.ConnectException: S’ha refusat la connexió 
    at java.net.PlainSocketImpl.socketConnect(Native Method) ~[?:1.7.0_91] 
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) ~[?:1.7.0_91] 
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) ~[?:1.7.0_91] 
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) ~[?:1.7.0_91] 
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[?:1.7.0_91] 
    at java.net.Socket.connect(Socket.java:579) ~[?:1.7.0_91] 
    at redis.clients.jedis.Connection.connect(Connection.java:148) ~[jedis-2.7.0.jar:?] 
    at redis.clients.jedis.BinaryClient.connect(BinaryClient.java:83) ~[jedis-2.7.0.jar:?] 
    at redis.clients.jedis.BinaryJedis.connect(BinaryJedis.java:1643) ~[jedis-2.7.0.jar:?] 
    at redis.clients.jedis.JedisFactory.makeObject(JedisFactory.java:85) ~[jedis-2.7.0.jar:?] 
    at org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:861) ~[commons-pool2-2.3.jar:2.3] 
    at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:435) ~[commons-pool2-2.3.jar:2.3] 
    at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:363) ~[commons-pool2-2.3.jar:2.3] 
    at redis.clients.util.Pool.getResource(Pool.java:48) ~[jedis-2.7.0.jar:?] 
    at redis.clients.jedis.JedisPool.getResource(JedisPool.java:86) ~[jedis-2.7.0.jar:?] 
    at Storm.practice.Storm.Prova.ProvaTopology$RedisBolt.publish(ProvaTopology.java:175) ~[classes/:?] 
    at Storm.practice.Storm.Prova.ProvaTopology$RedisBolt.execute(ProvaTopology.java:157) ~[classes/:?] 
    at backtype.storm.daemon.executor$fn__5694$tuple_action_fn__5696.invoke(executor.clj:690) ~[storm-core-0.10.0.jar:0.10.0] 
    at backtype.storm.daemon.executor$mk_task_receiver$fn__5615.invoke(executor.clj:436) ~[storm-core-0.10.0.jar:0.10.0] 
    at backtype.storm.disruptor$clojure_handler$reify__5189.onEvent(disruptor.clj:58) ~[storm-core-0.10.0.jar:0.10.0] 
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:132) ~[storm-core-0.10.0.jar:0.10.0] 
    ... 6 more 

而且我也没办法为什么它无法获得资源。我不确定这个螺栓是否会写出确认的元组。我希望有人知道如何解决这个问题(有很多未使用的导入,但这是因为我正试图让它写入Redis数据库,我不知道如何完成它,这是只是一个测试,我工作的一个大数据的项目,我需要了解此数据库连接):

package Storm.practice.Storm.Prova; 
import backtype.storm.Config; 
import backtype.storm.LocalCluster; 
import backtype.storm.StormSubmitter; 
import backtype.storm.task.OutputCollector; 
import backtype.storm.task.TopologyContext; 
import backtype.storm.testing.TestWordSpout; 
import backtype.storm.topology.BasicOutputCollector; 
import backtype.storm.topology.IRichBolt; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.topology.TopologyBuilder; 
import backtype.storm.topology.base.BaseBasicBolt; 
import backtype.storm.topology.base.BaseRichBolt; 
import backtype.storm.tuple.Fields; 
import backtype.storm.tuple.ITuple; 
import backtype.storm.tuple.Tuple; 
import backtype.storm.tuple.Values; 
import backtype.storm.utils.Utils; 
import backtype.storm.spout.SpoutOutputCollector; 
import backtype.storm.topology.base.BaseRichSpout; 

import java.util.List; 
import java.util.Map; 
import java.util.Random; 
import java.io.BufferedReader; 
import java.io.FileReader; 
import java.io.IOException; 
import java.io.Serializable; 
import java.util.concurrent.atomic.AtomicLong; 
import java.util.logging.Logger; 

import org.apache.storm.redis.bolt.AbstractRedisBolt; 
import org.apache.storm.redis.bolt.RedisStoreBolt; 
import org.apache.storm.redis.common.config.JedisClusterConfig; 
import org.apache.storm.redis.common.config.JedisPoolConfig; 
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; 
import org.apache.storm.redis.common.mapper.RedisLookupMapper; 
import org.apache.storm.redis.common.mapper.RedisStoreMapper; 
import org.apache.storm.redis.trident.state.RedisState; 
import org.apache.storm.redis.trident.state.RedisStateQuerier; 
import org.apache.storm.redis.trident.state.RedisStateUpdater; 
import org.apache.storm.shade.com.google.common.collect.Lists; 
import org.slf4j.LoggerFactory; 
import redis.clients.jedis.JedisCommands; 

import redis.clients.jedis.Jedis; 
import redis.clients.jedis.JedisPool; 
//import redis.clients.jedis.JedisPoolConfig; 
import redis.clients.jedis.JedisPubSub; 
import storm.trident.Stream; 
import storm.trident.TridentState; 
import storm.trident.TridentTopology; 

/** 
* This is a basic example of a Storm topology. 
*/ 
public class ProvaTopology implements Serializable { 

    public static class ProvaBolt extends BaseRichBolt { 
    OutputCollector _collector; 

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) { 
     _collector = collector; 
    } 

    public void execute(Tuple tuple) { 
     _collector.emit(tuple, new Values(tuple.getString(0) + " :-)")); 
     _collector.ack(tuple); 
    } 

    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     declarer.declare(new Fields("Morts")); 
    } 


    } 
    public class ProvaSpout extends BaseRichSpout { 
     SpoutOutputCollector _collector; 
     //Random _rand; 
     private String fileName; 
     //private SpoutOutputCollector _collector; 
     private BufferedReader reader; 
     private AtomicLong linesRead; 

     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { 
     _collector = collector; 
     try { 
      fileName= (String)"prova.tsv"; 
      reader = new BufferedReader(new FileReader(fileName)); 
      // read and ignore the header if one exists 
      } catch (Exception e) { 
      throw new RuntimeException(e); 
      } 
     // _rand = new Random(); 
     } 

     public void nextTuple() { 
     Utils.sleep(100); 


     try { 
      String line = reader.readLine(); 
      if (line != null) { 
       //long id = linesRead.incrementAndGet(); 
       System.out.println("Finished reading line, " + line); 
       _collector.emit(new Values((String)line));// id)); 
      } else { 
       System.out.println("Finished reading file, " + linesRead.get() + " lines read"); 
       Thread.sleep(10000); 
      } 
      } catch (Exception e) { 
      e.printStackTrace(); 
      } 
     } 

     public void ack(Object id) { 
     } 

     public void fail(Object id) { 
     } 

     public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     declarer.declare(new Fields("Morts")); 
     } 

    } 

    public class RedisBolt implements IRichBolt { 

     protected String channel = "Somriures"; 
     // protected String configChannel; 
     protected OutputCollector collector; 
     // protected Tuple currentTuple; 
     // protected Logger log; 
     protected JedisPool pool; 
     // protected ConfigListenerThread configListenerThread; 

     public RedisBolt(){} 
     public RedisBolt(String channel) { 

     // log = Logger.getLogger(getClass().getName()); 
     // setupNonSerializableAttributes(); 
     } 

     public void prepare(Map stormConf, TopologyContext context, 
       OutputCollector collector) { 
     this.collector = collector; 
     pool = new JedisPool("localhost"); 
     } 



     public void execute(Tuple tuple) { 
     String current = tuple.getString(0); 
     if(current != null) { 
      //  for(Object obj: result) { 
      publish(current); 
      collector.emit(tuple, new Values(current)); 
      //  } 
      collector.ack(tuple); 
     } 
     } 

     public void cleanup() { 
     if(pool != null) { 
      pool.destroy(); 
     } 
     } 

     public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     declarer.declare(new Fields(channel)); 
     } 

     public void publish(String msg) { 
     Jedis jedis = pool.getResource(); 
     jedis.publish(channel, msg); 
     pool.returnResource(jedis); 
     } 

     protected void setupNonSerializableAttributes() { 

     } 

     public Map getComponentConfiguration() { 
     return null; 
     } 
    } 



    public class PrinterBolt extends BaseBasicBolt { 

     public void execute(Tuple tuple, BasicOutputCollector collector) { 
      System.out.println(tuple); 
     } 

     public void declareOutputFields(OutputFieldsDeclarer ofd) { 
     } 

    } 


    public static void main(String[] args) throws Exception { 
    TopologyBuilder builder = new TopologyBuilder(); 
    ProvaTopology Pt = new ProvaTopology(); 
    JedisPoolConfig poolConfig = new JedisPoolConfig.Builder() 
      .setHost("127.0.0.1").setPort(666).build(); 



    builder.setSpout("Morts", Pt.new ProvaSpout(), 10);//emisorTestWordSpout 
    builder.setBolt("happy", new ProvaBolt(), 3).shuffleGrouping("Morts");// de on llig? 
    builder.setBolt("meal", new ProvaBolt(), 2).shuffleGrouping("happy");// de on llig? 
    builder.setBolt("bd", Pt.new RedisBolt(), 2).shuffleGrouping("meal");// de on llig? 
    builder.setBolt("print", Pt.new PrinterBolt(), 2).shuffleGrouping("meal"); 
    builder.setBolt("bd2", Pt.new RedisBolt(), 2).shuffleGrouping("happy"); 
    // builder.setBolt("StoreM", (storeMapperS)); 
    Config conf = new Config(); 
    conf.setDebug(true); 

    if (args != null && args.length > 0) { 
     conf.setNumWorkers(5); 

     StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); 
            //WithProgressBar 
    } 
    else { 

     LocalCluster cluster = new LocalCluster(); 
     cluster.submitTopology("test", conf, builder.createTopology()); 
     Utils.sleep(10000); 
     cluster.killTopology("test"); 
     cluster.shutdown(); 
    } 
    } 
} 

提前感谢!

回答

0

我昨天来了答案,我需要的是将本地主机改为127.0.0.1,然后我在终端上启动了Redis数据库,在第二个终端上启动了监视器,并且我的发布方法正在运行。