2016-03-09 64 views
2

我们正试图定义一个服务织物无状态服务的UDP数据,监听器。服务织物无状态服务器的自定义UDP监听器

我们正在与微软合作谁也说,这是支持的,我应该设置为TCP;下面是从ServiceManifest.xml文件中的片段:

<Resources> 
    <Endpoints> 
     <!-- This endpoint is used by the communication listener to obtain the port on which to 
      listen. Please note that if your service is partitioned, this port is shared with 
      replicas of different partitions that are placed in your code. --> 
     <Endpoint Name="ServiceEndpoint" Protocol="tcp" Port="12345" Type="Input" /> 
    </Endpoints> 
</Resources> 

服务启动正常,但我不能让服务接收任何UDP数据,如果我做了netstat -a我什么都看不到听在TCP或端口UDP。

我已经做了很多的研究对在线和我还没有发现太多有关创建自定义ICommunicationListener但我希望别人也许可以验证这应该是可能的SF。

下面是ICommunicationListener实现:

public UdpCommunicationListener(string serviceEndPoint, 
      ServiceInitializationParameters serviceInitializationParameters, Action<UdpReceiveResult> connector) 
    { 
     if (serviceInitializationParameters == null) 
     { 
      throw new ArgumentNullException(nameof(serviceInitializationParameters)); 
     } 

     var endPoint = serviceInitializationParameters 
      .CodePackageActivationContext 
      .GetEndpoint(serviceEndPoint ?? "ServiceEndPoint"); 

     _connector = connector; 

     _ipAddress = FabricRuntime.GetNodeContext().IPAddressOrFQDN; 
     _port = endPoint.Port; 

     _server = new UdpServer(_ipAddress, _port); 

     _server.Open(); 
    } 

    public Task<string> OpenAsync(CancellationToken cancellationToken) 
    { 
     _listener = _server.Listen(_connector); 

     return Task.FromResult($"udp::{_ipAddress}:{_port}"); 
    } 

    public Task CloseAsync(CancellationToken cancellationToken) 
    { 
     this.Abort(); 

     return Task.FromResult(true); 
    } 

    public void Abort() 
    { 
     _listener.Dispose(); 
     _server?.Close(); 
    } 
} 

public class UdpServer 
{ 
    private readonly UdpClient _udpClient; 
    private IObservable<UdpReceiveResult> _receiveStream; 

    public UdpServer(string ipAddress, int port) 
    { 
     Id = Guid.NewGuid(); 

     _udpClient = new UdpClient(ipAddress, port); 
    } 

    public Guid Id { get; } 

    public void Open() 
    { 
     _receiveStream = _udpClient.ReceiveStream().Publish().RefCount(); 
    } 

    public void Close() 
    { 
     //TODO: Not sure how to stop the process 
    } 

    public IDisposable Listen(Action<UdpReceiveResult> process) 
    { 
     return _receiveStream.Subscribe(async r => 
     { 
       process(r); 
     }); 
    } 
} 
+0

能否请您发布'ICommunicationListener'码? –

回答

1

有与UdpServer组件,我解决的缺陷,这现在工作在服务织物局主办。

与该行的代码WA问题:

_udpClient = new UdpClient(ipAddress, port); 

这是监听的流量错过载,它需要的是:

_udpClient = new UdpClient(port); 

我曾尝试:

_udpClient = new UdpClient(new IPAddress(IPAddress.Parse(_ipAddress)),port) 

但这不起作用;因为来自Communication的线路(如它自己描述的那样)检索主机返回的是主机名而不是IPAddress,我认为你可以通过对清单进行一些更改来改变这种行为,但现在只需要端口就足够了。

+0

所以问题是你的UdpServer实际上没有绑定/监听? –

+0

请提供解决方案,而不仅仅是您找到一个。否则其他人将无法使用此问题/答案。 –

+1

公平点 - 我已经为实际解决方案增加了更多细节 –

-1

因为只有协议HTTP/HTTPS和TCP的支持。我想你不能做一个udp协议有点事情。 Udp不可靠。我们能够使用SignalR,但我猜Udp不起作用。

编辑:您可以在我的其他帖子看到,UDP是现在的工作。

+0

这是不正确的。所有协议均受支持。请参阅https://azure.microsoft.com/en-us/documentation/articles/service-fabric-reliable-services-communication/ –

+1

就像我上面说的,我得到了它的工作。 –

2

我有工作的udp作为一个无状态的服务。这是代码:

UdpService.cs

using System; 
using System.Collections.Generic; 
using System.Diagnostics; 
using System.Fabric; 
using System.Linq; 
using System.Net; 
using System.Threading; 
using System.Threading.Tasks; 
using Microsoft.ServiceFabric.Services.Communication.Runtime; 
using Microsoft.ServiceFabric.Services.Runtime; 

namespace UdpService 
{ 
    /// <summary> 
    /// An instance of this class is created for each service instance by the Service Fabric runtime. 
    /// </summary> 
    internal sealed class UdpService : StatelessService 
    { 
     private UdpCommunicationListener listener; 

     public UdpService(StatelessServiceContext context) 
      : base(context) 
     { } 

     protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners() 
     { 
      yield return new ServiceInstanceListener(initParams => 
      { 
       this.listener = new UdpCommunicationListener(); 
       this.listener.Initialize(initParams.CodePackageActivationContext); 

       return this.listener; 
      }); 
     } 
    } 
} 

UdpCommunicationListener

