2011-09-15 102 views
0

正确的解决方案,我实现我自己的发布/订阅模式实现与WCF与WSDualHttpBinding的代码,但我已经超时一个小问题,我在后面解释,现在让我告诉我”在做:WCF的发布/订阅模式实现

public interface IEventSubscriber 
{ 
    [OperationContract] 
    void NotifyEvent(EventNotification notification); 
    [OperationContract] 
    void NotifyServiceDisconnecting(); 
} 

[ServiceContract(SessionMode = SessionMode.Required, CallbackContract = typeof(IEventSubscriber))] 
public interface IEventPublisherService 
{ 
    [OperationContract(IsOneWay = false, IsInitiating = true)] 
    void Subscribe(string culture); 
    [OperationContract(IsOneWay = false, IsInitiating = false, IsTerminating = true)] 
    void Unsubscribe(); 
} 

[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)] 
internal class EventPublisherServiceImpl : IEventPublisherService 
{ 
    ServiceHost host; 
    public bool StartService() 
    { 
     bool ret = false; 
     try 
     { 
      Uri baseAddress = new Uri(ConfigurationManager.AppSettings[GlobalConstants.CfgKeyConfigEventPublishserServiceBaseAddress].ToString()); 
      EventHelper.AddEvent(string.Format("Event Publisher Service on: {0}", baseAddress.ToString())); 

      host = new ServiceHost(this, baseAddress); 

      // duplex session enable http binding 
      WSDualHttpBinding httpBinding = new WSDualHttpBinding(WSDualHttpSecurityMode.None); 
      httpBinding.ReceiveTimeout = TimeSpan.FromMinutes(10); 
      httpBinding.ReliableSession = new ReliableSession(); 
      httpBinding.ReliableSession.Ordered = true; 
      httpBinding.ReliableSession.InactivityTimeout = TimeSpan.FromMinutes(10); 
      host.AddServiceEndpoint(typeof(IEventPublisherService), httpBinding, baseAddress); 

      // Enable metadata publishing. 
      ServiceMetadataBehavior smb = new ServiceMetadataBehavior(); 
      smb.HttpGetEnabled = true; 
      smb.MetadataExporter.PolicyVersion = PolicyVersion.Policy15; 
      host.Description.Behaviors.Add(smb); 

      // Open the ServiceHost to start listening for messages. 
      host.Open(); 
      ret = true; 
     } 
     catch (Exception e) 
     { 
      EventHelper.AddException(e.Message); 
     } 

     return ret; 
    } 
... 

} 

现在在我的实现类,我有存储在存储器中,当新通知到来对每个用户进行下面的代码的用户的列表:

... 
    /// <summary> 
    /// List of active subscribers 
    /// </summary> 
    private static Dictionary<IEventSubscriber, string> subscribers = new Dictionary<IEventSubscriber, string>(); 

... 

我像这样使用它:

internal void Subscribe(string culture) 
    { 
     lock (subscribers) 
     { 
      // Get callback contract as specified on the service definition 
      IEventSubscriber callback = OperationContext.Current.GetCallbackChannel<IEventSubscriber>(); 

      // Add subscriber to the list of active subscribers 
      if (!subscribers.ContainsKey(callback)) 
      { 
       subscribers.Add(callback, culture); 
      } 
     } 
    } 

... 

    private void OnNotificationEvent(NormalEvent notification) 
    { 
     lock (subscribers) 
     { 
      List<IEventSubscriber> listToRemove = new List<IEventSubscriber>(); 
      // Method signature: Parallel.ForEach(IEnumerable<TSource> source, Action<TSource> body) 
      Parallel.ForEach(subscribers, kvp => 
      { 
       try 
       { 
        kvp.Key.NotifyEvent(new EventNotification(notification, kvp.Value)); 
       } 
       catch (Exception ex) 
       { 
        EventHelper.AddException(string.Format("Error notifying event notification to client: {0} - removing this one", ex.Message)); 
        listToRemove.Add(kvp.Key); 
       } 

      } //close lambda expression 
      ); //close method invocation 

      Parallel.ForEach(listToRemove, subs => 
      { 
       try 
       { 
        subs.NotifyServiceDisconnecting(); 
       } 
       catch (Exception ex) { 
        EventHelper.AddException(string.Format("Failed to notify client that is to be removed: {0}", 
         ex.Message)); 
       } 
       subscribers.Remove(subs); 
      } 
      ); 
     } 
    } 

