2014-07-10 41 views
4

我们开发了一个Camel包(在Karaf中部署),预计每隔24小时从MySQL中获取数据并将其推送到S3。但是,如果MySQL在内部关闭了连接,如果它闲置了8个小时,那么在下一次计划执行时,它会开始抛出一个错误。请参阅下面的代码片段。骆驼jdbc:如果mysql连接关闭,我该如何重置数据源

属性:

MySqlDriver=com.mysql.jdbc.Driver 
MySqlDatabaseURL=jdbc:mysql://x.x.x.x/dbname?autoReconnect=true 
MySqlUsername=sm***** 
MySqlPassword=******* 

激活:

public class Activator implements BundleActivator { 

    public CamelContext context = null; 

    public void start(BundleContext bundleContext) throws Exception { 
     DataSource dataSource = UDMSUtils.createDataSource(UDMSUtils.getProperty(UDMSConstants.MYSQL_DATABASE_URL)); 

     SimpleRegistry simpleRegistry = new SimpleRegistry(); 
     simpleRegistry.put(UDMSConstants.UDMS_DATA_SOURCE, dataSource); 

     context = new OsgiDefaultCamelContext(bundleContext, simpleRegistry); 
     context.addRoutes(new CreativeRoutes()); 
     context.start(); 
    } 

} 

大厦数据来源:

public static DataSource createDataSource(String connectURI) { 
    BasicDataSource ds = new BasicDataSource(); 
    ds.setDriverClassName(getProperty(UDMSConstants.MYSQL_DRIVER)); 
    ds.setUsername(getProperty(UDMSConstants.MYSQL_USERNAME)); 
    ds.setPassword(getProperty(UDMSConstants.MYSQL_PASSWORD)); 
    ds.setUrl(connectURI); 
    ds.setMaxWait(-1); // Waits indefinately 
    return ds; 
} 

路线:

from("timer://Timer?repeatCount=1").to("direct:record_count").end(); 

from("direct:record_count") 
    .process(new Processor() { 
     @Override 
     public void process(Exchange exchange) throws Exception { 
      exchange.getIn().setBody(query); 
     } 
    })  
    .routeId("record_count") 
    .to("jdbc:" + UDMSConstants.UDMS_DATA_SOURCE) 
    .process(new Processor() { 
     @Override 
     public void process(Exchange exchange) throws Exception { 
      // ... 
     } 
    ); 

任何人都可以请建议,在上面的代码中需要做什么修改,以便只要我们需要连接就保持活动状态。

请注意:我们无权更改mysql.properties,因此我们需要在我们的代码中处理此操作。

+0

为什么你不使用连接池和借用自动测试(或任何其称为)设置为true。 – vikingsteve

+0

感谢Steve(@vikingsteve),当我创建一个dataSource对象时,我添加了这一行:'ds.setTestOnBorrow(true);',现在正在测试它,将在完成测试后将我的发现放入。 –

回答

4

前段时间我有类似的问题。 VikingSteve也在他建议你要做的事上发现。由于我使用的是OSGI Blueprint,因此我在XML中完成了所有配置,因此我按照以下方式解决了这个问题。

1)添加的Apache下议院DBCP依赖于你的POM:

<dependency> 
    <groupId>commons-dbcp</groupId> 
    <artifactId>commons-dbcp</artifactId> 
    <version>1.4</version> 
</dependency> 

2)声明连接池在你的骆驼路线/蓝图文件,如下所示:

<bean id="MydataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close" scope="singleton" > 
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/> 
    <property name="url" value="jdbc:mysql://DB-001:3306/Customer"/> 
    <property name="username" value="sys_ETL"/> 
    <property name="password" value="Blah"/> 
    <property name="initialSize" value="4"/> 
    <property name="maxActive" value="32"/> 
    <property name="maxIdle" value="16"/> 
    <property name="minIdle" value="8"/> 
    <property name="timeBetweenEvictionRunsMillis" value="1800"/> 
    <property name="minEvictableIdleTimeMillis" value="1800"/> 
    <property name="testOnBorrow" value="true"/> 
    <property name="testWhileIdle" value="true"/> 
    <property name="testOnReturn" value="true"/> 
    <property name="validationQuery" value="SELECT 1"/> 
    <property name="maxWait" value="1000"/> 
    <property name="removeAbandoned" value="true"/> 
    <property name="logAbandoned" value="true"/> 
    <property name="removeAbandonedTimeout" value="30000"/> 
