2011-09-09 37 views
6

我的程序设置如下:
有一个线程安全的队列类,一个线程将数据放在它上面而坐在一个无限循环中,另一个线程弹出数据它坐在无限循环中。我试图想出一种方法来使用Windows事件或其他机制来制作thread_1(下图),等待无限while循环,并且只在队列深度大于或等于1时迭代。windows C++线程正在等待队列数据推送

class thread-safe_Queue 
{ 
public: 
    push(); 
    pop(); 
}; 

DWORD thread_1() 
{ 
while(1) 
{ 
    // wait for thread-safe queue to have data on it 
    // pop data off 
    // process data 
} 
} 

DWORD thread_2() 
{ 
while(1) 
{ 
    // when data becomes available, push data onto thread-safe queue 
} 
} 

回答

0

这个怎么样(我假设你熟悉事件机制)。

1.

thread_safe_Queue::push(something) 
{ 
// lock the queue 
... 
// push object 
// Signal the event 
SetEvent(notification); 

// unlock the queue 
} 

2.

thread_safe_Queue::pop(something) 
{ 
WaitForSingleObject(notification); 
// lock the queue 
... 
// get object 
// reset the event 
if (queue is empty) 
    ResetEvent(notification); 

// unlock the queue 
} 

3. thread_1只是试图弹出对象和处理它。由于推送事件,事件已启用,因此可以成功调用pop。否则它会在pop内等待。实际上,您可以使用其他同步对象,如互斥体或临界区域,而不是此例中的事件。

UPDATE。外部事件: 线程1:

void thread_1() 
    { 
    while(1) 
    { 
    WaitForSingleObject(notification); 
    if (!pop(object)) // pop should return if there are any objects left in queue 
     SetEvent(notification);  
    } 
    } 

thread_2

void thread_2() 
    { 
    while(1) 
    { 
    // push the object and than signal event 
    ResetEvent(notification) 
    } 
    } 
+0

我宁愿将事件放在类之外,并且在线程入口函数内部。原因是线程还等待着第二个事件。那就是当用户想要结束程序,并因此结束无限循环时。当发生这种情况时,用户将发送一个关闭程序的命令,推送线程将停止监听数据并关闭,并且推送线程将停止等待线程在其中存储数据,并且也将关闭。 – rossb83

+0

你可以对外部事件做同样的事情。我更新了上面的答案。 – Werolik

+0

你是不是指thread1调用reset并且线程2在外部版本中调用set?另外,你在这种情况下如何避免死锁:1. thread1无法弹出。 2.线程2调用集。 3. thread1调用重置。 – Nir

0

您可以使用命名的事件。每个线程都会调用以相同名称传递的CreateEvent。然后使用WaitForMultipleObjects等待队列相关事件或结束程序事件。弹出线程将等待queue_has_data和end_program事件。推送线程将等待data_available和end_program事件,并在将某些事物放入队列时设置queue_has_data事件。

2

我认为这可能会诀窍。派生类Event并重载Process()函数。

#include <process.h> // Along with all the normal windows includes 

//********************************************* 
using namespace os; 

Mutex globalQueueMutex; 

class QueueReader : public Event 
{ 
public: 
    virtual void Process() 
    { 
     // Lock the queue 
     Locker l(globalQueueMutex); 
     // pop data off 
     // process data 
     return; // queue will automatically unlock 
    } 
}; 

QueueReader myQueueReader; 

//********************************************* 
// The queue writer would have functions like : 
void StartQueueReader() 
{ 
    Thread(QueueReader::StartEventHandler, &myQueueReader); 
} 
void WriteToQueue() 
{ 
    Locker l(globalQueueMutex); 
    // write to the queue 
    myQueueReader.SignalProcess(); // tell reader to wake up 
} 
// When want to shutdown 
void Shutdown() 
{ 
    myQueueReader.SignalShutdown(); 
} 

下面是执行魔法类。