这有什么问题,如果超时得以实现(请注意,我设置10分钟,ReceiveTimeout和非活动超时)的用户是在列表中去故障状态,以下异常是逮住OnNotificationEvent

* 无法完成操作'NotifyEvent',因为会话通道超时等待接收消息。要增加超时,请在配置文件中的绑定上设置receiveTimeout属性,或者直接在绑定上设置ReceiveTimeout属性。 *

好吧,我可以增加超时值,但如果我这样做会发生一些时间,最终。

我的问题是:我做错了什么努力来实现这个模式的时候?或者以任何其他方式实现这种模式是避免此问题的更好方法?或存在重新连接故障回调通道的任何方式(对于我读什么是不可能的,但由于我不能通知的连接中断,让客户盲目的,不知道的是,通信结束的客户端!?或者是给知识,他失去了通信与发布!?)

当然像ping消息解决方案已经过时了:),但是没关系,如果没有更好的出现看起来像我已经实现类似的东西办法...

感谢

回答

1

对于现在的解决办法是改变超时有无限的价值:

  // duplex session enable http binding 
      WSDualHttpBinding httpBinding = new WSDualHttpBinding(WSDualHttpSecurityMode.None); 
      httpBinding.ReceiveTimeout = TimeSpan.MaxValue; 
      httpBinding.ReliableSession = new ReliableSession(); 
      httpBinding.ReliableSession.Ordered = true; 
      httpBinding.ReliableSession.InactivityTimeout = TimeSpan.MaxValue; 
0

您正在使用Parallel.ForEach,但我不知道这是不够的。 AFAIR Parallel.ForEach不会在单独的线程中执行每个迭代。

我想建议,开始独立的线程在OnNotificationEvent每个用户,并使用锁,以确保的foreach不会被收集来breaked修改例外:

lock (_subscribersSync) 
     foreach (var chatter in subscribers) 
     { 
       Logger.Log.DebugFormat("putting all users to {0}", subscribers.Key.Name); 
       Thread th = new Thread(PublishAllUserMessage); 
       th.Start(new MessageData() { Message = "", Subscriber = chatter.Key}; 
     } 

void PublishAllUserMessage(object messageData) 
{ 
     MessageData md = (MessageData)messageData; 
     try 
     { 
       md.Subscriber.NotifyEvent(...event parameters here ...); 
     } 
     catch (Exception ex) 
     { 
       Logger.Log.Error(string.Format("failed to publish message to '{0}'", md.Subscriber.Name), ex); 
       KickOff(md.Subscriber); 
     } 
} 
object _subscribersSync = new object(); 
void KickOff(IEventSubscriber p) 
{ 
     lock (_subscribersSync) 
     { 
       subscribers.Remove(p); 
       Logger.Log.WarnFormat("'{0}' kicked off", p.Name); 
     } 
} 
public class MessageData 
{ 
     public string Message; 
     public IEventSubscriber Subscriber; 
} 
+0

确定,但paralel.ForEach做同样的,你使用线程,或similiar做,不创建的所有线程做了较为不错的工作与,如果我有2000客户端我不想开始2000线程我是对的还是不是? – Nuno

+0

AFAIR没有保证Paralel.ForEach执行的单独的线程每次迭代。是的,它看起来不那么难看,但我已经展示了工作系统的一部分代码。您可以使用ThreadPool而不是创建Thread对象。只需尝试在单独的线程中调用订阅者NotifyEvent并测试它对2k元素的工作方式。我想超时会消失。另外我想说的是,这是跨进程调用,您应该记住,2k订户的大量evenets会导致您的网络被Paralel.ForEach或Threads重载。 – oleksa

+0

但我指着超时是ReceiveTimeout和InactivityTimeout不执行超时,但确定我会尝试用线程来做到这一点 – Nuno