2016-11-23 207 views
2

风暴拓扑从卡夫卡读取数据卡桑德拉群连接和写入卡桑德拉表如何通过从一个螺栓到另一个螺栓

在风暴我建立卡桑德拉群连接和会话的准备方法。

cassandraCluster = Cluster.builder().withoutJMXReporting().withoutMetrics() 
      .addContactPoints(nodes) 
      .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE) 
      .withReconnectionPolicy(new ExponentialReconnectionPolicy(100L, 
        TimeUnit.MINUTES.toMillis(5))) 
      .withLoadBalancingPolicy(
        new TokenAwarePolicy(new RoundRobinPolicy())) 
      .build(); 

session = cassandraCluster.connect(keyspace); 

在执行方法我可以处理的元组,并将其保存在卡桑德拉表

假设,如果我想从一个元组数据写入到多个表 编写单独的螺栓为每个表将是不错的选择。但我必须创建群集连接和会话每个螺栓中的每个表。

但在每簇此链接单一连接将是性能不错的主意 http://www.datastax.com/dev/blog/4-simple-rules-when-using-the-datastax-drivers-for-cassandra

没有任何的你有创造一个螺栓群连接并使用在其他螺栓任何这方面的想法?

+0

我不知道足够多关于Apache风暴在这里发表评论,但快速浏览的文档似乎表明您携带在“螺栓”中进行谨慎的操作。您可能需要在Cassandra和驱动程序用户的问题中对此进行更多解释。这是正确的,你需要理想地保持一个会话打开。如果您可以在每个botl之间共享对象,那么您是否可以不让会话对象成为跨每个“螺栓”共享的公共对象? – markc

+0

@markc因为螺栓分布在物理上分离的机器上,所以不可能跨螺栓共享对象。这里最好的做法是每个螺栓保持一个集群/会话,但这似乎与链接中描述的最佳实践相矛盾。我对卡桑德拉不太了解,如果没有问题的话。 –

+1

@RyanWalker好的谢谢你的清理。说得通。那么为每个螺栓创建一个会话可能是有意义的。 cassandra集群可以连接多个客户端,但建议保持一个会话打开的原因仅仅是为了避免建立和拆除连接。只要螺栓本身是永久的,那么会话对象可以是我想说的那个孩子。建立连接后,驱动程序将为集群提供连接池。请参阅:https://github.com/datastax/java-driver/tree/3.x/manual/pooling – markc

回答

0

这取决于风暴如何将螺栓和喷口分配给工人。您不能假定您可以共享螺栓之间的连接,因为它们可能在不同的工作人员(读取:JVM)中运行,或完全在不同的节点上运行。

见我的答案在这里:Mongo connection pooling for Storm topology

可能看起来是这样的伪代码:

public class CassandraBolt extends BaseRichBolt { 
    private static final long serialVersionUID = 1L; 
    private static Logger LOG = LoggerFactory.getLogger(CassandraBolt.class); 
    OutputCollector _collector; 

    // whatever your cassandra session is 
    // has to be transient because session is not serializable 
    protected transient CassandraSession _session; 

    @SuppressWarnings("rawtypes") 
    @Override 
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 
     _collector = collector; 

     // maybe get properties from stormConf instead of hard coding them 
     cassandraCluster = Cluster.builder().withoutJMXReporting().withoutMetrics() 
      .addContactPoints(nodes) 
      .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE) 
      .withReconnectionPolicy(new ExponentialReconnectionPolicy(100L, 
        TimeUnit.MINUTES.toMillis(5))) 
      .withLoadBalancingPolicy(
        new TokenAwarePolicy(new RoundRobinPolicy())) 
      .build(); 

     _session = cassandraCluster.connect(keyspace); 
    } 

    @Override 
    public void execute(Tuple input) { 
     try { 
      // use _session to talk to cassandra 

     } catch (Exception e) { 
      LOG.error("CassandraBolt error", e); 
      _collector.reportError(e); 
     } 
    } 


    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     // TODO Auto-generated method stub 
    } 
} 
+0

每个螺栓在不同的机器上运行在不同的JVM上。但它具有将数据从一个螺栓发送到另一个螺栓的机制。 我搜索的风暴有任何拓扑级别的方法,我可以直接从任何螺栓访问 我们在cassandra和风暴中的选项是每个螺栓的集群/会话连接。 –

+0

是的每个螺栓可以在不同的JVM中。没有拓扑级别的方法可以共享。您应该将连接属性传递给螺栓,并在螺栓的“准备”方法内创建连接。 –