正确的解决方案,我实现我自己的发布/订阅模式实现与WCF与WSDualHttpBinding的代码,但我已经超时一个小问题,我在后面解释,现在让我告诉我”在做:WCF的发布/订阅模式实现
public interface IEventSubscriber
{
[OperationContract]
void NotifyEvent(EventNotification notification);
[OperationContract]
void NotifyServiceDisconnecting();
}
[ServiceContract(SessionMode = SessionMode.Required, CallbackContract = typeof(IEventSubscriber))]
public interface IEventPublisherService
{
[OperationContract(IsOneWay = false, IsInitiating = true)]
void Subscribe(string culture);
[OperationContract(IsOneWay = false, IsInitiating = false, IsTerminating = true)]
void Unsubscribe();
}
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
internal class EventPublisherServiceImpl : IEventPublisherService
{
ServiceHost host;
public bool StartService()
{
bool ret = false;
try
{
Uri baseAddress = new Uri(ConfigurationManager.AppSettings[GlobalConstants.CfgKeyConfigEventPublishserServiceBaseAddress].ToString());
EventHelper.AddEvent(string.Format("Event Publisher Service on: {0}", baseAddress.ToString()));
host = new ServiceHost(this, baseAddress);
// duplex session enable http binding
WSDualHttpBinding httpBinding = new WSDualHttpBinding(WSDualHttpSecurityMode.None);
httpBinding.ReceiveTimeout = TimeSpan.FromMinutes(10);
httpBinding.ReliableSession = new ReliableSession();
httpBinding.ReliableSession.Ordered = true;
httpBinding.ReliableSession.InactivityTimeout = TimeSpan.FromMinutes(10);
host.AddServiceEndpoint(typeof(IEventPublisherService), httpBinding, baseAddress);
// Enable metadata publishing.
ServiceMetadataBehavior smb = new ServiceMetadataBehavior();
smb.HttpGetEnabled = true;
smb.MetadataExporter.PolicyVersion = PolicyVersion.Policy15;
host.Description.Behaviors.Add(smb);
// Open the ServiceHost to start listening for messages.
host.Open();
ret = true;
}
catch (Exception e)
{
EventHelper.AddException(e.Message);
}
return ret;
}
...
}
现在在我的实现类,我有存储在存储器中,当新通知到来对每个用户进行下面的代码的用户的列表:
...
/// <summary>
/// List of active subscribers
/// </summary>
private static Dictionary<IEventSubscriber, string> subscribers = new Dictionary<IEventSubscriber, string>();
...
我像这样使用它:
internal void Subscribe(string culture)
{
lock (subscribers)
{
// Get callback contract as specified on the service definition
IEventSubscriber callback = OperationContext.Current.GetCallbackChannel<IEventSubscriber>();
// Add subscriber to the list of active subscribers
if (!subscribers.ContainsKey(callback))
{
subscribers.Add(callback, culture);
}
}
}
...
private void OnNotificationEvent(NormalEvent notification)
{
lock (subscribers)
{
List<IEventSubscriber> listToRemove = new List<IEventSubscriber>();
// Method signature: Parallel.ForEach(IEnumerable<TSource> source, Action<TSource> body)
Parallel.ForEach(subscribers, kvp =>
{
try
{
kvp.Key.NotifyEvent(new EventNotification(notification, kvp.Value));
}
catch (Exception ex)
{
EventHelper.AddException(string.Format("Error notifying event notification to client: {0} - removing this one", ex.Message));
listToRemove.Add(kvp.Key);
}
} //close lambda expression
); //close method invocation
Parallel.ForEach(listToRemove, subs =>
{
try
{
subs.NotifyServiceDisconnecting();
}
catch (Exception ex) {
EventHelper.AddException(string.Format("Failed to notify client that is to be removed: {0}",
ex.Message));
}
subscribers.Remove(subs);
}
);
}
}
这有什么问题,如果超时得以实现(请注意,我设置10分钟,ReceiveTimeout和非活动超时)的用户是在列表中去故障状态,以下异常是逮住OnNotificationEvent
* 无法完成操作'NotifyEvent',因为会话通道超时等待接收消息。要增加超时,请在配置文件中的绑定上设置receiveTimeout属性,或者直接在绑定上设置ReceiveTimeout属性。 *
好吧,我可以增加超时值,但如果我这样做会发生一些时间,最终。
我的问题是:我做错了什么努力来实现这个模式的时候?或者以任何其他方式实现这种模式是避免此问题的更好方法?或存在重新连接故障回调通道的任何方式(对于我读什么是不可能的,但由于我不能通知的连接中断,让客户盲目的,不知道的是,通信结束的客户端!?或者是给知识,他失去了通信与发布!?)
当然像ping消息解决方案已经过时了:),但是没关系,如果没有更好的出现看起来像我已经实现类似的东西办法...
感谢
确定,但paralel.ForEach做同样的,你使用线程,或similiar做,不创建的所有线程做了较为不错的工作与,如果我有2000客户端我不想开始2000线程我是对的还是不是? – Nuno
AFAIR没有保证Paralel.ForEach执行的单独的线程每次迭代。是的,它看起来不那么难看,但我已经展示了工作系统的一部分代码。您可以使用ThreadPool而不是创建Thread对象。只需尝试在单独的线程中调用订阅者NotifyEvent并测试它对2k元素的工作方式。我想超时会消失。另外我想说的是,这是跨进程调用,您应该记住,2k订户的大量evenets会导致您的网络被Paralel.ForEach或Threads重载。 – oleksa
但我指着超时是ReceiveTimeout和InactivityTimeout不执行超时,但确定我会尝试用线程来做到这一点 – Nuno