我已经实现了从Oracle AQ中消除邮件并将其作为IObservable暴露给系统。工作流程如下: -Oracle AQ使用反应扩展和事务处理回调ODP.NET
- 应用程序收到Oracle有关新消息的回调事件。
- 应用程序将消息出队并将其添加到IObservable(消息出队作为消息出队后立即提交的事务的一部分)。
我意识到一个潜在的问题,那就是当消息出队时,事务立即提交,而不是等待它被应用程序成功使用。下面是我正在使用的代码,但需要一个建议,其中/如何在应用程序成功使用后提交事务。目前它启动并提交/回滚专用Dequeue方法中的事务。
public sealed class Queue<T> : IQueue<T> where T : IQueueDataType
{
private readonly OracleConnection _connection;
private readonly string _consumerName;
private readonly IQueueSetting _queueSetting;
private readonly IDbConnectionFactory _dbConnectionFactory;
private OracleAQQueue _queue;
private IObservable<T> _messages;
private bool _isDisposed;
public Queue(IDbConnectionFactory dbConnectionFactory, IDalSettings dalSettings, IQueueSetting queueSetting)
{
_dbConnectionFactory = dbConnectionFactory;
_connection = dbConnectionFactory.Create() as OracleConnection;
_consumerName = dalSettings.Consumer;
_queueSetting = queueSetting;
}
public void Connect()
{
_connection.Open();
_queue = new OracleAQQueue(_queueSetting.QueueName, _connection)
{
DequeueOptions = { Wait = 10, Visibility = OracleAQVisibilityMode.Immediate , ConsumerName = _consumerName, NavigationMode = OracleAQNavigationMode.FirstMessage, DequeueMode = OracleAQDequeueMode.Remove},
UdtTypeName = _queueSetting.QueueDataTypeName,
MessageType = OracleAQMessageType.Udt
};
_queue.NotificationConsumers = new[] { _consumerName };
_messages = Observable
.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs>(
h => _queue.MessageAvailable += h, h => _queue.MessageAvailable -= h)
.Where(x => x.EventArgs.AvailableMessages > 0)
.Select(x =>
{
try
{
Log.Info("Msg received", "Queue", _queueSetting.QueueName);
OracleAQMessage msg = Dequeue();
Log.Info("Msg received id " + msg.MessageId, "Queue", _queueSetting.QueueName);
return (T)msg.Payload;
}
catch (Exception e)
{
}
}).Publish().RefCount();
}
private OracleAQMessage Dequeue()
{
using (var connection = _dbConnectionFactory.Create() as OracleConnection)
{
try
{
connection.Open();
using (OracleTransaction transaction = connection.BeginTransaction())
{
try
{
OracleAQMessage msg = _queue.Dequeue();
**transaction.Commit();**
return msg;
}
catch (Exception e)
{
**transaction.Rollback();**
throw;
}
}
}
catch (Exception e)
{
Log.Error(string.Format("Error occurred while connecting to database to dequeue new message. Error : {0}", e),
"Dequeue", GetType().FullName);
throw;
}
finally
{
connection.Close();
}
}
}
public IObservable<T> GetMessages()
{
return _messages;
}
public void Dispose()
{
if (!_isDisposed)
{
if (_queue != null)
{
_queue.Dispose();
}
_connection.Dispose();
_isDisposed = true;
}
}
}
如果不是使用IObservable
,我只是公开的事件提交和回滚事务时会很容易,但我想我可以用IObservable
即做,我可以运行一个Linq
,但不知道如何提交交易。
谢谢。你能否提供第二种情况的例子“IObservables指向另一种方式...........” – tangokhi