2014-06-05 33 views
2

我遇到了一个问题,我需要限制调用另一个Web服务器的次数。它会有所不同,因为服务器是共享的,也许它可能有更多或更少的容量。带动态信号量限制maxCount

我正在考虑使用SemaphoreSlim类,但没有公共属性来更改最大数量。

我是否应该将SemaphoreSlim类包装到另一个可处理最大数量的类中?有没有更好的方法?

编辑:

这里的我想要什么:

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Threading; 
using System.Threading.Tasks; 

namespace Semaphore 
{ 
class Program 
{ 
    static SemaphoreSlim _sem = new SemaphoreSlim(10,10000); 

    static void Main(string[] args) 
    { 
     int max = 15; 

     for (int i = 1; i <= 50; i++) 
     { 
      new Thread(Enter).Start(new int[] { i, max}); 
     } 

     Console.ReadLine(); 

     max = 11; 

     for (int i = 1; i <= 50; i++) 
     { 
      new Thread(Enter).Start(new int[] { i, max }); 
     } 
    } 

    static void Enter(object param) 
    { 
     int[] arr = (int[])param; 
     int id = arr[0]; 
     int max = arr[1]; 

     try 
     { 
      Console.WriteLine(_sem.CurrentCount); 

      if (_sem.CurrentCount <= max) 
       _sem.Release(1); 
      else 
      { 
       _sem.Wait(1000); 

       Console.WriteLine(id + " wants to enter"); 

       Thread.Sleep((1000 * id)/2); // can be here at 

       Console.WriteLine(id + " is in!"); // Only three threads 

      } 
     } 
     catch(Exception ex) 
     { 
      Console.WriteLine("opps ", id); 
      Console.WriteLine(ex.Message); 
     } 
     finally    
     { 
      _sem.Release(); 
     } 
    } 
} 
} 

问题:

1 _sem.Wait(1000)应取消线程的执行将超过执行1000毫秒,不是吗?

2 - 我有使用Release/Wait的想法吗?

回答

2
  1. 得到一个信号量。
  2. 将容量设置得比您需要的要高很多。
  3. 将初始容量设置为您想要的最大容量为实际
  4. 把信号给他人使用。

在这一点上,你可以等待信号量,无论你想要多少(没有相应的释放调用)来降低容量。您可以多次释放信号量(没有相应的等待呼叫)以增加有效容量。

如果这是你做得够多的事情,你可以创建自己的信号量类,它构成了一个SemaphoreSlim并封装了这个逻辑。如果你的代码已经在没有先等待的情况下释放一个信号量,那么这个组合也是很重要的。与你自己的班级,你可以确保这样的发布是没有操作。 (这就是说,你应该避免把自己在那个位置上开始的,真的。)

+0

或者只是调用构造函数:http://msdn.microsoft.com/en-us/library/dd270891(v=vs.110).aspx –

+0

即使我自己的类,它封装SemaphoreSlim,我需要灵活地将最大并发呼叫切换为上或下。即从1000开始,改为600,并在一段时间后改为1700. –

+0

@JimMischel当然,尽管如果你想真的能够改变适当的最大值,你真的需要用另一种类型来组合它,这样你才能确保当它已经达到最大值时释放它,而不增加最大值就成为一个noop(或一个例外)。 – Servy

5

你不能改变的最大计数,但你可以创建一个SemaphoreSlim具有非常高的最大计数,并保留一些他们。见this constructor

所以我们可以说的是,绝对最大数量的并发呼叫100,但一开始你希望它是25.你初始化你的信号:

SemaphoreSlim sem = new SemaphoreSlim(25, 100); 

所以25是请求的数量,可以同时服务。您已预订其他75.

如果您想要增加允许的数量,请致电Release(num)。如果您拨打Release(10),那么电话号码将转为35.

现在,如果您想减少可用请求的数量,则必须多次拨打WaitOne。例如,如果你想从可用计数删除10:

for (var i = 0; i < 10; ++i) 
{ 
    sem.WaitOne(); 
} 

这有阻塞,直到其他客户端释放信号量的潜力。也就是说,如果允许35个并发请求,并且希望将其减少到25个,但已有35个客户端进行活动请求,那么WaitOne将阻塞,直到客户端调用Release,并且直到10个客户端发布时,循环才会终止。

+0

