要回答你的问题,只要你锁定了访问权限,就可以让多个线程访问一个常规队列。
对于我来说,虽然,我没有使用,想使用队列,锁,让他们线程安全的。我一直在c#中为我的一个程序做这个。我只是使用一个普通的队列,然后放置一个更衣室(enqueue,dequeue,count)。如果您只是锁定访问权限,则它是完全线程安全的。
我的设置来自教程/例子在这里:http://www.albahari.com/threading/part2.aspx#_ProducerConsumerQWaitHandle
我的情况比你有点不同,但很相似。对我而言,我的数据可能会非常快速地出现,如果我不排队,那么如果多个数据同时进入,我将丢失数据。然后我有一个线程正在运行,缓慢地将项目从队列中取出并处理它们。该切换使用AutoResetEvent来保存我的工作线程,直到数据准备好处理。在你的情况下,你会使用计时器或定期发生的事情。
我复制/粘贴我的代码,并试图更改名称。希望我没有完全打破它,因为错过了一些名称变更,但你应该能够获得要点。
public class MyClass : IDisposable
{
private Thread sensorProcessingThread = null;
private Queue<SensorData> sensorQueue = new Queue<SensorData>();
private readonly object _sensorQueueLocker = new object();
private EventWaitHandle _whSensorEvent = new AutoResetEvent(false);
public MyClass() {
sensorProcessingThread = new Thread(sensorProcessingThread_DoWork);
sensorProcessingThread.Start();
}
public void Dispose()
{
// Signal the end by sending 'null'
EnqueueSensorEvent(null);
sensorProcessingThread.Join();
_whSensorEvent.Close();
}
// The fast sensor data comes in, locks queue, and then
// enqueues the data, and releases the EventWaitHandle
private void EnqueueSensorEvent(SensorData wd)
{
lock (_sensorQueueLocker)
{
sensorQueue.Enqueue(wd);
_whSensorEvent.Set();
}
}
// When asynchronous events come in, I just throw them into queue
private void OnSensorEvent(object sender, MySensorArgs e)
{
EnqueueSensorEvent(new SensorData(sender, e));
}
// I have several types of events that can come in,
// they just get packaged up into the same "SensorData"
// struct, and I worry about the contents later
private void FileSystem_Changed(object sender, System.IO.FileSystemEventArgs e)
{
EnqueueSensorEvent(new SensorData(sender, e));
}
// This is the slower process that waits for new SensorData,
// and processes it. Note, if it sees 'null' as data,
// then it knows it should quit the while(true) loop.
private void sensorProcessingThread_DoWork(object obj)
{
while (true)
{
SensorData wd = null;
lock (_sensorQueueLocker)
{
if (sensorQueue.Count > 0)
{
wd = sensorQueue.Dequeue();
if (wd == null)
{
// Quit the loop, thread finishes
return;
}
}
}
if (wd != null)
{
try
{
// Call specific handlers for the type of SensorData that was received
if (wd.isSensorDataType1)
{
SensorDataType1_handler(wd.sender, wd.SensorDataType1Content);
}
else
{
FileSystemChanged_handler(wd.sender, wd.FileSystemChangedContent);
}
}
catch (Exception exc)
{
// My sensor processing also has a chance of failing to process completely, so I have a retry
// methodology that gives up after 5 attempts
if (wd.NumFailedUpdateAttempts < 5)
{
wd.NumFailedUpdateAttempts++;
lock (_sensorQueueLocker)
{
sensorQueue.Enqueue(wd);
}
}
else
{
log.Fatal("Can no longer try processing data", exc);
}
}
}
else
_whWatchEvent.WaitOne(); // No more tasks, wait for a signal
}
}
你可能会看到的东西是来自Microsoft的.net的Reactive(Rx)。退房:https://msdn.microsoft.com/en-us/data/gg577611.aspx,页面底部是一个pdf教程“固化异步蓝调”:http://go.microsoft.com/fwlink/?LinkId=208528这是非常不同的东西,但也许你会看到你喜欢的东西。
你可能会想在[线程安全的集合(https://msdn.microsoft.com/en-us/library/dd997305(V = vs.110)的.aspx),特别是ConcurrentQueue读了。 –
Micke
几年前,我使用[ZeroMQ](http://zeromq.org/)构建了一个类似的解决方案,它负责处理产品问题和并发问题。它的工作,并仍然正常工作! – Micke
当我添加我的解决方案后,我读了Micke的评论,我认为你应该首先看看那些:) – mdiehl13