2012-07-10 94 views
1

我已经使用多个同步线程来处理多个事务,并通过使用hibernate框架和使用锁定概念从数据库中处理这些事务。 问题是,每个线程进程第一次正确,但下次抛出SQL 01002和SQL 72000时出现异常。请看代码 -使用休眠和线程锁定

long _StartTimeForMe = System.currentTimeMillis(); 

    boolean _contextPushed; 
    txContext = 
     GlobalFunctions.generateId(
      true, 
      DataBrokerConstants.CONTEXT_PREFIX, 
      null, 
      null); 
    txContext = this.getName() + txContext; 
    logger_U.pushContext(txContext); 
    _contextPushed = true; 

    try 
    { 
     coreSession = openTxCoreSession(); 
    } 
    catch (Exception e1) 
    { 
     logger_U.info("exception while creating Tx_Core session"); 
     e1.printStackTrace(); 
     return; 
    } 

    try 
    { 
     stagSession = openStagSession(); 
    } 
    catch (RuntimeException e2) 
    { 
     logger_U.info("exception while creating stag session"); 
     e2.printStackTrace(); 
     return; 
    } 

    String _txContextBackup = txContext; 

    int i = 1; 
    long _timeElapsedByMe = System.currentTimeMillis() - _StartTimeForMe; 

    try 
    { 
     while (doIhaveMoreTime(_timeElapsedByMe)) 
     { 
      txContext = _txContextBackup + "-" + (i); 
      logger_U.removeContext(); 
      logger_U.pushContext(txContext); 

      coreTransaction = null; 
      stagTransaction = null; 

      InnerClassUpdate _rec = null; 
      UpdateRecord _r = null; 
      if (null != coreSession) 
      { 
       coreTransaction = coreSession.beginTransaction(); 
       _rec = getURForUpdates(coreSession); 
      } 
      else 
      { 
       logger_U.error("coreSession is not created."); 
       break; 
      } 

      if (null != _rec) 
      { 
       _r = _rec.record; 
       logger_U.info(
        "record found. Going to process Record: ID = " 
         + _r.getId() 
         + " TX_IDENTIFIER = " 
         + _r.getTxIdentifier() 
         + "UPDATE_TYPE = " 
         + _r.getUpdateType()); 

       //If a record of same transaction is processing or has been processed before in same run, it will not run again in this run 
       if (isTxProcessing(_r.getTxIdentifier())) 
       { 
        if (coreTransaction != null 
         && !coreTransaction.wasCommitted() 
         && !coreTransaction.wasRolledBack()) 
         coreTransaction.rollback(); 

        if (stagTransaction != null 
         && !stagTransaction.wasCommitted() 
         && !stagTransaction.wasRolledBack()) 
         stagTransaction.rollback(); 

        logger_U.debug(
         "A record of Tx_Identifier : " 
          + _r.getTxIdentifier() 
          + " is either processing or has been processed in this run."); 
       } 
       else 
       { 
        try 
        { 
         stagTransaction = stagSession.beginTransaction(); 
         if (processUpdates(_rec)) 
         { 
          try 
          { 
           if (stagTransaction != null 
            && !stagTransaction.wasCommitted() 
            && !stagTransaction.wasRolledBack()) 
           { 
            stagTransaction.commit(); 

            logger_U.debug("stag commit success"); 
           } 
           if (coreTransaction != null 
            && !coreTransaction.wasCommitted() 
            && !coreTransaction.wasRolledBack()) 
           { 
            coreTransaction.commit(); 

            logger_U.debug("core commit success"); 
           } 

          } 
          catch (HibernateException e) 
          { 
           logger_U.debug("error while commit"); 
           e.printStackTrace(); 
          } 

         } 
         else 
         { 
          if (stagTransaction != null 
           && !stagTransaction.wasCommitted() 
           && !stagTransaction.wasRolledBack()) 
           stagTransaction.rollback(); 

          logger_U.debug("stagTransaction rollback..."); 
          if (coreTransaction != null 
           && !coreTransaction.wasCommitted() 
           && !coreTransaction.wasRolledBack()) 
           coreTransaction.rollback(); 

          logger_U.debug("coreTransaction rolback..."); 
         } 
        } 
        catch (Exception e1) 
        { 
         if (stagTransaction != null 
          && !stagTransaction.wasCommitted() 
          && !stagTransaction.wasRolledBack()) 
          stagTransaction.rollback(); 

         if (coreTransaction != null 
          && !coreTransaction.wasCommitted() 
          && !coreTransaction.wasRolledBack()) 
          coreTransaction.rollback(); 

         logger_U.error(
          "Exception while processing UpdateRecord."); 
         e1.printStackTrace(); 
        } 
       } 
      } 
      else 
      { 
       if (stagTransaction != null 
        && !stagTransaction.wasCommitted() 
        && !stagTransaction.wasRolledBack()) 
        stagTransaction.rollback(); 

       if (coreTransaction != null 
        && !coreTransaction.wasCommitted() 
        && !coreTransaction.wasRolledBack()) 
        coreTransaction.rollback(); 

       logger_U.info(
        "No record found. wait for " 
         + convertMilliesToSec(
          DataBrokerConstants.SMALL_SLEEP) 
         + "Sec."); 
       Thread.sleep(DataBrokerConstants.SMALL_SLEEP); 
       i++; 
       _timeElapsedByMe = 
        System.currentTimeMillis() - _StartTimeForMe; 

       continue; 
      } 

      //No matter record is processed successfully or not, it will not be processed again in this run 
      if (_rec != null && vectForUpdate.contains(_rec)) 
      { 
       logger_U.debug(
        "UpdateRecord of ID " 
         + _r.getId() 
         + " and TX_IDENTIFIER " 
         + _r.getTxIdentifier() 
         + " is removing from vectForUpdate."); 
       removeObjectFromVect(_rec); 
      } 
      Thread.sleep(1000); 
      _timeElapsedByMe = System.currentTimeMillis() - _StartTimeForMe; 
      i++; 
     } //END while loop 
    } 
    catch (Exception e) 
    { 
     try 
     { 
      if (stagTransaction != null 
       && !stagTransaction.wasCommitted() 
       && !stagTransaction.wasRolledBack()) 
       stagTransaction.rollback(); 

      if (coreTransaction != null 
       && !coreTransaction.wasCommitted() 
       && !coreTransaction.wasRolledBack()) 
       coreTransaction.rollback(); 

     } 
     catch (HibernateException e3) 
     { 
      // TODO Auto-generated catch block 
      e3.printStackTrace(); 
     } 
     logger_U.error("Unknown Exception occured while processing."); 
     e.printStackTrace(); 
    } 
    finally 
    { 
     txContext = _txContextBackup; 
     logger_U.removeContext(); 
     logger_U.pushContext(txContext); 
     closeSession(coreSession); 
     closeSession(stagSession); 
    } 
+0

听起来像你真的想交易和隔离给我。 – duffymo 2012-07-10 11:45:11

+0

“二手锁定概念”是什么意思? Hibernate和您的数据库引擎在设计时考虑了事务和并发,它们已经按照定义是线程安全的。会话不是线程安全的,但你不应该在线程之间共享同一个会话。 – 2012-07-10 11:50:59

回答

2

当使用Hibernate时,您应该在多线程环境中非常小心。

  • 您应该验证的第一件事是事务被正确提交,并执行SQL。如果你不能通过阅读你的代码来验证它,你可以增加Hibernate的日志级别来验证它。

  • Hibernate Session不是线程安全的,你只能在线程间共享SessionFactory。您还必须验证每次打开会话时都注意关闭它,因为这不会自动发生,并且会阻止新线程打开新会话。

+0

感谢您的宝贵答复。我正在使用多个会话。线程来创建会话和循环内的开始处理。每次创建新事务时在循环内部。请看下面的示例代码 - – user1514616 2012-07-10 11:56:23

+0

你在哪里关闭会话? – Edmondo1984 2012-07-10 12:12:46

+0

在finally块 – user1514616 2012-07-10 12:16:02