2017-01-10 26 views
2

我已经实现了从Oracle AQ中消除邮件并将其作为IObservable暴露给系统。工作流程如下: -Oracle AQ使用反应扩展和事务处理回调ODP.NET

  1. 应用程序收到Oracle有关新消息的回调事件。
  2. 应用程序将消息出队并将其添加到IObservable(消息出队作为消息出队后立即提交的事务的一部分)。

我意识到一个潜在的问题,那就是当消息出队时,事务立即提交,而不是等待它被应用程序成功使用。下面是我正在使用的代码,但需要一个建议,其中/如何在应用程序成功使用后提交事务。目前它启动并提交/回滚专用Dequeue方法中的事务。

public sealed class Queue<T> : IQueue<T> where T : IQueueDataType 
{ 
    private readonly OracleConnection _connection; 

    private readonly string _consumerName; 

    private readonly IQueueSetting _queueSetting; 

    private readonly IDbConnectionFactory _dbConnectionFactory; 

    private OracleAQQueue _queue; 

    private IObservable<T> _messages; 

    private bool _isDisposed; 

    public Queue(IDbConnectionFactory dbConnectionFactory, IDalSettings dalSettings, IQueueSetting queueSetting) 
    { 
     _dbConnectionFactory = dbConnectionFactory; 
     _connection = dbConnectionFactory.Create() as OracleConnection; 
     _consumerName = dalSettings.Consumer; 
     _queueSetting = queueSetting; 

    } 

    public void Connect() 
    {    
     _connection.Open(); 
     _queue = new OracleAQQueue(_queueSetting.QueueName, _connection) 
     { 
      DequeueOptions = { Wait = 10, Visibility = OracleAQVisibilityMode.Immediate , ConsumerName = _consumerName, NavigationMode = OracleAQNavigationMode.FirstMessage, DequeueMode = OracleAQDequeueMode.Remove}, 
      UdtTypeName = _queueSetting.QueueDataTypeName, 
      MessageType = OracleAQMessageType.Udt 
     }; 

     _queue.NotificationConsumers = new[] { _consumerName }; 

     _messages = Observable 
      .FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs>(
       h => _queue.MessageAvailable += h, h => _queue.MessageAvailable -= h) 
      .Where(x => x.EventArgs.AvailableMessages > 0) 
      .Select(x => 
      { 
       try 
       { 
       Log.Info("Msg received", "Queue", _queueSetting.QueueName); 

        OracleAQMessage msg = Dequeue(); 

        Log.Info("Msg received id " + msg.MessageId, "Queue", _queueSetting.QueueName); 
        return (T)msg.Payload; 
       } 
       catch (Exception e) 
       { 

       } 
      }).Publish().RefCount(); 

    } 

    private OracleAQMessage Dequeue() 
    { 
     using (var connection = _dbConnectionFactory.Create() as OracleConnection) 
     { 
      try 
      { 
       connection.Open(); 
       using (OracleTransaction transaction = connection.BeginTransaction()) 
       { 
        try 
        { 
         OracleAQMessage msg = _queue.Dequeue(); 
         **transaction.Commit();** 
         return msg; 
        } 
        catch (Exception e) 
        {       
         **transaction.Rollback();** 
         throw; 
        } 
       } 
      } 
      catch (Exception e) 
      { 
       Log.Error(string.Format("Error occurred while connecting to database to dequeue new message. Error : {0}", e), 
        "Dequeue", GetType().FullName); 
       throw; 
      } 
      finally 
      { 
       connection.Close(); 
      } 
     } 
    } 

    public IObservable<T> GetMessages() 
    { 
     return _messages; 
    } 

    public void Dispose() 
    {   
     if (!_isDisposed) 
     { 
      if (_queue != null) 
      { 
       _queue.Dispose();  
      } 

      _connection.Dispose(); 
      _isDisposed = true; 
     } 
    } 
} 

如果不是使用IObservable,我只是公开的事件提交和回滚事务时会很容易,但我想我可以用IObservable即做,我可以运行一个Linq,但不知道如何提交交易。

回答

1

我不认为这里有一个简单的解决方案。如果我的理解是正确的:

  1. 事件是由甲骨文推,
  2. 你想通过IObservable流暴露的情况下,
  3. 应用“处理”了,
  4. 要提交如果处理成功,否则回滚。

问题是IObservable是一种单向机制。一旦你发布了一条消息(在我们的例子中你从这个Oracle队列中得到了一些东西),其目的不是要跟踪它,而是稍后决定是否提交/回滚。所以,你的选择是相当多的东西你的应用程序逻辑的某种形式的处理程序:

Func<OracleMessage, bool> isMessageCommitable; //...application handling logic here 

var appHandledMessages = oracleSourceMessages 
    .Select(m => Tuple.Create(m, isMessageCommitable(m))) 
    .Publish() 
    .RefCount(); 

appHandledMessages 
    .Where(t => t.Item2) 
    .Subscribe(t => Commit(t.Item1)); 

appHandledMessages 
    .Where(t => !t.Item2) 
    .Subscribe(t => Rollback(t.Item1)); 

...或设立IObservable指点下这会从应用程序推回到队列中的其他方式,也应该提交/回滚消息。您可能需要两个,一个用于提交,一个用于回滚,并且可能需要传入构造函数Queue<T>

祝你好运。

+0

谢谢。你能否提供第二种情况的例子“IObservables指向另一种方式...........” – tangokhi