2017-09-04 145 views
1

在我尝试向我的Worker [线程]类添加暂停/恢复功能时,发生了一个我无法解释的问题。 (C++ 1y/VS2015)工作线程暂停/恢复实施

这个问题看起来像是一个死锁,但是我似乎无法在一个调试器被连接并且在某个点之前设置断点时重现它(见#1) - 因此它看起来像这是一个计时问题。

我能找到的修复(#2)对我来说并没有多大意义,因为它需要更长时间保持互斥体,并且客户机代码可能会尝试获取其他互斥体,而我知道其中的其他互斥体增加死锁的机会。

但它确实解决了这个问题。

工人循环:

Job* job; 
while (true) 
{ 
    { 
    std::unique_lock<std::mutex> lock(m_jobsMutex); 
    m_workSemaphore.Wait(lock); 

    if (m_jobs.empty() && m_finishing) 
    { 
     break; 
    } 

    // Take the next job 
    ASSERT(!m_jobs.empty()); 
    job = m_jobs.front(); 
    m_jobs.pop_front(); 
    } 

    bool done = false; 
    bool wasSuspended = false; 
    do 
    { 
    // #2 
    { // Removing this extra scoping seemingly fixes the issue BUT 
     // incurs us holding on to m_suspendMutex while the job is Process()ing, 
     // which might 1, be lengthy, 2, acquire other locks. 
     std::unique_lock<std::mutex> lock(m_suspendMutex); 
     if (m_isSuspended && !wasSuspended) 
     { 
     job->Suspend(); 
     } 
     wasSuspended = m_isSuspended; 

     m_suspendCv.wait(lock, [this] { 
     return !m_isSuspended; 
     }); 

     if (wasSuspended && !m_isSuspended) 
     { 
     job->Resume(); 
     } 
     wasSuspended = m_isSuspended; 
    } 

    done = job->Process(); 
    } 
    while (!done); 
} 

挂起/恢复只是:

void Worker::Suspend() 
{ 
    std::unique_lock<std::mutex> lock(m_suspendMutex); 
    ASSERT(!m_isSuspended); 
    m_isSuspended = true; 
} 

void Worker::Resume() 
{ 
    { 
    std::unique_lock<std::mutex> lock(m_suspendMutex); 
    ASSERT(m_isSuspended); 
    m_isSuspended = false; 
    } 
    m_suspendCv.notify_one(); // notify_all() doesn't work either. 
} 

的(Visual Studio中)测试:

struct Job: Worker::Job 
    { 
    int durationMs = 25; 
    int chunks = 40; 
    int executed = 0; 

    bool Process() 
    { 
     auto now = std::chrono::system_clock::now(); 
     auto until = now + std::chrono::milliseconds(durationMs); 
     while (std::chrono::system_clock::now() < until) 
     { /* busy, busy */ 
     } 

     ++executed; 
     return executed < chunks; 
    } 

    void Suspend() { /* nothing here */ } 
    void Resume() { /* nothing here */ } 
    }; 

    auto worker = std::make_unique<Worker>(); 

    Job j; 
    worker->Enqueue(j); 

    std::this_thread::sleep_for(std::chrono::milliseconds(j.durationMs)); // Wait at least one chunk. 

    worker->Suspend(); 

    Assert::IsTrue(j.executed < j.chunks); // We've suspended before we finished. 
    const int testExec = j.executed; 

    std::this_thread::sleep_for(std::chrono::milliseconds(j.durationMs * 4)); 

    Assert::IsTrue(j.executed == testExec); // We haven't moved on. 

    // #1 
    worker->Resume(); // Breaking before this call means that I won't see the issue. 
    worker->Finalize(); 

    Assert::IsTrue(j.executed == j.chunks); // Now we've finished. 

我缺少/做错了吗?为什么该工作的过程()必须由suspend互斥锁来保护?

编辑Resume()在通知的时候不应该一直保留在互斥量上;这是固定的 - 问题依然存在。

回答

0

当然,作业的Process()不必由suspend互斥锁来保护。

j.executed的访问 - 对于断言和递增 - 然而确实需要同步(或者通过将其设置为std::atomic<int>或通过用互斥锁来保护它)。

现在仍不清楚为什么问题表现出它的方式(因为我没有写入主线程上的变量) - 可能是undefined behaviour propagating backwards in time的情况。