2014-04-03 66 views
2

我有一个服务器类型的应用程序,并且在确保线程在完成之前不会被删除。下面的代码几乎代表我的服务器;需要进行清理以防止在列表中建立死线。处理多线程清理的最佳方式

using namespace std; 

class A { 
public: 
    void doSomethingThreaded(function<void()> cleanupFunction, function<bool()> getStopFlag) { 
     somethingThread = thread([cleanupFunction, getStopFlag, this]() { 
      doSomething(getStopFlag); 
      cleanupFunction(); 
     }); 

    } 
private: 
    void doSomething(function<bool()> getStopFlag); 
    thread somethingThread; 
    ... 
} 

class B { 
public: 
    void runServer(); 

    void stop() { 
     stopFlag = true; 
     waitForListToBeEmpty(); 
    } 
private: 
    void waitForListToBeEmpty() { ... }; 
    void handleAccept(...) { 
     shared_ptr<A> newClient(new A()); 
     { 
      unique_lock<mutex> lock(listMutex); 
      clientData.push_back(newClient); 
     } 
     newClient.doSomethingThreaded(bind(&B::cleanup, this, newClient), [this]() { 
      return stopFlag; 
     }); 
    } 

    void cleanup(shared_ptr<A> data) { 
     unique_lock<mutex> lock(listMutex); 
     clientData.remove(data); 
    } 

    list<shared_ptr<A>> clientData; 
    mutex listMutex; 
    atomc<bool> stopFlag; 
} 

这个问题似乎是,析构函数在错误的顺序运行 - 即shared_ptr的是线程的函数结束时在破坏,这意味着“A”对象的线程完成之前被删除,造成了Havok当线程的析构函数被调用。

即 通话中移除清理功能 这一切的引用(即一个的目的),那么再次调用析构函数(包括该线程的析构函数) 调用该线程的析构函数 - 糟糕!

我看了一些替代品,比如维护一个'被删除'列表,这个列表定期用来清理另一个线程的主列表,或者为共享指针使用一个延时的deletor函数,但是这两个这些看起来矮胖,可能有竞争条件。

任何人都知道一个很好的方法来做到这一点?我看不到一个简单的重构方法,它可以正常工作。

回答

3

线程可以连接还是分离?我没有看到任何detach, 这意味着破坏线程对象而没有 加入它是一个致命的错误。你可以尝试简单地将它分离, 虽然这可以使一个干净的关机有点复杂。 (当然,对于很多服务器来说,反正不应该有关闭 )。否则:我过去所做的是创建 收割线程;一个线程除了加入任何其他的线程外,什么也不做,随后清理。

我可以补充说,这是一个很好的例子,其中 shared_ptr是而不是合适。发生删除时,您想完全控制 ;如果你分离,你可以在 清理功能中做到这一点(不过很坦白地说,只是使用delete this; 在lambda的末尾A::doSomethingThreaded好像更多 可读);否则,你在加入后在 收割者线程中完成。

编辑:

对于回收线程,类似下面应该工作:

class ReaperQueue 
{ 
    std::deque<A*> myQueue; 
    std::mutex myMutex; 
    std::conditional_variable myCond; 
    A* getOne() 
    { 
     std::lock<std::mutex> lock(myMutex); 
     myCond.wait(lock, [&](!myQueue.empty())); 
     A* results = myQueue.front(); 
     myQueue.pop_front(); 
     return results; 
    } 
public: 
    void readyToReap(A* finished_thread) 
    { 
     std::unique_lock<std::mutex> lock(myMutex); 
     myQueue.push_back(finished_thread); 
     myCond.notify_all(); 
    } 

    void reaperThread() 
    { 
     for (; ;) 
     { 
      A* mine = getOne(); 
      mine->somethingThread.join(); 
      delete mine; 
     } 
    } 
}; 

(警告:我没有测试这一点,我试图用C++ 11 功能。我一直在使用并行线程实际上只实现了它,在过去, ,所以可能会有一些错误。但是其基本原则 应持有。)

要使用,创建一个实例,然后ST艺术线程调用 reaperThread就可以了。在清理每个线程时,请致电 readyToReap

为了支持正常关机,你可能需要使用两个队列:你 插入每个线程进入第一,因为它是创建,然后 它从第一位置移动到第二(这将符合 myQueue ,以上)在readyToReap。要关机,您需要等待 ,直到两个队列都为空(当然,这段时间内没有启动 的任何新线程)。

+0

你能举一些例子吗? – 4pie0

+0

@lizusek一个什么样的例子?我提出了几种不同的解 –

+0

如何创建收割者线程的示例;一个什么也不做,只能加入任何未完成的线程的线程,在它们之后进行清理。 – 4pie0

1

问题是,由于您通过共享指针管理A,线程lambda捕获的this指针确实需要是共享指针而不是原始指针以防止它变成悬挂。问题是,当没有实际的shared_ptr时,没有简单的方法可以从原始指针创建shared_ptr。

一个办法来解决这个问题是使用shared_from_this

class A : public enable_shared_from_this<A> { 
public: 
    void doSomethingThreaded(function<void()> cleanupFunction, function<bool()> getStopFlag) { 
     somethingThread = thread([cleanupFunction, getStopFlag, this]() { 
      shared_ptr<A> temp = shared_from_this(); 
      doSomething(getStopFlag); 
      cleanupFunction(); 
     }); 

这将创建一个额外的shared_ptr的A对象保持它活着,直到线程结束。

注意,您仍然有join/detach问题,詹姆斯甘孜标识 - 线程必须要么joindetach呼吁它究竟是一旦被破坏之前。如果您不关心线程退出值,则可以通过向线程lambda添加detach调用来满足该要求。

也有潜在的问题,如果doSomethingThreaded被称为单A对象上多次...

0

对于那些谁感兴趣的话,我把给定的(即詹姆斯的分离建议两个答案的升技,和克里斯 - '关于shared_ptr的建议)。

我由此得到的代码看起来是这样的,似乎更整洁,并不会导致停机或客户端断开连接崩溃:

使用命名空间std;

class A { 
public: 
    void doSomething(function<bool()> getStopFlag) { 
     ... 
    } 
private: 
    ... 
} 

class B { 
public: 
    void runServer(); 

    void stop() { 
     stopFlag = true; 
     waitForListToBeEmpty(); 
    } 
private: 
    void waitForListToBeEmpty() { ... }; 
    void handleAccept(...) { 
     shared_ptr<A> newClient(new A()); 
     { 
      unique_lock<mutex> lock(listMutex); 
      clientData.push_back(newClient); 
     } 
     thread clientThread([this, newClient]() { 
      // Capture the shared_ptr until thread over and done with. 

      newClient->doSomething([this]() { 
       return stopFlag; 
      }); 
      cleanup(newClient); 
     }); 
     // Detach to remove the need to store these threads until their completion. 
     clientThread.detach(); 
    } 

    void cleanup(shared_ptr<A> data) { 
     unique_lock<mutex> lock(listMutex); 
     clientData.remove(data); 
    } 

    list<shared_ptr<A>> clientData; // Can remove this if you don't 
            // need to connect with your clients. 
            // However, you'd need to make sure this 
            // didn't get deallocated before all clients 
            // finished as they reference the boolean stopFlag 
            // OR make it a shared_ptr to an atomic boolean 
    mutex listMutex; 
    atomc<bool> stopFlag; 
}