2012-02-06 50 views
4

我有一个WCF服务托管在Windows服务中。这个服务公开2点的方法:关于同时处理单个和批量请求的体系结构

  1. bool ProcessClaim(string options, ref string xml);需要一些数据作为输入,执行一些处理(包括IO绑定操作,像DB查询),并返回结果回来。
  2. void RunJob(string ticket);立即返回。根据ticket,从存储器(例如DB或文件系统)读取输入数据,对每个数据元素进行相同的处理,并将结果保存回存储器。批量通常由许多索赔组成。

用户可以拨打ProcessClaim来处理单个请求,RunJob运行批处理。几批可以同时运行。每个处理请求包装为Task,因此所有请求都并行执行。 问题不在于通过安排大量请求来允许批量塞满处理队列。换句话说,如果用户执行大量批处理,它将会阻止小批量和单个处理请求大量时间。 于是我想出了下面的模式,由Albahari(很简单)很好地描述:

public sealed class ProcessingQueue : IDisposable 
{ 
    private class WorkItem 
    { 
     public readonly TaskCompletionSource<string> TaskSource; 
     public readonly string Options; 
     public readonly string Claim; 
     public readonly CancellationToken? CancelToken; 

     public WorkItem(
      TaskCompletionSource<string> taskSource, 
      string options, 
      string claim, 
      CancellationToken? cancelToken) 
     { 
      TaskSource = taskSource; 
      Options = options; 
      Claim = claim; 
      CancelToken = cancelToken; 
     } 
    } 

    public ProcessingQueue() 
     : this(Environment.ProcessorCount) 
    { 
    } 

    public ProcessingQueue(int workerCount) 
    { 
     _taskQ = new BlockingCollection<WorkItem>(workerCount * 2); 

     for (var i = 0; i < workerCount; i++) 
      Task.Factory.StartNew(Consume); 
    } 

    public void Dispose() 
    { 
     _taskQ.CompleteAdding(); 
    } 

    private readonly BlockingCollection<WorkItem> _taskQ; 

    public Task<string> EnqueueTask(string options, string claim, CancellationToken? cancelToken = null) 
    { 
     var tcs = new TaskCompletionSource<string>(); 
     _taskQ.Add(new WorkItem(tcs, options, claim, cancelToken)); 
     return tcs.Task; 
    } 

    public static Task<string> ProcessRequest(string options, string claim, CancellationToken? cancelToken = null) 
    { 
     return Task<string>.Factory.StartNew(() => ProcessItem(options, claim)); 
    } 

    private void Consume() 
    { 
     foreach (var workItem in _taskQ.GetConsumingEnumerable()) 
     { 
      if (workItem.CancelToken.HasValue && workItem.CancelToken.Value.IsCancellationRequested) 
       workItem.TaskSource.SetCanceled(); 
      else 
      { 
       try 
       { 
        workItem.TaskSource.SetResult(ProcessItem(workItem.Options, workItem.Claim)); 
       } 
       catch (Exception ex) 
       { 
        workItem.TaskSource.SetException(ex); 
       } 
      } 
     } 
    } 

    private static string ProcessItem(string options, string claim) 
    { 
     // do some actual work here 
     Thread.Sleep(2000); // simulate work; 
     return options + claim; // return final result 
    } 
} 

静态方法ProcessRequest可以用来处理单个请求,而实例方法EnqueueTask - 批量处理。当然,所有批次都必须使用单个共享实例ProcessingQueue。尽管这种方式非常好,可以控制同时运行多个批次的步伐,有一些看上去是错的对我说:

  • 要保持工作的线程池手动
  • 难以猜测的最佳工作线程(I使用由默认处理器核的数量)螺纹
  • 一束保持当没有批次运行阻塞,浪费系统资源
  • IO结合的处理块的工作线程减少CPU的使用
  • 的效率的部分的数

我想知道,是否有更好的方式来处理这种情况?

更新: 要求之一是提供全功率的批次,当用户执行一个批次的意思,并没有其他传入的请求,所有资源必须努力处理这批专用。

+0

这听起来像你想要以更一致的方式分配负载。我会看看服务巴士来做到这一点。 http://nservicebus.com/ – 2012-02-06 15:20:44

回答

4

我想说,让一个服务接口和一个主机容器来处理这两种非常不同类型的需求可能是错误的。

您应该将服务分解为两个 - 一个返回对个别请求的响应,另一个将批量查询排队并在单个线程上处理它们。

通过这种方式,您可以为您的实时消费者提供高可用性渠道,并为您的消费者提供离线渠道。这些可以作为单独的问题进行部署和管理,使您可以在每个服务界面上提供不同的服务级别。

只是我对提出的架构的想法。

UPDATE

事实是,你的容量处理渠道是线下渠道。这种方式意味着消费者将不得不排队等待并确定其要求返回的时间。

那么工作队列怎么样?每个作业都会在处理时获取所有可用资源。处理作业后,调用者会收到通知,说明作业已完成。

+0

这是一个很好的观点,我绝对应该将这些功能分开。但是,其中一个要求是为批次提供全部电力,这意味着当用户执行一个批次时,并且没有其他传入请求时,所有资源必须专用于处理该批次。您在单线程中处理批次的建议不符合此要求。另一方面,批量线程数量的增加会导致相同的初始问题。 – yuramag 2012-02-06 17:11:05

+0

我同意你的意见。我不知道如何最好地解决您的问题。您的挑战是以某种方式将更多线程专用于处理批处理,同时最大化线程可用性。至少通过解耦您的通道,可以避免影响实时端点。 – 2012-02-10 11:58:01

+0

查看我的答案更新 – 2012-02-10 12:00:21

相关问题