</bean> 

此步骤将创建一个数据库连接池作为一个bean,然后我可以在我的路由中使用。这个bean的名字是Mydatasource我稍后会用到这个信息。另请注意我在配置中为连接池设置的属性。这些属性允许我的连接池增长和缩小,并且它确保即使闲置后连接也不会失效。

3)创建一个POJO使用此连接池:

public class AccountInformationToDatabase { 


private BasicDataSource dataSource; 
public BasicDataSource getDataSource() { 
    return dataSource; 
} 
public void setDataSource(BasicDataSource dataSource) { 
    this.dataSource = dataSource; 
} 
@Handler 
public void PersistRecord 
(
     @Body AccountRecordBindy msgBody 
     , @Headers Map hdr 
     , Exchange exch 
) throws Exception 
{ 

    Connection conn = null; 
    PreparedStatement stmt=null; 



    try 
    { 


     conn= dataSource.getConnection(); 
     stmt =conn.prepareStatement("SOME INSERT STATEMENT"); 

     stmt.setString(1,msgBody.getAccountNumber().trim()); 
     stmt.setString(2,msgBody.getRecordType().trim()); 
     stmt.setString(3,msgBody.getSequenceNumber().trim()); 
     stmt.setString(4,msgBody.getTitle().trim()); 
     stmt.setString(5,msgBody.getCustomerType().trim()); 
     stmt.setString(6,msgBody.getName().trim()); 
     stmt.setString(7,msgBody.getAccountAddress1().trim()); 


     stmt.executeUpdate();   






    } 
    catch (Exception e) 
    { 

     throw new Exception(e.getMessage()); 

    } 

    finally 
    { 
     try 
     { 
       if (stmt!=null) 
       { 
        stmt.close(); 
        stmt= null; 
       } 
       if (conn!=null) 
       { 
        conn.close(); 
        conn= null; 
       } 
     } 
     catch(SQLException e) 
     { 

      throw new Exception(e.getMessage()); 

     } 

    } 


} 

}

该POJO有一个属性叫做数据源是org.apache.commons.dbcp.BasicDataSource类型。我现在可以将Mydatasource bean注入到这个POJO中,以便我的类可以访问连接池。

4)开启POJO成豆和注射连接池:

<bean id="AccountPersist" class="AccountInformationToDatabase"> 
    <property name="dataSource" ref="MydataSource"/> 
</bean> 

这项技术是一个必须有,如果你正在做的文本文件处理,并想用并行插入等

+0

谢谢@Nphphibian,只通过添加借用测试,给我类似的问题。请参阅堆栈跟踪:'com.mysql.jdbc.CommunicationsException:通信链路故障是由于底层异常: ** BEGIN嵌套异常** java.net.SocketException异常 消息:断的管 ',我现在将设置在你的XML中提到的所有属性,并重新做我的测试。谢谢 –

+0

在添加了给定的属性之后,我在12到13小时的时间间隔后执行了两次捆绑,现在捆绑正在从数据源连接池获取有效连接。 –

3

使用其他更高级的JDBC连接池,如HikariCP。设置jdbc4ConnectionTestconnectionTestQuery属性以测试连接是否仍处于活动状态。从文档connectionTestQuery

这是针对不支持JDBC4 Connection.isValid()API的“传统”数据库。这是在连接从池中获得连接以确认与数据库的连接仍然存在之前将要执行的查询。它依赖于数据库,应该是一个只需要很少处理数据库的查询(例如,“VALUES 1”)

相关问题