2014-12-20 31 views
0

我正在使用.Net构建股票报价更新程序。假设在上市时间内有X个股票代码需要更新。为了保持更新速度不超过数据提供者的限制(例如雅虎财务),我将尝试通过使用类似于线程池的机制来限制请求数/秒。假设我想只允许5个请求/秒,对应于5个线程池。使用任务并行库处理频繁的URL请求

我听说过TPL并希望使用它,虽然我没有经验。我如何在任务中指定隐式使用的池中的线程数?这里是一个循环来安排请求,其中requestFunc(url)是更新引号的函数。我喜欢从专家那里得到一些意见或建议,正确地做到这一点:

// X is a number much bigger than 5 
List<Task> tasks = new List<Task>(); 
for (int i=0; i<X; i++) 
{ 
    Task t = Task.Factory.StartNew(() => { requestFunc(url); }, TaskCreationOptions.None); 
    t.Wait(100); //slow down 100 ms. I am not sure if this is the right thing to do 
    tasks.Add(t); 
} 

Task.WaitAll(tasks); 

好的,我添加了一个外部循环,使其连续运行。当我对@ steve16351的代码做一些修改时,它只会循环一次。为什么????

static void Main(string[] args) 
    { 
     LimitedExecutionRateTaskScheduler scheduler = new LimitedExecutionRateTaskScheduler(5); 
     TaskFactory factory = new TaskFactory(scheduler); 
     List<string> symbolsToCheck = new List<string>() { "GOOG", "AAPL", "MSFT", "AGIO", "MNK", "SPY", "EBAY", "INTC" }; 


     while (true) 
     { 
      List<Task> tasks = new List<Task>(); 
      Console.WriteLine("Starting..."); 

      foreach (string symbol in symbolsToCheck) 
      { 
       Task t = factory.StartNew(() => { write(symbol); }, 
                    CancellationToken.None, TaskCreationOptions.None, scheduler); 
       tasks.Add(t); 
      } 
      //Task.WhenAll(tasks); 

      Console.WriteLine("Ending..."); 
      Console.Read(); 
     } 

     //Console.Read(); 
    } 

    public static void write (string symbol) 
    { 
     DateTime dateValue = DateTime.Now; 
     //Console.WriteLine("[{0:HH:mm:ss}] Doing {1}..", DateTime.Now, symbol); 
     Console.WriteLine("Date and Time with Milliseconds: {0} doing {1}..", 
       dateValue.ToString("MM/dd/yyyy hh:mm:ss.fff tt"), symbol); 
    } 
+0

是否要限制为5个/秒或5个并发操作? – i3arnon

+0

是的,我主要关心的是请求的数量,因为我的IP可能会被阻止,如果我超过数据提供商的限制太长时间/频繁。只要系统允许,线程是次要的。 –

+0

是的,这是处理外部服务时常见的限制。 TPL Dataflow非常适合这一点。 – i3arnon

回答

0

您可以使用自定义任务计划程序,该计划程序限制任务可以开始的速率。

在下面的任务排队,并出现一个定时器设置为您的最大允许速率的频率。所以如果每秒请求5次,计时器设置为200ms。在tick上,一个任务会从正在等待的任务中出列并执行。

编辑:除了请求率,您还可以扩展到控制执行线程的最大数量。

static void Main(string[] args) 
{ 
    TaskFactory factory = new TaskFactory(new LimitedExecutionRateTaskScheduler(5, 5)); // 5 per second, 5 max executing 
    List<string> symbolsToCheck = new List<string>() { "GOOG", "AAPL", "MSFT" }; 

    for (int i = 0; i < 5; i++) 
     symbolsToCheck.AddRange(symbolsToCheck); 

    foreach (string symbol in symbolsToCheck) 
    { 
     factory.StartNew(() => 
     { 
      Console.WriteLine("[{0:HH:mm:ss}] [{1}] Doing {2}..", DateTime.Now, Thread.CurrentThread.ManagedThreadId, symbol); 
      Thread.Sleep(5000); 
      Console.WriteLine("[{0:HH:mm:ss}] [{1}] {2} is done", DateTime.Now, Thread.CurrentThread.ManagedThreadId, symbol); 
     }); 
    } 

    Console.Read(); 
} 


public class LimitedExecutionRateTaskScheduler : TaskScheduler 
{ 
    private ConcurrentQueue<Task> _pendingTasks = new ConcurrentQueue<Task>();    
    private readonly object _taskLocker = new object(); 
    private List<Task> _executingTasks = new List<Task>(); 
    private readonly int _maximumConcurrencyLevel = 5; 
    private Timer _doWork = null; 

    public LimitedExecutionRateTaskScheduler(double requestsPerSecond, int maximumDegreeOfParallelism) 
    { 
     _maximumConcurrencyLevel = maximumDegreeOfParallelism; 
     long frequency = (long)(1000.0/requestsPerSecond); 
     _doWork = new Timer(ExecuteRequests, null, frequency, frequency); 
    } 

    public override int MaximumConcurrencyLevel 
    { 
     get 
     { 
      return _maximumConcurrencyLevel; 
     } 
    } 