这可能有帮助,但我需要灵活的东西。可以说,最多1000个并发,但几个小时后,最大值应该是600或1200.我相信SemaphoreSlim不会给我这种灵活性。 =( –

+3

@ThiagoCustodio:你甚至读过答案吗?将第二个参数设置为你将允许的*最大值*。然后你可以使用'Release'和'WaitOne'来调整可用的数字 –

+0

I我会尝试一下,非常感谢 –

1

好吧,我可以解决我的问题看单声道项目。

// SemaphoreSlim.cs 
// 
// Copyright (c) 2008 Jérémie "Garuma" Laval 
// 
// Permission is hereby granted, free of charge, to any person obtaining a copy 
// of this software and associated documentation files (the "Software"), to deal 
// in the Software without restriction, including without limitation the rights 
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 
// copies of the Software, and to permit persons to whom the Software is 
// furnished to do so, subject to the following conditions: 
// 
// The above copyright notice and this permission notice shall be included in 
// all copies or substantial portions of the Software. 
// 
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 
// THE SOFTWARE. 
// 
// 

using System; 
using System.Diagnostics; 
using System.Threading.Tasks; 

namespace System.Threading 
{ 
    public class SemaphoreSlimCustom : IDisposable 
    { 
     const int spinCount = 10; 
     const int deepSleepTime = 20; 
     private object _sync = new object(); 


     int maxCount; 
     int currCount; 
     bool isDisposed; 

     public int MaxCount 
     { 
      get { lock (_sync) { return maxCount; } } 
      set 
      { 
       lock (_sync) 
       { 
        maxCount = value; 
       } 
      } 
     } 

     EventWaitHandle handle; 

     public SemaphoreSlimCustom (int initialCount) : this (initialCount, int.MaxValue) 
     { 
     } 

     public SemaphoreSlimCustom (int initialCount, int maxCount) 
     { 
      if (initialCount < 0 || initialCount > maxCount || maxCount < 0) 
       throw new ArgumentOutOfRangeException ("The initialCount argument is negative, initialCount is greater than maxCount, or maxCount is not positive."); 

      this.maxCount = maxCount; 
      this.currCount = initialCount; 
      this.handle = new ManualResetEvent (initialCount > 0); 
     } 

     public void Dispose() 
     { 
      Dispose(true); 
     } 

     protected virtual void Dispose (bool disposing) 
     { 
      isDisposed = true; 
     } 

     void CheckState() 
     { 
      if (isDisposed) 
       throw new ObjectDisposedException ("The SemaphoreSlim has been disposed."); 
     } 

     public int CurrentCount { 
      get { 
       return currCount; 
      } 
     } 

     public int Release() 
     { 
      return Release(1); 
     } 

     public int Release (int releaseCount) 
     { 
      CheckState(); 
      if (releaseCount < 1) 
       throw new ArgumentOutOfRangeException ("releaseCount", "releaseCount is less than 1"); 

      // As we have to take care of the max limit we resort to CAS 
      int oldValue, newValue; 
      do { 
       oldValue = currCount; 
       newValue = (currCount + releaseCount); 
       newValue = newValue > maxCount ? maxCount : newValue; 
      } while (Interlocked.CompareExchange (ref currCount, newValue, oldValue) != oldValue); 

      handle.Set(); 

      return oldValue; 
     } 

     public void Wait() 
     { 
      Wait (CancellationToken.None); 
     } 

     public bool Wait (TimeSpan timeout) 
     { 
      return Wait ((int)timeout.TotalMilliseconds, CancellationToken.None); 
     } 

     public bool Wait (int millisecondsTimeout) 
     { 
      return Wait (millisecondsTimeout, CancellationToken.None); 
     } 

     public void Wait (CancellationToken cancellationToken) 
     { 
      Wait (-1, cancellationToken); 
     } 

     public bool Wait (TimeSpan timeout, CancellationToken cancellationToken) 
     { 
      CheckState(); 
      return Wait ((int)timeout.TotalMilliseconds, cancellationToken); 
     } 

     public bool Wait (int millisecondsTimeout, CancellationToken cancellationToken) 
     { 
      CheckState(); 
      if (millisecondsTimeout < -1) 
       throw new ArgumentOutOfRangeException ("millisecondsTimeout", 
                 "millisecondsTimeout is a negative number other than -1"); 

      Stopwatch sw = Stopwatch.StartNew(); 

      Func<bool> stopCondition =() => millisecondsTimeout >= 0 && sw.ElapsedMilliseconds > millisecondsTimeout; 

      do { 
       bool shouldWait; 
       int result; 

       do { 
        cancellationToken.ThrowIfCancellationRequested(); 
        if (stopCondition()) 
         return false; 

        shouldWait = true; 
        result = currCount; 

        if (result > 0) 
         shouldWait = false; 
        else 
         break; 
       } while (Interlocked.CompareExchange (ref currCount, result - 1, result) != result); 

       if (!shouldWait) { 
        if (result == 1) 
         handle.Reset(); 
        break; 
       } 

       SpinWait wait = new SpinWait(); 

       while (Thread.VolatileRead (ref currCount) <= 0) { 
        cancellationToken.ThrowIfCancellationRequested(); 
        if (stopCondition()) 
         return false; 

        if (wait.Count > spinCount) { 
         int diff = millisecondsTimeout - (int)sw.ElapsedMilliseconds; 

         int timeout = millisecondsTimeout < 0 ? deepSleepTime : 


          Math.Min (Math.Max (diff, 1), deepSleepTime); 
         handle.WaitOne (timeout); 
        } else 
         wait.SpinOnce(); 
       } 
      } while (true); 

      return true; 
     } 

     public WaitHandle AvailableWaitHandle { 
      get { 
       return handle; 
      } 
     } 

     public Task WaitAsync() 
     { 
      return Task.Factory.StartNew (() => Wait()); 
     } 

     public Task WaitAsync (CancellationToken cancellationToken) 
     { 
      return Task.Factory.StartNew (() => Wait (cancellationToken), cancellationToken); 
     } 

     public Task<bool> WaitAsync (int millisecondsTimeout) 
     { 
      return Task.Factory.StartNew (() => Wait (millisecondsTimeout)); 
     } 

     public Task<bool> WaitAsync (TimeSpan timeout) 
     { 
      return Task.Factory.StartNew (() => Wait (timeout)); 
     } 

     public Task<bool> WaitAsync (int millisecondsTimeout, CancellationToken cancellationToken) 
     { 
      return Task.Factory.StartNew (() => Wait (millisecondsTimeout, cancellationToken), cancellationToken); 
     } 

     public Task<bool> WaitAsync (TimeSpan timeout, CancellationToken cancellationToken) 
     { 
      return Task.Factory.StartNew (() => Wait (timeout, cancellationToken), cancellationToken); 
     } 
    } 
}