0

我有一个风暴拓扑结构,可以将一些来自Kafka队列的数据写入到Cassandra DB中。该程序是一个多线程的程序。为了方便卡桑德拉分贝插入,我有这个作为我DBUtils:在Storm Topology的所有线程中同步一个变量

public DBUtils() { 
    if(session == null) { 
     session = CassandraUtil.getInstance().getSession(); 
     LOG.info("Started a new session for dbUtils-Monitoring....."); 
    } 
    synchronized(session) { 
     testMapper = new MappingManager(session).mapper(TestVO.class); 
    } 
} 

所以,我已经使用同步到在所有正在运行的线程创建一个单一的dbUtils实例。但是当我检查日志时,似乎会话正在多次初始化。风暴拓扑中的dbUtils仅在prepare方法中初始化,并且已在prepare/execute/clean up方法中使用。因此,如果我想在所有线程中使用的变量在多个地方使用,则不确定如何使用同步块。 我的问题是如何在所有线程中只对session/dbUtils变量进行一次初始化。

+0

所以实际上你想要一个线程安全[Singleton](http://stackoverflow.com/documentation/java/130/singletons#t=201608121142368276512)? – Fildor

+0

是的!到目前为止,我使用了两种解决方案:一种是java.util.concurrent,另一种是同步。他们都用普通的java代码工作得很好,但在Storm的情况下他们不工作。 Storm是否遵循一些不同的多线程机制? –

+1

其实我不知道风暴。所以我不能说这个。但你的片段是**不是**线程安全的单身人士。因此,即使风暴对同步没有任何“幻想”,您的代码在并发环境中使用时也会产生问题。 – Fildor

回答

0

As Storm is distributed system,you can not have a single shared variable over all parallel parallel bolts。您只能在单个工作JVM中的executors上共享变量。

为此,您需要创建一个static变量并使用共享/静态对象实例来同步其初始化。

相关问题