我需要实现可以从多个线程填充请求的队列。当这个队列变得大于1000个完成的请求时,这个请求应该被存储到数据库中。这是我的实现:如何从ConcurrentQueue消耗chuncks正确
public class RequestQueue
{
private static BlockingCollection<VerificationRequest> _queue = new BlockingCollection<VerificationRequest>();
private static ConcurrentQueue<VerificationRequest> _storageQueue = new ConcurrentQueue<VerificationRequest>();
private static volatile bool isLoading = false;
private static object _lock = new object();
public static void Launch()
{
Task.Factory.StartNew(execute);
}
public static void Add(VerificationRequest request)
{
_queue.Add(request);
}
public static void AddRange(List<VerificationRequest> requests)
{
Parallel.ForEach(requests, new ParallelOptions() {MaxDegreeOfParallelism = 3},
(request) => { _queue.Add(request); });
}
private static void execute()
{
Parallel.ForEach(_queue.GetConsumingEnumerable(), new ParallelOptions {MaxDegreeOfParallelism = 5}, EnqueueSaveRequest);
}
private static void EnqueueSaveRequest(VerificationRequest request)
{
_storageQueue.Enqueue(new RequestExecuter().ExecuteVerificationRequest(request));
if (_storageQueue.Count > 1000 && !isLoading)
{
lock (_lock)
{
if (_storageQueue.Count > 1000 && !isLoading)
{
isLoading = true;
var requestChunck = new List<VerificationRequest>();
VerificationRequest req;
for (var i = 0; i < 1000; i++)
{
if(_storageQueue.TryDequeue(out req))
requestChunck.Add(req);
}
new VerificationRequestRepository().InsertRange(requestChunck);
isLoading = false;
}
}
}
}
}
有没有什么办法来实现这个没有锁和isLoading?
为什么你不希望使用锁?我的意思是在这种情况下似乎不会影响性能。 – Evk
我同意,但也许有更好的办法。另外我不确定我是否实现了正确加载isLoading的锁定 – xalz
为什么你甚至需要'isLoading'?如果您只是将其删除,会发生什么变化? – zerkms