我有一个WCF服务托管在Windows服务中。这个服务公开2点的方法:关于同时处理单个和批量请求的体系结构
bool ProcessClaim(string options, ref string xml);
需要一些数据作为输入,执行一些处理(包括IO绑定操作,像DB查询),并返回结果回来。void RunJob(string ticket);
立即返回。根据ticket
,从存储器(例如DB或文件系统)读取输入数据,对每个数据元素进行相同的处理,并将结果保存回存储器。批量通常由许多索赔组成。
用户可以拨打ProcessClaim
来处理单个请求,RunJob
运行批处理。几批可以同时运行。每个处理请求包装为Task
,因此所有请求都并行执行。 问题不在于通过安排大量请求来允许批量塞满处理队列。换句话说,如果用户执行大量批处理,它将会阻止小批量和单个处理请求大量时间。 于是我想出了下面的模式,由Albahari(很简单)很好地描述:
public sealed class ProcessingQueue : IDisposable
{
private class WorkItem
{
public readonly TaskCompletionSource<string> TaskSource;
public readonly string Options;
public readonly string Claim;
public readonly CancellationToken? CancelToken;
public WorkItem(
TaskCompletionSource<string> taskSource,
string options,
string claim,
CancellationToken? cancelToken)
{
TaskSource = taskSource;
Options = options;
Claim = claim;
CancelToken = cancelToken;
}
}
public ProcessingQueue()
: this(Environment.ProcessorCount)
{
}
public ProcessingQueue(int workerCount)
{
_taskQ = new BlockingCollection<WorkItem>(workerCount * 2);
for (var i = 0; i < workerCount; i++)
Task.Factory.StartNew(Consume);
}
public void Dispose()
{
_taskQ.CompleteAdding();
}
private readonly BlockingCollection<WorkItem> _taskQ;
public Task<string> EnqueueTask(string options, string claim, CancellationToken? cancelToken = null)
{
var tcs = new TaskCompletionSource<string>();
_taskQ.Add(new WorkItem(tcs, options, claim, cancelToken));
return tcs.Task;
}
public static Task<string> ProcessRequest(string options, string claim, CancellationToken? cancelToken = null)
{
return Task<string>.Factory.StartNew(() => ProcessItem(options, claim));
}
private void Consume()
{
foreach (var workItem in _taskQ.GetConsumingEnumerable())
{
if (workItem.CancelToken.HasValue && workItem.CancelToken.Value.IsCancellationRequested)
workItem.TaskSource.SetCanceled();
else
{
try
{
workItem.TaskSource.SetResult(ProcessItem(workItem.Options, workItem.Claim));
}
catch (Exception ex)
{
workItem.TaskSource.SetException(ex);
}
}
}
}
private static string ProcessItem(string options, string claim)
{
// do some actual work here
Thread.Sleep(2000); // simulate work;
return options + claim; // return final result
}
}
静态方法ProcessRequest
可以用来处理单个请求,而实例方法EnqueueTask
- 批量处理。当然,所有批次都必须使用单个共享实例ProcessingQueue
。尽管这种方式非常好,可以控制同时运行多个批次的步伐,有一些看上去是错的对我说:
- 要保持工作的线程池手动
- 难以猜测的最佳工作线程(I使用由默认处理器核的数量)螺纹
- 一束保持当没有批次运行阻塞,浪费系统资源
- IO结合的处理块的工作线程减少CPU的使用 的效率的部分的数
我想知道,是否有更好的方式来处理这种情况?
更新: 要求之一是提供全功率的批次,当用户执行一个批次的意思,并没有其他传入的请求,所有资源必须努力处理这批专用。
这听起来像你想要以更一致的方式分配负载。我会看看服务巴士来做到这一点。 http://nservicebus.com/ – 2012-02-06 15:20:44