2012-03-20 97 views
5

我有一个要求将Web服务请求发送到在线api,我认为并行扩展将非常适合我的需求。任务并行库 - 自定义任务计划程序

有问题的Web服务被设计为重复调用,但如果您每秒接收到一定数量的呼叫,则会有一个机制向您收费。我显然希望尽量减少我收取的费用等不知道是否有人已经看到了的TaskScheduler可以用以下要求处理:

  1. 限制计划每时间跨度的任务数。我想如果请求的数量超过了这个限制,那么它将需要丢弃任务或可能阻止? (停止任务的后台日志)
  2. 检测相同的请求是否已经在要执行但尚未执行的调度程序中,如果没有,则不排队第二个任务,而是返回第一个任务。

人们是否觉得这些是任务调度程序应该处理的责任或者是我在吠叫错误的树?如果你有其他选择,我愿意接受建议。

+0

我认为至少#2是不可能做到与TaskScheduler,因为它处理“任务”,并且没有办法从中获取这些信息。 – svick 2012-03-20 22:25:43

+0

您是否对使用C#5/.Net 4.5的解决方案感兴趣? – svick 2012-03-20 22:26:10

+0

@svick绝对,我有幸被允许使用c#5 – Fen 2012-03-20 22:32:36

回答

7

我同意其他人TPL数据流听起来这是很好的解决方案。

要限制处理,你可以创建一个TransformBlock实际上并没有以任何方式转换数据,它只是延缓它,如果它以前的数据后立即赶到:

static IPropagatorBlock<T, T> CreateDelayBlock<T>(TimeSpan delay) 
{ 
    DateTime lastItem = DateTime.MinValue; 
    return new TransformBlock<T, T>(
     async x => 
       { 
        var waitTime = lastItem + delay - DateTime.UtcNow; 
        if (waitTime > TimeSpan.Zero) 
         await Task.Delay(waitTime); 

        lastItem = DateTime.UtcNow; 

        return x; 
       }, 
     new ExecutionDataflowBlockOptions { BoundedCapacity = 1 }); 
} 

然后创建一个方法产生的数据(从0开始的整数例子):

static async Task Producer(ITargetBlock<int> target) 
{ 
    int i = 0; 
    while (await target.SendAsync(i)) 
     i++; 
} 

它的异步写入,因此,如果目标块是不能够马上处理的项目,它会等待。

然后写一个消费者方法:

static void Consumer(int i) 
{ 
    Console.WriteLine(i); 
} 

最后,连在一起这一切,并启动它:

var delayBlock = CreateDelayBlock<int>(TimeSpan.FromMilliseconds(500)); 

var consumerBlock = new ActionBlock<int>(
    (Action<int>)Consumer, 
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded }); 

delayBlock.LinkTo(consumerBlock, new DataflowLinkOptions { PropagateCompletion = true }); 

Task.WaitAll(Producer(delayBlock), consumerBlock.Completion); 

这里,delayBlock将接受最多一个每500毫秒项和方法可以并行运行多次。要完成处理,请拨打delayBlock.Complete()

如果你想为你的#2添加一些缓存,你可以创建另一个TransformBlock做那里的工作,并将其链接到其他块。

+0

宾果,这正是我想到的。在你提供实际的实现方面很好,我只是无法找到时间。 – 2012-03-21 17:58:25

+1

整洁。有一件事要注意,如果你运行的是异步CTP而不是.NET 4。5,您需要将Task.Delay更改为TaskEx.Delay。 – 2012-03-21 19:21:23

+0

@ DPeden,我认为你的意思是.Net 4.5(目前处于测试阶段)。 – svick 2012-03-21 19:23:13

0

如果你需要时间节流,你应该检查出Quartz.net。它可以促进一致的投票。如果你关心所有的请求,你应该考虑使用某种排队机制。 MSMQ可能是正确的解决方案,但是如果您想要扩大规模并使用像NServiceBusRabbitMQ这样的ESB,则有许多特定的实施方案。

更新:

在这种情况下,TPL数据流是首选的解决方案,如果你能充分利用CTP。受限制的BufferBlock是解决方案。

这个例子来自documentation provided by Microsoft

// Hand-off through a bounded BufferBlock<T> 
private static BufferBlock<int> m_buffer = new BufferBlock<int>(
    new DataflowBlockOptions { BoundedCapacity = 10 }); 

// Producer 
private static async void Producer() 
{ 
    while(true) 
    { 
     await m_buffer.SendAsync(Produce()); 
    } 
} 

// Consumer 
private static async Task Consumer() 
{ 
    while(true) 
    { 
     Process(await m_buffer.ReceiveAsync()); 
    } 
} 

// Start the Producer and Consumer 
private static async Task Run() 
{ 
    await Task.WhenAll(Producer(), Consumer()); 
} 

更新:

退房RX的Observable.Throttle

+0

我想保持一切正在进行中,所以排除了排队作为解决方案。我不认为Quartz是正确的解决方案,调度不是问题,它限制了请求的数量,并且对我所做的调用有点聪明。感谢您的建议,虽然 – Fen 2012-03-20 22:12:17

+0

@DarenFox我根据您的反馈更新了我的答案。 – 2012-03-20 22:17:21

+0

@DarenFox这究竟如何防止每秒超过_N_个请求?看起来你只能限制10个并发未处理的调用,但很明显,如果调用速度超过每秒10毫秒,可能会超过每秒限制。 FWIW,我回答说TPL数据流可能是最好的方式,但是您在技术上需要一个时间性的BufferBlock实现。 – 2012-03-20 22:24:44

3

老实说,我会在更高层次的抽象工作,并使用TPL Dataflow API。唯一的问题是您需要编写一个自定义块,以您需要的速度调整请求,因为默认情况下,块是“贪婪”的,并且会尽可能快地进行处理。实现将如下所示:

  1. BufferBlock<T>开始,这是您要发布到的逻辑块。
  2. BufferBlock<T>链接到具有请求/秒和限制逻辑知识的自定义块。
  3. 将自定义块从2链接到您的ActionBlock<T>

我没有时间在第二秒写上#2的自定义块,但是如果您还没有弄明白,我会稍后再回来检查并尝试填写实施。

+0

由于我没有查看TPL Dataflow,我真的很感激你的帮助 – Fen 2012-03-20 22:46:08