using System; 
using System.Diagnostics; 
using System.Fabric; 
using System.Fabric.Description; 
using System.Globalization; 
using System.Net; 
using System.Net.Sockets; 
using System.Reflection; 
using System.Runtime.CompilerServices; 
using System.Text; 
using System.Threading; 
using System.Threading.Tasks; 

using Microsoft.ServiceFabric.Services.Communication.Runtime; 

namespace UdpService 
{ 
    public class UdpCommunicationListener : ICommunicationListener, IDisposable 
    { 
     private readonly CancellationTokenSource processRequestsCancellation = new CancellationTokenSource(); 

     public int Port { get; set; } 

     private UdpClient server; 

     /// <summary> 
     /// Stops the Server Ungracefully 
     /// </summary> 
     public void Abort() 
     { 
      this.StopWebServer(); 
     } 

     /// <summary> 
     /// Stops the Server Gracefully 
     /// </summary> 
     /// <param name="cancellationToken">Cancellation Token</param> 
     /// <returns>Task for Asynchron usage</returns> 
     public Task CloseAsync(CancellationToken cancellationToken) 
     { 
      this.StopWebServer(); 

      return Task.FromResult(true); 
     } 

     /// <summary> 
     /// Free Resources 
     /// </summary> 
     public void Dispose() 
     { 
      this.Dispose(true); 
      GC.SuppressFinalize(this); 
     } 

     /// <summary> 
     /// Initializes Configuration 
     /// </summary> 
     /// <param name="context">Code Package Activation Context</param> 
     public void Initialize(ICodePackageActivationContext context) 
     { 
      EndpointResourceDescription serviceEndpoint = context.GetEndpoint("ServiceEndpoint"); 
      this.Port = serviceEndpoint.Port; 
     } 

     /// <summary> 
     /// Starts the Server 
     /// </summary> 
     /// <param name="cancellationToken">Cancellation Token</param> 
     /// <returns>Task for Asynchron usage</returns> 
     public Task<string> OpenAsync(CancellationToken cancellationToken) 
     { 
      try 
      { 
       this.server = new UdpClient(this.Port); 
      } 
      catch (Exception ex) 
      { 
      } 

      ThreadPool.QueueUserWorkItem((state) => 
      { 
       this.MessageHandling(this.processRequestsCancellation.Token); 
      }); 

      return Task.FromResult("udp://" + FabricRuntime.GetNodeContext().IPAddressOrFQDN + ":" + this.Port); 
     } 

     protected void MessageHandling(CancellationToken cancellationToken) 
     { 
      while (!cancellationToken.IsCancellationRequested) 
      { 
       IPEndPoint ipEndPoint = new IPEndPoint(IPAddress.Any, this.Port); 
       byte[] receivedBytes = this.server.Receive(ref ipEndPoint); 
       this.server.Send(receivedBytes, receivedBytes.Length, ipEndPoint); 
       Debug.WriteLine("Received bytes: " + receivedBytes.Length.ToString()); 
      } 
     } 

     /// <summary> 
     /// Receives the specified endpoint. 
     /// </summary> 
     /// <param name="endpoint">The endpoint.</param> 
     /// <returns></returns> 
     public Task<byte[]> Receive(ref IPEndPoint endpoint) 
     { 
      return Task.FromResult(this.server.Receive(ref endpoint)); 
     } 

     /// <summary> 
     /// Free Resources and Stop Server 
     /// </summary> 
     /// <param name="disposing">Disposing .NET Resources?</param> 
     protected virtual void Dispose(bool disposing) 
     { 
      if (disposing) 
      { 
       if (this.server != null) 
       { 
        try 
        { 
         this.server.Close(); 
         this.server = null; 
        } 
        catch (Exception ex) 
        { 
         ServiceEventSource.Current.Message(ex.Message); 
        } 
       } 
      } 
     } 

     /// <summary> 
     /// Stops Server and Free Handles 
     /// </summary> 
     private void StopWebServer() 
     { 
      this.processRequestsCancellation.Cancel(); 
      this.Dispose(); 
     } 
    } 
} 

最后但并非最不重要的ServiceManifest.xml

<?xml version="1.0" encoding="utf-8"?> 
<ServiceManifest Name="UdpServicePkg" 
       Version="1.0.0" 
       xmlns="http://schemas.microsoft.com/2011/01/fabric" 
       xmlns:xsd="http://www.w3.org/2001/XMLSchema" 
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> 
    <ServiceTypes> 
    <!-- This is the name of your ServiceType. 
     This name must match the string used in RegisterServiceType call in Program.cs. --> 
    <StatelessServiceType ServiceTypeName="UdpServiceType" /> 
    </ServiceTypes> 

    <!-- Code package is your service executable. --> 
    <CodePackage Name="Code" Version="1.0.0"> 
    <EntryPoint> 
     <ExeHost> 
     <Program>UdpService.exe</Program> 
     </ExeHost> 
    </EntryPoint> 
    </CodePackage> 

    <!-- Config package is the contents of the Config directoy under PackageRoot that contains an 
     independently-updateable and versioned set of custom configuration settings for your service. --> 
    <ConfigPackage Name="Config" Version="1.0.0" /> 

    <Resources> 
    <Endpoints> 
     <!-- This endpoint is used by the communication listener to obtain the port on which to 
      listen. Please note that if your service is partitioned, this port is shared with 
      replicas of different partitions that are placed in your code. --> 
     <Endpoint Name="ServiceEndpoint" Port="5555" /> 
    </Endpoints> 
    </Resources> 
</ServiceManifest>