2015-09-28 28 views
0
#include <boost/asio/io_service.hpp> 
#include <boost/bind.hpp> 
#include <boost/thread/thread.hpp> 
#include <iostream> 
using namespace std; 

#define MAX_THREAD_COUNT 50 

class A 
{ 
public: 
    static void ExecDecrementX(void* thisObj, int *x) 
    { 
     cout << "DecrementX thread..." << endl; 
     A *obj = (A*)thisObj; 
     obj->DecrementX(x); 
    } 

    void DecrementX(int* x) 
    { 
     cout << "Thread Instantiated........" << endl; 
     for (; *x <= 1200; (*x)++) 
     { 
      cout << "DecrementX thread, X = " << *x << endl; 
      if (*x == 1100) 
      { 
       HANDLE hEvent = OpenEvent(EVENT_ALL_ACCESS, false, L"MyEvent1"); 
       if (!hEvent) { return; } 
       ResetEvent(hEvent); 
       if (SetEvent(hEvent)) 
       { 
        cout << "Got The signal - MyEvent 1......." << endl; 
        CloseHandle(hEvent); 
       } 
      } 
     } 
     cout << "End of the Thread ......" << endl; 
    } 
}; 

int main() 
{ 
    int x = 1000; 
    A* obj1 = new A(); 
    DWORD dwRet; 
    HANDLE Events_Handle[1]; 
    HANDLE hEvent1 = CreateEvent(NULL, true, false, L"MyEvent1"); 
    if (!hEvent1) return -1; 
    Events_Handle[0] = hEvent1; 
    void(*fPtr)(void*, int*) = A::ExecDecrementX; 
    boost::asio::io_service ioService; 
    boost::thread_group ThreadPool; 
    boost::asio::io_service::work work(ioService); 
    for (int threadcount = 0; threadcount < MAX_THREAD_COUNT; threadcount++) 
    { 
     ThreadPool.create_thread(boost::bind(&boost::asio::io_service::run, &ioService)); 
    } 
    cout << "Main thread\n"; 
    ioService.post(boost::bind(fPtr, obj1, &x)); 
    while (1) 
    { 
     dwRet = WaitForMultipleObjects(1, Events_Handle, false, INFINITE); 
     cout << "dwRet = " << dwRet << endl; 
     if (dwRet == WAIT_OBJECT_0) 
     { 
      cout << "All events were signaled..." << endl; 
      ioService.stop(); 
      ThreadPool.interrupt_all(); 
      cout << "Remaining threads interrupted... X = " << x << endl; 
      CloseHandle(hEvent1); 
      break; 
     } 
    } 
    cout << "Main Thread: At the End X = " << x << endl; 
    return 0; 
} 

我是需求量的一次x达到1100,打断了所有的线程,但即使线程池的中断之后,我看到线程池的推移和保持印刷,直到结束。 你能帮我理解发生了什么,我该如何解决它?想一旦变量达到一定值时,终止线程池

感谢, JK

回答

0

它看起来像你很难跟你把周围的所有线程的原语更加困惑。

首先你混的线程抽象的不少于3个不同层次:

  • 短耳异步任务排队
  • 升压线程便携式线程库(thread_group和断点)
  • Windows本机API(WaitForMultipleObjects,活动)

