我一直在测试将IO完成端口与线程池中的工作线程相结合,并偶然发现我无法解释的行为。特别是,虽然下面的代码:从线程池工作线程使用GetQueuedCompletionStatus的奇怪行为
int data;
for (int i = 0; i < NUM; ++i)
PostQueuedCompletionStatus(cp, 1, NULL, reinterpret_cast<LPOVERLAPPED>(&data));
{
std::thread t([&]()
{
LPOVERLAPPED aux;
DWORD cmd;
ULONG_PTR key;
for (int i = 0; i < NUM; ++i)
{
if (!GetQueuedCompletionStatus(cp, &cmd, &key, &aux, 0))
break;
++count;
}
});
t.join();
}
作品完美的罚款,并接收NUM状态通知(与NUM是大量的,10万以上),使用线程池工作对象,上面写着一个状态通知了类似的代码每个工作项目并在阅读后重新发布工作项目,在阅读几百个状态通知后失败。具有以下全局变量(请不要介意名称):
HANDLE cport;
PTP_POOL pool;
TP_CALLBACK_ENVIRON env;
PTP_WORK work;
std::size_t num_calls;
std::mutex mutex;
std::condition_variable cv;
bool job_done;
和回调函数:
static VOID CALLBACK callback(PTP_CALLBACK_INSTANCE instance_, PVOID pv_, PTP_WORK work_)
{
LPOVERLAPPED aux;
DWORD cmd;
ULONG_PTR key;
if (GetQueuedCompletionStatus(cport, &cmd, &key, &aux, 0))
{
++num_calls;
SubmitThreadpoolWork(work);
}
else
{
std::unique_lock<std::mutex> l(mutex);
std::cout << "No work after " << num_calls << " calls.\n";
job_done = true;
cv.notify_one();
}
}
下面的代码:
{
job_done = false;
std::unique_lock<std::mutex> l(mutex);
num_calls = 0;
cport = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 1);
pool = CreateThreadpool(nullptr);
InitializeThreadpoolEnvironment(&env);
SetThreadpoolCallbackPool(&env, pool);
work = CreateThreadpoolWork(callback, nullptr, &env);
for (int i = 0; i < NUM; ++i)
PostQueuedCompletionStatus(cport, 1, NULL, reinterpret_cast<LPOVERLAPPED>(&data));
SubmitThreadpoolWork(work);
cv.wait_for(l, std::chrono::milliseconds(10000), [] { return job_done; });
}
将报告“没有更多的工作之后......“250个左右之后调用GetQueuedCompletionStatus,尽管NUM被设置为1000000.更加奇怪的是,将等待从0设置为10毫秒会增加succ的数量精心打电话给几十万人,偶尔会读取所有1000000个通知。由于所有状态通知都是在首次提交工作对象之前发布的,所以我不太了解。
是否有可能确实存在将完成端口和线程池组合在一起的问题,或者在我的代码中出现错误?请不要进入为什么我想要这样做 - 我正在调查的可能性,并在这方面偶然发现。在我看来,它应该工作,并不能指出什么是错的。谢谢。
您应检查由'PostQueuedCompletionStatus'(和其他WINAPI函数)返回的值,并检查'GetLastError'如果失败。 – VTT
完整的代码做到了这一点,为了简单起见,我删除了检查。没有错误报告。 –
你应该添加它们。 – VTT