2017-08-08 83 views
1

我一直在测试将IO完成端口与线程池中的工作线程相结合,并偶然发现我无法解释的行为。特别是,虽然下面的代码:从线程池工作线程使用GetQueuedCompletionStatus的奇怪行为

int data; 
    for (int i = 0; i < NUM; ++i) 
     PostQueuedCompletionStatus(cp, 1, NULL, reinterpret_cast<LPOVERLAPPED>(&data)); 

    { 
     std::thread t([&]() 
     { 
      LPOVERLAPPED aux; 
      DWORD  cmd; 
      ULONG_PTR key; 

      for (int i = 0; i < NUM; ++i) 
      { 
       if (!GetQueuedCompletionStatus(cp, &cmd, &key, &aux, 0)) 
       break; 
       ++count; 
      } 
     }); 

     t.join(); 
    } 

作品完美的罚款,并接收NUM状态通知(与NUM是大量的,10万以上),使用线程池工作对象,上面写着一个状态通知了类似的代码每个工作项目并在阅读后重新发布工作项目,在阅读几百个状态通知后失败。具有以下全局变量(请不要介意名称):

HANDLE cport; 
PTP_POOL pool; 
TP_CALLBACK_ENVIRON env; 
PTP_WORK work; 
std::size_t num_calls; 
std::mutex mutex; 
std::condition_variable cv; 
bool job_done; 

和回调函数:

static VOID CALLBACK callback(PTP_CALLBACK_INSTANCE instance_, PVOID pv_, PTP_WORK work_) 
{ 
    LPOVERLAPPED aux; 
    DWORD  cmd; 
    ULONG_PTR key; 

    if (GetQueuedCompletionStatus(cport, &cmd, &key, &aux, 0)) 
    { 
    ++num_calls; 
    SubmitThreadpoolWork(work); 
    } 
    else 
    { 
    std::unique_lock<std::mutex> l(mutex); 
    std::cout << "No work after " << num_calls << " calls.\n"; 
    job_done = true; 
    cv.notify_one(); 
    } 
} 

下面的代码:

{ 
    job_done = false; 
    std::unique_lock<std::mutex> l(mutex); 

    num_calls = 0; 
    cport = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 1); 

    pool = CreateThreadpool(nullptr); 
    InitializeThreadpoolEnvironment(&env); 
    SetThreadpoolCallbackPool(&env, pool); 

    work = CreateThreadpoolWork(callback, nullptr, &env); 

    for (int i = 0; i < NUM; ++i) 
     PostQueuedCompletionStatus(cport, 1, NULL, reinterpret_cast<LPOVERLAPPED>(&data)); 

    SubmitThreadpoolWork(work); 
    cv.wait_for(l, std::chrono::milliseconds(10000), [] { return job_done; }); 
} 

将报告“没有更多的工作之后......“250个左右之后调用GetQueuedCompletionStatus,尽管NUM被设置为1000000.更加奇怪的是,将等待从0设置为10毫秒会增加succ的数量精心打电话给几十万人,偶尔会读取所有1000000个通知。由于所有状态通知都是在首次提交工作对象之前发布的,所以我不太了解。

是否有可能确实存在将完成端口和线程池组合在一起的问题,或者在我的代码中出现错误?请不要进入为什么我想要这样做 - 我正在调查的可能性,并在这方面偶然发现。在我看来,它应该工作,并不能指出什么是错的。谢谢。

+0

您应检查由'PostQueuedCompletionStatus'(和其他WINAPI函数)返回的值,并检查'GetLastError'如果失败。 – VTT

+0

完整的代码做到了这一点,为了简单起见,我删除了检查。没有错误报告。 –

+0

你应该添加它们。 – VTT

回答

0

我试过运行这段代码,问题似乎是提供给CreateIoCompletionPortNumberOfConcurrentThreads参数。传递1意味着执行callback的第一个池线程与io完成端口关联,但由于线程池可能执行callback使用不同的线程GetQueuedCompletionStatus会在发生这种情况时失败。 From documentation

要仔细考虑的I/O完成端口最重要的属性是并发值。通过参数NumberOfConcurrentThreads通过CreateIoCompletionPort创建完成端口的并发值。此值限制与完成端口关联的可运行线程的数量。当与完成端口相关联的可运行线程的总数达到并发值时,系统将阻止与该完成端口相关联的任何后续线程的执行,直到可运行线程的数量下降到低于并发值。

尽管任意数量的线程可以调用GetQueuedCompletionStatus为指定的I/O完成端口,当指定的线程调用GetQueuedCompletionStatus在第一时间,就成了具有指定I/O完成端口,直到三件事情之一相关情况:该线程退出,指定不同的I/O完成端口,或者关闭I/O完成端口。换句话说,一个线程最多可以与一个I/O完成端口相关联。

所以使用IO使用线程池完成你需要的并发线程数设置为线程池的大小(你可以使用SetThreadpoolThreadMaximum设置)。

::DWORD const threads_count{1}; 

cport = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, threads_count); 
... 
pool = ::CreateThreadpool(nullptr); 
::SetThreadpoolThreadMaximum(pool, threads_count); 
+0

谢谢,先生!这确实解决了这个问题。 –