namespace os { 

// ********************************************************************** 
/// Windows implementation to spawn a thread. 
static uintptr_t Thread (void (*StartAddress)(void *), void *ArgList) 
{ 
    return _beginthread(StartAddress, 0, ArgList); 
} 

// ********************************************************************** 
/// Windows implementation of a critical section. 
class Mutex 
{ 
public: 
    // Initialize section on construction 
    Mutex() { InitializeCriticalSection(&cs_); } 
    // Delete section on destruction 
    ~Mutex() { DeleteCriticalSection(&cs_); } 
    // Lock it 
    void lock() { EnterCriticalSection(&cs_); } 
    // Unlock it 
    void unlock() { LeaveCriticalSection(&cs_); } 

private: 
    CRITICAL_SECTION cs_; 
}; // class Mutex 

/// Locks/Unlocks a mutex 
class Locker 
{ 
public: 
    // Lock the mutex on construction 
    Locker(Mutex& mutex): mutex_(mutex) { mutex_.lock(); } 
    // Unlock on destruction 
    ~Locker() { mutex_.unlock(); } 
private: 
    Mutex& mutex_; 
}; // class Locker 

// ********************************************************************** 
// Windows implementation of event handler 
#define ProcessEvent hEvents[0] 
#define SetTimerEvent hEvents[1] 
#define ShutdownEvent hEvents[2] 

/// Windows implementation of events 
class Event 
{ 
    /// Flag set when shutdown is complete 
    bool Shutdown; 
    /// Max time to wait for events 
    DWORD Timer; 
    /// The three events - process, reset timer, and shutdown 
    HANDLE hEvents[3]; 

public: 
    /// Timeout is disabled by default and Events assigned 
    Event(DWORD timer = INFINITE) : Timer(timer) 
    { 
    Shutdown = false; 
    ProcessEvent = CreateEvent(NULL,TRUE,FALSE,NULL); 
    SetTimerEvent = CreateEvent(NULL,TRUE,FALSE,NULL); 
    ShutdownEvent = CreateEvent(NULL,TRUE,FALSE,NULL); 
    } 

    /// Close the event handles 
    virtual ~Event() 
    { 
    CloseHandle(ProcessEvent); 
    CloseHandle(SetTimerEvent); 
    CloseHandle(ShutdownEvent); 
    } 

    /// os::Thread calls this to start the Event handler 
    static void StartEventHandler(void *pMyInstance) 
    { ((Event *)pMyInstance)->EventHandler(); } 
    /// Call here to Change/Reset the timeout timer 
    void ResetTimer(DWORD timer) { Timer = timer; SetEvent(SetTimerEvent); } 
    /// Set the signal to shutdown the worker thread processing events 
    void SignalShutdown() { SetEvent(ShutdownEvent); while (!Shutdown) Sleep(30);} 
    /// Set the signal to run the process 
    void SignalProcess() { SetEvent(ProcessEvent); } 

protected: 
    /// Overload in derived class to process events with worker thread 
    virtual void Process(){} 
    /// Override to process timeout- return true to terminate thread 
    virtual bool Timeout(){ return true;} 

    /// Monitor thread events 
    void EventHandler() 
    { 
    DWORD WaitEvents; 
    while (!Shutdown) 
    { 
     // Wait here, looking to be signaled what to do next 
     WaitEvents = WaitForMultipleObjects(3, hEvents, FALSE, Timer); 

     switch (WaitEvents) 
     { 
     // Process event - process event then reset for the next one 
     case WAIT_OBJECT_0 + 0: 
      Process(); 
      ResetEvent(ProcessEvent); 
      break; 

     // Change timer event - see ResetTimer(DWORD timer) 
     case WAIT_OBJECT_0 + 1: 
      ResetEvent(SetTimerEvent); 
      continue; 

     // Shutdown requested so exit this thread 
     case WAIT_OBJECT_0 + 2: 
      Shutdown = true; 
      break; 

     // Timed out waiting for an event 
     case WAIT_TIMEOUT: 
      Shutdown = Timeout(); 
      break; 

     // Failed - should never happen 
     case WAIT_FAILED: 
      break; 

     default: 
      break; 
     } 
    } 
    } 


}; 

} // namespace os