    protected override bool TryDequeue(Task task) 
    { 
     return base.TryDequeue(task); 
    } 
    protected override void QueueTask(Task task) 
    { 
     _pendingTasks.Enqueue(task); 
    } 

    private void ExecuteRequests(object state) 
    { 
     Task queuedTask = null; 
     int currentlyExecutingTasks = 0; 

     lock (_taskLocker) 
     { 
      for (int i = 0; i < _executingTasks.Count; i++) 
       if (_executingTasks[i].IsCompleted) 
        _executingTasks.RemoveAt(i--); 

      currentlyExecutingTasks = _executingTasks.Count; 
     } 

     if (currentlyExecutingTasks == MaximumConcurrencyLevel) 
      return; 

     if (_pendingTasks.TryDequeue(out queuedTask) == false) 
      return; // no work to do 

     lock (_taskLocker) 
      _executingTasks.Add(queuedTask); 

     base.TryExecuteTask(queuedTask); 
    } 

    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) 
    { 
     return false; // not properly implemented just to complete the class 
    } 

    protected override IEnumerable<Task> GetScheduledTasks() 
    { 
     return new List<Task>(); // not properly implemented just to complete the class 
    } 
} 
+0

嗨,史蒂夫,谢谢你的代码。我也会测试这个。 –

+0

Steve的解决方案似乎设置了每秒请求的限制。有用!由于我需要在市场时间内更新所有符号,我认为仍然需要控制运行线程的数量。 –

+0

@Shuang Liang,我已经扩展了这个例子,允许你限制线程的执行。 – steve16351

1

不要担心线程的数量;只要确保你没有超过每秒的请求数量即可。每200毫秒使用一个计时器发出ManualResetEvent的信号,并让这些任务在循环内等待该ManualResetEvent。

要创建一个定时器,使其信号ManualResetEvent的每200毫秒:

resetEvent = new ManualResetEvent(false); 
timer = new Timer((state)=>resetEvent.Set(), 200, 0); 

确保您清理定时器(调用Dispose),当你不需要它了。

让线程数由运行时确定。

如果您创建每个股票的单个任务,这将是一个糟糕的实施,因为您不知道何时更新股票。

所以你可以把所有的股票放在一个列表中,并让一个任务一个接一个地更新每个股票。

通过给另一个任务提供另一个股票列表,您可以通过将其计时器设置为每250毫秒并将低优先级设置为每1000毫秒来给该任务一个更高的优先级。这将每秒增加5次,高优先级列表比低优先级更新4次。

+0

感谢您的意见。实际上,我打算在一个查询或任务中绑定100个符号(这似乎是雅虎设置的上限),以便请求总数将会更低,因此请求数/秒也是如此。我没有比其他人更喜欢的股票名单。我的唯一目的是更新我的清单中的全部(约1000),但并非所有市场中的所有股票都是。什么是语法“每200毫秒发出一次ManualResetEvent”?谢谢! –

+0

@ShuangLiang我加入到我的回答 –

0

您可以使用具有任务延迟的while循环来控制何时发出请求。使用异步无效方法发出请求意味着您不会被失败的请求阻止。

异步无效是火灾并忘记哪些开发者不接受,但我认为这将作为一种可能的解决方案在这种情况下。

我也认为erno de weerd围绕优先调用更重要的股票提出了一个很好的建议。

2

如果你想拥有URL请求的流量,同时限制在不超过5个并发操作,你应该使用TPL数据流的ActionBlock

var block = new ActionBlock<string>(
    url => requestFunc(url), 
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 }); 

foreach (var url in urls) 
{ 
    block.Post(url); 
} 

block.Complete(); 
await block.Completion; 

Post给它的URL,并为他们每个人它会在确保一次不超过MaxDegreeOfParallelism请求的同时执行请求。

完成后,您可以拨打Complete来发信号通知该块完成,awaitCompletion任务将异步等待,直到该块实际完成。

+0

嗨,很高兴知道。我会测试这个想法。谢谢! –

+0

请注意,这不会执行每秒5个请求要求 –

+0

@ErnodeWeerd OP在注释中确认这不是实际的要求。 – i3arnon

0

谢谢@ steve16351!它的工作原理是这样的:

static void Main(string[] args) 
    { 
     LimitedExecutionRateTaskScheduler scheduler = new LimitedExecutionRateTaskScheduler(5); 
     TaskFactory factory = new TaskFactory(scheduler); 
     List<string> symbolsToCheck = new List<string>() { "GOOG", "AAPL", "MSFT", "AGIO", "MNK", "SPY", "EBAY", "INTC" }; 


     while (true) 
     { 
      List<Task> tasks = new List<Task>(); 
      foreach (string symbol in symbolsToCheck) 
      { 
       Task t = factory.StartNew(() => 
       { 
        write(symbol); 
       }, CancellationToken.None, 
        TaskCreationOptions.None, scheduler); 
       tasks.Add(t); 
      } 
     } 
    } 

    public static void write (string symbol) 
    { 
     DateTime dateValue = DateTime.Now; 
     Console.WriteLine("Date and Time with Milliseconds: {0} doing {1}..", 
       dateValue.ToString("MM/dd/yyyy hh:mm:ss.fff tt"), symbol); 
    }