即便如此,你得到它的大部分......混合起来:

  • 该事件是一个命名对象。也就是说,它旨在用于进程间同步(IPC),并且所有实例都是针对同一个同步对象的不同内核句柄...
  • 您使用WaitForMultipleObjects来等待... 1个固定同步对象。看来你可以做的更好:)

  • 采用interrupt_all()意味着你使用断点,你从来没有处理thread_interrupted例外呢?请参阅文档:http://www.boost.org/doc/libs/1_59_0/doc/html/thread/thread_management.html#thread.thread_management.tutorial.interruption

  • 您似乎使用Boost Asio来服务线程池上的任务。 还有您只能向服务发布1个任务,这意味着不会有超过1个线程参与运行任务。

    混乱的主要来源在这里似乎是一个任务一个线程

    泳池管理线程,服务玩杂耍任务。你的'任务'似乎有雄心壮志占据一条完整的线程。这永远不适合一个线程池(整个想法是重复使用相同的线程来执行多个短暂的任务),特别是异步处理(长时间运行的任务会阻塞队列,并且不会有异步发生在相同的线程)。

  • 使用work是错误的。它的生命周期不会在连接之前结束,这意味着操作将永不返回。请参见docs

    工作类用于通知io_service工作何时开始和结束。这确保io_service对象的run()函数在工作正在进行时不会退出,并且在没有剩余未完成的工作时它将退出。

  • 最后,为什么在地球上,你需要在短耳任务队列50个线程。这只有在你有很长的运行任务时才有意义,这些任务是而不是 CPU绑定。这与异步任务处理完全相反。

  • 具有讽刺意味的是,DecrementX似乎没有别的,但增量 x。在互斥体下应该访问哪个应用程序。我做了一个atomic_size_t下面

这里是在清理一个不起眼的尝试:

Live On Coliru

#include <boost/asio/io_service.hpp> 
#include <boost/bind.hpp> 
#include <boost/thread.hpp> 
#include <iostream> 

class A { 
    boost::atomic_size_t x { 0 }; 
    boost::asio::io_service& _svc; 

public: 
    A(boost::asio::io_service& svc) : _svc(svc) {} 
    void IncrementX() { 
     std::cout << "IncrementX X = " << x << "\n"; 
     if (x++ == 1100) 
      _svc.stop(); 
    } 

    size_t getX() const { return x; } 
}; 

int main() 
{ 
    using namespace boost; 

    asio::io_service ioService; 
    A obj(ioService); 

    thread_group pool; 
    optional<asio::io_service::work> work = asio::io_service::work(ioService); 

    for (unsigned i = 0; i < thread::hardware_concurrency(); i++) { 
     pool.create_thread(bind(&asio::io_service::run, &ioService)); 
    } 

    std::cout << "Main thread\n"; 

    while (!ioService.stopped()) 
    { 
     ioService.post(bind(&A::IncrementX, &obj)); 
    } 

    work.reset(); 
    pool.join_all(); 

    std::cout << "Main Thread: At the End X = " << obj.getX() << std::endl; 
} 

打印(删节):

Main thread 
IncrementX X = IncrementX X = 00 

IncrementX X = IncrementX X = 2 
2 
IncrementX X = IncrementX X = 4 
4 
IncrementX X = 5 
IncrementX X = 7IncrementX X = 
7 
IncrementX X = IncrementX X = 99 

IncrementX X = IncrementX X = 1111 

IncrementX X = 13 
IncrementX X = 14 
IncrementX X = 15 
IncrementX X = 16 
IncrementX X = 17 
IncrementX X = 18 
IncrementX X = 19 
IncrementX X = 20 
IncrementX X = 21 
... 
... 
IncrementX X = 1078IncrementX X = 
1074 
IncrementX X = 1079 
IncrementX X = 
IncrementX X = IncrementX X = IncrementX X = 1082 
10751081 

IncrementX X = IncrementX X = 10851085 

IncrementX X = 10811085IncrementX X = 
1087IncrementX X = IncrementX X = 1088 
IncrementX X = 1089 
IncrementX X = 1090 
IncrementX X = 1091 

IncrementX X = 1092 
IncrementX X = 1093 

1087IncrementX X = 
1095IncrementX X = IncrementX X = 1096 

1097 
IncrementX X = IncrementX X = 1097 
1100 
IncrementX X = IncrementX X = 11021102 



IncrementX X = 1075 

Main Thread: At the End X = 1108 

real 0m0.003s 
user 0m0.000s 
sys 0m0.006s 
+0

如果你想让你查看我录制的直播,重构这个答案[围绕18'45''标记](https://www.livecoding.tv/video/qihold-butchering-a-thread-fest-question/) 。特别是你可以看到我先用'boost :: condition_variable' /'mutex' /'unique_lock'组合代替了并处理了'thread_interrupted'异常(尽管我的最终代码并不包含任何这些代码),你仍然可能仍然想看?) – sehe

相关问题